Fix warnings, simplify connection trait (#4366)

This commit is contained in:
Mees Delzenne 2024-07-18 15:33:22 +02:00 committed by GitHub
parent b37929b6ab
commit 3b1a1f0348
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
43 changed files with 302 additions and 636 deletions

View file

@ -1,6 +1,7 @@
use crate::api;
use crate::api::err::Error;
use crate::api::method::query::Response;
use crate::api::method::BoxFuture;
use crate::api::opt::Endpoint;
use crate::api::ExtraFeatures;
use crate::api::Result;
@ -15,9 +16,7 @@ use serde::de::DeserializeOwned;
use serde::Serialize;
use std::collections::BTreeMap;
use std::collections::HashSet;
use std::future::Future;
use std::path::PathBuf;
use std::pin::Pin;
use std::sync::atomic::AtomicI64;
use std::sync::atomic::Ordering;
@ -40,6 +39,141 @@ impl Router {
pub(crate) fn next_id(&self) -> i64 {
self.last_id.fetch_add(1, Ordering::SeqCst)
}
pub(crate) fn send(
&self,
method: Method,
param: Param,
) -> BoxFuture<'_, Result<Receiver<Result<DbResponse>>>> {
Box::pin(async move {
let id = self.next_id();
let (sender, receiver) = flume::bounded(1);
let route = Route {
request: (id, method, param),
response: sender,
};
self.sender.send_async(route).await?;
Ok(receiver)
})
}
/// Receive responses for all methods except `query`
pub(crate) fn recv(
&self,
receiver: Receiver<Result<DbResponse>>,
) -> BoxFuture<'_, Result<Value>> {
Box::pin(async move {
let response = receiver.into_recv_async().await?;
match response? {
DbResponse::Other(value) => Ok(value),
DbResponse::Query(..) => unreachable!(),
}
})
}
/// Receive the response of the `query` method
pub(crate) fn recv_query(
&self,
receiver: Receiver<Result<DbResponse>>,
) -> BoxFuture<'_, Result<Response>> {
Box::pin(async move {
let response = receiver.into_recv_async().await?;
match response? {
DbResponse::Query(results) => Ok(results),
DbResponse::Other(..) => unreachable!(),
}
})
}
/// Execute all methods except `query`
pub(crate) fn execute<R>(&self, method: Method, param: Param) -> BoxFuture<'_, Result<R>>
where
R: DeserializeOwned,
{
Box::pin(async move {
let rx = self.send(method, param).await?;
let value = self.recv(rx).await?;
from_value(value).map_err(Into::into)
})
}
/// Execute methods that return an optional single response
pub(crate) fn execute_opt<R>(
&self,
method: Method,
param: Param,
) -> BoxFuture<'_, Result<Option<R>>>
where
R: DeserializeOwned,
{
Box::pin(async move {
let rx = self.send(method, param).await?;
match self.recv(rx).await? {
Value::None | Value::Null => Ok(None),
value => from_value(value).map_err(Into::into),
}
})
}
/// Execute methods that return multiple responses
pub(crate) fn execute_vec<R>(
&self,
method: Method,
param: Param,
) -> BoxFuture<'_, Result<Vec<R>>>
where
R: DeserializeOwned,
{
Box::pin(async move {
let rx = self.send(method, param).await?;
let value = match self.recv(rx).await? {
Value::None | Value::Null => Value::Array(Default::default()),
Value::Array(array) => Value::Array(array),
value => vec![value].into(),
};
from_value(value).map_err(Into::into)
})
}
/// Execute methods that return nothing
pub(crate) fn execute_unit(&self, method: Method, param: Param) -> BoxFuture<'_, Result<()>> {
Box::pin(async move {
let rx = self.send(method, param).await?;
match self.recv(rx).await? {
Value::None | Value::Null => Ok(()),
Value::Array(array) if array.is_empty() => Ok(()),
value => Err(Error::FromValue {
value,
error: "expected the database to return nothing".to_owned(),
}
.into()),
}
})
}
/// Execute methods that return a raw value
pub(crate) fn execute_value(
&self,
method: Method,
param: Param,
) -> BoxFuture<'_, Result<Value>> {
Box::pin(async move {
let rx = self.send(method, param).await?;
self.recv(rx).await
})
}
/// Execute the `query` method
pub(crate) fn execute_query(
&self,
method: Method,
param: Param,
) -> BoxFuture<'_, Result<Response>> {
Box::pin(async move {
let rx = self.send(method, param).await?;
self.recv_query(rx).await
})
}
}
/// The query method
@ -164,162 +298,8 @@ impl Param {
/// Connection trait implemented by supported protocols
pub trait Connection: Sized + Send + Sync + 'static {
/// Constructs a new client without connecting to the server
fn new(method: Method) -> Self;
/// Connect to the server
fn connect(
address: Endpoint,
capacity: usize,
) -> Pin<Box<dyn Future<Output = Result<Surreal<Self>>> + Send + Sync + 'static>>
fn connect(address: Endpoint, capacity: usize) -> BoxFuture<'static, Result<Surreal<Self>>>
where
Self: api::Connection;
/// Send a query to the server
#[allow(clippy::type_complexity)]
fn send<'r>(
&'r mut self,
router: &'r Router,
param: Param,
) -> Pin<Box<dyn Future<Output = Result<Receiver<Result<DbResponse>>>> + Send + Sync + 'r>>
where
Self: api::Connection;
/// Receive responses for all methods except `query`
fn recv(
&mut self,
receiver: Receiver<Result<DbResponse>>,
) -> Pin<Box<dyn Future<Output = Result<Value>> + Send + Sync + '_>> {
Box::pin(async move {
let response = receiver.into_recv_async().await?;
match response? {
DbResponse::Other(value) => Ok(value),
DbResponse::Query(..) => unreachable!(),
}
})
}
/// Receive the response of the `query` method
fn recv_query(
&mut self,
receiver: Receiver<Result<DbResponse>>,
) -> Pin<Box<dyn Future<Output = Result<Response>> + Send + Sync + '_>> {
Box::pin(async move {
let response = receiver.into_recv_async().await?;
match response? {
DbResponse::Query(results) => Ok(results),
DbResponse::Other(..) => unreachable!(),
}
})
}
/// Execute all methods except `query`
fn execute<'r, R>(
&'r mut self,
router: &'r Router,
param: Param,
) -> Pin<Box<dyn Future<Output = Result<R>> + Send + Sync + 'r>>
where
R: DeserializeOwned,
Self: api::Connection,
{
Box::pin(async move {
let rx = self.send(router, param).await?;
let value = self.recv(rx).await?;
from_value(value).map_err(Into::into)
})
}
/// Execute methods that return an optional single response
fn execute_opt<'r, R>(
&'r mut self,
router: &'r Router,
param: Param,
) -> Pin<Box<dyn Future<Output = Result<Option<R>>> + Send + Sync + 'r>>
where
R: DeserializeOwned,
Self: api::Connection,
{
Box::pin(async move {
let rx = self.send(router, param).await?;
match self.recv(rx).await? {
Value::None | Value::Null => Ok(None),
value => from_value(value).map_err(Into::into),
}
})
}
/// Execute methods that return multiple responses
fn execute_vec<'r, R>(
&'r mut self,
router: &'r Router,
param: Param,
) -> Pin<Box<dyn Future<Output = Result<Vec<R>>> + Send + Sync + 'r>>
where
R: DeserializeOwned,
Self: api::Connection,
{
Box::pin(async move {
let rx = self.send(router, param).await?;
let value = match self.recv(rx).await? {
Value::None | Value::Null => Value::Array(Default::default()),
Value::Array(array) => Value::Array(array),
value => vec![value].into(),
};
from_value(value).map_err(Into::into)
})
}
/// Execute methods that return nothing
fn execute_unit<'r>(
&'r mut self,
router: &'r Router,
param: Param,
) -> Pin<Box<dyn Future<Output = Result<()>> + Send + Sync + 'r>>
where
Self: api::Connection,
{
Box::pin(async move {
let rx = self.send(router, param).await?;
match self.recv(rx).await? {
Value::None | Value::Null => Ok(()),
Value::Array(array) if array.is_empty() => Ok(()),
value => Err(Error::FromValue {
value,
error: "expected the database to return nothing".to_owned(),
}
.into()),
}
})
}
/// Execute methods that return a raw value
fn execute_value<'r>(
&'r mut self,
router: &'r Router,
param: Param,
) -> Pin<Box<dyn Future<Output = Result<Value>> + Send + Sync + 'r>>
where
Self: api::Connection,
{
Box::pin(async move {
let rx = self.send(router, param).await?;
self.recv(rx).await
})
}
/// Execute the `query` method
fn execute_query<'r>(
&'r mut self,
router: &'r Router,
param: Param,
) -> Pin<Box<dyn Future<Output = Result<Response>> + Send + Sync + 'r>>
where
Self: api::Connection,
{
Box::pin(async move {
let rx = self.send(router, param).await?;
self.recv_query(rx).await
})
}
}

View file

@ -133,7 +133,6 @@ mod native;
#[cfg(target_arch = "wasm32")]
mod wasm;
use crate::api::conn::Method;
use crate::api::err::Error;
use crate::api::opt::Config;
use crate::api::opt::Endpoint;
@ -211,10 +210,7 @@ where
/// A dynamic connection that supports any engine and allows you to pick at runtime
#[derive(Debug, Clone)]
pub struct Any {
id: i64,
method: Method,
}
pub struct Any(());
impl Surreal<Any> {
/// Connects to a specific database endpoint, saving the connection on the static client

View file

@ -1,7 +1,4 @@
use crate::api::conn::Connection;
use crate::api::conn::Method;
use crate::api::conn::Param;
use crate::api::conn::Route;
use crate::api::conn::Router;
#[allow(unused_imports)] // used by the DB engines
use crate::api::engine;
@ -9,11 +6,11 @@ use crate::api::engine::any::Any;
#[cfg(feature = "protocol-http")]
use crate::api::engine::remote::http;
use crate::api::err::Error;
use crate::api::method::BoxFuture;
#[cfg(any(feature = "native-tls", feature = "rustls"))]
#[cfg(feature = "protocol-http")]
use crate::api::opt::Tls;
use crate::api::opt::{Endpoint, EndpointKind};
use crate::api::DbResponse;
#[allow(unused_imports)] // used by the DB engines
use crate::api::ExtraFeatures;
use crate::api::OnceLockExt;
@ -22,12 +19,9 @@ use crate::api::Surreal;
#[allow(unused_imports)]
use crate::error::Db as DbError;
use crate::opt::WaitFor;
use flume::Receiver;
#[cfg(feature = "protocol-http")]
use reqwest::ClientBuilder;
use std::collections::HashSet;
use std::future::Future;
use std::pin::Pin;
use std::sync::atomic::AtomicI64;
use std::sync::Arc;
use std::sync::OnceLock;
@ -41,18 +35,8 @@ use tokio_tungstenite::Connector;
impl crate::api::Connection for Any {}
impl Connection for Any {
fn new(method: Method) -> Self {
Self {
method,
id: 0,
}
}
#[allow(unused_variables, unreachable_code, unused_mut)] // these are all used depending on feature
fn connect(
address: Endpoint,
capacity: usize,
) -> Pin<Box<dyn Future<Output = Result<Surreal<Self>>> + Send + Sync + 'static>> {
fn connect(address: Endpoint, capacity: usize) -> BoxFuture<'static, Result<Surreal<Self>>> {
Box::pin(async move {
let (route_tx, route_rx) = match capacity {
0 => flume::unbounded(),
@ -159,7 +143,7 @@ impl Connection for Any {
let client = builder.build()?;
let base_url = address.url;
engine::remote::http::health(
client.get(base_url.join(Method::Health.as_str())?),
client.get(base_url.join(crate::api::conn::Method::Health.as_str())?),
)
.await?;
tokio::spawn(engine::remote::http::native::run_router(
@ -226,21 +210,4 @@ impl Connection for Any {
))
})
}
fn send<'r>(
&'r mut self,
router: &'r Router,
param: Param,
) -> Pin<Box<dyn Future<Output = Result<Receiver<Result<DbResponse>>>> + Send + Sync + 'r>> {
Box::pin(async move {
let (sender, receiver) = flume::bounded(1);
self.id = router.next_id();
let route = Route {
request: (self.id, self.method, param),
response: sender,
};
router.sender.send_async(route).await?;
Ok(receiver)
})
}
}

View file

@ -1,24 +1,18 @@
use crate::api::conn::Connection;
use crate::api::conn::Method;
use crate::api::conn::Param;
use crate::api::conn::Route;
use crate::api::conn::Router;
#[allow(unused_imports)] // used by the DB engines
use crate::api::engine;
use crate::api::engine::any::Any;
use crate::api::err::Error;
use crate::api::method::BoxFuture;
use crate::api::opt::{Endpoint, EndpointKind};
use crate::api::DbResponse;
use crate::api::ExtraFeatures;
use crate::api::OnceLockExt;
use crate::api::Result;
use crate::api::Surreal;
use crate::error::Db as DbError;
use crate::opt::WaitFor;
use flume::Receiver;
use std::collections::HashSet;
use std::future::Future;
use std::pin::Pin;
use std::sync::atomic::AtomicI64;
use std::sync::Arc;
use std::sync::OnceLock;
@ -28,18 +22,8 @@ use wasm_bindgen_futures::spawn_local;
impl crate::api::Connection for Any {}
impl Connection for Any {
fn new(method: Method) -> Self {
Self {
method,
id: 0,
}
}
#[allow(unused_variables, unreachable_code, unused_mut)] // these are all used depending on feature
fn connect(
address: Endpoint,
capacity: usize,
) -> Pin<Box<dyn Future<Output = Result<Surreal<Self>>> + Send + Sync + 'static>> {
fn connect(address: Endpoint, capacity: usize) -> BoxFuture<'static, Result<Surreal<Self>>> {
Box::pin(async move {
let (route_tx, route_rx) = match capacity {
0 => flume::unbounded(),
@ -183,21 +167,4 @@ impl Connection for Any {
))
})
}
fn send<'r>(
&'r mut self,
router: &'r Router,
param: Param,
) -> Pin<Box<dyn Future<Output = Result<Receiver<Result<DbResponse>>>> + Send + Sync + 'r>> {
Box::pin(async move {
let (sender, receiver) = flume::bounded(1);
self.id = router.next_id();
let route = Route {
request: (self.id, self.method, param),
response: sender,
};
router.sender.send_async(route).await?;
Ok(receiver)
})
}
}

View file

@ -373,9 +373,7 @@ pub struct SurrealKV;
/// An embedded database
#[derive(Debug, Clone)]
pub struct Db {
pub(crate) method: crate::api::conn::Method,
}
pub struct Db(());
impl Surreal<Db> {
/// Connects to a specific database endpoint, saving the connection on the static client

View file

@ -1,10 +1,8 @@
use crate::api::conn::Connection;
use crate::api::conn::DbResponse;
use crate::api::conn::Method;
use crate::api::conn::Param;
use crate::api::conn::Route;
use crate::api::conn::Router;
use crate::api::engine::local::Db;
use crate::api::method::BoxFuture;
use crate::api::opt::{Endpoint, EndpointKind};
use crate::api::ExtraFeatures;
use crate::api::OnceLockExt;
@ -24,8 +22,6 @@ use futures::StreamExt;
use std::collections::BTreeMap;
use std::collections::HashMap;
use std::collections::HashSet;
use std::future::Future;
use std::pin::Pin;
use std::sync::atomic::AtomicI64;
use std::sync::Arc;
use std::sync::OnceLock;
@ -35,16 +31,7 @@ use tokio::sync::watch;
impl crate::api::Connection for Db {}
impl Connection for Db {
fn new(method: Method) -> Self {
Self {
method,
}
}
fn connect(
address: Endpoint,
capacity: usize,
) -> Pin<Box<dyn Future<Output = Result<Surreal<Self>>> + Send + Sync + 'static>> {
fn connect(address: Endpoint, capacity: usize) -> BoxFuture<'static, Result<Surreal<Self>>> {
Box::pin(async move {
let (route_tx, route_rx) = match capacity {
0 => flume::unbounded(),
@ -71,22 +58,6 @@ impl Connection for Db {
))
})
}
fn send<'r>(
&'r mut self,
router: &'r Router,
param: Param,
) -> Pin<Box<dyn Future<Output = Result<Receiver<Result<DbResponse>>>> + Send + Sync + 'r>> {
Box::pin(async move {
let (sender, receiver) = flume::bounded(1);
let route = Route {
request: (0, self.method, param),
response: sender,
};
router.sender.send_async(route).await?;
Ok(receiver)
})
}
}
pub(crate) async fn run_router(

View file

@ -1,11 +1,9 @@
use crate::api::conn::Connection;
use crate::api::conn::DbResponse;
use crate::api::conn::Method;
use crate::api::conn::Param;
use crate::api::conn::Route;
use crate::api::conn::Router;
use crate::api::engine::local::Db;
use crate::api::engine::local::DEFAULT_TICK_INTERVAL;
use crate::api::method::BoxFuture;
use crate::api::opt::Endpoint;
use crate::api::ExtraFeatures;
use crate::api::OnceLockExt;
@ -26,8 +24,6 @@ use futures::StreamExt;
use std::collections::BTreeMap;
use std::collections::HashMap;
use std::collections::HashSet;
use std::future::Future;
use std::pin::Pin;
use std::sync::atomic::AtomicI64;
use std::sync::Arc;
use std::sync::OnceLock;
@ -38,16 +34,7 @@ use wasm_bindgen_futures::spawn_local;
impl crate::api::Connection for Db {}
impl Connection for Db {
fn new(method: Method) -> Self {
Self {
method,
}
}
fn connect(
address: Endpoint,
capacity: usize,
) -> Pin<Box<dyn Future<Output = Result<Surreal<Self>>> + Send + Sync + 'static>> {
fn connect(address: Endpoint, capacity: usize) -> BoxFuture<'static, Result<Surreal<Self>>> {
Box::pin(async move {
let (route_tx, route_rx) = match capacity {
0 => flume::unbounded(),
@ -73,22 +60,6 @@ impl Connection for Db {
))
})
}
fn send<'r>(
&'r mut self,
router: &'r Router,
param: Param,
) -> Pin<Box<dyn Future<Output = Result<Receiver<Result<DbResponse>>>> + Send + Sync + 'r>> {
Box::pin(async move {
let (sender, receiver) = flume::bounded(1);
let route = Route {
request: (0, self.method, param),
response: sender,
};
router.sender.send_async(route).await?;
Ok(receiver)
})
}
}
pub(crate) async fn run_router(

View file

@ -71,9 +71,7 @@ pub struct Https;
/// An HTTP client for communicating with the server via HTTP
#[derive(Debug, Clone)]
pub struct Client {
method: Method,
}
pub struct Client(());
impl Surreal<Client> {
/// Connects to a specific database endpoint, saving the connection on the static client

View file

@ -1,10 +1,9 @@
use super::Client;
use crate::api::conn::Connection;
use crate::api::conn::DbResponse;
use crate::api::conn::Method;
use crate::api::conn::Param;
use crate::api::conn::Route;
use crate::api::conn::Router;
use crate::api::method::BoxFuture;
use crate::api::opt::Endpoint;
#[cfg(any(feature = "native-tls", feature = "rustls"))]
use crate::api::opt::Tls;
@ -19,8 +18,6 @@ use indexmap::IndexMap;
use reqwest::header::HeaderMap;
use reqwest::ClientBuilder;
use std::collections::HashSet;
use std::future::Future;
use std::pin::Pin;
use std::sync::atomic::AtomicI64;
use std::sync::Arc;
use std::sync::OnceLock;
@ -30,16 +27,7 @@ use url::Url;
impl crate::api::Connection for Client {}
impl Connection for Client {
fn new(method: Method) -> Self {
Self {
method,
}
}
fn connect(
address: Endpoint,
capacity: usize,
) -> Pin<Box<dyn Future<Output = Result<Surreal<Self>>> + Send + Sync + 'static>> {
fn connect(address: Endpoint, capacity: usize) -> BoxFuture<'static, Result<Surreal<Self>>> {
Box::pin(async move {
let headers = super::default_headers();
@ -82,22 +70,6 @@ impl Connection for Client {
))
})
}
fn send<'r>(
&'r mut self,
router: &'r Router,
param: Param,
) -> Pin<Box<dyn Future<Output = Result<Receiver<Result<DbResponse>>>> + Send + Sync + 'r>> {
Box::pin(async move {
let (sender, receiver) = flume::bounded(1);
let route = Route {
request: (0, self.method, param),
response: sender,
};
router.sender.send_async(route).await?;
Ok(receiver)
})
}
}
pub(crate) async fn run_router(base_url: Url, client: reqwest::Client, route_rx: Receiver<Route>) {

View file

@ -1,10 +1,9 @@
use super::Client;
use crate::api::conn::Connection;
use crate::api::conn::DbResponse;
use crate::api::conn::Method;
use crate::api::conn::Param;
use crate::api::conn::Route;
use crate::api::conn::Router;
use crate::api::method::BoxFuture;
use crate::api::opt::Endpoint;
use crate::api::OnceLockExt;
use crate::api::Result;
@ -17,8 +16,6 @@ use indexmap::IndexMap;
use reqwest::header::HeaderMap;
use reqwest::ClientBuilder;
use std::collections::HashSet;
use std::future::Future;
use std::pin::Pin;
use std::sync::atomic::AtomicI64;
use std::sync::Arc;
use std::sync::OnceLock;
@ -29,16 +26,7 @@ use wasm_bindgen_futures::spawn_local;
impl crate::api::Connection for Client {}
impl Connection for Client {
fn new(method: Method) -> Self {
Self {
method,
}
}
fn connect(
address: Endpoint,
capacity: usize,
) -> Pin<Box<dyn Future<Output = Result<Surreal<Self>>> + Send + Sync + 'static>> {
fn connect(address: Endpoint, capacity: usize) -> BoxFuture<'static, Result<Surreal<Self>>> {
Box::pin(async move {
let (route_tx, route_rx) = match capacity {
0 => flume::unbounded(),
@ -61,23 +49,6 @@ impl Connection for Client {
))
})
}
fn send<'r>(
&'r mut self,
router: &'r Router,
param: Param,
) -> Pin<Box<dyn Future<Output = Result<Receiver<Result<DbResponse>>>> + Send + Sync + 'r>> {
Box::pin(async move {
let (sender, receiver) = flume::bounded(1);
trace!("{param:?}");
let route = Route {
request: (0, self.method, param),
response: sender,
};
router.sender.send_async(route).await?;
Ok(receiver)
})
}
}
async fn client(base_url: &Url) -> Result<reqwest::Client> {

View file

@ -184,10 +184,7 @@ pub struct Wss;
/// A WebSocket client for communicating with the server via WebSockets
#[derive(Debug, Clone)]
pub struct Client {
pub(crate) id: i64,
method: Method,
}
pub struct Client(());
impl Surreal<Client> {
/// Connects to a specific database endpoint, saving the connection on the static client

View file

@ -4,7 +4,6 @@ use super::{HandleResult, RouterRequest};
use crate::api::conn::Connection;
use crate::api::conn::DbResponse;
use crate::api::conn::Method;
use crate::api::conn::Param;
use crate::api::conn::Route;
use crate::api::conn::Router;
use crate::api::engine::remote::ws::Client;
@ -12,6 +11,7 @@ use crate::api::engine::remote::ws::Response;
use crate::api::engine::remote::ws::PING_INTERVAL;
use crate::api::engine::remote::ws::PING_METHOD;
use crate::api::err::Error;
use crate::api::method::BoxFuture;
use crate::api::opt::Endpoint;
#[cfg(any(feature = "native-tls", feature = "rustls"))]
use crate::api::opt::Tls;
@ -31,9 +31,7 @@ use revision::revisioned;
use serde::Deserialize;
use std::collections::hash_map::Entry;
use std::collections::HashSet;
use std::future::Future;
use std::mem;
use std::pin::Pin;
use std::sync::atomic::AtomicI64;
use std::sync::Arc;
use std::sync::OnceLock;
@ -105,17 +103,10 @@ pub(crate) async fn connect(
impl crate::api::Connection for Client {}
impl Connection for Client {
fn new(method: Method) -> Self {
Self {
id: 0,
method,
}
}
fn connect(
mut address: Endpoint,
capacity: usize,
) -> Pin<Box<dyn Future<Output = Result<Surreal<Self>>> + Send + Sync + 'static>> {
) -> BoxFuture<'static, Result<Surreal<Self>>> {
Box::pin(async move {
address.url = address.url.join(PATH)?;
#[cfg(any(feature = "native-tls", feature = "rustls"))]
@ -152,23 +143,6 @@ impl Connection for Client {
))
})
}
fn send<'r>(
&'r mut self,
router: &'r Router,
param: Param,
) -> Pin<Box<dyn Future<Output = Result<Receiver<Result<DbResponse>>>> + Send + Sync + 'r>> {
Box::pin(async move {
self.id = router.next_id();
let (sender, receiver) = flume::bounded(1);
let route = Route {
request: (self.id, self.method, param),
response: sender,
};
router.sender.send_async(route).await?;
Ok(receiver)
})
}
}
async fn router_handle_route(

View file

@ -3,7 +3,6 @@ use super::{HandleResult, PATH};
use crate::api::conn::Connection;
use crate::api::conn::DbResponse;
use crate::api::conn::Method;
use crate::api::conn::Param;
use crate::api::conn::Route;
use crate::api::conn::Router;
use crate::api::engine::remote::ws::Client;
@ -11,6 +10,7 @@ use crate::api::engine::remote::ws::Response;
use crate::api::engine::remote::ws::PING_INTERVAL;
use crate::api::engine::remote::ws::PING_METHOD;
use crate::api::err::Error;
use crate::api::method::BoxFuture;
use crate::api::opt::Endpoint;
use crate::api::ExtraFeatures;
use crate::api::OnceLockExt;
@ -35,9 +35,7 @@ use serde::Deserialize;
use std::collections::hash_map::Entry;
use std::collections::BTreeMap;
use std::collections::HashSet;
use std::future::Future;
use std::mem;
use std::pin::Pin;
use std::sync::atomic::AtomicI64;
use std::sync::Arc;
use std::sync::OnceLock;
@ -58,17 +56,10 @@ type RouterState = super::RouterState<MessageSink, MessageStream, Message>;
impl crate::api::Connection for Client {}
impl Connection for Client {
fn new(method: Method) -> Self {
Self {
id: 0,
method,
}
}
fn connect(
mut address: Endpoint,
capacity: usize,
) -> Pin<Box<dyn Future<Output = Result<Surreal<Self>>> + Send + Sync + 'static>> {
) -> BoxFuture<'static, Result<Surreal<Self>>> {
Box::pin(async move {
address.url = address.url.join(PATH)?;
@ -96,23 +87,6 @@ impl Connection for Client {
))
})
}
fn send<'r>(
&'r mut self,
router: &'r Router,
param: Param,
) -> Pin<Box<dyn Future<Output = Result<Receiver<Result<DbResponse>>>> + Send + Sync + 'r>> {
Box::pin(async move {
self.id = router.next_id();
let (sender, receiver) = flume::bounded(1);
let route = Route {
request: (self.id, self.method, param),
response: sender,
};
router.sender.send_async(route).await?;
Ok(receiver)
})
}
}
async fn router_handle_request(

View file

@ -1,3 +1,5 @@
use crate::api::method::BoxFuture;
use crate::api::conn::Method;
use crate::api::conn::Param;
use crate::api::method::OnceLockExt;
@ -6,9 +8,7 @@ use crate::api::Connection;
use crate::api::Result;
use crate::Surreal;
use std::borrow::Cow;
use std::future::Future;
use std::future::IntoFuture;
use std::pin::Pin;
/// An authentication future
#[derive(Debug)]
@ -23,13 +23,12 @@ where
Client: Connection,
{
type Output = Result<()>;
type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send + Sync + 'r>>;
type IntoFuture = BoxFuture<'r, Self::Output>;
fn into_future(self) -> Self::IntoFuture {
Box::pin(async move {
let router = self.client.router.extract()?;
let mut conn = Client::new(Method::Authenticate);
conn.execute_unit(router, Param::new(vec![self.token.0.into()])).await
router.execute_unit(Method::Authenticate, Param::new(vec![self.token.0.into()])).await
})
}
}

View file

@ -1,13 +1,13 @@
use crate::api::method::BoxFuture;
use crate::api::method::Cancel;
use crate::api::method::Commit;
use crate::api::Connection;
use crate::api::Result;
use crate::api::Surreal;
use crate::sql::statements::BeginStatement;
use std::future::Future;
use std::future::IntoFuture;
use std::ops::Deref;
use std::pin::Pin;
/// A beginning of a transaction
#[derive(Debug)]
@ -21,7 +21,7 @@ where
C: Connection,
{
type Output = Result<Transaction<C>>;
type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send + Sync + 'static>>;
type IntoFuture = BoxFuture<'static, Self::Output>;
fn into_future(self) -> Self::IntoFuture {
Box::pin(async move {

View file

@ -1,10 +1,10 @@
use crate::api::method::BoxFuture;
use crate::api::Connection;
use crate::api::Result;
use crate::api::Surreal;
use crate::sql::statements::CancelStatement;
use std::future::Future;
use std::future::IntoFuture;
use std::pin::Pin;
/// A transaction cancellation future
#[derive(Debug)]
@ -18,7 +18,7 @@ where
C: Connection,
{
type Output = Result<Surreal<C>>;
type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send + Sync + 'static>>;
type IntoFuture = BoxFuture<'static, Self::Output>;
fn into_future(self) -> Self::IntoFuture {
Box::pin(async move {

View file

@ -1,10 +1,10 @@
use crate::api::method::BoxFuture;
use crate::api::Connection;
use crate::api::Result;
use crate::api::Surreal;
use crate::sql::statements::CommitStatement;
use std::future::Future;
use std::future::IntoFuture;
use std::pin::Pin;
/// A transaction commit future
#[derive(Debug)]
@ -18,7 +18,7 @@ where
C: Connection,
{
type Output = Result<Surreal<C>>;
type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send + Sync + 'static>>;
type IntoFuture = BoxFuture<'static, Self::Output>;
fn into_future(self) -> Self::IntoFuture {
Box::pin(async move {

View file

@ -1,5 +1,6 @@
use crate::api::conn::Method;
use crate::api::conn::Param;
use crate::api::method::BoxFuture;
use crate::api::opt::Range;
use crate::api::opt::Resource;
use crate::api::Connection;
@ -12,10 +13,8 @@ use crate::Surreal;
use serde::de::DeserializeOwned;
use serde::Serialize;
use std::borrow::Cow;
use std::future::Future;
use std::future::IntoFuture;
use std::marker::PhantomData;
use std::pin::Pin;
/// A content future
///
@ -61,12 +60,12 @@ macro_rules! into_future {
Some(range) => resource?.with_range(range)?.into(),
None => resource?.into(),
};
let mut conn = Client::new(method);
let params = match content? {
Value::None | Value::Null => vec![param],
content => vec![param, content],
};
conn.$method(client.router.extract()?, Param::new(params)).await
let router = client.router.extract()?;
router.$method(method, Param::new(params)).await
})
}
};
@ -78,7 +77,7 @@ where
D: Serialize,
{
type Output = Result<Value>;
type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send + Sync + 'r>>;
type IntoFuture = BoxFuture<'r, Self::Output>;
into_future! {execute_value}
}
@ -90,7 +89,7 @@ where
R: DeserializeOwned,
{
type Output = Result<Option<R>>;
type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send + Sync + 'r>>;
type IntoFuture = BoxFuture<'r, Self::Output>;
into_future! {execute_opt}
}
@ -102,7 +101,7 @@ where
R: DeserializeOwned,
{
type Output = Result<Vec<R>>;
type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send + Sync + 'r>>;
type IntoFuture = BoxFuture<'r, Self::Output>;
into_future! {execute_vec}
}

View file

@ -1,5 +1,6 @@
use crate::api::conn::Method;
use crate::api::conn::Param;
use crate::api::method::BoxFuture;
use crate::api::method::Content;
use crate::api::opt::Resource;
use crate::api::Connection;
@ -10,10 +11,8 @@ use crate::Surreal;
use serde::de::DeserializeOwned;
use serde::Serialize;
use std::borrow::Cow;
use std::future::Future;
use std::future::IntoFuture;
use std::marker::PhantomData;
use std::pin::Pin;
/// A record create future
#[derive(Debug)]
@ -46,8 +45,8 @@ macro_rules! into_future {
..
} = self;
Box::pin(async move {
let mut conn = Client::new(Method::Create);
conn.$method(client.router.extract()?, Param::new(vec![resource?.into()])).await
let router = client.router.extract()?;
router.$method(Method::Create, Param::new(vec![resource?.into()])).await
})
}
};
@ -58,7 +57,7 @@ where
Client: Connection,
{
type Output = Result<Value>;
type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send + Sync + 'r>>;
type IntoFuture = BoxFuture<'r, Self::Output>;
into_future! {execute_value}
}
@ -69,7 +68,7 @@ where
R: DeserializeOwned,
{
type Output = Result<Option<R>>;
type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send + Sync + 'r>>;
type IntoFuture = BoxFuture<'r, Self::Output>;
into_future! {execute_opt}
}
@ -80,7 +79,7 @@ where
R: DeserializeOwned,
{
type Output = Result<Vec<R>>;
type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send + Sync + 'r>>;
type IntoFuture = BoxFuture<'r, Self::Output>;
into_future! {execute_vec}
}

View file

@ -1,5 +1,6 @@
use crate::api::conn::Method;
use crate::api::conn::Param;
use crate::api::method::BoxFuture;
use crate::api::opt::Range;
use crate::api::opt::Resource;
use crate::api::Connection;
@ -10,10 +11,8 @@ use crate::sql::Value;
use crate::Surreal;
use serde::de::DeserializeOwned;
use std::borrow::Cow;
use std::future::Future;
use std::future::IntoFuture;
use std::marker::PhantomData;
use std::pin::Pin;
/// A record delete future
#[derive(Debug)]
@ -52,8 +51,8 @@ macro_rules! into_future {
Some(range) => resource?.with_range(range)?.into(),
None => resource?.into(),
};
let mut conn = Client::new(Method::Delete);
conn.$method(client.router.extract()?, Param::new(vec![param])).await
let router = client.router.extract()?;
router.$method(Method::Delete, Param::new(vec![param])).await
})
}
};
@ -64,7 +63,7 @@ where
Client: Connection,
{
type Output = Result<Value>;
type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send + Sync + 'r>>;
type IntoFuture = BoxFuture<'r, Self::Output>;
into_future! {execute_value}
}
@ -75,7 +74,7 @@ where
R: DeserializeOwned,
{
type Output = Result<Option<R>>;
type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send + Sync + 'r>>;
type IntoFuture = BoxFuture<'r, Self::Output>;
into_future! {execute_opt}
}
@ -86,7 +85,7 @@ where
R: DeserializeOwned,
{
type Output = Result<Vec<R>>;
type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send + Sync + 'r>>;
type IntoFuture = BoxFuture<'r, Self::Output>;
into_future! {execute_vec}
}

View file

@ -1,6 +1,7 @@
use crate::api::conn::Method;
use crate::api::conn::MlConfig;
use crate::api::conn::Param;
use crate::api::method::BoxFuture;
use crate::api::Connection;
use crate::api::Error;
use crate::api::ExtraFeatures;
@ -14,7 +15,6 @@ use futures::Stream;
use futures::StreamExt;
use semver::Version;
use std::borrow::Cow;
use std::future::Future;
use std::future::IntoFuture;
use std::marker::PhantomData;
use std::path::PathBuf;
@ -70,7 +70,7 @@ where
Client: Connection,
{
type Output = Result<()>;
type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send + Sync + 'r>>;
type IntoFuture = BoxFuture<'r, Self::Output>;
fn into_future(self) -> Self::IntoFuture {
Box::pin(async move {
@ -78,13 +78,12 @@ where
if !router.features.contains(&ExtraFeatures::Backup) {
return Err(Error::BackupsNotSupported.into());
}
let mut conn = Client::new(Method::Export);
let mut param = match self.target {
ExportDestination::File(path) => Param::file(path),
ExportDestination::Memory => unreachable!(),
};
param.ml_config = self.ml_config;
conn.execute_unit(router, param).await
router.execute_unit(Method::Export, param).await
})
}
}
@ -94,7 +93,7 @@ where
Client: Connection,
{
type Output = Result<Backup>;
type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send + Sync + 'r>>;
type IntoFuture = BoxFuture<'r, Self::Output>;
fn into_future(self) -> Self::IntoFuture {
Box::pin(async move {
@ -103,13 +102,12 @@ where
return Err(Error::BackupsNotSupported.into());
}
let (tx, rx) = crate::channel::bounded(1);
let mut conn = Client::new(Method::Export);
let ExportDestination::Memory = self.target else {
unreachable!();
};
let mut param = Param::bytes_sender(tx);
param.ml_config = self.ml_config;
conn.execute_unit(router, param).await?;
router.execute_unit(Method::Export, param).await?;
Ok(Backup {
rx,
})

View file

@ -1,3 +1,5 @@
use crate::api::method::BoxFuture;
use crate::api::conn::Method;
use crate::api::conn::Param;
use crate::api::Connection;
@ -5,9 +7,7 @@ use crate::api::Result;
use crate::method::OnceLockExt;
use crate::Surreal;
use std::borrow::Cow;
use std::future::Future;
use std::future::IntoFuture;
use std::pin::Pin;
/// A health check future
#[derive(Debug)]
@ -33,12 +33,12 @@ where
Client: Connection,
{
type Output = Result<()>;
type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send + Sync + 'r>>;
type IntoFuture = BoxFuture<'r, Self::Output>;
fn into_future(self) -> Self::IntoFuture {
Box::pin(async move {
let mut conn = Client::new(Method::Health);
conn.execute_unit(self.client.router.extract()?, Param::new(Vec::new())).await
let router = self.client.router.extract()?;
router.execute_unit(Method::Health, Param::new(Vec::new())).await
})
}
}

View file

@ -1,3 +1,5 @@
use crate::api::method::BoxFuture;
use crate::api::conn::Method;
use crate::api::conn::MlConfig;
use crate::api::conn::Param;
@ -9,11 +11,9 @@ use crate::method::Model;
use crate::method::OnceLockExt;
use crate::Surreal;
use std::borrow::Cow;
use std::future::Future;
use std::future::IntoFuture;
use std::marker::PhantomData;
use std::path::PathBuf;
use std::pin::Pin;
/// An database import future
#[derive(Debug)]
@ -58,7 +58,7 @@ where
Client: Connection,
{
type Output = Result<()>;
type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send + Sync + 'r>>;
type IntoFuture = BoxFuture<'r, Self::Output>;
fn into_future(self) -> Self::IntoFuture {
Box::pin(async move {
@ -66,10 +66,9 @@ where
if !router.features.contains(&ExtraFeatures::Backup) {
return Err(Error::BackupsNotSupported.into());
}
let mut conn = Client::new(Method::Import);
let mut param = Param::file(self.file);
param.ml_config = self.ml_config;
conn.execute_unit(router, param).await
router.execute_unit(Method::Import, param).await
})
}
}

View file

@ -1,6 +1,7 @@
use crate::api::conn::Method;
use crate::api::conn::Param;
use crate::api::err::Error;
use crate::api::method::BoxFuture;
use crate::api::method::Content;
use crate::api::opt::Resource;
use crate::api::Connection;
@ -14,10 +15,8 @@ use crate::Surreal;
use serde::de::DeserializeOwned;
use serde::Serialize;
use std::borrow::Cow;
use std::future::Future;
use std::future::IntoFuture;
use std::marker::PhantomData;
use std::pin::Pin;
/// An insert future
#[derive(Debug)]
@ -61,9 +60,9 @@ macro_rules! into_future {
Resource::Array(arr) => return Err(Error::InsertOnArray(arr).into()),
Resource::Edges(edges) => return Err(Error::InsertOnEdges(edges).into()),
};
let mut conn = Client::new(Method::Insert);
let param = vec![table, data];
conn.$method(client.router.extract()?, Param::new(param)).await
let router = client.router.extract()?;
router.$method(Method::Insert, Param::new(param)).await
})
}
};
@ -74,7 +73,7 @@ where
Client: Connection,
{
type Output = Result<Value>;
type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send + Sync + 'r>>;
type IntoFuture = BoxFuture<'r, Self::Output>;
into_future! {execute_value}
}
@ -85,7 +84,7 @@ where
R: DeserializeOwned,
{
type Output = Result<Option<R>>;
type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send + Sync + 'r>>;
type IntoFuture = BoxFuture<'r, Self::Output>;
into_future! {execute_opt}
}
@ -96,7 +95,7 @@ where
R: DeserializeOwned,
{
type Output = Result<Vec<R>>;
type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send + Sync + 'r>>;
type IntoFuture = BoxFuture<'r, Self::Output>;
into_future! {execute_vec}
}

View file

@ -1,3 +1,5 @@
use crate::api::method::BoxFuture;
use crate::api::conn::Method;
use crate::api::conn::Param;
use crate::api::Connection;
@ -5,9 +7,7 @@ use crate::api::Result;
use crate::method::OnceLockExt;
use crate::Surreal;
use std::borrow::Cow;
use std::future::Future;
use std::future::IntoFuture;
use std::pin::Pin;
/// A session invalidate future
#[derive(Debug)]
@ -33,13 +33,12 @@ where
Client: Connection,
{
type Output = Result<()>;
type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send + Sync + 'r>>;
type IntoFuture = BoxFuture<'r, Self::Output>;
fn into_future(self) -> Self::IntoFuture {
Box::pin(async move {
let router = self.client.router.extract()?;
let mut conn = Client::new(Method::Invalidate);
conn.execute_unit(router, Param::new(Vec::new())).await
router.execute_unit(Method::Invalidate, Param::new(Vec::new())).await
})
}
}

View file

@ -2,6 +2,7 @@ use crate::api::conn::Method;
use crate::api::conn::Param;
use crate::api::conn::Router;
use crate::api::err::Error;
use crate::api::method::BoxFuture;
use crate::api::Connection;
use crate::api::ExtraFeatures;
use crate::api::Result;
@ -30,7 +31,6 @@ use crate::Surreal;
use channel::Receiver;
use futures::StreamExt;
use serde::de::DeserializeOwned;
use std::future::Future;
use std::future::IntoFuture;
use std::marker::PhantomData;
use std::mem;
@ -100,7 +100,7 @@ macro_rules! into_future {
false,
);
let id: Value = query.await?.take(0)?;
let rx = register::<Client>(router, id.clone()).await?;
let rx = register(router, id.clone()).await?;
Ok(Stream::new(
Surreal::new_from_router_waiter(client.router.clone(), client.waiter.clone()),
id,
@ -111,18 +111,11 @@ macro_rules! into_future {
};
}
pub(crate) async fn register<Client>(
router: &Router,
id: Value,
) -> Result<Receiver<dbs::Notification>>
where
Client: Connection,
{
let mut conn = Client::new(Method::Live);
pub(crate) async fn register(router: &Router, id: Value) -> Result<Receiver<dbs::Notification>> {
let (tx, rx) = channel::unbounded();
let mut param = Param::notification_sender(tx);
param.other = vec![id];
conn.execute_unit(router, param).await?;
router.execute_unit(Method::Live, param).await?;
Ok(rx)
}
@ -131,7 +124,7 @@ where
Client: Connection,
{
type Output = Result<Stream<Value>>;
type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send + Sync + 'r>>;
type IntoFuture = BoxFuture<'r, Self::Output>;
into_future! {}
}
@ -142,7 +135,7 @@ where
R: DeserializeOwned,
{
type Output = Result<Stream<Option<R>>>;
type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send + Sync + 'r>>;
type IntoFuture = BoxFuture<'r, Self::Output>;
into_future! {}
}
@ -153,7 +146,7 @@ where
R: DeserializeOwned,
{
type Output = Result<Stream<Vec<R>>>;
type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send + Sync + 'r>>;
type IntoFuture = BoxFuture<'r, Self::Output>;
into_future! {}
}
@ -261,8 +254,7 @@ where
let client = client.clone();
spawn(async move {
if let Ok(router) = client.router.extract() {
let mut conn = Client::new(Method::Kill);
conn.execute_unit(router, Param::new(vec![id.clone()])).await.ok();
router.execute_unit(Method::Kill, Param::new(vec![id.clone()])).await.ok();
}
});
}

View file

@ -1,5 +1,6 @@
use crate::api::conn::Method;
use crate::api::conn::Param;
use crate::api::method::BoxFuture;
use crate::api::opt::Range;
use crate::api::opt::Resource;
use crate::api::Connection;
@ -12,10 +13,8 @@ use crate::Surreal;
use serde::de::DeserializeOwned;
use serde::Serialize;
use std::borrow::Cow;
use std::future::Future;
use std::future::IntoFuture;
use std::marker::PhantomData;
use std::pin::Pin;
/// A merge future
#[derive(Debug)]
@ -57,8 +56,8 @@ macro_rules! into_future {
Some(range) => resource?.with_range(range)?.into(),
None => resource?.into(),
};
let mut conn = Client::new(Method::Merge);
conn.$method(client.router.extract()?, Param::new(vec![param, content?])).await
let router = client.router.extract()?;
router.$method(Method::Merge, Param::new(vec![param, content?])).await
})
}
};
@ -70,7 +69,7 @@ where
D: Serialize,
{
type Output = Result<Value>;
type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send + Sync + 'r>>;
type IntoFuture = BoxFuture<'r, Self::Output>;
into_future! {execute_value}
}
@ -82,7 +81,7 @@ where
R: DeserializeOwned,
{
type Output = Result<Option<R>>;
type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send + Sync + 'r>>;
type IntoFuture = BoxFuture<'r, Self::Output>;
into_future! {execute_opt}
}
@ -94,7 +93,7 @@ where
R: DeserializeOwned,
{
type Output = Result<Vec<R>>;
type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send + Sync + 'r>>;
type IntoFuture = BoxFuture<'r, Self::Output>;
into_future! {execute_vec}
}

View file

@ -45,6 +45,7 @@ pub use create::Create;
pub use delete::Delete;
pub use export::Backup;
pub use export::Export;
use futures::Future;
pub use health::Health;
pub use import::Import;
pub use insert::Insert;
@ -84,12 +85,16 @@ use serde::Serialize;
use std::borrow::Cow;
use std::marker::PhantomData;
use std::path::Path;
use std::pin::Pin;
use std::sync::Arc;
use std::sync::OnceLock;
use std::time::Duration;
use self::query::ValidQuery;
/// A alias for an often used type of future returned by async methods in this library.
pub(crate) type BoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + Sync + 'a>>;
/// Query statistics
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
#[non_exhaustive]

View file

@ -1,5 +1,6 @@
use crate::api::conn::Method;
use crate::api::conn::Param;
use crate::api::method::BoxFuture;
use crate::api::opt::PatchOp;
use crate::api::opt::Range;
use crate::api::opt::Resource;
@ -11,10 +12,8 @@ use crate::sql::Value;
use crate::Surreal;
use serde::de::DeserializeOwned;
use std::borrow::Cow;
use std::future::Future;
use std::future::IntoFuture;
use std::marker::PhantomData;
use std::pin::Pin;
use std::result::Result as StdResult;
/// A patch future
@ -61,8 +60,8 @@ macro_rules! into_future {
vec.push(result?);
}
let patches = vec.into();
let mut conn = Client::new(Method::Patch);
conn.$method(client.router.extract()?, Param::new(vec![param, patches])).await
let router = client.router.extract()?;
router.$method(Method::Patch, Param::new(vec![param, patches])).await
})
}
};
@ -73,7 +72,7 @@ where
Client: Connection,
{
type Output = Result<Value>;
type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send + Sync + 'r>>;
type IntoFuture = BoxFuture<'r, Self::Output>;
into_future! {execute_value}
}
@ -84,7 +83,7 @@ where
R: DeserializeOwned,
{
type Output = Result<Option<R>>;
type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send + Sync + 'r>>;
type IntoFuture = BoxFuture<'r, Self::Output>;
into_future! {execute_opt}
}
@ -95,7 +94,7 @@ where
R: DeserializeOwned,
{
type Output = Result<Vec<R>>;
type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send + Sync + 'r>>;
type IntoFuture = BoxFuture<'r, Self::Output>;
into_future! {execute_vec}
}

View file

@ -4,6 +4,7 @@ use super::Stream;
use crate::api::conn::Method;
use crate::api::conn::Param;
use crate::api::err::Error;
use crate::api::method::BoxFuture;
use crate::api::opt;
use crate::api::Connection;
use crate::api::ExtraFeatures;
@ -27,7 +28,6 @@ use serde::Serialize;
use std::borrow::Cow;
use std::collections::BTreeMap;
use std::collections::HashMap;
use std::future::Future;
use std::future::IntoFuture;
use std::mem;
use std::pin::Pin;
@ -111,7 +111,7 @@ where
Client: Connection,
{
type Output = Result<Response>;
type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send + Sync + 'r>>;
type IntoFuture = BoxFuture<'r, Self::Output>;
fn into_future(self) -> Self::IntoFuture {
let ValidQuery {
@ -159,8 +159,7 @@ where
query.0 .0 = query_statements;
let param = Param::query(query, bindings);
let mut conn = Client::new(Method::Query);
let mut response = conn.execute_query(router, param).await?;
let mut response = router.execute_query(Method::Query, param).await?;
for idx in query_indicies {
let Some((_, result)) = response.results.get(&idx) else {
@ -170,7 +169,7 @@ where
// This is a live query. We are using this as a workaround to avoid
// creating another public error variant for this internal error.
let res = match result {
Ok(id) => live::register::<Client>(router, id.clone()).await.map(|rx| {
Ok(id) => live::register(router, id.clone()).await.map(|rx| {
Stream::new(
Surreal::new_from_router_waiter(
client.router.clone(),
@ -197,7 +196,7 @@ where
Client: Connection,
{
type Output = Result<WithStats<Response>>;
type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send + Sync + 'r>>;
type IntoFuture = BoxFuture<'r, Self::Output>;
fn into_future(self) -> Self::IntoFuture {
Box::pin(async move {

View file

@ -1,5 +1,6 @@
use crate::api::conn::Method;
use crate::api::conn::Param;
use crate::api::method::BoxFuture;
use crate::api::method::OnceLockExt;
use crate::api::opt::Range;
use crate::api::opt::Resource;
@ -11,10 +12,8 @@ use crate::sql::Value;
use crate::Surreal;
use serde::de::DeserializeOwned;
use std::borrow::Cow;
use std::future::Future;
use std::future::IntoFuture;
use std::marker::PhantomData;
use std::pin::Pin;
/// A select future
#[derive(Debug)]
@ -54,8 +53,8 @@ macro_rules! into_future {
Some(range) => resource?.with_range(range)?.into(),
None => resource?.into(),
};
let mut conn = Client::new(Method::Select);
conn.$method(client.router.extract()?, Param::new(vec![param])).await
let router = client.router.extract()?;
router.$method(Method::Select, Param::new(vec![param])).await
})
}
};
@ -66,7 +65,7 @@ where
Client: Connection,
{
type Output = Result<Value>;
type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send + Sync + 'r>>;
type IntoFuture = BoxFuture<'r, Self::Output>;
into_future! {execute_value}
}
@ -77,7 +76,7 @@ where
R: DeserializeOwned,
{
type Output = Result<Option<R>>;
type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send + Sync + 'r>>;
type IntoFuture = BoxFuture<'r, Self::Output>;
into_future! {execute_opt}
}
@ -88,7 +87,7 @@ where
R: DeserializeOwned,
{
type Output = Result<Vec<R>>;
type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send + Sync + 'r>>;
type IntoFuture = BoxFuture<'r, Self::Output>;
into_future! {execute_vec}
}

View file

@ -1,3 +1,5 @@
use crate::api::method::BoxFuture;
use crate::api::conn::Method;
use crate::api::conn::Param;
use crate::api::Connection;
@ -6,9 +8,7 @@ use crate::method::OnceLockExt;
use crate::sql::Value;
use crate::Surreal;
use std::borrow::Cow;
use std::future::Future;
use std::future::IntoFuture;
use std::pin::Pin;
/// A set future
#[derive(Debug)]
@ -37,16 +37,12 @@ where
Client: Connection,
{
type Output = Result<()>;
type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send + Sync + 'r>>;
type IntoFuture = BoxFuture<'r, Self::Output>;
fn into_future(self) -> Self::IntoFuture {
Box::pin(async move {
let mut conn = Client::new(Method::Set);
conn.execute_unit(
self.client.router.extract()?,
Param::new(vec![self.key.into(), self.value?]),
)
.await
let router = self.client.router.extract()?;
router.execute_unit(Method::Set, Param::new(vec![self.key.into(), self.value?])).await
})
}
}

View file

@ -1,5 +1,6 @@
use crate::api::conn::Method;
use crate::api::conn::Param;
use crate::api::method::BoxFuture;
use crate::api::Connection;
use crate::api::Result;
use crate::method::OnceLockExt;
@ -7,10 +8,8 @@ use crate::sql::Value;
use crate::Surreal;
use serde::de::DeserializeOwned;
use std::borrow::Cow;
use std::future::Future;
use std::future::IntoFuture;
use std::marker::PhantomData;
use std::pin::Pin;
/// A signin future
#[derive(Debug)]
@ -40,7 +39,7 @@ where
R: DeserializeOwned,
{
type Output = Result<R>;
type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send + Sync + 'r>>;
type IntoFuture = BoxFuture<'r, Self::Output>;
fn into_future(self) -> Self::IntoFuture {
let Signin {
@ -50,8 +49,7 @@ where
} = self;
Box::pin(async move {
let router = client.router.extract()?;
let mut conn = Client::new(Method::Signin);
conn.execute(router, Param::new(vec![credentials?])).await
router.execute(Method::Signin, Param::new(vec![credentials?])).await
})
}
}

View file

@ -1,5 +1,6 @@
use crate::api::conn::Method;
use crate::api::conn::Param;
use crate::api::method::BoxFuture;
use crate::api::Connection;
use crate::api::Result;
use crate::method::OnceLockExt;
@ -7,10 +8,8 @@ use crate::sql::Value;
use crate::Surreal;
use serde::de::DeserializeOwned;
use std::borrow::Cow;
use std::future::Future;
use std::future::IntoFuture;
use std::marker::PhantomData;
use std::pin::Pin;
/// A signup future
#[derive(Debug)]
@ -40,7 +39,7 @@ where
R: DeserializeOwned,
{
type Output = Result<R>;
type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send + Sync + 'r>>;
type IntoFuture = BoxFuture<'r, Self::Output>;
fn into_future(self) -> Self::IntoFuture {
let Signup {
@ -50,8 +49,7 @@ where
} = self;
Box::pin(async move {
let router = client.router.extract()?;
let mut conn = Client::new(Method::Signup);
conn.execute(router, Param::new(vec![credentials?])).await
router.execute(Method::Signup, Param::new(vec![credentials?])).await
})
}
}

View file

@ -170,11 +170,11 @@ async fn api() {
let _: Version = DB.version().await.unwrap();
}
fn send_and_sync(_: impl Send + Sync) {}
fn assert_send_sync(_: impl Send + Sync) {}
#[test]
fn futures_are_send_and_sync() {
send_and_sync(async {
fn futures_are_send_sync() {
assert_send_sync(async {
let db = Surreal::new::<Test>(()).await.unwrap();
db.signin(Root {
username: "root",

View file

@ -1,10 +1,7 @@
use super::server;
use crate::api::conn::Connection;
use crate::api::conn::DbResponse;
use crate::api::conn::Method;
use crate::api::conn::Param;
use crate::api::conn::Route;
use crate::api::conn::Router;
use crate::api::method::BoxFuture;
use crate::api::opt::Endpoint;
use crate::api::opt::IntoEndpoint;
use crate::api::Connect;
@ -12,11 +9,8 @@ use crate::api::ExtraFeatures;
use crate::api::OnceLockExt;
use crate::api::Result;
use crate::api::Surreal;
use flume::Receiver;
use std::collections::HashSet;
use std::future::Future;
use std::marker::PhantomData;
use std::pin::Pin;
use std::sync::atomic::AtomicI64;
use std::sync::Arc;
use std::sync::OnceLock;
@ -35,9 +29,7 @@ impl IntoEndpoint<Test> for () {
}
#[derive(Debug, Clone)]
pub struct Client {
method: Method,
}
pub struct Client(());
impl Surreal<Client> {
pub fn connect<P>(
@ -58,16 +50,7 @@ impl Surreal<Client> {
impl crate::api::Connection for Client {}
impl Connection for Client {
fn new(method: Method) -> Self {
Self {
method,
}
}
fn connect(
_address: Endpoint,
capacity: usize,
) -> Pin<Box<dyn Future<Output = Result<Surreal<Self>>> + Send + Sync + 'static>> {
fn connect(_address: Endpoint, capacity: usize) -> BoxFuture<'static, Result<Surreal<Self>>> {
Box::pin(async move {
let (route_tx, route_rx) = flume::bounded(capacity);
let mut features = HashSet::new();
@ -84,20 +67,4 @@ impl Connection for Client {
))
})
}
fn send<'r>(
&'r mut self,
router: &'r Router,
param: Param,
) -> Pin<Box<dyn Future<Output = Result<Receiver<Result<DbResponse>>>> + Send + Sync + 'r>> {
Box::pin(async move {
let (sender, receiver) = flume::bounded(1);
let route = Route {
request: (0, self.method, param),
response: sender,
};
router.sender.send_async(route).await.as_ref().map_err(ToString::to_string).unwrap();
Ok(receiver)
})
}
}

View file

@ -1,3 +1,5 @@
use crate::api::method::BoxFuture;
use crate::api::conn::Method;
use crate::api::conn::Param;
use crate::api::Connection;
@ -5,9 +7,7 @@ use crate::api::Result;
use crate::method::OnceLockExt;
use crate::Surreal;
use std::borrow::Cow;
use std::future::Future;
use std::future::IntoFuture;
use std::pin::Pin;
/// An unset future
#[derive(Debug)]
@ -35,13 +35,12 @@ where
Client: Connection,
{
type Output = Result<()>;
type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send + Sync + 'r>>;
type IntoFuture = BoxFuture<'r, Self::Output>;
fn into_future(self) -> Self::IntoFuture {
Box::pin(async move {
let mut conn = Client::new(Method::Unset);
conn.execute_unit(self.client.router.extract()?, Param::new(vec![self.key.into()]))
.await
let router = self.client.router.extract()?;
router.execute_unit(Method::Unset, Param::new(vec![self.key.into()])).await
})
}
}

View file

@ -1,5 +1,6 @@
use crate::api::conn::Method;
use crate::api::conn::Param;
use crate::api::method::BoxFuture;
use crate::api::method::Content;
use crate::api::method::Merge;
use crate::api::method::Patch;
@ -15,10 +16,8 @@ use crate::Surreal;
use serde::de::DeserializeOwned;
use serde::Serialize;
use std::borrow::Cow;
use std::future::Future;
use std::future::IntoFuture;
use std::marker::PhantomData;
use std::pin::Pin;
/// An update future
#[derive(Debug)]
@ -57,8 +56,8 @@ macro_rules! into_future {
Some(range) => resource?.with_range(range)?.into(),
None => resource?.into(),
};
let mut conn = Client::new(Method::Update);
conn.$method(client.router.extract()?, Param::new(vec![param])).await
let router = client.router.extract()?;
router.$method(Method::Upsert, Param::new(vec![param])).await
})
}
};
@ -69,7 +68,7 @@ where
Client: Connection,
{
type Output = Result<Value>;
type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send + Sync + 'r>>;
type IntoFuture = BoxFuture<'r, Self::Output>;
into_future! {execute_value}
}
@ -80,7 +79,7 @@ where
R: DeserializeOwned,
{
type Output = Result<Option<R>>;
type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send + Sync + 'r>>;
type IntoFuture = BoxFuture<'r, Self::Output>;
into_future! {execute_opt}
}
@ -91,7 +90,7 @@ where
R: DeserializeOwned,
{
type Output = Result<Vec<R>>;
type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send + Sync + 'r>>;
type IntoFuture = BoxFuture<'r, Self::Output>;
into_future! {execute_vec}
}

View file

@ -1,5 +1,6 @@
use crate::api::conn::Method;
use crate::api::conn::Param;
use crate::api::method::BoxFuture;
use crate::api::method::Content;
use crate::api::method::Merge;
use crate::api::method::Patch;
@ -15,10 +16,8 @@ use crate::Surreal;
use serde::de::DeserializeOwned;
use serde::Serialize;
use std::borrow::Cow;
use std::future::Future;
use std::future::IntoFuture;
use std::marker::PhantomData;
use std::pin::Pin;
/// An upsert future
#[derive(Debug)]
@ -57,8 +56,8 @@ macro_rules! into_future {
Some(range) => resource?.with_range(range)?.into(),
None => resource?.into(),
};
let mut conn = Client::new(Method::Upsert);
conn.$method(client.router.extract()?, Param::new(vec![param])).await
let router = client.router.extract()?;
router.$method(Method::Upsert, Param::new(vec![param])).await
})
}
};
@ -69,7 +68,7 @@ where
Client: Connection,
{
type Output = Result<Value>;
type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send + Sync + 'r>>;
type IntoFuture = BoxFuture<'r, Self::Output>;
into_future! {execute_value}
}
@ -80,7 +79,7 @@ where
R: DeserializeOwned,
{
type Output = Result<Option<R>>;
type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send + Sync + 'r>>;
type IntoFuture = BoxFuture<'r, Self::Output>;
into_future! {execute_opt}
}
@ -91,7 +90,7 @@ where
R: DeserializeOwned,
{
type Output = Result<Vec<R>>;
type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send + Sync + 'r>>;
type IntoFuture = BoxFuture<'r, Self::Output>;
into_future! {execute_vec}
}

View file

@ -1,3 +1,5 @@
use crate::api::method::BoxFuture;
use crate::api::conn::Method;
use crate::api::conn::Param;
use crate::api::Connection;
@ -7,9 +9,7 @@ use crate::opt::WaitFor;
use crate::sql::Value;
use crate::Surreal;
use std::borrow::Cow;
use std::future::Future;
use std::future::IntoFuture;
use std::pin::Pin;
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
@ -37,16 +37,12 @@ where
Client: Connection,
{
type Output = Result<()>;
type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send + Sync + 'r>>;
type IntoFuture = BoxFuture<'r, Self::Output>;
fn into_future(self) -> Self::IntoFuture {
Box::pin(async move {
let mut conn = Client::new(Method::Use);
conn.execute_unit(
self.client.router.extract()?,
Param::new(vec![self.ns, self.db.into()]),
)
.await?;
let router = self.client.router.extract()?;
router.execute_unit(Method::Use, Param::new(vec![self.ns, self.db.into()])).await?;
self.client.waiter.0.send(Some(WaitFor::Database)).ok();
Ok(())
})

View file

@ -1,3 +1,5 @@
use crate::api::method::BoxFuture;
use crate::api::conn::Method;
use crate::api::conn::Param;
use crate::api::method::UseDb;
@ -7,9 +9,7 @@ use crate::method::OnceLockExt;
use crate::sql::Value;
use crate::Surreal;
use std::borrow::Cow;
use std::future::Future;
use std::future::IntoFuture;
use std::pin::Pin;
/// Stores the namespace to use
#[derive(Debug)]
@ -51,16 +51,12 @@ where
Client: Connection,
{
type Output = Result<()>;
type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send + Sync + 'r>>;
type IntoFuture = BoxFuture<'r, Self::Output>;
fn into_future(self) -> Self::IntoFuture {
Box::pin(async move {
let mut conn = Client::new(Method::Use);
conn.execute_unit(
self.client.router.extract()?,
Param::new(vec![self.ns.into(), Value::None]),
)
.await
let router = self.client.router.extract()?;
router.execute_unit(Method::Use, Param::new(vec![self.ns.into(), Value::None])).await
})
}
}

View file

@ -1,3 +1,5 @@
use crate::api::method::BoxFuture;
use crate::api::conn::Method;
use crate::api::conn::Param;
use crate::api::err::Error;
@ -6,9 +8,7 @@ use crate::api::Result;
use crate::method::OnceLockExt;
use crate::Surreal;
use std::borrow::Cow;
use std::future::Future;
use std::future::IntoFuture;
use std::pin::Pin;
/// A version future
#[derive(Debug)]
@ -34,13 +34,13 @@ where
Client: Connection,
{
type Output = Result<semver::Version>;
type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send + Sync + 'r>>;
type IntoFuture = BoxFuture<'r, Self::Output>;
fn into_future(self) -> Self::IntoFuture {
Box::pin(async move {
let mut conn = Client::new(Method::Version);
let version = conn
.execute_value(self.client.router.extract()?, Param::new(Vec::new()))
let router = self.client.router.extract()?;
let version = router
.execute_value(Method::Version, Param::new(Vec::new()))
.await?
.convert_to_string()?;
let semantic = version.trim_start_matches("surrealdb-");

View file

@ -10,10 +10,10 @@ pub mod opt;
mod conn;
pub use method::query::Response;
use method::BoxFuture;
use semver::Version;
use tokio::sync::watch;
use crate::api::conn::DbResponse;
use crate::api::conn::Router;
use crate::api::err::Error;
use crate::api::opt::Endpoint;
@ -21,10 +21,8 @@ use semver::BuildMetadata;
use semver::VersionReq;
use std::fmt;
use std::fmt::Debug;
use std::future::Future;
use std::future::IntoFuture;
use std::marker::PhantomData;
use std::pin::Pin;
use std::sync::Arc;
use std::sync::OnceLock;
@ -96,7 +94,7 @@ where
Client: Connection,
{
type Output = Result<Surreal<Client>>;
type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send + Sync>>;
type IntoFuture = BoxFuture<'static, Self::Output>;
fn into_future(self) -> Self::IntoFuture {
Box::pin(async move {
@ -126,7 +124,7 @@ where
Client: Connection,
{
type Output = Result<()>;
type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send + Sync>>;
type IntoFuture = BoxFuture<'static, Self::Output>;
fn into_future(self) -> Self::IntoFuture {
Box::pin(async move {