diff --git a/lib/src/api/conn.rs b/lib/src/api/conn.rs index 61a92d99..420ff5ed 100644 --- a/lib/src/api/conn.rs +++ b/lib/src/api/conn.rs @@ -1,6 +1,7 @@ use crate::api; use crate::api::err::Error; use crate::api::method::query::Response; +use crate::api::method::BoxFuture; use crate::api::opt::Endpoint; use crate::api::ExtraFeatures; use crate::api::Result; @@ -15,9 +16,7 @@ use serde::de::DeserializeOwned; use serde::Serialize; use std::collections::BTreeMap; use std::collections::HashSet; -use std::future::Future; use std::path::PathBuf; -use std::pin::Pin; use std::sync::atomic::AtomicI64; use std::sync::atomic::Ordering; @@ -40,6 +39,141 @@ impl Router { pub(crate) fn next_id(&self) -> i64 { self.last_id.fetch_add(1, Ordering::SeqCst) } + + pub(crate) fn send( + &self, + method: Method, + param: Param, + ) -> BoxFuture<'_, Result>>> { + Box::pin(async move { + let id = self.next_id(); + let (sender, receiver) = flume::bounded(1); + let route = Route { + request: (id, method, param), + response: sender, + }; + self.sender.send_async(route).await?; + Ok(receiver) + }) + } + + /// Receive responses for all methods except `query` + pub(crate) fn recv( + &self, + receiver: Receiver>, + ) -> BoxFuture<'_, Result> { + Box::pin(async move { + let response = receiver.into_recv_async().await?; + match response? { + DbResponse::Other(value) => Ok(value), + DbResponse::Query(..) => unreachable!(), + } + }) + } + + /// Receive the response of the `query` method + pub(crate) fn recv_query( + &self, + receiver: Receiver>, + ) -> BoxFuture<'_, Result> { + Box::pin(async move { + let response = receiver.into_recv_async().await?; + match response? { + DbResponse::Query(results) => Ok(results), + DbResponse::Other(..) => unreachable!(), + } + }) + } + + /// Execute all methods except `query` + pub(crate) fn execute(&self, method: Method, param: Param) -> BoxFuture<'_, Result> + where + R: DeserializeOwned, + { + Box::pin(async move { + let rx = self.send(method, param).await?; + let value = self.recv(rx).await?; + from_value(value).map_err(Into::into) + }) + } + + /// Execute methods that return an optional single response + pub(crate) fn execute_opt( + &self, + method: Method, + param: Param, + ) -> BoxFuture<'_, Result>> + where + R: DeserializeOwned, + { + Box::pin(async move { + let rx = self.send(method, param).await?; + match self.recv(rx).await? { + Value::None | Value::Null => Ok(None), + value => from_value(value).map_err(Into::into), + } + }) + } + + /// Execute methods that return multiple responses + pub(crate) fn execute_vec( + &self, + method: Method, + param: Param, + ) -> BoxFuture<'_, Result>> + where + R: DeserializeOwned, + { + Box::pin(async move { + let rx = self.send(method, param).await?; + let value = match self.recv(rx).await? { + Value::None | Value::Null => Value::Array(Default::default()), + Value::Array(array) => Value::Array(array), + value => vec![value].into(), + }; + from_value(value).map_err(Into::into) + }) + } + + /// Execute methods that return nothing + pub(crate) fn execute_unit(&self, method: Method, param: Param) -> BoxFuture<'_, Result<()>> { + Box::pin(async move { + let rx = self.send(method, param).await?; + match self.recv(rx).await? { + Value::None | Value::Null => Ok(()), + Value::Array(array) if array.is_empty() => Ok(()), + value => Err(Error::FromValue { + value, + error: "expected the database to return nothing".to_owned(), + } + .into()), + } + }) + } + + /// Execute methods that return a raw value + pub(crate) fn execute_value( + &self, + method: Method, + param: Param, + ) -> BoxFuture<'_, Result> { + Box::pin(async move { + let rx = self.send(method, param).await?; + self.recv(rx).await + }) + } + + /// Execute the `query` method + pub(crate) fn execute_query( + &self, + method: Method, + param: Param, + ) -> BoxFuture<'_, Result> { + Box::pin(async move { + let rx = self.send(method, param).await?; + self.recv_query(rx).await + }) + } } /// The query method @@ -164,162 +298,8 @@ impl Param { /// Connection trait implemented by supported protocols pub trait Connection: Sized + Send + Sync + 'static { - /// Constructs a new client without connecting to the server - fn new(method: Method) -> Self; - /// Connect to the server - fn connect( - address: Endpoint, - capacity: usize, - ) -> Pin>> + Send + Sync + 'static>> + fn connect(address: Endpoint, capacity: usize) -> BoxFuture<'static, Result>> where Self: api::Connection; - - /// Send a query to the server - #[allow(clippy::type_complexity)] - fn send<'r>( - &'r mut self, - router: &'r Router, - param: Param, - ) -> Pin>>> + Send + Sync + 'r>> - where - Self: api::Connection; - - /// Receive responses for all methods except `query` - fn recv( - &mut self, - receiver: Receiver>, - ) -> Pin> + Send + Sync + '_>> { - Box::pin(async move { - let response = receiver.into_recv_async().await?; - match response? { - DbResponse::Other(value) => Ok(value), - DbResponse::Query(..) => unreachable!(), - } - }) - } - - /// Receive the response of the `query` method - fn recv_query( - &mut self, - receiver: Receiver>, - ) -> Pin> + Send + Sync + '_>> { - Box::pin(async move { - let response = receiver.into_recv_async().await?; - match response? { - DbResponse::Query(results) => Ok(results), - DbResponse::Other(..) => unreachable!(), - } - }) - } - - /// Execute all methods except `query` - fn execute<'r, R>( - &'r mut self, - router: &'r Router, - param: Param, - ) -> Pin> + Send + Sync + 'r>> - where - R: DeserializeOwned, - Self: api::Connection, - { - Box::pin(async move { - let rx = self.send(router, param).await?; - let value = self.recv(rx).await?; - from_value(value).map_err(Into::into) - }) - } - - /// Execute methods that return an optional single response - fn execute_opt<'r, R>( - &'r mut self, - router: &'r Router, - param: Param, - ) -> Pin>> + Send + Sync + 'r>> - where - R: DeserializeOwned, - Self: api::Connection, - { - Box::pin(async move { - let rx = self.send(router, param).await?; - match self.recv(rx).await? { - Value::None | Value::Null => Ok(None), - value => from_value(value).map_err(Into::into), - } - }) - } - - /// Execute methods that return multiple responses - fn execute_vec<'r, R>( - &'r mut self, - router: &'r Router, - param: Param, - ) -> Pin>> + Send + Sync + 'r>> - where - R: DeserializeOwned, - Self: api::Connection, - { - Box::pin(async move { - let rx = self.send(router, param).await?; - let value = match self.recv(rx).await? { - Value::None | Value::Null => Value::Array(Default::default()), - Value::Array(array) => Value::Array(array), - value => vec![value].into(), - }; - from_value(value).map_err(Into::into) - }) - } - - /// Execute methods that return nothing - fn execute_unit<'r>( - &'r mut self, - router: &'r Router, - param: Param, - ) -> Pin> + Send + Sync + 'r>> - where - Self: api::Connection, - { - Box::pin(async move { - let rx = self.send(router, param).await?; - match self.recv(rx).await? { - Value::None | Value::Null => Ok(()), - Value::Array(array) if array.is_empty() => Ok(()), - value => Err(Error::FromValue { - value, - error: "expected the database to return nothing".to_owned(), - } - .into()), - } - }) - } - - /// Execute methods that return a raw value - fn execute_value<'r>( - &'r mut self, - router: &'r Router, - param: Param, - ) -> Pin> + Send + Sync + 'r>> - where - Self: api::Connection, - { - Box::pin(async move { - let rx = self.send(router, param).await?; - self.recv(rx).await - }) - } - - /// Execute the `query` method - fn execute_query<'r>( - &'r mut self, - router: &'r Router, - param: Param, - ) -> Pin> + Send + Sync + 'r>> - where - Self: api::Connection, - { - Box::pin(async move { - let rx = self.send(router, param).await?; - self.recv_query(rx).await - }) - } } diff --git a/lib/src/api/engine/any/mod.rs b/lib/src/api/engine/any/mod.rs index d94d05c4..06336a8c 100644 --- a/lib/src/api/engine/any/mod.rs +++ b/lib/src/api/engine/any/mod.rs @@ -133,7 +133,6 @@ mod native; #[cfg(target_arch = "wasm32")] mod wasm; -use crate::api::conn::Method; use crate::api::err::Error; use crate::api::opt::Config; use crate::api::opt::Endpoint; @@ -211,10 +210,7 @@ where /// A dynamic connection that supports any engine and allows you to pick at runtime #[derive(Debug, Clone)] -pub struct Any { - id: i64, - method: Method, -} +pub struct Any(()); impl Surreal { /// Connects to a specific database endpoint, saving the connection on the static client diff --git a/lib/src/api/engine/any/native.rs b/lib/src/api/engine/any/native.rs index 1b088f05..9b7a3576 100644 --- a/lib/src/api/engine/any/native.rs +++ b/lib/src/api/engine/any/native.rs @@ -1,7 +1,4 @@ use crate::api::conn::Connection; -use crate::api::conn::Method; -use crate::api::conn::Param; -use crate::api::conn::Route; use crate::api::conn::Router; #[allow(unused_imports)] // used by the DB engines use crate::api::engine; @@ -9,11 +6,11 @@ use crate::api::engine::any::Any; #[cfg(feature = "protocol-http")] use crate::api::engine::remote::http; use crate::api::err::Error; +use crate::api::method::BoxFuture; #[cfg(any(feature = "native-tls", feature = "rustls"))] #[cfg(feature = "protocol-http")] use crate::api::opt::Tls; use crate::api::opt::{Endpoint, EndpointKind}; -use crate::api::DbResponse; #[allow(unused_imports)] // used by the DB engines use crate::api::ExtraFeatures; use crate::api::OnceLockExt; @@ -22,12 +19,9 @@ use crate::api::Surreal; #[allow(unused_imports)] use crate::error::Db as DbError; use crate::opt::WaitFor; -use flume::Receiver; #[cfg(feature = "protocol-http")] use reqwest::ClientBuilder; use std::collections::HashSet; -use std::future::Future; -use std::pin::Pin; use std::sync::atomic::AtomicI64; use std::sync::Arc; use std::sync::OnceLock; @@ -41,18 +35,8 @@ use tokio_tungstenite::Connector; impl crate::api::Connection for Any {} impl Connection for Any { - fn new(method: Method) -> Self { - Self { - method, - id: 0, - } - } - #[allow(unused_variables, unreachable_code, unused_mut)] // these are all used depending on feature - fn connect( - address: Endpoint, - capacity: usize, - ) -> Pin>> + Send + Sync + 'static>> { + fn connect(address: Endpoint, capacity: usize) -> BoxFuture<'static, Result>> { Box::pin(async move { let (route_tx, route_rx) = match capacity { 0 => flume::unbounded(), @@ -159,7 +143,7 @@ impl Connection for Any { let client = builder.build()?; let base_url = address.url; engine::remote::http::health( - client.get(base_url.join(Method::Health.as_str())?), + client.get(base_url.join(crate::api::conn::Method::Health.as_str())?), ) .await?; tokio::spawn(engine::remote::http::native::run_router( @@ -226,21 +210,4 @@ impl Connection for Any { )) }) } - - fn send<'r>( - &'r mut self, - router: &'r Router, - param: Param, - ) -> Pin>>> + Send + Sync + 'r>> { - Box::pin(async move { - let (sender, receiver) = flume::bounded(1); - self.id = router.next_id(); - let route = Route { - request: (self.id, self.method, param), - response: sender, - }; - router.sender.send_async(route).await?; - Ok(receiver) - }) - } } diff --git a/lib/src/api/engine/any/wasm.rs b/lib/src/api/engine/any/wasm.rs index aef4058e..60bbf269 100644 --- a/lib/src/api/engine/any/wasm.rs +++ b/lib/src/api/engine/any/wasm.rs @@ -1,24 +1,18 @@ use crate::api::conn::Connection; -use crate::api::conn::Method; -use crate::api::conn::Param; -use crate::api::conn::Route; use crate::api::conn::Router; #[allow(unused_imports)] // used by the DB engines use crate::api::engine; use crate::api::engine::any::Any; use crate::api::err::Error; +use crate::api::method::BoxFuture; use crate::api::opt::{Endpoint, EndpointKind}; -use crate::api::DbResponse; use crate::api::ExtraFeatures; use crate::api::OnceLockExt; use crate::api::Result; use crate::api::Surreal; use crate::error::Db as DbError; use crate::opt::WaitFor; -use flume::Receiver; use std::collections::HashSet; -use std::future::Future; -use std::pin::Pin; use std::sync::atomic::AtomicI64; use std::sync::Arc; use std::sync::OnceLock; @@ -28,18 +22,8 @@ use wasm_bindgen_futures::spawn_local; impl crate::api::Connection for Any {} impl Connection for Any { - fn new(method: Method) -> Self { - Self { - method, - id: 0, - } - } - #[allow(unused_variables, unreachable_code, unused_mut)] // these are all used depending on feature - fn connect( - address: Endpoint, - capacity: usize, - ) -> Pin>> + Send + Sync + 'static>> { + fn connect(address: Endpoint, capacity: usize) -> BoxFuture<'static, Result>> { Box::pin(async move { let (route_tx, route_rx) = match capacity { 0 => flume::unbounded(), @@ -183,21 +167,4 @@ impl Connection for Any { )) }) } - - fn send<'r>( - &'r mut self, - router: &'r Router, - param: Param, - ) -> Pin>>> + Send + Sync + 'r>> { - Box::pin(async move { - let (sender, receiver) = flume::bounded(1); - self.id = router.next_id(); - let route = Route { - request: (self.id, self.method, param), - response: sender, - }; - router.sender.send_async(route).await?; - Ok(receiver) - }) - } } diff --git a/lib/src/api/engine/local/mod.rs b/lib/src/api/engine/local/mod.rs index 31a279d8..6da8ead8 100644 --- a/lib/src/api/engine/local/mod.rs +++ b/lib/src/api/engine/local/mod.rs @@ -373,9 +373,7 @@ pub struct SurrealKV; /// An embedded database #[derive(Debug, Clone)] -pub struct Db { - pub(crate) method: crate::api::conn::Method, -} +pub struct Db(()); impl Surreal { /// Connects to a specific database endpoint, saving the connection on the static client diff --git a/lib/src/api/engine/local/native.rs b/lib/src/api/engine/local/native.rs index 4ab58a4b..16bf3155 100644 --- a/lib/src/api/engine/local/native.rs +++ b/lib/src/api/engine/local/native.rs @@ -1,10 +1,8 @@ use crate::api::conn::Connection; -use crate::api::conn::DbResponse; -use crate::api::conn::Method; -use crate::api::conn::Param; use crate::api::conn::Route; use crate::api::conn::Router; use crate::api::engine::local::Db; +use crate::api::method::BoxFuture; use crate::api::opt::{Endpoint, EndpointKind}; use crate::api::ExtraFeatures; use crate::api::OnceLockExt; @@ -24,8 +22,6 @@ use futures::StreamExt; use std::collections::BTreeMap; use std::collections::HashMap; use std::collections::HashSet; -use std::future::Future; -use std::pin::Pin; use std::sync::atomic::AtomicI64; use std::sync::Arc; use std::sync::OnceLock; @@ -35,16 +31,7 @@ use tokio::sync::watch; impl crate::api::Connection for Db {} impl Connection for Db { - fn new(method: Method) -> Self { - Self { - method, - } - } - - fn connect( - address: Endpoint, - capacity: usize, - ) -> Pin>> + Send + Sync + 'static>> { + fn connect(address: Endpoint, capacity: usize) -> BoxFuture<'static, Result>> { Box::pin(async move { let (route_tx, route_rx) = match capacity { 0 => flume::unbounded(), @@ -71,22 +58,6 @@ impl Connection for Db { )) }) } - - fn send<'r>( - &'r mut self, - router: &'r Router, - param: Param, - ) -> Pin>>> + Send + Sync + 'r>> { - Box::pin(async move { - let (sender, receiver) = flume::bounded(1); - let route = Route { - request: (0, self.method, param), - response: sender, - }; - router.sender.send_async(route).await?; - Ok(receiver) - }) - } } pub(crate) async fn run_router( diff --git a/lib/src/api/engine/local/wasm.rs b/lib/src/api/engine/local/wasm.rs index 73a850c6..0e2fc7b3 100644 --- a/lib/src/api/engine/local/wasm.rs +++ b/lib/src/api/engine/local/wasm.rs @@ -1,11 +1,9 @@ use crate::api::conn::Connection; -use crate::api::conn::DbResponse; -use crate::api::conn::Method; -use crate::api::conn::Param; use crate::api::conn::Route; use crate::api::conn::Router; use crate::api::engine::local::Db; use crate::api::engine::local::DEFAULT_TICK_INTERVAL; +use crate::api::method::BoxFuture; use crate::api::opt::Endpoint; use crate::api::ExtraFeatures; use crate::api::OnceLockExt; @@ -26,8 +24,6 @@ use futures::StreamExt; use std::collections::BTreeMap; use std::collections::HashMap; use std::collections::HashSet; -use std::future::Future; -use std::pin::Pin; use std::sync::atomic::AtomicI64; use std::sync::Arc; use std::sync::OnceLock; @@ -38,16 +34,7 @@ use wasm_bindgen_futures::spawn_local; impl crate::api::Connection for Db {} impl Connection for Db { - fn new(method: Method) -> Self { - Self { - method, - } - } - - fn connect( - address: Endpoint, - capacity: usize, - ) -> Pin>> + Send + Sync + 'static>> { + fn connect(address: Endpoint, capacity: usize) -> BoxFuture<'static, Result>> { Box::pin(async move { let (route_tx, route_rx) = match capacity { 0 => flume::unbounded(), @@ -73,22 +60,6 @@ impl Connection for Db { )) }) } - - fn send<'r>( - &'r mut self, - router: &'r Router, - param: Param, - ) -> Pin>>> + Send + Sync + 'r>> { - Box::pin(async move { - let (sender, receiver) = flume::bounded(1); - let route = Route { - request: (0, self.method, param), - response: sender, - }; - router.sender.send_async(route).await?; - Ok(receiver) - }) - } } pub(crate) async fn run_router( diff --git a/lib/src/api/engine/remote/http/mod.rs b/lib/src/api/engine/remote/http/mod.rs index 2d9ab874..9f043f07 100644 --- a/lib/src/api/engine/remote/http/mod.rs +++ b/lib/src/api/engine/remote/http/mod.rs @@ -71,9 +71,7 @@ pub struct Https; /// An HTTP client for communicating with the server via HTTP #[derive(Debug, Clone)] -pub struct Client { - method: Method, -} +pub struct Client(()); impl Surreal { /// Connects to a specific database endpoint, saving the connection on the static client diff --git a/lib/src/api/engine/remote/http/native.rs b/lib/src/api/engine/remote/http/native.rs index feb57f1e..9696ec7f 100644 --- a/lib/src/api/engine/remote/http/native.rs +++ b/lib/src/api/engine/remote/http/native.rs @@ -1,10 +1,9 @@ use super::Client; use crate::api::conn::Connection; -use crate::api::conn::DbResponse; use crate::api::conn::Method; -use crate::api::conn::Param; use crate::api::conn::Route; use crate::api::conn::Router; +use crate::api::method::BoxFuture; use crate::api::opt::Endpoint; #[cfg(any(feature = "native-tls", feature = "rustls"))] use crate::api::opt::Tls; @@ -19,8 +18,6 @@ use indexmap::IndexMap; use reqwest::header::HeaderMap; use reqwest::ClientBuilder; use std::collections::HashSet; -use std::future::Future; -use std::pin::Pin; use std::sync::atomic::AtomicI64; use std::sync::Arc; use std::sync::OnceLock; @@ -30,16 +27,7 @@ use url::Url; impl crate::api::Connection for Client {} impl Connection for Client { - fn new(method: Method) -> Self { - Self { - method, - } - } - - fn connect( - address: Endpoint, - capacity: usize, - ) -> Pin>> + Send + Sync + 'static>> { + fn connect(address: Endpoint, capacity: usize) -> BoxFuture<'static, Result>> { Box::pin(async move { let headers = super::default_headers(); @@ -82,22 +70,6 @@ impl Connection for Client { )) }) } - - fn send<'r>( - &'r mut self, - router: &'r Router, - param: Param, - ) -> Pin>>> + Send + Sync + 'r>> { - Box::pin(async move { - let (sender, receiver) = flume::bounded(1); - let route = Route { - request: (0, self.method, param), - response: sender, - }; - router.sender.send_async(route).await?; - Ok(receiver) - }) - } } pub(crate) async fn run_router(base_url: Url, client: reqwest::Client, route_rx: Receiver) { diff --git a/lib/src/api/engine/remote/http/wasm.rs b/lib/src/api/engine/remote/http/wasm.rs index ce621185..07b638a4 100644 --- a/lib/src/api/engine/remote/http/wasm.rs +++ b/lib/src/api/engine/remote/http/wasm.rs @@ -1,10 +1,9 @@ use super::Client; use crate::api::conn::Connection; -use crate::api::conn::DbResponse; use crate::api::conn::Method; -use crate::api::conn::Param; use crate::api::conn::Route; use crate::api::conn::Router; +use crate::api::method::BoxFuture; use crate::api::opt::Endpoint; use crate::api::OnceLockExt; use crate::api::Result; @@ -17,8 +16,6 @@ use indexmap::IndexMap; use reqwest::header::HeaderMap; use reqwest::ClientBuilder; use std::collections::HashSet; -use std::future::Future; -use std::pin::Pin; use std::sync::atomic::AtomicI64; use std::sync::Arc; use std::sync::OnceLock; @@ -29,16 +26,7 @@ use wasm_bindgen_futures::spawn_local; impl crate::api::Connection for Client {} impl Connection for Client { - fn new(method: Method) -> Self { - Self { - method, - } - } - - fn connect( - address: Endpoint, - capacity: usize, - ) -> Pin>> + Send + Sync + 'static>> { + fn connect(address: Endpoint, capacity: usize) -> BoxFuture<'static, Result>> { Box::pin(async move { let (route_tx, route_rx) = match capacity { 0 => flume::unbounded(), @@ -61,23 +49,6 @@ impl Connection for Client { )) }) } - - fn send<'r>( - &'r mut self, - router: &'r Router, - param: Param, - ) -> Pin>>> + Send + Sync + 'r>> { - Box::pin(async move { - let (sender, receiver) = flume::bounded(1); - trace!("{param:?}"); - let route = Route { - request: (0, self.method, param), - response: sender, - }; - router.sender.send_async(route).await?; - Ok(receiver) - }) - } } async fn client(base_url: &Url) -> Result { diff --git a/lib/src/api/engine/remote/ws/mod.rs b/lib/src/api/engine/remote/ws/mod.rs index 97fc6d66..c1a47dd6 100644 --- a/lib/src/api/engine/remote/ws/mod.rs +++ b/lib/src/api/engine/remote/ws/mod.rs @@ -184,10 +184,7 @@ pub struct Wss; /// A WebSocket client for communicating with the server via WebSockets #[derive(Debug, Clone)] -pub struct Client { - pub(crate) id: i64, - method: Method, -} +pub struct Client(()); impl Surreal { /// Connects to a specific database endpoint, saving the connection on the static client diff --git a/lib/src/api/engine/remote/ws/native.rs b/lib/src/api/engine/remote/ws/native.rs index 2fc6af38..849e72aa 100644 --- a/lib/src/api/engine/remote/ws/native.rs +++ b/lib/src/api/engine/remote/ws/native.rs @@ -4,7 +4,6 @@ use super::{HandleResult, RouterRequest}; use crate::api::conn::Connection; use crate::api::conn::DbResponse; use crate::api::conn::Method; -use crate::api::conn::Param; use crate::api::conn::Route; use crate::api::conn::Router; use crate::api::engine::remote::ws::Client; @@ -12,6 +11,7 @@ use crate::api::engine::remote::ws::Response; use crate::api::engine::remote::ws::PING_INTERVAL; use crate::api::engine::remote::ws::PING_METHOD; use crate::api::err::Error; +use crate::api::method::BoxFuture; use crate::api::opt::Endpoint; #[cfg(any(feature = "native-tls", feature = "rustls"))] use crate::api::opt::Tls; @@ -31,9 +31,7 @@ use revision::revisioned; use serde::Deserialize; use std::collections::hash_map::Entry; use std::collections::HashSet; -use std::future::Future; use std::mem; -use std::pin::Pin; use std::sync::atomic::AtomicI64; use std::sync::Arc; use std::sync::OnceLock; @@ -105,17 +103,10 @@ pub(crate) async fn connect( impl crate::api::Connection for Client {} impl Connection for Client { - fn new(method: Method) -> Self { - Self { - id: 0, - method, - } - } - fn connect( mut address: Endpoint, capacity: usize, - ) -> Pin>> + Send + Sync + 'static>> { + ) -> BoxFuture<'static, Result>> { Box::pin(async move { address.url = address.url.join(PATH)?; #[cfg(any(feature = "native-tls", feature = "rustls"))] @@ -152,23 +143,6 @@ impl Connection for Client { )) }) } - - fn send<'r>( - &'r mut self, - router: &'r Router, - param: Param, - ) -> Pin>>> + Send + Sync + 'r>> { - Box::pin(async move { - self.id = router.next_id(); - let (sender, receiver) = flume::bounded(1); - let route = Route { - request: (self.id, self.method, param), - response: sender, - }; - router.sender.send_async(route).await?; - Ok(receiver) - }) - } } async fn router_handle_route( diff --git a/lib/src/api/engine/remote/ws/wasm.rs b/lib/src/api/engine/remote/ws/wasm.rs index f6813b40..2cad1d33 100644 --- a/lib/src/api/engine/remote/ws/wasm.rs +++ b/lib/src/api/engine/remote/ws/wasm.rs @@ -3,7 +3,6 @@ use super::{HandleResult, PATH}; use crate::api::conn::Connection; use crate::api::conn::DbResponse; use crate::api::conn::Method; -use crate::api::conn::Param; use crate::api::conn::Route; use crate::api::conn::Router; use crate::api::engine::remote::ws::Client; @@ -11,6 +10,7 @@ use crate::api::engine::remote::ws::Response; use crate::api::engine::remote::ws::PING_INTERVAL; use crate::api::engine::remote::ws::PING_METHOD; use crate::api::err::Error; +use crate::api::method::BoxFuture; use crate::api::opt::Endpoint; use crate::api::ExtraFeatures; use crate::api::OnceLockExt; @@ -35,9 +35,7 @@ use serde::Deserialize; use std::collections::hash_map::Entry; use std::collections::BTreeMap; use std::collections::HashSet; -use std::future::Future; use std::mem; -use std::pin::Pin; use std::sync::atomic::AtomicI64; use std::sync::Arc; use std::sync::OnceLock; @@ -58,17 +56,10 @@ type RouterState = super::RouterState; impl crate::api::Connection for Client {} impl Connection for Client { - fn new(method: Method) -> Self { - Self { - id: 0, - method, - } - } - fn connect( mut address: Endpoint, capacity: usize, - ) -> Pin>> + Send + Sync + 'static>> { + ) -> BoxFuture<'static, Result>> { Box::pin(async move { address.url = address.url.join(PATH)?; @@ -96,23 +87,6 @@ impl Connection for Client { )) }) } - - fn send<'r>( - &'r mut self, - router: &'r Router, - param: Param, - ) -> Pin>>> + Send + Sync + 'r>> { - Box::pin(async move { - self.id = router.next_id(); - let (sender, receiver) = flume::bounded(1); - let route = Route { - request: (self.id, self.method, param), - response: sender, - }; - router.sender.send_async(route).await?; - Ok(receiver) - }) - } } async fn router_handle_request( diff --git a/lib/src/api/method/authenticate.rs b/lib/src/api/method/authenticate.rs index b2205407..62779cae 100644 --- a/lib/src/api/method/authenticate.rs +++ b/lib/src/api/method/authenticate.rs @@ -1,3 +1,5 @@ +use crate::api::method::BoxFuture; + use crate::api::conn::Method; use crate::api::conn::Param; use crate::api::method::OnceLockExt; @@ -6,9 +8,7 @@ use crate::api::Connection; use crate::api::Result; use crate::Surreal; use std::borrow::Cow; -use std::future::Future; use std::future::IntoFuture; -use std::pin::Pin; /// An authentication future #[derive(Debug)] @@ -23,13 +23,12 @@ where Client: Connection, { type Output = Result<()>; - type IntoFuture = Pin + Send + Sync + 'r>>; + type IntoFuture = BoxFuture<'r, Self::Output>; fn into_future(self) -> Self::IntoFuture { Box::pin(async move { let router = self.client.router.extract()?; - let mut conn = Client::new(Method::Authenticate); - conn.execute_unit(router, Param::new(vec![self.token.0.into()])).await + router.execute_unit(Method::Authenticate, Param::new(vec![self.token.0.into()])).await }) } } diff --git a/lib/src/api/method/begin.rs b/lib/src/api/method/begin.rs index 683e97b3..06f15f8e 100644 --- a/lib/src/api/method/begin.rs +++ b/lib/src/api/method/begin.rs @@ -1,13 +1,13 @@ +use crate::api::method::BoxFuture; + use crate::api::method::Cancel; use crate::api::method::Commit; use crate::api::Connection; use crate::api::Result; use crate::api::Surreal; use crate::sql::statements::BeginStatement; -use std::future::Future; use std::future::IntoFuture; use std::ops::Deref; -use std::pin::Pin; /// A beginning of a transaction #[derive(Debug)] @@ -21,7 +21,7 @@ where C: Connection, { type Output = Result>; - type IntoFuture = Pin + Send + Sync + 'static>>; + type IntoFuture = BoxFuture<'static, Self::Output>; fn into_future(self) -> Self::IntoFuture { Box::pin(async move { diff --git a/lib/src/api/method/cancel.rs b/lib/src/api/method/cancel.rs index d75e201f..a2b26391 100644 --- a/lib/src/api/method/cancel.rs +++ b/lib/src/api/method/cancel.rs @@ -1,10 +1,10 @@ +use crate::api::method::BoxFuture; + use crate::api::Connection; use crate::api::Result; use crate::api::Surreal; use crate::sql::statements::CancelStatement; -use std::future::Future; use std::future::IntoFuture; -use std::pin::Pin; /// A transaction cancellation future #[derive(Debug)] @@ -18,7 +18,7 @@ where C: Connection, { type Output = Result>; - type IntoFuture = Pin + Send + Sync + 'static>>; + type IntoFuture = BoxFuture<'static, Self::Output>; fn into_future(self) -> Self::IntoFuture { Box::pin(async move { diff --git a/lib/src/api/method/commit.rs b/lib/src/api/method/commit.rs index be6e4548..8c52816d 100644 --- a/lib/src/api/method/commit.rs +++ b/lib/src/api/method/commit.rs @@ -1,10 +1,10 @@ +use crate::api::method::BoxFuture; + use crate::api::Connection; use crate::api::Result; use crate::api::Surreal; use crate::sql::statements::CommitStatement; -use std::future::Future; use std::future::IntoFuture; -use std::pin::Pin; /// A transaction commit future #[derive(Debug)] @@ -18,7 +18,7 @@ where C: Connection, { type Output = Result>; - type IntoFuture = Pin + Send + Sync + 'static>>; + type IntoFuture = BoxFuture<'static, Self::Output>; fn into_future(self) -> Self::IntoFuture { Box::pin(async move { diff --git a/lib/src/api/method/content.rs b/lib/src/api/method/content.rs index f5d2eaa9..98e1a47e 100644 --- a/lib/src/api/method/content.rs +++ b/lib/src/api/method/content.rs @@ -1,5 +1,6 @@ use crate::api::conn::Method; use crate::api::conn::Param; +use crate::api::method::BoxFuture; use crate::api::opt::Range; use crate::api::opt::Resource; use crate::api::Connection; @@ -12,10 +13,8 @@ use crate::Surreal; use serde::de::DeserializeOwned; use serde::Serialize; use std::borrow::Cow; -use std::future::Future; use std::future::IntoFuture; use std::marker::PhantomData; -use std::pin::Pin; /// A content future /// @@ -61,12 +60,12 @@ macro_rules! into_future { Some(range) => resource?.with_range(range)?.into(), None => resource?.into(), }; - let mut conn = Client::new(method); let params = match content? { Value::None | Value::Null => vec![param], content => vec![param, content], }; - conn.$method(client.router.extract()?, Param::new(params)).await + let router = client.router.extract()?; + router.$method(method, Param::new(params)).await }) } }; @@ -78,7 +77,7 @@ where D: Serialize, { type Output = Result; - type IntoFuture = Pin + Send + Sync + 'r>>; + type IntoFuture = BoxFuture<'r, Self::Output>; into_future! {execute_value} } @@ -90,7 +89,7 @@ where R: DeserializeOwned, { type Output = Result>; - type IntoFuture = Pin + Send + Sync + 'r>>; + type IntoFuture = BoxFuture<'r, Self::Output>; into_future! {execute_opt} } @@ -102,7 +101,7 @@ where R: DeserializeOwned, { type Output = Result>; - type IntoFuture = Pin + Send + Sync + 'r>>; + type IntoFuture = BoxFuture<'r, Self::Output>; into_future! {execute_vec} } diff --git a/lib/src/api/method/create.rs b/lib/src/api/method/create.rs index eb588cd3..8bf4c69a 100644 --- a/lib/src/api/method/create.rs +++ b/lib/src/api/method/create.rs @@ -1,5 +1,6 @@ use crate::api::conn::Method; use crate::api::conn::Param; +use crate::api::method::BoxFuture; use crate::api::method::Content; use crate::api::opt::Resource; use crate::api::Connection; @@ -10,10 +11,8 @@ use crate::Surreal; use serde::de::DeserializeOwned; use serde::Serialize; use std::borrow::Cow; -use std::future::Future; use std::future::IntoFuture; use std::marker::PhantomData; -use std::pin::Pin; /// A record create future #[derive(Debug)] @@ -46,8 +45,8 @@ macro_rules! into_future { .. } = self; Box::pin(async move { - let mut conn = Client::new(Method::Create); - conn.$method(client.router.extract()?, Param::new(vec![resource?.into()])).await + let router = client.router.extract()?; + router.$method(Method::Create, Param::new(vec![resource?.into()])).await }) } }; @@ -58,7 +57,7 @@ where Client: Connection, { type Output = Result; - type IntoFuture = Pin + Send + Sync + 'r>>; + type IntoFuture = BoxFuture<'r, Self::Output>; into_future! {execute_value} } @@ -69,7 +68,7 @@ where R: DeserializeOwned, { type Output = Result>; - type IntoFuture = Pin + Send + Sync + 'r>>; + type IntoFuture = BoxFuture<'r, Self::Output>; into_future! {execute_opt} } @@ -80,7 +79,7 @@ where R: DeserializeOwned, { type Output = Result>; - type IntoFuture = Pin + Send + Sync + 'r>>; + type IntoFuture = BoxFuture<'r, Self::Output>; into_future! {execute_vec} } diff --git a/lib/src/api/method/delete.rs b/lib/src/api/method/delete.rs index 3b1a8d99..4509e2db 100644 --- a/lib/src/api/method/delete.rs +++ b/lib/src/api/method/delete.rs @@ -1,5 +1,6 @@ use crate::api::conn::Method; use crate::api::conn::Param; +use crate::api::method::BoxFuture; use crate::api::opt::Range; use crate::api::opt::Resource; use crate::api::Connection; @@ -10,10 +11,8 @@ use crate::sql::Value; use crate::Surreal; use serde::de::DeserializeOwned; use std::borrow::Cow; -use std::future::Future; use std::future::IntoFuture; use std::marker::PhantomData; -use std::pin::Pin; /// A record delete future #[derive(Debug)] @@ -52,8 +51,8 @@ macro_rules! into_future { Some(range) => resource?.with_range(range)?.into(), None => resource?.into(), }; - let mut conn = Client::new(Method::Delete); - conn.$method(client.router.extract()?, Param::new(vec![param])).await + let router = client.router.extract()?; + router.$method(Method::Delete, Param::new(vec![param])).await }) } }; @@ -64,7 +63,7 @@ where Client: Connection, { type Output = Result; - type IntoFuture = Pin + Send + Sync + 'r>>; + type IntoFuture = BoxFuture<'r, Self::Output>; into_future! {execute_value} } @@ -75,7 +74,7 @@ where R: DeserializeOwned, { type Output = Result>; - type IntoFuture = Pin + Send + Sync + 'r>>; + type IntoFuture = BoxFuture<'r, Self::Output>; into_future! {execute_opt} } @@ -86,7 +85,7 @@ where R: DeserializeOwned, { type Output = Result>; - type IntoFuture = Pin + Send + Sync + 'r>>; + type IntoFuture = BoxFuture<'r, Self::Output>; into_future! {execute_vec} } diff --git a/lib/src/api/method/export.rs b/lib/src/api/method/export.rs index 357a3612..c67c2454 100644 --- a/lib/src/api/method/export.rs +++ b/lib/src/api/method/export.rs @@ -1,6 +1,7 @@ use crate::api::conn::Method; use crate::api::conn::MlConfig; use crate::api::conn::Param; +use crate::api::method::BoxFuture; use crate::api::Connection; use crate::api::Error; use crate::api::ExtraFeatures; @@ -14,7 +15,6 @@ use futures::Stream; use futures::StreamExt; use semver::Version; use std::borrow::Cow; -use std::future::Future; use std::future::IntoFuture; use std::marker::PhantomData; use std::path::PathBuf; @@ -70,7 +70,7 @@ where Client: Connection, { type Output = Result<()>; - type IntoFuture = Pin + Send + Sync + 'r>>; + type IntoFuture = BoxFuture<'r, Self::Output>; fn into_future(self) -> Self::IntoFuture { Box::pin(async move { @@ -78,13 +78,12 @@ where if !router.features.contains(&ExtraFeatures::Backup) { return Err(Error::BackupsNotSupported.into()); } - let mut conn = Client::new(Method::Export); let mut param = match self.target { ExportDestination::File(path) => Param::file(path), ExportDestination::Memory => unreachable!(), }; param.ml_config = self.ml_config; - conn.execute_unit(router, param).await + router.execute_unit(Method::Export, param).await }) } } @@ -94,7 +93,7 @@ where Client: Connection, { type Output = Result; - type IntoFuture = Pin + Send + Sync + 'r>>; + type IntoFuture = BoxFuture<'r, Self::Output>; fn into_future(self) -> Self::IntoFuture { Box::pin(async move { @@ -103,13 +102,12 @@ where return Err(Error::BackupsNotSupported.into()); } let (tx, rx) = crate::channel::bounded(1); - let mut conn = Client::new(Method::Export); let ExportDestination::Memory = self.target else { unreachable!(); }; let mut param = Param::bytes_sender(tx); param.ml_config = self.ml_config; - conn.execute_unit(router, param).await?; + router.execute_unit(Method::Export, param).await?; Ok(Backup { rx, }) diff --git a/lib/src/api/method/health.rs b/lib/src/api/method/health.rs index 5d0eeeb6..676dfc7f 100644 --- a/lib/src/api/method/health.rs +++ b/lib/src/api/method/health.rs @@ -1,3 +1,5 @@ +use crate::api::method::BoxFuture; + use crate::api::conn::Method; use crate::api::conn::Param; use crate::api::Connection; @@ -5,9 +7,7 @@ use crate::api::Result; use crate::method::OnceLockExt; use crate::Surreal; use std::borrow::Cow; -use std::future::Future; use std::future::IntoFuture; -use std::pin::Pin; /// A health check future #[derive(Debug)] @@ -33,12 +33,12 @@ where Client: Connection, { type Output = Result<()>; - type IntoFuture = Pin + Send + Sync + 'r>>; + type IntoFuture = BoxFuture<'r, Self::Output>; fn into_future(self) -> Self::IntoFuture { Box::pin(async move { - let mut conn = Client::new(Method::Health); - conn.execute_unit(self.client.router.extract()?, Param::new(Vec::new())).await + let router = self.client.router.extract()?; + router.execute_unit(Method::Health, Param::new(Vec::new())).await }) } } diff --git a/lib/src/api/method/import.rs b/lib/src/api/method/import.rs index c4f3ee64..0e8ceec7 100644 --- a/lib/src/api/method/import.rs +++ b/lib/src/api/method/import.rs @@ -1,3 +1,5 @@ +use crate::api::method::BoxFuture; + use crate::api::conn::Method; use crate::api::conn::MlConfig; use crate::api::conn::Param; @@ -9,11 +11,9 @@ use crate::method::Model; use crate::method::OnceLockExt; use crate::Surreal; use std::borrow::Cow; -use std::future::Future; use std::future::IntoFuture; use std::marker::PhantomData; use std::path::PathBuf; -use std::pin::Pin; /// An database import future #[derive(Debug)] @@ -58,7 +58,7 @@ where Client: Connection, { type Output = Result<()>; - type IntoFuture = Pin + Send + Sync + 'r>>; + type IntoFuture = BoxFuture<'r, Self::Output>; fn into_future(self) -> Self::IntoFuture { Box::pin(async move { @@ -66,10 +66,9 @@ where if !router.features.contains(&ExtraFeatures::Backup) { return Err(Error::BackupsNotSupported.into()); } - let mut conn = Client::new(Method::Import); let mut param = Param::file(self.file); param.ml_config = self.ml_config; - conn.execute_unit(router, param).await + router.execute_unit(Method::Import, param).await }) } } diff --git a/lib/src/api/method/insert.rs b/lib/src/api/method/insert.rs index 04e8e2b8..974e4b78 100644 --- a/lib/src/api/method/insert.rs +++ b/lib/src/api/method/insert.rs @@ -1,6 +1,7 @@ use crate::api::conn::Method; use crate::api::conn::Param; use crate::api::err::Error; +use crate::api::method::BoxFuture; use crate::api::method::Content; use crate::api::opt::Resource; use crate::api::Connection; @@ -14,10 +15,8 @@ use crate::Surreal; use serde::de::DeserializeOwned; use serde::Serialize; use std::borrow::Cow; -use std::future::Future; use std::future::IntoFuture; use std::marker::PhantomData; -use std::pin::Pin; /// An insert future #[derive(Debug)] @@ -61,9 +60,9 @@ macro_rules! into_future { Resource::Array(arr) => return Err(Error::InsertOnArray(arr).into()), Resource::Edges(edges) => return Err(Error::InsertOnEdges(edges).into()), }; - let mut conn = Client::new(Method::Insert); let param = vec![table, data]; - conn.$method(client.router.extract()?, Param::new(param)).await + let router = client.router.extract()?; + router.$method(Method::Insert, Param::new(param)).await }) } }; @@ -74,7 +73,7 @@ where Client: Connection, { type Output = Result; - type IntoFuture = Pin + Send + Sync + 'r>>; + type IntoFuture = BoxFuture<'r, Self::Output>; into_future! {execute_value} } @@ -85,7 +84,7 @@ where R: DeserializeOwned, { type Output = Result>; - type IntoFuture = Pin + Send + Sync + 'r>>; + type IntoFuture = BoxFuture<'r, Self::Output>; into_future! {execute_opt} } @@ -96,7 +95,7 @@ where R: DeserializeOwned, { type Output = Result>; - type IntoFuture = Pin + Send + Sync + 'r>>; + type IntoFuture = BoxFuture<'r, Self::Output>; into_future! {execute_vec} } diff --git a/lib/src/api/method/invalidate.rs b/lib/src/api/method/invalidate.rs index 8b21f128..7f4548f1 100644 --- a/lib/src/api/method/invalidate.rs +++ b/lib/src/api/method/invalidate.rs @@ -1,3 +1,5 @@ +use crate::api::method::BoxFuture; + use crate::api::conn::Method; use crate::api::conn::Param; use crate::api::Connection; @@ -5,9 +7,7 @@ use crate::api::Result; use crate::method::OnceLockExt; use crate::Surreal; use std::borrow::Cow; -use std::future::Future; use std::future::IntoFuture; -use std::pin::Pin; /// A session invalidate future #[derive(Debug)] @@ -33,13 +33,12 @@ where Client: Connection, { type Output = Result<()>; - type IntoFuture = Pin + Send + Sync + 'r>>; + type IntoFuture = BoxFuture<'r, Self::Output>; fn into_future(self) -> Self::IntoFuture { Box::pin(async move { let router = self.client.router.extract()?; - let mut conn = Client::new(Method::Invalidate); - conn.execute_unit(router, Param::new(Vec::new())).await + router.execute_unit(Method::Invalidate, Param::new(Vec::new())).await }) } } diff --git a/lib/src/api/method/live.rs b/lib/src/api/method/live.rs index 0dacb708..99ec4994 100644 --- a/lib/src/api/method/live.rs +++ b/lib/src/api/method/live.rs @@ -2,6 +2,7 @@ use crate::api::conn::Method; use crate::api::conn::Param; use crate::api::conn::Router; use crate::api::err::Error; +use crate::api::method::BoxFuture; use crate::api::Connection; use crate::api::ExtraFeatures; use crate::api::Result; @@ -30,7 +31,6 @@ use crate::Surreal; use channel::Receiver; use futures::StreamExt; use serde::de::DeserializeOwned; -use std::future::Future; use std::future::IntoFuture; use std::marker::PhantomData; use std::mem; @@ -100,7 +100,7 @@ macro_rules! into_future { false, ); let id: Value = query.await?.take(0)?; - let rx = register::(router, id.clone()).await?; + let rx = register(router, id.clone()).await?; Ok(Stream::new( Surreal::new_from_router_waiter(client.router.clone(), client.waiter.clone()), id, @@ -111,18 +111,11 @@ macro_rules! into_future { }; } -pub(crate) async fn register( - router: &Router, - id: Value, -) -> Result> -where - Client: Connection, -{ - let mut conn = Client::new(Method::Live); +pub(crate) async fn register(router: &Router, id: Value) -> Result> { let (tx, rx) = channel::unbounded(); let mut param = Param::notification_sender(tx); param.other = vec![id]; - conn.execute_unit(router, param).await?; + router.execute_unit(Method::Live, param).await?; Ok(rx) } @@ -131,7 +124,7 @@ where Client: Connection, { type Output = Result>; - type IntoFuture = Pin + Send + Sync + 'r>>; + type IntoFuture = BoxFuture<'r, Self::Output>; into_future! {} } @@ -142,7 +135,7 @@ where R: DeserializeOwned, { type Output = Result>>; - type IntoFuture = Pin + Send + Sync + 'r>>; + type IntoFuture = BoxFuture<'r, Self::Output>; into_future! {} } @@ -153,7 +146,7 @@ where R: DeserializeOwned, { type Output = Result>>; - type IntoFuture = Pin + Send + Sync + 'r>>; + type IntoFuture = BoxFuture<'r, Self::Output>; into_future! {} } @@ -261,8 +254,7 @@ where let client = client.clone(); spawn(async move { if let Ok(router) = client.router.extract() { - let mut conn = Client::new(Method::Kill); - conn.execute_unit(router, Param::new(vec![id.clone()])).await.ok(); + router.execute_unit(Method::Kill, Param::new(vec![id.clone()])).await.ok(); } }); } diff --git a/lib/src/api/method/merge.rs b/lib/src/api/method/merge.rs index 927aa24c..f3c1ccd1 100644 --- a/lib/src/api/method/merge.rs +++ b/lib/src/api/method/merge.rs @@ -1,5 +1,6 @@ use crate::api::conn::Method; use crate::api::conn::Param; +use crate::api::method::BoxFuture; use crate::api::opt::Range; use crate::api::opt::Resource; use crate::api::Connection; @@ -12,10 +13,8 @@ use crate::Surreal; use serde::de::DeserializeOwned; use serde::Serialize; use std::borrow::Cow; -use std::future::Future; use std::future::IntoFuture; use std::marker::PhantomData; -use std::pin::Pin; /// A merge future #[derive(Debug)] @@ -57,8 +56,8 @@ macro_rules! into_future { Some(range) => resource?.with_range(range)?.into(), None => resource?.into(), }; - let mut conn = Client::new(Method::Merge); - conn.$method(client.router.extract()?, Param::new(vec![param, content?])).await + let router = client.router.extract()?; + router.$method(Method::Merge, Param::new(vec![param, content?])).await }) } }; @@ -70,7 +69,7 @@ where D: Serialize, { type Output = Result; - type IntoFuture = Pin + Send + Sync + 'r>>; + type IntoFuture = BoxFuture<'r, Self::Output>; into_future! {execute_value} } @@ -82,7 +81,7 @@ where R: DeserializeOwned, { type Output = Result>; - type IntoFuture = Pin + Send + Sync + 'r>>; + type IntoFuture = BoxFuture<'r, Self::Output>; into_future! {execute_opt} } @@ -94,7 +93,7 @@ where R: DeserializeOwned, { type Output = Result>; - type IntoFuture = Pin + Send + Sync + 'r>>; + type IntoFuture = BoxFuture<'r, Self::Output>; into_future! {execute_vec} } diff --git a/lib/src/api/method/mod.rs b/lib/src/api/method/mod.rs index 6d305ff5..5e383d77 100644 --- a/lib/src/api/method/mod.rs +++ b/lib/src/api/method/mod.rs @@ -45,6 +45,7 @@ pub use create::Create; pub use delete::Delete; pub use export::Backup; pub use export::Export; +use futures::Future; pub use health::Health; pub use import::Import; pub use insert::Insert; @@ -84,12 +85,16 @@ use serde::Serialize; use std::borrow::Cow; use std::marker::PhantomData; use std::path::Path; +use std::pin::Pin; use std::sync::Arc; use std::sync::OnceLock; use std::time::Duration; use self::query::ValidQuery; +/// A alias for an often used type of future returned by async methods in this library. +pub(crate) type BoxFuture<'a, T> = Pin + Send + Sync + 'a>>; + /// Query statistics #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] #[non_exhaustive] diff --git a/lib/src/api/method/patch.rs b/lib/src/api/method/patch.rs index ea989fe0..d0f830e2 100644 --- a/lib/src/api/method/patch.rs +++ b/lib/src/api/method/patch.rs @@ -1,5 +1,6 @@ use crate::api::conn::Method; use crate::api::conn::Param; +use crate::api::method::BoxFuture; use crate::api::opt::PatchOp; use crate::api::opt::Range; use crate::api::opt::Resource; @@ -11,10 +12,8 @@ use crate::sql::Value; use crate::Surreal; use serde::de::DeserializeOwned; use std::borrow::Cow; -use std::future::Future; use std::future::IntoFuture; use std::marker::PhantomData; -use std::pin::Pin; use std::result::Result as StdResult; /// A patch future @@ -61,8 +60,8 @@ macro_rules! into_future { vec.push(result?); } let patches = vec.into(); - let mut conn = Client::new(Method::Patch); - conn.$method(client.router.extract()?, Param::new(vec![param, patches])).await + let router = client.router.extract()?; + router.$method(Method::Patch, Param::new(vec![param, patches])).await }) } }; @@ -73,7 +72,7 @@ where Client: Connection, { type Output = Result; - type IntoFuture = Pin + Send + Sync + 'r>>; + type IntoFuture = BoxFuture<'r, Self::Output>; into_future! {execute_value} } @@ -84,7 +83,7 @@ where R: DeserializeOwned, { type Output = Result>; - type IntoFuture = Pin + Send + Sync + 'r>>; + type IntoFuture = BoxFuture<'r, Self::Output>; into_future! {execute_opt} } @@ -95,7 +94,7 @@ where R: DeserializeOwned, { type Output = Result>; - type IntoFuture = Pin + Send + Sync + 'r>>; + type IntoFuture = BoxFuture<'r, Self::Output>; into_future! {execute_vec} } diff --git a/lib/src/api/method/query.rs b/lib/src/api/method/query.rs index 6c154235..532439ce 100644 --- a/lib/src/api/method/query.rs +++ b/lib/src/api/method/query.rs @@ -4,6 +4,7 @@ use super::Stream; use crate::api::conn::Method; use crate::api::conn::Param; use crate::api::err::Error; +use crate::api::method::BoxFuture; use crate::api::opt; use crate::api::Connection; use crate::api::ExtraFeatures; @@ -27,7 +28,6 @@ use serde::Serialize; use std::borrow::Cow; use std::collections::BTreeMap; use std::collections::HashMap; -use std::future::Future; use std::future::IntoFuture; use std::mem; use std::pin::Pin; @@ -111,7 +111,7 @@ where Client: Connection, { type Output = Result; - type IntoFuture = Pin + Send + Sync + 'r>>; + type IntoFuture = BoxFuture<'r, Self::Output>; fn into_future(self) -> Self::IntoFuture { let ValidQuery { @@ -159,8 +159,7 @@ where query.0 .0 = query_statements; let param = Param::query(query, bindings); - let mut conn = Client::new(Method::Query); - let mut response = conn.execute_query(router, param).await?; + let mut response = router.execute_query(Method::Query, param).await?; for idx in query_indicies { let Some((_, result)) = response.results.get(&idx) else { @@ -170,7 +169,7 @@ where // This is a live query. We are using this as a workaround to avoid // creating another public error variant for this internal error. let res = match result { - Ok(id) => live::register::(router, id.clone()).await.map(|rx| { + Ok(id) => live::register(router, id.clone()).await.map(|rx| { Stream::new( Surreal::new_from_router_waiter( client.router.clone(), @@ -197,7 +196,7 @@ where Client: Connection, { type Output = Result>; - type IntoFuture = Pin + Send + Sync + 'r>>; + type IntoFuture = BoxFuture<'r, Self::Output>; fn into_future(self) -> Self::IntoFuture { Box::pin(async move { diff --git a/lib/src/api/method/select.rs b/lib/src/api/method/select.rs index 2065df06..6612ff62 100644 --- a/lib/src/api/method/select.rs +++ b/lib/src/api/method/select.rs @@ -1,5 +1,6 @@ use crate::api::conn::Method; use crate::api::conn::Param; +use crate::api::method::BoxFuture; use crate::api::method::OnceLockExt; use crate::api::opt::Range; use crate::api::opt::Resource; @@ -11,10 +12,8 @@ use crate::sql::Value; use crate::Surreal; use serde::de::DeserializeOwned; use std::borrow::Cow; -use std::future::Future; use std::future::IntoFuture; use std::marker::PhantomData; -use std::pin::Pin; /// A select future #[derive(Debug)] @@ -54,8 +53,8 @@ macro_rules! into_future { Some(range) => resource?.with_range(range)?.into(), None => resource?.into(), }; - let mut conn = Client::new(Method::Select); - conn.$method(client.router.extract()?, Param::new(vec![param])).await + let router = client.router.extract()?; + router.$method(Method::Select, Param::new(vec![param])).await }) } }; @@ -66,7 +65,7 @@ where Client: Connection, { type Output = Result; - type IntoFuture = Pin + Send + Sync + 'r>>; + type IntoFuture = BoxFuture<'r, Self::Output>; into_future! {execute_value} } @@ -77,7 +76,7 @@ where R: DeserializeOwned, { type Output = Result>; - type IntoFuture = Pin + Send + Sync + 'r>>; + type IntoFuture = BoxFuture<'r, Self::Output>; into_future! {execute_opt} } @@ -88,7 +87,7 @@ where R: DeserializeOwned, { type Output = Result>; - type IntoFuture = Pin + Send + Sync + 'r>>; + type IntoFuture = BoxFuture<'r, Self::Output>; into_future! {execute_vec} } diff --git a/lib/src/api/method/set.rs b/lib/src/api/method/set.rs index 135e702d..6a33b1ae 100644 --- a/lib/src/api/method/set.rs +++ b/lib/src/api/method/set.rs @@ -1,3 +1,5 @@ +use crate::api::method::BoxFuture; + use crate::api::conn::Method; use crate::api::conn::Param; use crate::api::Connection; @@ -6,9 +8,7 @@ use crate::method::OnceLockExt; use crate::sql::Value; use crate::Surreal; use std::borrow::Cow; -use std::future::Future; use std::future::IntoFuture; -use std::pin::Pin; /// A set future #[derive(Debug)] @@ -37,16 +37,12 @@ where Client: Connection, { type Output = Result<()>; - type IntoFuture = Pin + Send + Sync + 'r>>; + type IntoFuture = BoxFuture<'r, Self::Output>; fn into_future(self) -> Self::IntoFuture { Box::pin(async move { - let mut conn = Client::new(Method::Set); - conn.execute_unit( - self.client.router.extract()?, - Param::new(vec![self.key.into(), self.value?]), - ) - .await + let router = self.client.router.extract()?; + router.execute_unit(Method::Set, Param::new(vec![self.key.into(), self.value?])).await }) } } diff --git a/lib/src/api/method/signin.rs b/lib/src/api/method/signin.rs index 5574750a..971b8e54 100644 --- a/lib/src/api/method/signin.rs +++ b/lib/src/api/method/signin.rs @@ -1,5 +1,6 @@ use crate::api::conn::Method; use crate::api::conn::Param; +use crate::api::method::BoxFuture; use crate::api::Connection; use crate::api::Result; use crate::method::OnceLockExt; @@ -7,10 +8,8 @@ use crate::sql::Value; use crate::Surreal; use serde::de::DeserializeOwned; use std::borrow::Cow; -use std::future::Future; use std::future::IntoFuture; use std::marker::PhantomData; -use std::pin::Pin; /// A signin future #[derive(Debug)] @@ -40,7 +39,7 @@ where R: DeserializeOwned, { type Output = Result; - type IntoFuture = Pin + Send + Sync + 'r>>; + type IntoFuture = BoxFuture<'r, Self::Output>; fn into_future(self) -> Self::IntoFuture { let Signin { @@ -50,8 +49,7 @@ where } = self; Box::pin(async move { let router = client.router.extract()?; - let mut conn = Client::new(Method::Signin); - conn.execute(router, Param::new(vec![credentials?])).await + router.execute(Method::Signin, Param::new(vec![credentials?])).await }) } } diff --git a/lib/src/api/method/signup.rs b/lib/src/api/method/signup.rs index dc1ad6a5..c38eaa10 100644 --- a/lib/src/api/method/signup.rs +++ b/lib/src/api/method/signup.rs @@ -1,5 +1,6 @@ use crate::api::conn::Method; use crate::api::conn::Param; +use crate::api::method::BoxFuture; use crate::api::Connection; use crate::api::Result; use crate::method::OnceLockExt; @@ -7,10 +8,8 @@ use crate::sql::Value; use crate::Surreal; use serde::de::DeserializeOwned; use std::borrow::Cow; -use std::future::Future; use std::future::IntoFuture; use std::marker::PhantomData; -use std::pin::Pin; /// A signup future #[derive(Debug)] @@ -40,7 +39,7 @@ where R: DeserializeOwned, { type Output = Result; - type IntoFuture = Pin + Send + Sync + 'r>>; + type IntoFuture = BoxFuture<'r, Self::Output>; fn into_future(self) -> Self::IntoFuture { let Signup { @@ -50,8 +49,7 @@ where } = self; Box::pin(async move { let router = client.router.extract()?; - let mut conn = Client::new(Method::Signup); - conn.execute(router, Param::new(vec![credentials?])).await + router.execute(Method::Signup, Param::new(vec![credentials?])).await }) } } diff --git a/lib/src/api/method/tests/mod.rs b/lib/src/api/method/tests/mod.rs index de6f9099..cc8e0bd5 100644 --- a/lib/src/api/method/tests/mod.rs +++ b/lib/src/api/method/tests/mod.rs @@ -170,11 +170,11 @@ async fn api() { let _: Version = DB.version().await.unwrap(); } -fn send_and_sync(_: impl Send + Sync) {} +fn assert_send_sync(_: impl Send + Sync) {} #[test] -fn futures_are_send_and_sync() { - send_and_sync(async { +fn futures_are_send_sync() { + assert_send_sync(async { let db = Surreal::new::(()).await.unwrap(); db.signin(Root { username: "root", diff --git a/lib/src/api/method/tests/protocol.rs b/lib/src/api/method/tests/protocol.rs index 663d1b27..1ff3342c 100644 --- a/lib/src/api/method/tests/protocol.rs +++ b/lib/src/api/method/tests/protocol.rs @@ -1,10 +1,7 @@ use super::server; use crate::api::conn::Connection; -use crate::api::conn::DbResponse; -use crate::api::conn::Method; -use crate::api::conn::Param; -use crate::api::conn::Route; use crate::api::conn::Router; +use crate::api::method::BoxFuture; use crate::api::opt::Endpoint; use crate::api::opt::IntoEndpoint; use crate::api::Connect; @@ -12,11 +9,8 @@ use crate::api::ExtraFeatures; use crate::api::OnceLockExt; use crate::api::Result; use crate::api::Surreal; -use flume::Receiver; use std::collections::HashSet; -use std::future::Future; use std::marker::PhantomData; -use std::pin::Pin; use std::sync::atomic::AtomicI64; use std::sync::Arc; use std::sync::OnceLock; @@ -35,9 +29,7 @@ impl IntoEndpoint for () { } #[derive(Debug, Clone)] -pub struct Client { - method: Method, -} +pub struct Client(()); impl Surreal { pub fn connect

( @@ -58,16 +50,7 @@ impl Surreal { impl crate::api::Connection for Client {} impl Connection for Client { - fn new(method: Method) -> Self { - Self { - method, - } - } - - fn connect( - _address: Endpoint, - capacity: usize, - ) -> Pin>> + Send + Sync + 'static>> { + fn connect(_address: Endpoint, capacity: usize) -> BoxFuture<'static, Result>> { Box::pin(async move { let (route_tx, route_rx) = flume::bounded(capacity); let mut features = HashSet::new(); @@ -84,20 +67,4 @@ impl Connection for Client { )) }) } - - fn send<'r>( - &'r mut self, - router: &'r Router, - param: Param, - ) -> Pin>>> + Send + Sync + 'r>> { - Box::pin(async move { - let (sender, receiver) = flume::bounded(1); - let route = Route { - request: (0, self.method, param), - response: sender, - }; - router.sender.send_async(route).await.as_ref().map_err(ToString::to_string).unwrap(); - Ok(receiver) - }) - } } diff --git a/lib/src/api/method/unset.rs b/lib/src/api/method/unset.rs index a909039c..c233f352 100644 --- a/lib/src/api/method/unset.rs +++ b/lib/src/api/method/unset.rs @@ -1,3 +1,5 @@ +use crate::api::method::BoxFuture; + use crate::api::conn::Method; use crate::api::conn::Param; use crate::api::Connection; @@ -5,9 +7,7 @@ use crate::api::Result; use crate::method::OnceLockExt; use crate::Surreal; use std::borrow::Cow; -use std::future::Future; use std::future::IntoFuture; -use std::pin::Pin; /// An unset future #[derive(Debug)] @@ -35,13 +35,12 @@ where Client: Connection, { type Output = Result<()>; - type IntoFuture = Pin + Send + Sync + 'r>>; + type IntoFuture = BoxFuture<'r, Self::Output>; fn into_future(self) -> Self::IntoFuture { Box::pin(async move { - let mut conn = Client::new(Method::Unset); - conn.execute_unit(self.client.router.extract()?, Param::new(vec![self.key.into()])) - .await + let router = self.client.router.extract()?; + router.execute_unit(Method::Unset, Param::new(vec![self.key.into()])).await }) } } diff --git a/lib/src/api/method/update.rs b/lib/src/api/method/update.rs index 503559b1..eb96e3db 100644 --- a/lib/src/api/method/update.rs +++ b/lib/src/api/method/update.rs @@ -1,5 +1,6 @@ use crate::api::conn::Method; use crate::api::conn::Param; +use crate::api::method::BoxFuture; use crate::api::method::Content; use crate::api::method::Merge; use crate::api::method::Patch; @@ -15,10 +16,8 @@ use crate::Surreal; use serde::de::DeserializeOwned; use serde::Serialize; use std::borrow::Cow; -use std::future::Future; use std::future::IntoFuture; use std::marker::PhantomData; -use std::pin::Pin; /// An update future #[derive(Debug)] @@ -57,8 +56,8 @@ macro_rules! into_future { Some(range) => resource?.with_range(range)?.into(), None => resource?.into(), }; - let mut conn = Client::new(Method::Update); - conn.$method(client.router.extract()?, Param::new(vec![param])).await + let router = client.router.extract()?; + router.$method(Method::Upsert, Param::new(vec![param])).await }) } }; @@ -69,7 +68,7 @@ where Client: Connection, { type Output = Result; - type IntoFuture = Pin + Send + Sync + 'r>>; + type IntoFuture = BoxFuture<'r, Self::Output>; into_future! {execute_value} } @@ -80,7 +79,7 @@ where R: DeserializeOwned, { type Output = Result>; - type IntoFuture = Pin + Send + Sync + 'r>>; + type IntoFuture = BoxFuture<'r, Self::Output>; into_future! {execute_opt} } @@ -91,7 +90,7 @@ where R: DeserializeOwned, { type Output = Result>; - type IntoFuture = Pin + Send + Sync + 'r>>; + type IntoFuture = BoxFuture<'r, Self::Output>; into_future! {execute_vec} } diff --git a/lib/src/api/method/upsert.rs b/lib/src/api/method/upsert.rs index 843af64e..3ac103e7 100644 --- a/lib/src/api/method/upsert.rs +++ b/lib/src/api/method/upsert.rs @@ -1,5 +1,6 @@ use crate::api::conn::Method; use crate::api::conn::Param; +use crate::api::method::BoxFuture; use crate::api::method::Content; use crate::api::method::Merge; use crate::api::method::Patch; @@ -15,10 +16,8 @@ use crate::Surreal; use serde::de::DeserializeOwned; use serde::Serialize; use std::borrow::Cow; -use std::future::Future; use std::future::IntoFuture; use std::marker::PhantomData; -use std::pin::Pin; /// An upsert future #[derive(Debug)] @@ -57,8 +56,8 @@ macro_rules! into_future { Some(range) => resource?.with_range(range)?.into(), None => resource?.into(), }; - let mut conn = Client::new(Method::Upsert); - conn.$method(client.router.extract()?, Param::new(vec![param])).await + let router = client.router.extract()?; + router.$method(Method::Upsert, Param::new(vec![param])).await }) } }; @@ -69,7 +68,7 @@ where Client: Connection, { type Output = Result; - type IntoFuture = Pin + Send + Sync + 'r>>; + type IntoFuture = BoxFuture<'r, Self::Output>; into_future! {execute_value} } @@ -80,7 +79,7 @@ where R: DeserializeOwned, { type Output = Result>; - type IntoFuture = Pin + Send + Sync + 'r>>; + type IntoFuture = BoxFuture<'r, Self::Output>; into_future! {execute_opt} } @@ -91,7 +90,7 @@ where R: DeserializeOwned, { type Output = Result>; - type IntoFuture = Pin + Send + Sync + 'r>>; + type IntoFuture = BoxFuture<'r, Self::Output>; into_future! {execute_vec} } diff --git a/lib/src/api/method/use_db.rs b/lib/src/api/method/use_db.rs index 9be2510a..f4161413 100644 --- a/lib/src/api/method/use_db.rs +++ b/lib/src/api/method/use_db.rs @@ -1,3 +1,5 @@ +use crate::api::method::BoxFuture; + use crate::api::conn::Method; use crate::api::conn::Param; use crate::api::Connection; @@ -7,9 +9,7 @@ use crate::opt::WaitFor; use crate::sql::Value; use crate::Surreal; use std::borrow::Cow; -use std::future::Future; use std::future::IntoFuture; -use std::pin::Pin; #[derive(Debug)] #[must_use = "futures do nothing unless you `.await` or poll them"] @@ -37,16 +37,12 @@ where Client: Connection, { type Output = Result<()>; - type IntoFuture = Pin + Send + Sync + 'r>>; + type IntoFuture = BoxFuture<'r, Self::Output>; fn into_future(self) -> Self::IntoFuture { Box::pin(async move { - let mut conn = Client::new(Method::Use); - conn.execute_unit( - self.client.router.extract()?, - Param::new(vec![self.ns, self.db.into()]), - ) - .await?; + let router = self.client.router.extract()?; + router.execute_unit(Method::Use, Param::new(vec![self.ns, self.db.into()])).await?; self.client.waiter.0.send(Some(WaitFor::Database)).ok(); Ok(()) }) diff --git a/lib/src/api/method/use_ns.rs b/lib/src/api/method/use_ns.rs index 421a63e8..bae8e39c 100644 --- a/lib/src/api/method/use_ns.rs +++ b/lib/src/api/method/use_ns.rs @@ -1,3 +1,5 @@ +use crate::api::method::BoxFuture; + use crate::api::conn::Method; use crate::api::conn::Param; use crate::api::method::UseDb; @@ -7,9 +9,7 @@ use crate::method::OnceLockExt; use crate::sql::Value; use crate::Surreal; use std::borrow::Cow; -use std::future::Future; use std::future::IntoFuture; -use std::pin::Pin; /// Stores the namespace to use #[derive(Debug)] @@ -51,16 +51,12 @@ where Client: Connection, { type Output = Result<()>; - type IntoFuture = Pin + Send + Sync + 'r>>; + type IntoFuture = BoxFuture<'r, Self::Output>; fn into_future(self) -> Self::IntoFuture { Box::pin(async move { - let mut conn = Client::new(Method::Use); - conn.execute_unit( - self.client.router.extract()?, - Param::new(vec![self.ns.into(), Value::None]), - ) - .await + let router = self.client.router.extract()?; + router.execute_unit(Method::Use, Param::new(vec![self.ns.into(), Value::None])).await }) } } diff --git a/lib/src/api/method/version.rs b/lib/src/api/method/version.rs index 85b0d7ca..581a4801 100644 --- a/lib/src/api/method/version.rs +++ b/lib/src/api/method/version.rs @@ -1,3 +1,5 @@ +use crate::api::method::BoxFuture; + use crate::api::conn::Method; use crate::api::conn::Param; use crate::api::err::Error; @@ -6,9 +8,7 @@ use crate::api::Result; use crate::method::OnceLockExt; use crate::Surreal; use std::borrow::Cow; -use std::future::Future; use std::future::IntoFuture; -use std::pin::Pin; /// A version future #[derive(Debug)] @@ -34,13 +34,13 @@ where Client: Connection, { type Output = Result; - type IntoFuture = Pin + Send + Sync + 'r>>; + type IntoFuture = BoxFuture<'r, Self::Output>; fn into_future(self) -> Self::IntoFuture { Box::pin(async move { - let mut conn = Client::new(Method::Version); - let version = conn - .execute_value(self.client.router.extract()?, Param::new(Vec::new())) + let router = self.client.router.extract()?; + let version = router + .execute_value(Method::Version, Param::new(Vec::new())) .await? .convert_to_string()?; let semantic = version.trim_start_matches("surrealdb-"); diff --git a/lib/src/api/mod.rs b/lib/src/api/mod.rs index 6e926a1b..29fda453 100644 --- a/lib/src/api/mod.rs +++ b/lib/src/api/mod.rs @@ -10,10 +10,10 @@ pub mod opt; mod conn; pub use method::query::Response; +use method::BoxFuture; use semver::Version; use tokio::sync::watch; -use crate::api::conn::DbResponse; use crate::api::conn::Router; use crate::api::err::Error; use crate::api::opt::Endpoint; @@ -21,10 +21,8 @@ use semver::BuildMetadata; use semver::VersionReq; use std::fmt; use std::fmt::Debug; -use std::future::Future; use std::future::IntoFuture; use std::marker::PhantomData; -use std::pin::Pin; use std::sync::Arc; use std::sync::OnceLock; @@ -96,7 +94,7 @@ where Client: Connection, { type Output = Result>; - type IntoFuture = Pin + Send + Sync>>; + type IntoFuture = BoxFuture<'static, Self::Output>; fn into_future(self) -> Self::IntoFuture { Box::pin(async move { @@ -126,7 +124,7 @@ where Client: Connection, { type Output = Result<()>; - type IntoFuture = Pin + Send + Sync>>; + type IntoFuture = BoxFuture<'static, Self::Output>; fn into_future(self) -> Self::IntoFuture { Box::pin(async move {