diff --git a/Cargo.lock b/Cargo.lock index d8bdd219..8028450e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2036,18 +2036,6 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8bf7cc16383c4b8d58b9905a8509f02926ce3058053c056376248d958c9df1e8" -[[package]] -name = "flume" -version = "0.11.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "55ac459de2512911e4b674ce33cf20befaba382d05b62b008afc1c8b57cbf181" -dependencies = [ - "futures-core", - "futures-sink", - "nanorand", - "spin 0.9.8", -] - [[package]] name = "fnv" version = "1.0.7" @@ -3460,15 +3448,6 @@ dependencies = [ "rand 0.8.5", ] -[[package]] -name = "nanorand" -version = "0.7.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6a51313c5820b0b02bd422f4b44776fbf47961755c74ce64afc73bfad10226c3" -dependencies = [ - "getrandom 0.2.12", -] - [[package]] name = "native-tls" version = "0.2.11" @@ -5963,7 +5942,6 @@ dependencies = [ "dmp", "env_logger 0.10.2", "flate2", - "flume", "futures", "geo 0.27.0", "hashbrown 0.14.5", diff --git a/core/src/dbs/executor.rs b/core/src/dbs/executor.rs index 8ac4cf31..0d8aba63 100644 --- a/core/src/dbs/executor.rs +++ b/core/src/dbs/executor.rs @@ -422,14 +422,13 @@ impl<'a> Executor<'a> { } }, }; + + self.err = res.is_err(); // Produce the response let res = Response { // Get the statement end time time: now.elapsed(), - result: res.inspect_err(|_| { - // Mark the error. - self.err = true; - }), + result: res, query_type: match (is_stm_live, is_stm_kill) { (true, _) => QueryType::Live, (_, true) => QueryType::Kill, diff --git a/core/src/fnc/array.rs b/core/src/fnc/array.rs index 1363b3ba..11837e08 100644 --- a/core/src/fnc/array.rs +++ b/core/src/fnc/array.rs @@ -431,14 +431,14 @@ mod tests { assert_eq!(slice((initial_values, beg, lim)).unwrap(), expected_values.into()); } - let array = &[b'a', b'b', b'c', b'd', b'e', b'f', b'g']; + let array = b"abcdefg"; test(array, None, None, array); test(array, Some(2), None, &array[2..]); test(array, Some(2), Some(3), &array[2..5]); - test(array, Some(2), Some(-1), &[b'c', b'd', b'e', b'f']); - test(array, Some(-2), None, &[b'f', b'g']); - test(array, Some(-4), Some(2), &[b'd', b'e']); - test(array, Some(-4), Some(-1), &[b'd', b'e', b'f']); + test(array, Some(2), Some(-1), b"cdef"); + test(array, Some(-2), None, b"fg"); + test(array, Some(-4), Some(2), b"de"); + test(array, Some(-4), Some(-1), b"def"); } #[test] diff --git a/core/src/key/change/mod.rs b/core/src/key/change/mod.rs index 1830ba29..0de7e365 100644 --- a/core/src/key/change/mod.rs +++ b/core/src/key/change/mod.rs @@ -31,14 +31,14 @@ pub fn new<'a>(ns: &'a str, db: &'a str, ts: u64, tb: &'a str) -> Cf<'a> { #[allow(unused)] pub fn versionstamped_key_prefix(ns: &str, db: &str) -> Vec { let mut k = crate::key::database::all::new(ns, db).encode().unwrap(); - k.extend_from_slice(&[b'#']); + k.extend_from_slice(b"#"); k } #[allow(unused)] pub fn versionstamped_key_suffix(tb: &str) -> Vec { let mut k: Vec = vec![]; - k.extend_from_slice(&[b'*']); + k.extend_from_slice(b"*"); k.extend_from_slice(tb.as_bytes()); // Without this, decoding fails with UnexpectedEOF errors k.extend_from_slice(&[0x00]); @@ -50,7 +50,7 @@ pub fn versionstamped_key_suffix(tb: &str) -> Vec { #[allow(unused)] pub fn prefix_ts(ns: &str, db: &str, vs: vs::Versionstamp) -> Vec { let mut k = crate::key::database::all::new(ns, db).encode().unwrap(); - k.extend_from_slice(&[b'#']); + k.extend_from_slice(b"#"); k.extend_from_slice(&vs); k } @@ -59,7 +59,7 @@ pub fn prefix_ts(ns: &str, db: &str, vs: vs::Versionstamp) -> Vec { #[allow(unused)] pub fn prefix(ns: &str, db: &str) -> Vec { let mut k = crate::key::database::all::new(ns, db).encode().unwrap(); - k.extend_from_slice(&[b'#']); + k.extend_from_slice(b"#"); k } diff --git a/core/src/key/database/ts.rs b/core/src/key/database/ts.rs index 9b6f93af..ec45e263 100644 --- a/core/src/key/database/ts.rs +++ b/core/src/key/database/ts.rs @@ -28,7 +28,7 @@ pub fn new<'a>(ns: &'a str, db: &'a str, ts: u64) -> Ts<'a> { /// Returns the prefix for the whole database timestamps pub fn prefix(ns: &str, db: &str) -> Vec { let mut k = crate::key::database::all::new(ns, db).encode().unwrap(); - k.extend_from_slice(&[b'!', b't', b's']); + k.extend_from_slice(b"!ts"); k } diff --git a/core/src/key/table/lq.rs b/core/src/key/table/lq.rs index 6a29a475..9d40103a 100644 --- a/core/src/key/table/lq.rs +++ b/core/src/key/table/lq.rs @@ -32,13 +32,13 @@ pub fn new<'a>(ns: &'a str, db: &'a str, tb: &'a str, lq: Uuid) -> Lq<'a> { pub fn prefix(ns: &str, db: &str, tb: &str) -> Vec { let mut k = super::all::new(ns, db, tb).encode().unwrap(); - k.extend_from_slice(&[b'!', b'l', b'q', 0x00]); + k.extend_from_slice(b"!lq\0"); k } pub fn suffix(ns: &str, db: &str, tb: &str) -> Vec { let mut k = super::all::new(ns, db, tb).encode().unwrap(); - k.extend_from_slice(&[b'!', b'l', b'q', 0xff]); + k.extend_from_slice(b"!lq\xff"); k } diff --git a/lib/Cargo.toml b/lib/Cargo.toml index eb282b16..6b685b39 100644 --- a/lib/Cargo.toml +++ b/lib/Cargo.toml @@ -70,7 +70,6 @@ bincode = "1.3.3" channel = { version = "1.9.0", package = "async-channel" } chrono = { version = "0.4.31", features = ["serde"] } dmp = "0.2.0" -flume = "0.11.0" futures = "0.3.29" geo = { version = "0.27.0", features = ["use-serde"] } indexmap = { version = "2.1.0", features = ["serde"] } diff --git a/lib/src/api/conn.rs b/lib/src/api/conn.rs index 420ff5ed..785dfe48 100644 --- a/lib/src/api/conn.rs +++ b/lib/src/api/conn.rs @@ -10,8 +10,8 @@ use crate::dbs::Notification; use crate::sql::from_value; use crate::sql::Query; use crate::sql::Value; -use flume::Receiver; -use flume::Sender; +use channel::Receiver; +use channel::Sender; use serde::de::DeserializeOwned; use serde::Serialize; use std::collections::BTreeMap; @@ -47,12 +47,12 @@ impl Router { ) -> BoxFuture<'_, Result>>> { Box::pin(async move { let id = self.next_id(); - let (sender, receiver) = flume::bounded(1); + let (sender, receiver) = channel::bounded(1); let route = Route { request: (id, method, param), response: sender, }; - self.sender.send_async(route).await?; + self.sender.send(route).await?; Ok(receiver) }) } @@ -63,7 +63,7 @@ impl Router { receiver: Receiver>, ) -> BoxFuture<'_, Result> { Box::pin(async move { - let response = receiver.into_recv_async().await?; + let response = receiver.recv().await?; match response? { DbResponse::Other(value) => Ok(value), DbResponse::Query(..) => unreachable!(), @@ -77,7 +77,7 @@ impl Router { receiver: Receiver>, ) -> BoxFuture<'_, Result> { Box::pin(async move { - let response = receiver.into_recv_async().await?; + let response = receiver.recv().await?; match response? { DbResponse::Query(results) => Ok(results), DbResponse::Other(..) => unreachable!(), diff --git a/lib/src/api/engine/any/native.rs b/lib/src/api/engine/any/native.rs index 9b7a3576..cc7dc785 100644 --- a/lib/src/api/engine/any/native.rs +++ b/lib/src/api/engine/any/native.rs @@ -39,11 +39,11 @@ impl Connection for Any { fn connect(address: Endpoint, capacity: usize) -> BoxFuture<'static, Result>> { Box::pin(async move { let (route_tx, route_rx) = match capacity { - 0 => flume::unbounded(), - capacity => flume::bounded(capacity), + 0 => channel::unbounded(), + capacity => channel::bounded(capacity), }; - let (conn_tx, conn_rx) = flume::bounded::>(1); + let (conn_tx, conn_rx) = channel::bounded::>(1); let mut features = HashSet::new(); match EndpointKind::from(address.url.scheme()) { @@ -53,7 +53,7 @@ impl Connection for Any { features.insert(ExtraFeatures::Backup); features.insert(ExtraFeatures::LiveQueries); tokio::spawn(engine::local::native::run_router(address, conn_tx, route_rx)); - conn_rx.into_recv_async().await?? + conn_rx.recv().await?? } #[cfg(not(feature = "kv-fdb"))] @@ -68,7 +68,7 @@ impl Connection for Any { features.insert(ExtraFeatures::Backup); features.insert(ExtraFeatures::LiveQueries); tokio::spawn(engine::local::native::run_router(address, conn_tx, route_rx)); - conn_rx.into_recv_async().await?? + conn_rx.recv().await?? } #[cfg(not(feature = "kv-mem"))] @@ -83,7 +83,7 @@ impl Connection for Any { features.insert(ExtraFeatures::Backup); features.insert(ExtraFeatures::LiveQueries); tokio::spawn(engine::local::native::run_router(address, conn_tx, route_rx)); - conn_rx.into_recv_async().await?? + conn_rx.recv().await?? } #[cfg(not(feature = "kv-rocksdb"))] @@ -99,7 +99,7 @@ impl Connection for Any { features.insert(ExtraFeatures::Backup); features.insert(ExtraFeatures::LiveQueries); tokio::spawn(engine::local::native::run_router(address, conn_tx, route_rx)); - conn_rx.into_recv_async().await?? + conn_rx.recv().await?? } #[cfg(not(feature = "kv-tikv"))] @@ -114,7 +114,7 @@ impl Connection for Any { features.insert(ExtraFeatures::Backup); features.insert(ExtraFeatures::LiveQueries); tokio::spawn(engine::local::native::run_router(address, conn_tx, route_rx)); - conn_rx.into_recv_async().await?? + conn_rx.recv().await?? } #[cfg(not(feature = "kv-surrealkv"))] diff --git a/lib/src/api/engine/any/wasm.rs b/lib/src/api/engine/any/wasm.rs index 60bbf269..a4e9e972 100644 --- a/lib/src/api/engine/any/wasm.rs +++ b/lib/src/api/engine/any/wasm.rs @@ -26,11 +26,11 @@ impl Connection for Any { fn connect(address: Endpoint, capacity: usize) -> BoxFuture<'static, Result>> { Box::pin(async move { let (route_tx, route_rx) = match capacity { - 0 => flume::unbounded(), - capacity => flume::bounded(capacity), + 0 => channel::unbounded(), + capacity => channel::bounded(capacity), }; - let (conn_tx, conn_rx) = flume::bounded::>(1); + let (conn_tx, conn_rx) = channel::bounded::>(1); let mut features = HashSet::new(); match EndpointKind::from(address.url.scheme()) { @@ -39,7 +39,7 @@ impl Connection for Any { { features.insert(ExtraFeatures::LiveQueries); spawn_local(engine::local::wasm::run_router(address, conn_tx, route_rx)); - conn_rx.into_recv_async().await??; + conn_rx.recv().await??; } #[cfg(not(feature = "kv-fdb"))] @@ -53,7 +53,7 @@ impl Connection for Any { { features.insert(ExtraFeatures::LiveQueries); spawn_local(engine::local::wasm::run_router(address, conn_tx, route_rx)); - conn_rx.into_recv_async().await??; + conn_rx.recv().await??; } #[cfg(not(feature = "kv-indxdb"))] @@ -67,7 +67,7 @@ impl Connection for Any { { features.insert(ExtraFeatures::LiveQueries); spawn_local(engine::local::wasm::run_router(address, conn_tx, route_rx)); - conn_rx.into_recv_async().await??; + conn_rx.recv().await??; } #[cfg(not(feature = "kv-mem"))] @@ -81,7 +81,7 @@ impl Connection for Any { { features.insert(ExtraFeatures::LiveQueries); spawn_local(engine::local::wasm::run_router(address, conn_tx, route_rx)); - conn_rx.into_recv_async().await??; + conn_rx.recv().await??; } #[cfg(not(feature = "kv-rocksdb"))] @@ -96,7 +96,7 @@ impl Connection for Any { { features.insert(ExtraFeatures::LiveQueries); spawn_local(engine::local::wasm::run_router(address, conn_tx, route_rx)); - conn_rx.into_recv_async().await??; + conn_rx.recv().await??; } #[cfg(not(feature = "kv-surrealkv"))] @@ -111,7 +111,7 @@ impl Connection for Any { { features.insert(ExtraFeatures::LiveQueries); spawn_local(engine::local::wasm::run_router(address, conn_tx, route_rx)); - conn_rx.into_recv_async().await??; + conn_rx.recv().await??; } #[cfg(not(feature = "kv-tikv"))] @@ -144,7 +144,7 @@ impl Connection for Any { spawn_local(engine::remote::ws::wasm::run_router( endpoint, capacity, conn_tx, route_rx, )); - conn_rx.into_recv_async().await??; + conn_rx.recv().await??; } #[cfg(not(feature = "protocol-ws"))] diff --git a/lib/src/api/engine/local/native.rs b/lib/src/api/engine/local/native.rs index 16bf3155..29367ffe 100644 --- a/lib/src/api/engine/local/native.rs +++ b/lib/src/api/engine/local/native.rs @@ -15,8 +15,8 @@ use crate::kvs::Datastore; use crate::opt::auth::Root; use crate::opt::WaitFor; use crate::options::EngineOptions; -use flume::Receiver; -use flume::Sender; +use channel::Receiver; +use channel::Sender; use futures::stream::poll_fn; use futures::StreamExt; use std::collections::BTreeMap; @@ -34,15 +34,15 @@ impl Connection for Db { fn connect(address: Endpoint, capacity: usize) -> BoxFuture<'static, Result>> { Box::pin(async move { let (route_tx, route_rx) = match capacity { - 0 => flume::unbounded(), - capacity => flume::bounded(capacity), + 0 => channel::unbounded(), + capacity => channel::bounded(capacity), }; - let (conn_tx, conn_rx) = flume::bounded(1); + let (conn_tx, conn_rx) = channel::bounded(1); tokio::spawn(run_router(address, conn_tx, route_rx)); - conn_rx.into_recv_async().await??; + conn_rx.recv().await??; let mut features = HashSet::new(); features.insert(ExtraFeatures::Backup); @@ -81,21 +81,21 @@ pub(crate) async fn run_router( let kvs = match Datastore::new(endpoint).await { Ok(kvs) => { if let Err(error) = kvs.bootstrap().await { - let _ = conn_tx.into_send_async(Err(error.into())).await; + let _ = conn_tx.send(Err(error.into())).await; return; } // If a root user is specified, setup the initial datastore credentials if let Some(root) = configured_root { if let Err(error) = kvs.setup_initial_creds(root.username, root.password).await { - let _ = conn_tx.into_send_async(Err(error.into())).await; + let _ = conn_tx.send(Err(error.into())).await; return; } } - let _ = conn_tx.into_send_async(Ok(())).await; + let _ = conn_tx.send(Ok(())).await; kvs.with_auth_enabled(configured_root.is_some()) } Err(error) => { - let _ = conn_tx.into_send_async(Err(error.into())).await; + let _ = conn_tx.send(Err(error.into())).await; return; } }; @@ -142,22 +142,21 @@ pub(crate) async fn run_router( // constantly polled. None => Poll::Pending, }); - let mut route_stream = route_rx.into_stream(); loop { tokio::select! { - route = route_stream.next() => { - let Some(route) = route else { + route = route_rx.recv() => { + let Ok(route) = route else { break }; match super::router(route.request, &kvs, &mut session, &mut vars, &mut live_queries) .await { Ok(value) => { - let _ = route.response.into_send_async(Ok(value)).await; + let _ = route.response.send(Ok(value)).await; } Err(error) => { - let _ = route.response.into_send_async(Err(error)).await; + let _ = route.response.send(Err(error)).await; } } } diff --git a/lib/src/api/engine/local/wasm.rs b/lib/src/api/engine/local/wasm.rs index 0e2fc7b3..1708f2bc 100644 --- a/lib/src/api/engine/local/wasm.rs +++ b/lib/src/api/engine/local/wasm.rs @@ -16,8 +16,7 @@ use crate::kvs::Datastore; use crate::opt::auth::Root; use crate::opt::WaitFor; use crate::options::EngineOptions; -use flume::Receiver; -use flume::Sender; +use channel::{Receiver, Sender}; use futures::stream::poll_fn; use futures::FutureExt; use futures::StreamExt; @@ -37,15 +36,15 @@ impl Connection for Db { fn connect(address: Endpoint, capacity: usize) -> BoxFuture<'static, Result>> { Box::pin(async move { let (route_tx, route_rx) = match capacity { - 0 => flume::unbounded(), - capacity => flume::bounded(capacity), + 0 => channel::unbounded(), + capacity => channel::bounded(capacity), }; - let (conn_tx, conn_rx) = flume::bounded(1); + let (conn_tx, conn_rx) = channel::bounded(1); spawn_local(run_router(address, conn_tx, route_rx)); - conn_rx.into_recv_async().await??; + conn_rx.recv().await??; let mut features = HashSet::new(); features.insert(ExtraFeatures::LiveQueries); @@ -78,21 +77,21 @@ pub(crate) async fn run_router( let kvs = match Datastore::new(&address.path).await { Ok(kvs) => { if let Err(error) = kvs.bootstrap().await { - let _ = conn_tx.into_send_async(Err(error.into())).await; + let _ = conn_tx.send(Err(error.into())).await; return; } // If a root user is specified, setup the initial datastore credentials if let Some(root) = configured_root { if let Err(error) = kvs.setup_initial_creds(root.username, root.password).await { - let _ = conn_tx.into_send_async(Err(error.into())).await; + let _ = conn_tx.send(Err(error.into())).await; return; } } - let _ = conn_tx.into_send_async(Ok(())).await; + let _ = conn_tx.send(Ok(())).await; kvs.with_auth_enabled(configured_root.is_some()) } Err(error) => { - let _ = conn_tx.into_send_async(Err(error.into())).await; + let _ = conn_tx.send(Err(error.into())).await; return; } }; @@ -123,13 +122,11 @@ pub(crate) async fn run_router( None => Poll::Pending, }); - let mut route_stream = route_rx.into_stream(); - loop { // use the less ergonomic futures::select as tokio::select is not available. futures::select! { - route = route_stream.next().fuse() => { - let Some(route) = route else { + route = route_rx.recv().fuse() => { + let Ok(route) = route else { // termination requested break }; @@ -144,10 +141,10 @@ pub(crate) async fn run_router( .await { Ok(value) => { - let _ = route.response.into_send_async(Ok(value)).await; + let _ = route.response.send(Ok(value)).await; } Err(error) => { - let _ = route.response.into_send_async(Err(error)).await; + let _ = route.response.send(Err(error)).await; } } } diff --git a/lib/src/api/engine/remote/http/native.rs b/lib/src/api/engine/remote/http/native.rs index 9696ec7f..1882afd3 100644 --- a/lib/src/api/engine/remote/http/native.rs +++ b/lib/src/api/engine/remote/http/native.rs @@ -12,8 +12,7 @@ use crate::api::OnceLockExt; use crate::api::Result; use crate::api::Surreal; use crate::opt::WaitFor; -use flume::Receiver; -use futures::StreamExt; +use channel::Receiver; use indexmap::IndexMap; use reqwest::header::HeaderMap; use reqwest::ClientBuilder; @@ -51,8 +50,8 @@ impl Connection for Client { super::health(client.get(base_url.join(Method::Health.as_str())?)).await?; let (route_tx, route_rx) = match capacity { - 0 => flume::unbounded(), - capacity => flume::bounded(capacity), + 0 => channel::unbounded(), + capacity => channel::bounded(capacity), }; tokio::spawn(run_router(base_url, client, route_rx)); @@ -76,12 +75,11 @@ pub(crate) async fn run_router(base_url: Url, client: reqwest::Client, route_rx: let mut headers = HeaderMap::new(); let mut vars = IndexMap::new(); let mut auth = None; - let mut stream = route_rx.into_stream(); - while let Some(route) = stream.next().await { + while let Ok(route) = route_rx.recv().await { let result = super::router(route.request, &base_url, &client, &mut headers, &mut vars, &mut auth) .await; - let _ = route.response.into_send_async(result).await; + let _ = route.response.send(result).await; } } diff --git a/lib/src/api/engine/remote/http/wasm.rs b/lib/src/api/engine/remote/http/wasm.rs index 07b638a4..e94f9a08 100644 --- a/lib/src/api/engine/remote/http/wasm.rs +++ b/lib/src/api/engine/remote/http/wasm.rs @@ -9,9 +9,7 @@ use crate::api::OnceLockExt; use crate::api::Result; use crate::api::Surreal; use crate::opt::WaitFor; -use flume::Receiver; -use flume::Sender; -use futures::StreamExt; +use channel::{Receiver, Sender}; use indexmap::IndexMap; use reqwest::header::HeaderMap; use reqwest::ClientBuilder; @@ -29,15 +27,15 @@ impl Connection for Client { fn connect(address: Endpoint, capacity: usize) -> BoxFuture<'static, Result>> { Box::pin(async move { let (route_tx, route_rx) = match capacity { - 0 => flume::unbounded(), - capacity => flume::bounded(capacity), + 0 => channel::unbounded(), + capacity => channel::bounded(capacity), }; - let (conn_tx, conn_rx) = flume::bounded(1); + let (conn_tx, conn_rx) = channel::bounded(1); spawn_local(run_router(address, conn_tx, route_rx)); - conn_rx.into_recv_async().await??; + conn_rx.recv().await??; Ok(Surreal::new_from_router_waiter( Arc::new(OnceLock::with_value(Router { @@ -69,11 +67,11 @@ pub(crate) async fn run_router( let client = match client(&base_url).await { Ok(client) => { - let _ = conn_tx.into_send_async(Ok(())).await; + let _ = conn_tx.send(Ok(())).await; client } Err(error) => { - let _ = conn_tx.into_send_async(Err(error)).await; + let _ = conn_tx.send(Err(error)).await; return; } }; @@ -81,17 +79,16 @@ pub(crate) async fn run_router( let mut headers = HeaderMap::new(); let mut vars = IndexMap::new(); let mut auth = None; - let mut stream = route_rx.into_stream(); - while let Some(route) = stream.next().await { + while let Ok(route) = route_rx.recv().await { match super::router(route.request, &base_url, &client, &mut headers, &mut vars, &mut auth) .await { Ok(value) => { - let _ = route.response.into_send_async(Ok(value)).await; + let _ = route.response.send(Ok(value)).await; } Err(error) => { - let _ = route.response.into_send_async(Err(error)).await; + let _ = route.response.send(Err(error)).await; } } } diff --git a/lib/src/api/engine/remote/ws/mod.rs b/lib/src/api/engine/remote/ws/mod.rs index c1a47dd6..65522a7b 100644 --- a/lib/src/api/engine/remote/ws/mod.rs +++ b/lib/src/api/engine/remote/ws/mod.rs @@ -21,7 +21,7 @@ use crate::method::Stats; use crate::opt::IntoEndpoint; use crate::sql::Value; use bincode::Options as _; -use flume::Sender; +use channel::Sender; use indexmap::IndexMap; use revision::revisioned; use revision::Revisioned; diff --git a/lib/src/api/engine/remote/ws/native.rs b/lib/src/api/engine/remote/ws/native.rs index 849e72aa..0d8208b2 100644 --- a/lib/src/api/engine/remote/ws/native.rs +++ b/lib/src/api/engine/remote/ws/native.rs @@ -23,7 +23,7 @@ use crate::engine::remote::ws::Data; use crate::engine::IntervalStream; use crate::opt::WaitFor; use crate::sql::Value; -use flume::Receiver; +use channel::Receiver; use futures::stream::{SplitSink, SplitStream}; use futures::SinkExt; use futures::StreamExt; @@ -124,8 +124,8 @@ impl Connection for Client { let socket = connect(&address, Some(config), maybe_connector.clone()).await?; let (route_tx, route_rx) = match capacity { - 0 => flume::unbounded(), - capacity => flume::bounded(capacity), + 0 => channel::unbounded(), + capacity => channel::bounded(capacity), }; tokio::spawn(run_router(address, maybe_connector, capacity, config, socket, route_rx)); @@ -177,7 +177,7 @@ async fn router_handle_route( state.live_queries.insert(id.0, sender); } } - if response.clone().into_send_async(Ok(DbResponse::Other(Value::None))).await.is_err() { + if response.clone().send(Ok(DbResponse::Other(Value::None))).await.is_err() { trace!("Receiver dropped"); } // There is nothing to send to the server here @@ -222,7 +222,7 @@ async fn router_handle_route( } Entry::Occupied(..) => { let error = Error::DuplicateRequestId(id); - if response.into_send_async(Err(error.into())).await.is_err() { + if response.send(Err(error.into())).await.is_err() { trace!("Receiver dropped"); } } @@ -230,7 +230,7 @@ async fn router_handle_route( } Err(error) => { let error = Error::Ws(error.to_string()); - if response.into_send_async(Err(error.into())).await.is_err() { + if response.send(Err(error.into())).await.is_err() { trace!("Receiver dropped"); } return HandleResult::Disconnected; @@ -270,7 +270,7 @@ async fn router_handle_response( } } } - let _res = sender.into_send_async(DbResponse::from(response)).await; + let _res = sender.send(DbResponse::from(response)).await; } } } @@ -327,7 +327,7 @@ async fn router_handle_response( // Return an error if an ID was returned if let Some(Ok(id)) = id.map(Value::coerce_to_i64) { if let Some((_method, sender)) = state.routes.remove(&id) { - let _res = sender.into_send_async(Err(error)).await; + let _res = sender.send(Err(error)).await; } } } else { @@ -405,7 +405,6 @@ pub(crate) async fn run_router( let (socket_sink, socket_stream) = socket.split(); let mut state = RouterState::new(socket_sink, socket_stream); - let mut route_stream = route_rx.into_stream(); 'router: loop { let mut interval = time::interval(PING_INTERVAL); @@ -423,11 +422,11 @@ pub(crate) async fn run_router( loop { tokio::select! { - route = route_stream.next() => { + route = route_rx.recv() => { // handle incoming route - let Some(response) = route else { - // route returned none, frontend dropped the channel, meaning the router + let Ok(response) = route else { + // route returned Err, frontend dropped the channel, meaning the router // should quit. match state.sink.send(Message::Close(None)).await { Ok(..) => trace!("Connection closed successfully"), diff --git a/lib/src/api/engine/remote/ws/wasm.rs b/lib/src/api/engine/remote/ws/wasm.rs index 2cad1d33..d342ed28 100644 --- a/lib/src/api/engine/remote/ws/wasm.rs +++ b/lib/src/api/engine/remote/ws/wasm.rs @@ -20,8 +20,7 @@ use crate::engine::remote::ws::{Data, RouterRequest}; use crate::engine::IntervalStream; use crate::opt::WaitFor; use crate::sql::Value; -use flume::Receiver; -use flume::Sender; +use channel::{Receiver, Sender}; use futures::stream::{SplitSink, SplitStream}; use futures::FutureExt; use futures::SinkExt; @@ -64,15 +63,15 @@ impl Connection for Client { address.url = address.url.join(PATH)?; let (route_tx, route_rx) = match capacity { - 0 => flume::unbounded(), - capacity => flume::bounded(capacity), + 0 => channel::unbounded(), + capacity => channel::bounded(capacity), }; - let (conn_tx, conn_rx) = flume::bounded(1); + let (conn_tx, conn_rx) = channel::bounded(1); spawn_local(run_router(address, capacity, conn_tx, route_rx)); - conn_rx.into_recv_async().await??; + conn_rx.recv().await??; let mut features = HashSet::new(); features.insert(ExtraFeatures::LiveQueries); @@ -121,7 +120,7 @@ async fn router_handle_request( state.live_queries.insert(id.0, sender); } } - if response.into_send_async(Ok(DbResponse::Other(Value::None))).await.is_err() { + if response.send(Ok(DbResponse::Other(Value::None))).await.is_err() { trace!("Receiver dropped"); } // There is nothing to send to the server here @@ -165,7 +164,7 @@ async fn router_handle_request( } Entry::Occupied(..) => { let error = Error::DuplicateRequestId(id); - if response.into_send_async(Err(error.into())).await.is_err() { + if response.send(Err(error.into())).await.is_err() { trace!("Receiver dropped"); } } @@ -173,7 +172,7 @@ async fn router_handle_request( } Err(error) => { let error = Error::Ws(error.to_string()); - if response.into_send_async(Err(error.into())).await.is_err() { + if response.send(Err(error.into())).await.is_err() { trace!("Receiver dropped"); } return HandleResult::Disconnected; @@ -213,7 +212,7 @@ async fn router_handle_response( } } } - let _res = sender.into_send_async(DbResponse::from(response)).await; + let _res = sender.send(DbResponse::from(response)).await; } } } @@ -267,7 +266,7 @@ async fn router_handle_response( // Return an error if an ID was returned if let Some(Ok(id)) = id.map(Value::coerce_to_i64) { if let Some((_method, sender)) = state.routes.remove(&id) { - let _res = sender.into_send_async(Err(error)).await; + let _res = sender.send(Err(error)).await; } } } else { @@ -356,7 +355,7 @@ pub(crate) async fn run_router( let (mut ws, socket) = match connect { Ok(pair) => pair, Err(error) => { - let _ = conn_tx.into_send_async(Err(error.into())).await; + let _ = conn_tx.send(Err(error.into())).await; return; } }; @@ -369,13 +368,13 @@ pub(crate) async fn run_router( match result { Ok(events) => events, Err(error) => { - let _ = conn_tx.into_send_async(Err(error.into())).await; + let _ = conn_tx.send(Err(error.into())).await; return; } } }; - let _ = conn_tx.into_send_async(Ok(())).await; + let _ = conn_tx.send(Ok(())).await; let ping = { let mut request = BTreeMap::new(); @@ -389,8 +388,6 @@ pub(crate) async fn run_router( let mut state = RouterState::new(socket_sink, socket_stream); - let mut route_stream = route_rx.into_stream(); - 'router: loop { let mut interval = time::interval(PING_INTERVAL); // don't bombard the server with pings if we miss some ticks @@ -404,8 +401,8 @@ pub(crate) async fn run_router( loop { futures::select! { - route = route_stream.next() => { - let Some(route) = route else { + route = route_rx.recv().fuse() => { + let Ok(route) = route else { match ws.close().await { Ok(..) => trace!("Connection closed successfully"), Err(error) => { diff --git a/lib/src/api/err/mod.rs b/lib/src/api/err/mod.rs index e9cec299..c9c17fed 100644 --- a/lib/src/api/err/mod.rs +++ b/lib/src/api/err/mod.rs @@ -242,14 +242,14 @@ impl From for crate::Error { } } -impl From> for crate::Error { - fn from(error: flume::SendError) -> Self { +impl From> for crate::Error { + fn from(error: channel::SendError) -> Self { Self::Api(Error::InternalError(error.to_string())) } } -impl From for crate::Error { - fn from(error: flume::RecvError) -> Self { +impl From for crate::Error { + fn from(error: channel::RecvError) -> Self { Self::Api(Error::InternalError(error.to_string())) } } diff --git a/lib/src/api/method/authenticate.rs b/lib/src/api/method/authenticate.rs index 62779cae..745a9699 100644 --- a/lib/src/api/method/authenticate.rs +++ b/lib/src/api/method/authenticate.rs @@ -1,7 +1,6 @@ -use crate::api::method::BoxFuture; - use crate::api::conn::Method; use crate::api::conn::Param; +use crate::api::method::BoxFuture; use crate::api::method::OnceLockExt; use crate::api::opt::auth::Jwt; use crate::api::Connection; diff --git a/lib/src/api/method/begin.rs b/lib/src/api/method/begin.rs index 06f15f8e..475f8c85 100644 --- a/lib/src/api/method/begin.rs +++ b/lib/src/api/method/begin.rs @@ -1,5 +1,4 @@ use crate::api::method::BoxFuture; - use crate::api::method::Cancel; use crate::api::method::Commit; use crate::api::Connection; diff --git a/lib/src/api/method/cancel.rs b/lib/src/api/method/cancel.rs index a2b26391..c5c28a37 100644 --- a/lib/src/api/method/cancel.rs +++ b/lib/src/api/method/cancel.rs @@ -1,5 +1,4 @@ use crate::api::method::BoxFuture; - use crate::api::Connection; use crate::api::Result; use crate::api::Surreal; diff --git a/lib/src/api/method/commit.rs b/lib/src/api/method/commit.rs index 8c52816d..c3408d83 100644 --- a/lib/src/api/method/commit.rs +++ b/lib/src/api/method/commit.rs @@ -1,5 +1,4 @@ use crate::api::method::BoxFuture; - use crate::api::Connection; use crate::api::Result; use crate::api::Surreal; diff --git a/lib/src/api/method/health.rs b/lib/src/api/method/health.rs index 676dfc7f..25909214 100644 --- a/lib/src/api/method/health.rs +++ b/lib/src/api/method/health.rs @@ -1,7 +1,6 @@ -use crate::api::method::BoxFuture; - use crate::api::conn::Method; use crate::api::conn::Param; +use crate::api::method::BoxFuture; use crate::api::Connection; use crate::api::Result; use crate::method::OnceLockExt; diff --git a/lib/src/api/method/import.rs b/lib/src/api/method/import.rs index 0e8ceec7..8ee5415a 100644 --- a/lib/src/api/method/import.rs +++ b/lib/src/api/method/import.rs @@ -1,8 +1,7 @@ -use crate::api::method::BoxFuture; - use crate::api::conn::Method; use crate::api::conn::MlConfig; use crate::api::conn::Param; +use crate::api::method::BoxFuture; use crate::api::Connection; use crate::api::Error; use crate::api::ExtraFeatures; diff --git a/lib/src/api/method/invalidate.rs b/lib/src/api/method/invalidate.rs index 7f4548f1..4df83690 100644 --- a/lib/src/api/method/invalidate.rs +++ b/lib/src/api/method/invalidate.rs @@ -1,7 +1,6 @@ -use crate::api::method::BoxFuture; - use crate::api::conn::Method; use crate::api::conn::Param; +use crate::api::method::BoxFuture; use crate::api::Connection; use crate::api::Result; use crate::method::OnceLockExt; diff --git a/lib/src/api/method/set.rs b/lib/src/api/method/set.rs index 6a33b1ae..f615af7d 100644 --- a/lib/src/api/method/set.rs +++ b/lib/src/api/method/set.rs @@ -1,7 +1,6 @@ -use crate::api::method::BoxFuture; - use crate::api::conn::Method; use crate::api::conn::Param; +use crate::api::method::BoxFuture; use crate::api::Connection; use crate::api::Result; use crate::method::OnceLockExt; diff --git a/lib/src/api/method/tests/protocol.rs b/lib/src/api/method/tests/protocol.rs index 1ff3342c..21bfcfa9 100644 --- a/lib/src/api/method/tests/protocol.rs +++ b/lib/src/api/method/tests/protocol.rs @@ -52,7 +52,7 @@ impl crate::api::Connection for Client {} impl Connection for Client { fn connect(_address: Endpoint, capacity: usize) -> BoxFuture<'static, Result>> { Box::pin(async move { - let (route_tx, route_rx) = flume::bounded(capacity); + let (route_tx, route_rx) = channel::bounded(capacity); let mut features = HashSet::new(); features.insert(ExtraFeatures::Backup); let router = Router { diff --git a/lib/src/api/method/tests/server.rs b/lib/src/api/method/tests/server.rs index 4f32783a..55a6838c 100644 --- a/lib/src/api/method/tests/server.rs +++ b/lib/src/api/method/tests/server.rs @@ -1,3 +1,5 @@ +use channel::Receiver; + use super::types::User; use crate::api::conn::DbResponse; use crate::api::conn::Method; @@ -5,17 +7,13 @@ use crate::api::conn::Route; use crate::api::Response as QueryResponse; use crate::sql::to_value; use crate::sql::Value; -use flume::Receiver; -use futures::StreamExt; pub(super) fn mock(route_rx: Receiver) { tokio::spawn(async move { - let mut stream = route_rx.into_stream(); - - while let Some(Route { + while let Ok(Route { request, response, - }) = stream.next().await + }) = route_rx.recv().await { let (_, method, param) = request; let mut params = param.other; @@ -94,7 +92,7 @@ pub(super) fn mock(route_rx: Receiver) { }, }; - if let Err(message) = response.into_send_async(result).await { + if let Err(message) = response.send(result).await { panic!("message dropped; {message:?}"); } } diff --git a/lib/src/api/method/unset.rs b/lib/src/api/method/unset.rs index c233f352..65b834d0 100644 --- a/lib/src/api/method/unset.rs +++ b/lib/src/api/method/unset.rs @@ -1,7 +1,6 @@ -use crate::api::method::BoxFuture; - use crate::api::conn::Method; use crate::api::conn::Param; +use crate::api::method::BoxFuture; use crate::api::Connection; use crate::api::Result; use crate::method::OnceLockExt; diff --git a/lib/src/api/method/use_db.rs b/lib/src/api/method/use_db.rs index f4161413..6adf9d6d 100644 --- a/lib/src/api/method/use_db.rs +++ b/lib/src/api/method/use_db.rs @@ -1,7 +1,6 @@ -use crate::api::method::BoxFuture; - use crate::api::conn::Method; use crate::api::conn::Param; +use crate::api::method::BoxFuture; use crate::api::Connection; use crate::api::Result; use crate::method::OnceLockExt; diff --git a/lib/src/api/method/use_ns.rs b/lib/src/api/method/use_ns.rs index bae8e39c..ca872f65 100644 --- a/lib/src/api/method/use_ns.rs +++ b/lib/src/api/method/use_ns.rs @@ -1,7 +1,6 @@ -use crate::api::method::BoxFuture; - use crate::api::conn::Method; use crate::api::conn::Param; +use crate::api::method::BoxFuture; use crate::api::method::UseDb; use crate::api::Connection; use crate::api::Result; diff --git a/lib/src/api/method/version.rs b/lib/src/api/method/version.rs index 581a4801..39ae6855 100644 --- a/lib/src/api/method/version.rs +++ b/lib/src/api/method/version.rs @@ -1,8 +1,7 @@ -use crate::api::method::BoxFuture; - use crate::api::conn::Method; use crate::api::conn::Param; use crate::api::err::Error; +use crate::api::method::BoxFuture; use crate::api::Connection; use crate::api::Result; use crate::method::OnceLockExt; diff --git a/lib/tests/api/live.rs b/lib/tests/api/live.rs index 7a5c909a..cfaaec35 100644 --- a/lib/tests/api/live.rs +++ b/lib/tests/api/live.rs @@ -9,7 +9,6 @@ use surrealdb::method::QueryStream; use surrealdb::Action; use surrealdb::Notification; use surrealdb_core::sql::Object; -use tokio::sync::mpsc::{channel, Receiver, Sender}; use tokio::sync::RwLock; use tracing::info; @@ -429,17 +428,16 @@ async fn receive_all_pending_notifications< stream: Arc>, timeout: Duration, ) -> Vec> { - let (send, mut recv) = channel::>(MAX_NOTIFICATIONS); - let we_expect_timeout = tokio::time::timeout(timeout, async move { + let mut results = Vec::new(); + let we_expect_timeout = tokio::time::timeout(timeout, async { while let Some(notification) = stream.write().await.next().await { - send.send(notification.unwrap()).await.unwrap(); + if results.len() >= MAX_NOTIFICATIONS { + panic!("too many notification!") + } + results.push(notification.unwrap()) } }) .await; assert!(we_expect_timeout.is_err()); - let mut results = Vec::new(); - while let Ok(notification) = recv.try_recv() { - results.push(notification); - } results }