Remove flume (#4367)

This commit is contained in:
Mees Delzenne 2024-07-19 12:09:54 +02:00 committed by GitHub
parent 3b1a1f0348
commit 209c145ad0
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
33 changed files with 133 additions and 186 deletions

22
Cargo.lock generated
View file

@ -2036,18 +2036,6 @@ version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8bf7cc16383c4b8d58b9905a8509f02926ce3058053c056376248d958c9df1e8" checksum = "8bf7cc16383c4b8d58b9905a8509f02926ce3058053c056376248d958c9df1e8"
[[package]]
name = "flume"
version = "0.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "55ac459de2512911e4b674ce33cf20befaba382d05b62b008afc1c8b57cbf181"
dependencies = [
"futures-core",
"futures-sink",
"nanorand",
"spin 0.9.8",
]
[[package]] [[package]]
name = "fnv" name = "fnv"
version = "1.0.7" version = "1.0.7"
@ -3460,15 +3448,6 @@ dependencies = [
"rand 0.8.5", "rand 0.8.5",
] ]
[[package]]
name = "nanorand"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6a51313c5820b0b02bd422f4b44776fbf47961755c74ce64afc73bfad10226c3"
dependencies = [
"getrandom 0.2.12",
]
[[package]] [[package]]
name = "native-tls" name = "native-tls"
version = "0.2.11" version = "0.2.11"
@ -5963,7 +5942,6 @@ dependencies = [
"dmp", "dmp",
"env_logger 0.10.2", "env_logger 0.10.2",
"flate2", "flate2",
"flume",
"futures", "futures",
"geo 0.27.0", "geo 0.27.0",
"hashbrown 0.14.5", "hashbrown 0.14.5",

View file

@ -422,14 +422,13 @@ impl<'a> Executor<'a> {
} }
}, },
}; };
self.err = res.is_err();
// Produce the response // Produce the response
let res = Response { let res = Response {
// Get the statement end time // Get the statement end time
time: now.elapsed(), time: now.elapsed(),
result: res.inspect_err(|_| { result: res,
// Mark the error.
self.err = true;
}),
query_type: match (is_stm_live, is_stm_kill) { query_type: match (is_stm_live, is_stm_kill) {
(true, _) => QueryType::Live, (true, _) => QueryType::Live,
(_, true) => QueryType::Kill, (_, true) => QueryType::Kill,

View file

@ -431,14 +431,14 @@ mod tests {
assert_eq!(slice((initial_values, beg, lim)).unwrap(), expected_values.into()); assert_eq!(slice((initial_values, beg, lim)).unwrap(), expected_values.into());
} }
let array = &[b'a', b'b', b'c', b'd', b'e', b'f', b'g']; let array = b"abcdefg";
test(array, None, None, array); test(array, None, None, array);
test(array, Some(2), None, &array[2..]); test(array, Some(2), None, &array[2..]);
test(array, Some(2), Some(3), &array[2..5]); test(array, Some(2), Some(3), &array[2..5]);
test(array, Some(2), Some(-1), &[b'c', b'd', b'e', b'f']); test(array, Some(2), Some(-1), b"cdef");
test(array, Some(-2), None, &[b'f', b'g']); test(array, Some(-2), None, b"fg");
test(array, Some(-4), Some(2), &[b'd', b'e']); test(array, Some(-4), Some(2), b"de");
test(array, Some(-4), Some(-1), &[b'd', b'e', b'f']); test(array, Some(-4), Some(-1), b"def");
} }
#[test] #[test]

View file

@ -31,14 +31,14 @@ pub fn new<'a>(ns: &'a str, db: &'a str, ts: u64, tb: &'a str) -> Cf<'a> {
#[allow(unused)] #[allow(unused)]
pub fn versionstamped_key_prefix(ns: &str, db: &str) -> Vec<u8> { pub fn versionstamped_key_prefix(ns: &str, db: &str) -> Vec<u8> {
let mut k = crate::key::database::all::new(ns, db).encode().unwrap(); let mut k = crate::key::database::all::new(ns, db).encode().unwrap();
k.extend_from_slice(&[b'#']); k.extend_from_slice(b"#");
k k
} }
#[allow(unused)] #[allow(unused)]
pub fn versionstamped_key_suffix(tb: &str) -> Vec<u8> { pub fn versionstamped_key_suffix(tb: &str) -> Vec<u8> {
let mut k: Vec<u8> = vec![]; let mut k: Vec<u8> = vec![];
k.extend_from_slice(&[b'*']); k.extend_from_slice(b"*");
k.extend_from_slice(tb.as_bytes()); k.extend_from_slice(tb.as_bytes());
// Without this, decoding fails with UnexpectedEOF errors // Without this, decoding fails with UnexpectedEOF errors
k.extend_from_slice(&[0x00]); k.extend_from_slice(&[0x00]);
@ -50,7 +50,7 @@ pub fn versionstamped_key_suffix(tb: &str) -> Vec<u8> {
#[allow(unused)] #[allow(unused)]
pub fn prefix_ts(ns: &str, db: &str, vs: vs::Versionstamp) -> Vec<u8> { pub fn prefix_ts(ns: &str, db: &str, vs: vs::Versionstamp) -> Vec<u8> {
let mut k = crate::key::database::all::new(ns, db).encode().unwrap(); let mut k = crate::key::database::all::new(ns, db).encode().unwrap();
k.extend_from_slice(&[b'#']); k.extend_from_slice(b"#");
k.extend_from_slice(&vs); k.extend_from_slice(&vs);
k k
} }
@ -59,7 +59,7 @@ pub fn prefix_ts(ns: &str, db: &str, vs: vs::Versionstamp) -> Vec<u8> {
#[allow(unused)] #[allow(unused)]
pub fn prefix(ns: &str, db: &str) -> Vec<u8> { pub fn prefix(ns: &str, db: &str) -> Vec<u8> {
let mut k = crate::key::database::all::new(ns, db).encode().unwrap(); let mut k = crate::key::database::all::new(ns, db).encode().unwrap();
k.extend_from_slice(&[b'#']); k.extend_from_slice(b"#");
k k
} }

View file

@ -28,7 +28,7 @@ pub fn new<'a>(ns: &'a str, db: &'a str, ts: u64) -> Ts<'a> {
/// Returns the prefix for the whole database timestamps /// Returns the prefix for the whole database timestamps
pub fn prefix(ns: &str, db: &str) -> Vec<u8> { pub fn prefix(ns: &str, db: &str) -> Vec<u8> {
let mut k = crate::key::database::all::new(ns, db).encode().unwrap(); let mut k = crate::key::database::all::new(ns, db).encode().unwrap();
k.extend_from_slice(&[b'!', b't', b's']); k.extend_from_slice(b"!ts");
k k
} }

View file

@ -32,13 +32,13 @@ pub fn new<'a>(ns: &'a str, db: &'a str, tb: &'a str, lq: Uuid) -> Lq<'a> {
pub fn prefix(ns: &str, db: &str, tb: &str) -> Vec<u8> { pub fn prefix(ns: &str, db: &str, tb: &str) -> Vec<u8> {
let mut k = super::all::new(ns, db, tb).encode().unwrap(); let mut k = super::all::new(ns, db, tb).encode().unwrap();
k.extend_from_slice(&[b'!', b'l', b'q', 0x00]); k.extend_from_slice(b"!lq\0");
k k
} }
pub fn suffix(ns: &str, db: &str, tb: &str) -> Vec<u8> { pub fn suffix(ns: &str, db: &str, tb: &str) -> Vec<u8> {
let mut k = super::all::new(ns, db, tb).encode().unwrap(); let mut k = super::all::new(ns, db, tb).encode().unwrap();
k.extend_from_slice(&[b'!', b'l', b'q', 0xff]); k.extend_from_slice(b"!lq\xff");
k k
} }

View file

@ -70,7 +70,6 @@ bincode = "1.3.3"
channel = { version = "1.9.0", package = "async-channel" } channel = { version = "1.9.0", package = "async-channel" }
chrono = { version = "0.4.31", features = ["serde"] } chrono = { version = "0.4.31", features = ["serde"] }
dmp = "0.2.0" dmp = "0.2.0"
flume = "0.11.0"
futures = "0.3.29" futures = "0.3.29"
geo = { version = "0.27.0", features = ["use-serde"] } geo = { version = "0.27.0", features = ["use-serde"] }
indexmap = { version = "2.1.0", features = ["serde"] } indexmap = { version = "2.1.0", features = ["serde"] }

View file

@ -10,8 +10,8 @@ use crate::dbs::Notification;
use crate::sql::from_value; use crate::sql::from_value;
use crate::sql::Query; use crate::sql::Query;
use crate::sql::Value; use crate::sql::Value;
use flume::Receiver; use channel::Receiver;
use flume::Sender; use channel::Sender;
use serde::de::DeserializeOwned; use serde::de::DeserializeOwned;
use serde::Serialize; use serde::Serialize;
use std::collections::BTreeMap; use std::collections::BTreeMap;
@ -47,12 +47,12 @@ impl Router {
) -> BoxFuture<'_, Result<Receiver<Result<DbResponse>>>> { ) -> BoxFuture<'_, Result<Receiver<Result<DbResponse>>>> {
Box::pin(async move { Box::pin(async move {
let id = self.next_id(); let id = self.next_id();
let (sender, receiver) = flume::bounded(1); let (sender, receiver) = channel::bounded(1);
let route = Route { let route = Route {
request: (id, method, param), request: (id, method, param),
response: sender, response: sender,
}; };
self.sender.send_async(route).await?; self.sender.send(route).await?;
Ok(receiver) Ok(receiver)
}) })
} }
@ -63,7 +63,7 @@ impl Router {
receiver: Receiver<Result<DbResponse>>, receiver: Receiver<Result<DbResponse>>,
) -> BoxFuture<'_, Result<Value>> { ) -> BoxFuture<'_, Result<Value>> {
Box::pin(async move { Box::pin(async move {
let response = receiver.into_recv_async().await?; let response = receiver.recv().await?;
match response? { match response? {
DbResponse::Other(value) => Ok(value), DbResponse::Other(value) => Ok(value),
DbResponse::Query(..) => unreachable!(), DbResponse::Query(..) => unreachable!(),
@ -77,7 +77,7 @@ impl Router {
receiver: Receiver<Result<DbResponse>>, receiver: Receiver<Result<DbResponse>>,
) -> BoxFuture<'_, Result<Response>> { ) -> BoxFuture<'_, Result<Response>> {
Box::pin(async move { Box::pin(async move {
let response = receiver.into_recv_async().await?; let response = receiver.recv().await?;
match response? { match response? {
DbResponse::Query(results) => Ok(results), DbResponse::Query(results) => Ok(results),
DbResponse::Other(..) => unreachable!(), DbResponse::Other(..) => unreachable!(),

View file

@ -39,11 +39,11 @@ impl Connection for Any {
fn connect(address: Endpoint, capacity: usize) -> BoxFuture<'static, Result<Surreal<Self>>> { fn connect(address: Endpoint, capacity: usize) -> BoxFuture<'static, Result<Surreal<Self>>> {
Box::pin(async move { Box::pin(async move {
let (route_tx, route_rx) = match capacity { let (route_tx, route_rx) = match capacity {
0 => flume::unbounded(), 0 => channel::unbounded(),
capacity => flume::bounded(capacity), capacity => channel::bounded(capacity),
}; };
let (conn_tx, conn_rx) = flume::bounded::<Result<()>>(1); let (conn_tx, conn_rx) = channel::bounded::<Result<()>>(1);
let mut features = HashSet::new(); let mut features = HashSet::new();
match EndpointKind::from(address.url.scheme()) { match EndpointKind::from(address.url.scheme()) {
@ -53,7 +53,7 @@ impl Connection for Any {
features.insert(ExtraFeatures::Backup); features.insert(ExtraFeatures::Backup);
features.insert(ExtraFeatures::LiveQueries); features.insert(ExtraFeatures::LiveQueries);
tokio::spawn(engine::local::native::run_router(address, conn_tx, route_rx)); tokio::spawn(engine::local::native::run_router(address, conn_tx, route_rx));
conn_rx.into_recv_async().await?? conn_rx.recv().await??
} }
#[cfg(not(feature = "kv-fdb"))] #[cfg(not(feature = "kv-fdb"))]
@ -68,7 +68,7 @@ impl Connection for Any {
features.insert(ExtraFeatures::Backup); features.insert(ExtraFeatures::Backup);
features.insert(ExtraFeatures::LiveQueries); features.insert(ExtraFeatures::LiveQueries);
tokio::spawn(engine::local::native::run_router(address, conn_tx, route_rx)); tokio::spawn(engine::local::native::run_router(address, conn_tx, route_rx));
conn_rx.into_recv_async().await?? conn_rx.recv().await??
} }
#[cfg(not(feature = "kv-mem"))] #[cfg(not(feature = "kv-mem"))]
@ -83,7 +83,7 @@ impl Connection for Any {
features.insert(ExtraFeatures::Backup); features.insert(ExtraFeatures::Backup);
features.insert(ExtraFeatures::LiveQueries); features.insert(ExtraFeatures::LiveQueries);
tokio::spawn(engine::local::native::run_router(address, conn_tx, route_rx)); tokio::spawn(engine::local::native::run_router(address, conn_tx, route_rx));
conn_rx.into_recv_async().await?? conn_rx.recv().await??
} }
#[cfg(not(feature = "kv-rocksdb"))] #[cfg(not(feature = "kv-rocksdb"))]
@ -99,7 +99,7 @@ impl Connection for Any {
features.insert(ExtraFeatures::Backup); features.insert(ExtraFeatures::Backup);
features.insert(ExtraFeatures::LiveQueries); features.insert(ExtraFeatures::LiveQueries);
tokio::spawn(engine::local::native::run_router(address, conn_tx, route_rx)); tokio::spawn(engine::local::native::run_router(address, conn_tx, route_rx));
conn_rx.into_recv_async().await?? conn_rx.recv().await??
} }
#[cfg(not(feature = "kv-tikv"))] #[cfg(not(feature = "kv-tikv"))]
@ -114,7 +114,7 @@ impl Connection for Any {
features.insert(ExtraFeatures::Backup); features.insert(ExtraFeatures::Backup);
features.insert(ExtraFeatures::LiveQueries); features.insert(ExtraFeatures::LiveQueries);
tokio::spawn(engine::local::native::run_router(address, conn_tx, route_rx)); tokio::spawn(engine::local::native::run_router(address, conn_tx, route_rx));
conn_rx.into_recv_async().await?? conn_rx.recv().await??
} }
#[cfg(not(feature = "kv-surrealkv"))] #[cfg(not(feature = "kv-surrealkv"))]

View file

@ -26,11 +26,11 @@ impl Connection for Any {
fn connect(address: Endpoint, capacity: usize) -> BoxFuture<'static, Result<Surreal<Self>>> { fn connect(address: Endpoint, capacity: usize) -> BoxFuture<'static, Result<Surreal<Self>>> {
Box::pin(async move { Box::pin(async move {
let (route_tx, route_rx) = match capacity { let (route_tx, route_rx) = match capacity {
0 => flume::unbounded(), 0 => channel::unbounded(),
capacity => flume::bounded(capacity), capacity => channel::bounded(capacity),
}; };
let (conn_tx, conn_rx) = flume::bounded::<Result<()>>(1); let (conn_tx, conn_rx) = channel::bounded::<Result<()>>(1);
let mut features = HashSet::new(); let mut features = HashSet::new();
match EndpointKind::from(address.url.scheme()) { match EndpointKind::from(address.url.scheme()) {
@ -39,7 +39,7 @@ impl Connection for Any {
{ {
features.insert(ExtraFeatures::LiveQueries); features.insert(ExtraFeatures::LiveQueries);
spawn_local(engine::local::wasm::run_router(address, conn_tx, route_rx)); spawn_local(engine::local::wasm::run_router(address, conn_tx, route_rx));
conn_rx.into_recv_async().await??; conn_rx.recv().await??;
} }
#[cfg(not(feature = "kv-fdb"))] #[cfg(not(feature = "kv-fdb"))]
@ -53,7 +53,7 @@ impl Connection for Any {
{ {
features.insert(ExtraFeatures::LiveQueries); features.insert(ExtraFeatures::LiveQueries);
spawn_local(engine::local::wasm::run_router(address, conn_tx, route_rx)); spawn_local(engine::local::wasm::run_router(address, conn_tx, route_rx));
conn_rx.into_recv_async().await??; conn_rx.recv().await??;
} }
#[cfg(not(feature = "kv-indxdb"))] #[cfg(not(feature = "kv-indxdb"))]
@ -67,7 +67,7 @@ impl Connection for Any {
{ {
features.insert(ExtraFeatures::LiveQueries); features.insert(ExtraFeatures::LiveQueries);
spawn_local(engine::local::wasm::run_router(address, conn_tx, route_rx)); spawn_local(engine::local::wasm::run_router(address, conn_tx, route_rx));
conn_rx.into_recv_async().await??; conn_rx.recv().await??;
} }
#[cfg(not(feature = "kv-mem"))] #[cfg(not(feature = "kv-mem"))]
@ -81,7 +81,7 @@ impl Connection for Any {
{ {
features.insert(ExtraFeatures::LiveQueries); features.insert(ExtraFeatures::LiveQueries);
spawn_local(engine::local::wasm::run_router(address, conn_tx, route_rx)); spawn_local(engine::local::wasm::run_router(address, conn_tx, route_rx));
conn_rx.into_recv_async().await??; conn_rx.recv().await??;
} }
#[cfg(not(feature = "kv-rocksdb"))] #[cfg(not(feature = "kv-rocksdb"))]
@ -96,7 +96,7 @@ impl Connection for Any {
{ {
features.insert(ExtraFeatures::LiveQueries); features.insert(ExtraFeatures::LiveQueries);
spawn_local(engine::local::wasm::run_router(address, conn_tx, route_rx)); spawn_local(engine::local::wasm::run_router(address, conn_tx, route_rx));
conn_rx.into_recv_async().await??; conn_rx.recv().await??;
} }
#[cfg(not(feature = "kv-surrealkv"))] #[cfg(not(feature = "kv-surrealkv"))]
@ -111,7 +111,7 @@ impl Connection for Any {
{ {
features.insert(ExtraFeatures::LiveQueries); features.insert(ExtraFeatures::LiveQueries);
spawn_local(engine::local::wasm::run_router(address, conn_tx, route_rx)); spawn_local(engine::local::wasm::run_router(address, conn_tx, route_rx));
conn_rx.into_recv_async().await??; conn_rx.recv().await??;
} }
#[cfg(not(feature = "kv-tikv"))] #[cfg(not(feature = "kv-tikv"))]
@ -144,7 +144,7 @@ impl Connection for Any {
spawn_local(engine::remote::ws::wasm::run_router( spawn_local(engine::remote::ws::wasm::run_router(
endpoint, capacity, conn_tx, route_rx, endpoint, capacity, conn_tx, route_rx,
)); ));
conn_rx.into_recv_async().await??; conn_rx.recv().await??;
} }
#[cfg(not(feature = "protocol-ws"))] #[cfg(not(feature = "protocol-ws"))]

View file

@ -15,8 +15,8 @@ use crate::kvs::Datastore;
use crate::opt::auth::Root; use crate::opt::auth::Root;
use crate::opt::WaitFor; use crate::opt::WaitFor;
use crate::options::EngineOptions; use crate::options::EngineOptions;
use flume::Receiver; use channel::Receiver;
use flume::Sender; use channel::Sender;
use futures::stream::poll_fn; use futures::stream::poll_fn;
use futures::StreamExt; use futures::StreamExt;
use std::collections::BTreeMap; use std::collections::BTreeMap;
@ -34,15 +34,15 @@ impl Connection for Db {
fn connect(address: Endpoint, capacity: usize) -> BoxFuture<'static, Result<Surreal<Self>>> { fn connect(address: Endpoint, capacity: usize) -> BoxFuture<'static, Result<Surreal<Self>>> {
Box::pin(async move { Box::pin(async move {
let (route_tx, route_rx) = match capacity { let (route_tx, route_rx) = match capacity {
0 => flume::unbounded(), 0 => channel::unbounded(),
capacity => flume::bounded(capacity), capacity => channel::bounded(capacity),
}; };
let (conn_tx, conn_rx) = flume::bounded(1); let (conn_tx, conn_rx) = channel::bounded(1);
tokio::spawn(run_router(address, conn_tx, route_rx)); tokio::spawn(run_router(address, conn_tx, route_rx));
conn_rx.into_recv_async().await??; conn_rx.recv().await??;
let mut features = HashSet::new(); let mut features = HashSet::new();
features.insert(ExtraFeatures::Backup); features.insert(ExtraFeatures::Backup);
@ -81,21 +81,21 @@ pub(crate) async fn run_router(
let kvs = match Datastore::new(endpoint).await { let kvs = match Datastore::new(endpoint).await {
Ok(kvs) => { Ok(kvs) => {
if let Err(error) = kvs.bootstrap().await { if let Err(error) = kvs.bootstrap().await {
let _ = conn_tx.into_send_async(Err(error.into())).await; let _ = conn_tx.send(Err(error.into())).await;
return; return;
} }
// If a root user is specified, setup the initial datastore credentials // If a root user is specified, setup the initial datastore credentials
if let Some(root) = configured_root { if let Some(root) = configured_root {
if let Err(error) = kvs.setup_initial_creds(root.username, root.password).await { if let Err(error) = kvs.setup_initial_creds(root.username, root.password).await {
let _ = conn_tx.into_send_async(Err(error.into())).await; let _ = conn_tx.send(Err(error.into())).await;
return; return;
} }
} }
let _ = conn_tx.into_send_async(Ok(())).await; let _ = conn_tx.send(Ok(())).await;
kvs.with_auth_enabled(configured_root.is_some()) kvs.with_auth_enabled(configured_root.is_some())
} }
Err(error) => { Err(error) => {
let _ = conn_tx.into_send_async(Err(error.into())).await; let _ = conn_tx.send(Err(error.into())).await;
return; return;
} }
}; };
@ -142,22 +142,21 @@ pub(crate) async fn run_router(
// constantly polled. // constantly polled.
None => Poll::Pending, None => Poll::Pending,
}); });
let mut route_stream = route_rx.into_stream();
loop { loop {
tokio::select! { tokio::select! {
route = route_stream.next() => { route = route_rx.recv() => {
let Some(route) = route else { let Ok(route) = route else {
break break
}; };
match super::router(route.request, &kvs, &mut session, &mut vars, &mut live_queries) match super::router(route.request, &kvs, &mut session, &mut vars, &mut live_queries)
.await .await
{ {
Ok(value) => { Ok(value) => {
let _ = route.response.into_send_async(Ok(value)).await; let _ = route.response.send(Ok(value)).await;
} }
Err(error) => { Err(error) => {
let _ = route.response.into_send_async(Err(error)).await; let _ = route.response.send(Err(error)).await;
} }
} }
} }

View file

@ -16,8 +16,7 @@ use crate::kvs::Datastore;
use crate::opt::auth::Root; use crate::opt::auth::Root;
use crate::opt::WaitFor; use crate::opt::WaitFor;
use crate::options::EngineOptions; use crate::options::EngineOptions;
use flume::Receiver; use channel::{Receiver, Sender};
use flume::Sender;
use futures::stream::poll_fn; use futures::stream::poll_fn;
use futures::FutureExt; use futures::FutureExt;
use futures::StreamExt; use futures::StreamExt;
@ -37,15 +36,15 @@ impl Connection for Db {
fn connect(address: Endpoint, capacity: usize) -> BoxFuture<'static, Result<Surreal<Self>>> { fn connect(address: Endpoint, capacity: usize) -> BoxFuture<'static, Result<Surreal<Self>>> {
Box::pin(async move { Box::pin(async move {
let (route_tx, route_rx) = match capacity { let (route_tx, route_rx) = match capacity {
0 => flume::unbounded(), 0 => channel::unbounded(),
capacity => flume::bounded(capacity), capacity => channel::bounded(capacity),
}; };
let (conn_tx, conn_rx) = flume::bounded(1); let (conn_tx, conn_rx) = channel::bounded(1);
spawn_local(run_router(address, conn_tx, route_rx)); spawn_local(run_router(address, conn_tx, route_rx));
conn_rx.into_recv_async().await??; conn_rx.recv().await??;
let mut features = HashSet::new(); let mut features = HashSet::new();
features.insert(ExtraFeatures::LiveQueries); features.insert(ExtraFeatures::LiveQueries);
@ -78,21 +77,21 @@ pub(crate) async fn run_router(
let kvs = match Datastore::new(&address.path).await { let kvs = match Datastore::new(&address.path).await {
Ok(kvs) => { Ok(kvs) => {
if let Err(error) = kvs.bootstrap().await { if let Err(error) = kvs.bootstrap().await {
let _ = conn_tx.into_send_async(Err(error.into())).await; let _ = conn_tx.send(Err(error.into())).await;
return; return;
} }
// If a root user is specified, setup the initial datastore credentials // If a root user is specified, setup the initial datastore credentials
if let Some(root) = configured_root { if let Some(root) = configured_root {
if let Err(error) = kvs.setup_initial_creds(root.username, root.password).await { if let Err(error) = kvs.setup_initial_creds(root.username, root.password).await {
let _ = conn_tx.into_send_async(Err(error.into())).await; let _ = conn_tx.send(Err(error.into())).await;
return; return;
} }
} }
let _ = conn_tx.into_send_async(Ok(())).await; let _ = conn_tx.send(Ok(())).await;
kvs.with_auth_enabled(configured_root.is_some()) kvs.with_auth_enabled(configured_root.is_some())
} }
Err(error) => { Err(error) => {
let _ = conn_tx.into_send_async(Err(error.into())).await; let _ = conn_tx.send(Err(error.into())).await;
return; return;
} }
}; };
@ -123,13 +122,11 @@ pub(crate) async fn run_router(
None => Poll::Pending, None => Poll::Pending,
}); });
let mut route_stream = route_rx.into_stream();
loop { loop {
// use the less ergonomic futures::select as tokio::select is not available. // use the less ergonomic futures::select as tokio::select is not available.
futures::select! { futures::select! {
route = route_stream.next().fuse() => { route = route_rx.recv().fuse() => {
let Some(route) = route else { let Ok(route) = route else {
// termination requested // termination requested
break break
}; };
@ -144,10 +141,10 @@ pub(crate) async fn run_router(
.await .await
{ {
Ok(value) => { Ok(value) => {
let _ = route.response.into_send_async(Ok(value)).await; let _ = route.response.send(Ok(value)).await;
} }
Err(error) => { Err(error) => {
let _ = route.response.into_send_async(Err(error)).await; let _ = route.response.send(Err(error)).await;
} }
} }
} }

View file

@ -12,8 +12,7 @@ use crate::api::OnceLockExt;
use crate::api::Result; use crate::api::Result;
use crate::api::Surreal; use crate::api::Surreal;
use crate::opt::WaitFor; use crate::opt::WaitFor;
use flume::Receiver; use channel::Receiver;
use futures::StreamExt;
use indexmap::IndexMap; use indexmap::IndexMap;
use reqwest::header::HeaderMap; use reqwest::header::HeaderMap;
use reqwest::ClientBuilder; use reqwest::ClientBuilder;
@ -51,8 +50,8 @@ impl Connection for Client {
super::health(client.get(base_url.join(Method::Health.as_str())?)).await?; super::health(client.get(base_url.join(Method::Health.as_str())?)).await?;
let (route_tx, route_rx) = match capacity { let (route_tx, route_rx) = match capacity {
0 => flume::unbounded(), 0 => channel::unbounded(),
capacity => flume::bounded(capacity), capacity => channel::bounded(capacity),
}; };
tokio::spawn(run_router(base_url, client, route_rx)); tokio::spawn(run_router(base_url, client, route_rx));
@ -76,12 +75,11 @@ pub(crate) async fn run_router(base_url: Url, client: reqwest::Client, route_rx:
let mut headers = HeaderMap::new(); let mut headers = HeaderMap::new();
let mut vars = IndexMap::new(); let mut vars = IndexMap::new();
let mut auth = None; let mut auth = None;
let mut stream = route_rx.into_stream();
while let Some(route) = stream.next().await { while let Ok(route) = route_rx.recv().await {
let result = let result =
super::router(route.request, &base_url, &client, &mut headers, &mut vars, &mut auth) super::router(route.request, &base_url, &client, &mut headers, &mut vars, &mut auth)
.await; .await;
let _ = route.response.into_send_async(result).await; let _ = route.response.send(result).await;
} }
} }

View file

@ -9,9 +9,7 @@ use crate::api::OnceLockExt;
use crate::api::Result; use crate::api::Result;
use crate::api::Surreal; use crate::api::Surreal;
use crate::opt::WaitFor; use crate::opt::WaitFor;
use flume::Receiver; use channel::{Receiver, Sender};
use flume::Sender;
use futures::StreamExt;
use indexmap::IndexMap; use indexmap::IndexMap;
use reqwest::header::HeaderMap; use reqwest::header::HeaderMap;
use reqwest::ClientBuilder; use reqwest::ClientBuilder;
@ -29,15 +27,15 @@ impl Connection for Client {
fn connect(address: Endpoint, capacity: usize) -> BoxFuture<'static, Result<Surreal<Self>>> { fn connect(address: Endpoint, capacity: usize) -> BoxFuture<'static, Result<Surreal<Self>>> {
Box::pin(async move { Box::pin(async move {
let (route_tx, route_rx) = match capacity { let (route_tx, route_rx) = match capacity {
0 => flume::unbounded(), 0 => channel::unbounded(),
capacity => flume::bounded(capacity), capacity => channel::bounded(capacity),
}; };
let (conn_tx, conn_rx) = flume::bounded(1); let (conn_tx, conn_rx) = channel::bounded(1);
spawn_local(run_router(address, conn_tx, route_rx)); spawn_local(run_router(address, conn_tx, route_rx));
conn_rx.into_recv_async().await??; conn_rx.recv().await??;
Ok(Surreal::new_from_router_waiter( Ok(Surreal::new_from_router_waiter(
Arc::new(OnceLock::with_value(Router { Arc::new(OnceLock::with_value(Router {
@ -69,11 +67,11 @@ pub(crate) async fn run_router(
let client = match client(&base_url).await { let client = match client(&base_url).await {
Ok(client) => { Ok(client) => {
let _ = conn_tx.into_send_async(Ok(())).await; let _ = conn_tx.send(Ok(())).await;
client client
} }
Err(error) => { Err(error) => {
let _ = conn_tx.into_send_async(Err(error)).await; let _ = conn_tx.send(Err(error)).await;
return; return;
} }
}; };
@ -81,17 +79,16 @@ pub(crate) async fn run_router(
let mut headers = HeaderMap::new(); let mut headers = HeaderMap::new();
let mut vars = IndexMap::new(); let mut vars = IndexMap::new();
let mut auth = None; let mut auth = None;
let mut stream = route_rx.into_stream();
while let Some(route) = stream.next().await { while let Ok(route) = route_rx.recv().await {
match super::router(route.request, &base_url, &client, &mut headers, &mut vars, &mut auth) match super::router(route.request, &base_url, &client, &mut headers, &mut vars, &mut auth)
.await .await
{ {
Ok(value) => { Ok(value) => {
let _ = route.response.into_send_async(Ok(value)).await; let _ = route.response.send(Ok(value)).await;
} }
Err(error) => { Err(error) => {
let _ = route.response.into_send_async(Err(error)).await; let _ = route.response.send(Err(error)).await;
} }
} }
} }

View file

@ -21,7 +21,7 @@ use crate::method::Stats;
use crate::opt::IntoEndpoint; use crate::opt::IntoEndpoint;
use crate::sql::Value; use crate::sql::Value;
use bincode::Options as _; use bincode::Options as _;
use flume::Sender; use channel::Sender;
use indexmap::IndexMap; use indexmap::IndexMap;
use revision::revisioned; use revision::revisioned;
use revision::Revisioned; use revision::Revisioned;

View file

@ -23,7 +23,7 @@ use crate::engine::remote::ws::Data;
use crate::engine::IntervalStream; use crate::engine::IntervalStream;
use crate::opt::WaitFor; use crate::opt::WaitFor;
use crate::sql::Value; use crate::sql::Value;
use flume::Receiver; use channel::Receiver;
use futures::stream::{SplitSink, SplitStream}; use futures::stream::{SplitSink, SplitStream};
use futures::SinkExt; use futures::SinkExt;
use futures::StreamExt; use futures::StreamExt;
@ -124,8 +124,8 @@ impl Connection for Client {
let socket = connect(&address, Some(config), maybe_connector.clone()).await?; let socket = connect(&address, Some(config), maybe_connector.clone()).await?;
let (route_tx, route_rx) = match capacity { let (route_tx, route_rx) = match capacity {
0 => flume::unbounded(), 0 => channel::unbounded(),
capacity => flume::bounded(capacity), capacity => channel::bounded(capacity),
}; };
tokio::spawn(run_router(address, maybe_connector, capacity, config, socket, route_rx)); tokio::spawn(run_router(address, maybe_connector, capacity, config, socket, route_rx));
@ -177,7 +177,7 @@ async fn router_handle_route(
state.live_queries.insert(id.0, sender); state.live_queries.insert(id.0, sender);
} }
} }
if response.clone().into_send_async(Ok(DbResponse::Other(Value::None))).await.is_err() { if response.clone().send(Ok(DbResponse::Other(Value::None))).await.is_err() {
trace!("Receiver dropped"); trace!("Receiver dropped");
} }
// There is nothing to send to the server here // There is nothing to send to the server here
@ -222,7 +222,7 @@ async fn router_handle_route(
} }
Entry::Occupied(..) => { Entry::Occupied(..) => {
let error = Error::DuplicateRequestId(id); let error = Error::DuplicateRequestId(id);
if response.into_send_async(Err(error.into())).await.is_err() { if response.send(Err(error.into())).await.is_err() {
trace!("Receiver dropped"); trace!("Receiver dropped");
} }
} }
@ -230,7 +230,7 @@ async fn router_handle_route(
} }
Err(error) => { Err(error) => {
let error = Error::Ws(error.to_string()); let error = Error::Ws(error.to_string());
if response.into_send_async(Err(error.into())).await.is_err() { if response.send(Err(error.into())).await.is_err() {
trace!("Receiver dropped"); trace!("Receiver dropped");
} }
return HandleResult::Disconnected; return HandleResult::Disconnected;
@ -270,7 +270,7 @@ async fn router_handle_response(
} }
} }
} }
let _res = sender.into_send_async(DbResponse::from(response)).await; let _res = sender.send(DbResponse::from(response)).await;
} }
} }
} }
@ -327,7 +327,7 @@ async fn router_handle_response(
// Return an error if an ID was returned // Return an error if an ID was returned
if let Some(Ok(id)) = id.map(Value::coerce_to_i64) { if let Some(Ok(id)) = id.map(Value::coerce_to_i64) {
if let Some((_method, sender)) = state.routes.remove(&id) { if let Some((_method, sender)) = state.routes.remove(&id) {
let _res = sender.into_send_async(Err(error)).await; let _res = sender.send(Err(error)).await;
} }
} }
} else { } else {
@ -405,7 +405,6 @@ pub(crate) async fn run_router(
let (socket_sink, socket_stream) = socket.split(); let (socket_sink, socket_stream) = socket.split();
let mut state = RouterState::new(socket_sink, socket_stream); let mut state = RouterState::new(socket_sink, socket_stream);
let mut route_stream = route_rx.into_stream();
'router: loop { 'router: loop {
let mut interval = time::interval(PING_INTERVAL); let mut interval = time::interval(PING_INTERVAL);
@ -423,11 +422,11 @@ pub(crate) async fn run_router(
loop { loop {
tokio::select! { tokio::select! {
route = route_stream.next() => { route = route_rx.recv() => {
// handle incoming route // handle incoming route
let Some(response) = route else { let Ok(response) = route else {
// route returned none, frontend dropped the channel, meaning the router // route returned Err, frontend dropped the channel, meaning the router
// should quit. // should quit.
match state.sink.send(Message::Close(None)).await { match state.sink.send(Message::Close(None)).await {
Ok(..) => trace!("Connection closed successfully"), Ok(..) => trace!("Connection closed successfully"),

View file

@ -20,8 +20,7 @@ use crate::engine::remote::ws::{Data, RouterRequest};
use crate::engine::IntervalStream; use crate::engine::IntervalStream;
use crate::opt::WaitFor; use crate::opt::WaitFor;
use crate::sql::Value; use crate::sql::Value;
use flume::Receiver; use channel::{Receiver, Sender};
use flume::Sender;
use futures::stream::{SplitSink, SplitStream}; use futures::stream::{SplitSink, SplitStream};
use futures::FutureExt; use futures::FutureExt;
use futures::SinkExt; use futures::SinkExt;
@ -64,15 +63,15 @@ impl Connection for Client {
address.url = address.url.join(PATH)?; address.url = address.url.join(PATH)?;
let (route_tx, route_rx) = match capacity { let (route_tx, route_rx) = match capacity {
0 => flume::unbounded(), 0 => channel::unbounded(),
capacity => flume::bounded(capacity), capacity => channel::bounded(capacity),
}; };
let (conn_tx, conn_rx) = flume::bounded(1); let (conn_tx, conn_rx) = channel::bounded(1);
spawn_local(run_router(address, capacity, conn_tx, route_rx)); spawn_local(run_router(address, capacity, conn_tx, route_rx));
conn_rx.into_recv_async().await??; conn_rx.recv().await??;
let mut features = HashSet::new(); let mut features = HashSet::new();
features.insert(ExtraFeatures::LiveQueries); features.insert(ExtraFeatures::LiveQueries);
@ -121,7 +120,7 @@ async fn router_handle_request(
state.live_queries.insert(id.0, sender); state.live_queries.insert(id.0, sender);
} }
} }
if response.into_send_async(Ok(DbResponse::Other(Value::None))).await.is_err() { if response.send(Ok(DbResponse::Other(Value::None))).await.is_err() {
trace!("Receiver dropped"); trace!("Receiver dropped");
} }
// There is nothing to send to the server here // There is nothing to send to the server here
@ -165,7 +164,7 @@ async fn router_handle_request(
} }
Entry::Occupied(..) => { Entry::Occupied(..) => {
let error = Error::DuplicateRequestId(id); let error = Error::DuplicateRequestId(id);
if response.into_send_async(Err(error.into())).await.is_err() { if response.send(Err(error.into())).await.is_err() {
trace!("Receiver dropped"); trace!("Receiver dropped");
} }
} }
@ -173,7 +172,7 @@ async fn router_handle_request(
} }
Err(error) => { Err(error) => {
let error = Error::Ws(error.to_string()); let error = Error::Ws(error.to_string());
if response.into_send_async(Err(error.into())).await.is_err() { if response.send(Err(error.into())).await.is_err() {
trace!("Receiver dropped"); trace!("Receiver dropped");
} }
return HandleResult::Disconnected; return HandleResult::Disconnected;
@ -213,7 +212,7 @@ async fn router_handle_response(
} }
} }
} }
let _res = sender.into_send_async(DbResponse::from(response)).await; let _res = sender.send(DbResponse::from(response)).await;
} }
} }
} }
@ -267,7 +266,7 @@ async fn router_handle_response(
// Return an error if an ID was returned // Return an error if an ID was returned
if let Some(Ok(id)) = id.map(Value::coerce_to_i64) { if let Some(Ok(id)) = id.map(Value::coerce_to_i64) {
if let Some((_method, sender)) = state.routes.remove(&id) { if let Some((_method, sender)) = state.routes.remove(&id) {
let _res = sender.into_send_async(Err(error)).await; let _res = sender.send(Err(error)).await;
} }
} }
} else { } else {
@ -356,7 +355,7 @@ pub(crate) async fn run_router(
let (mut ws, socket) = match connect { let (mut ws, socket) = match connect {
Ok(pair) => pair, Ok(pair) => pair,
Err(error) => { Err(error) => {
let _ = conn_tx.into_send_async(Err(error.into())).await; let _ = conn_tx.send(Err(error.into())).await;
return; return;
} }
}; };
@ -369,13 +368,13 @@ pub(crate) async fn run_router(
match result { match result {
Ok(events) => events, Ok(events) => events,
Err(error) => { Err(error) => {
let _ = conn_tx.into_send_async(Err(error.into())).await; let _ = conn_tx.send(Err(error.into())).await;
return; return;
} }
} }
}; };
let _ = conn_tx.into_send_async(Ok(())).await; let _ = conn_tx.send(Ok(())).await;
let ping = { let ping = {
let mut request = BTreeMap::new(); let mut request = BTreeMap::new();
@ -389,8 +388,6 @@ pub(crate) async fn run_router(
let mut state = RouterState::new(socket_sink, socket_stream); let mut state = RouterState::new(socket_sink, socket_stream);
let mut route_stream = route_rx.into_stream();
'router: loop { 'router: loop {
let mut interval = time::interval(PING_INTERVAL); let mut interval = time::interval(PING_INTERVAL);
// don't bombard the server with pings if we miss some ticks // don't bombard the server with pings if we miss some ticks
@ -404,8 +401,8 @@ pub(crate) async fn run_router(
loop { loop {
futures::select! { futures::select! {
route = route_stream.next() => { route = route_rx.recv().fuse() => {
let Some(route) = route else { let Ok(route) = route else {
match ws.close().await { match ws.close().await {
Ok(..) => trace!("Connection closed successfully"), Ok(..) => trace!("Connection closed successfully"),
Err(error) => { Err(error) => {

View file

@ -242,14 +242,14 @@ impl From<tokio_tungstenite::tungstenite::Error> for crate::Error {
} }
} }
impl<T> From<flume::SendError<T>> for crate::Error { impl<T> From<channel::SendError<T>> for crate::Error {
fn from(error: flume::SendError<T>) -> Self { fn from(error: channel::SendError<T>) -> Self {
Self::Api(Error::InternalError(error.to_string())) Self::Api(Error::InternalError(error.to_string()))
} }
} }
impl From<flume::RecvError> for crate::Error { impl From<channel::RecvError> for crate::Error {
fn from(error: flume::RecvError) -> Self { fn from(error: channel::RecvError) -> Self {
Self::Api(Error::InternalError(error.to_string())) Self::Api(Error::InternalError(error.to_string()))
} }
} }

View file

@ -1,7 +1,6 @@
use crate::api::method::BoxFuture;
use crate::api::conn::Method; use crate::api::conn::Method;
use crate::api::conn::Param; use crate::api::conn::Param;
use crate::api::method::BoxFuture;
use crate::api::method::OnceLockExt; use crate::api::method::OnceLockExt;
use crate::api::opt::auth::Jwt; use crate::api::opt::auth::Jwt;
use crate::api::Connection; use crate::api::Connection;

View file

@ -1,5 +1,4 @@
use crate::api::method::BoxFuture; use crate::api::method::BoxFuture;
use crate::api::method::Cancel; use crate::api::method::Cancel;
use crate::api::method::Commit; use crate::api::method::Commit;
use crate::api::Connection; use crate::api::Connection;

View file

@ -1,5 +1,4 @@
use crate::api::method::BoxFuture; use crate::api::method::BoxFuture;
use crate::api::Connection; use crate::api::Connection;
use crate::api::Result; use crate::api::Result;
use crate::api::Surreal; use crate::api::Surreal;

View file

@ -1,5 +1,4 @@
use crate::api::method::BoxFuture; use crate::api::method::BoxFuture;
use crate::api::Connection; use crate::api::Connection;
use crate::api::Result; use crate::api::Result;
use crate::api::Surreal; use crate::api::Surreal;

View file

@ -1,7 +1,6 @@
use crate::api::method::BoxFuture;
use crate::api::conn::Method; use crate::api::conn::Method;
use crate::api::conn::Param; use crate::api::conn::Param;
use crate::api::method::BoxFuture;
use crate::api::Connection; use crate::api::Connection;
use crate::api::Result; use crate::api::Result;
use crate::method::OnceLockExt; use crate::method::OnceLockExt;

View file

@ -1,8 +1,7 @@
use crate::api::method::BoxFuture;
use crate::api::conn::Method; use crate::api::conn::Method;
use crate::api::conn::MlConfig; use crate::api::conn::MlConfig;
use crate::api::conn::Param; use crate::api::conn::Param;
use crate::api::method::BoxFuture;
use crate::api::Connection; use crate::api::Connection;
use crate::api::Error; use crate::api::Error;
use crate::api::ExtraFeatures; use crate::api::ExtraFeatures;

View file

@ -1,7 +1,6 @@
use crate::api::method::BoxFuture;
use crate::api::conn::Method; use crate::api::conn::Method;
use crate::api::conn::Param; use crate::api::conn::Param;
use crate::api::method::BoxFuture;
use crate::api::Connection; use crate::api::Connection;
use crate::api::Result; use crate::api::Result;
use crate::method::OnceLockExt; use crate::method::OnceLockExt;

View file

@ -1,7 +1,6 @@
use crate::api::method::BoxFuture;
use crate::api::conn::Method; use crate::api::conn::Method;
use crate::api::conn::Param; use crate::api::conn::Param;
use crate::api::method::BoxFuture;
use crate::api::Connection; use crate::api::Connection;
use crate::api::Result; use crate::api::Result;
use crate::method::OnceLockExt; use crate::method::OnceLockExt;

View file

@ -52,7 +52,7 @@ impl crate::api::Connection for Client {}
impl Connection for Client { impl Connection for Client {
fn connect(_address: Endpoint, capacity: usize) -> BoxFuture<'static, Result<Surreal<Self>>> { fn connect(_address: Endpoint, capacity: usize) -> BoxFuture<'static, Result<Surreal<Self>>> {
Box::pin(async move { Box::pin(async move {
let (route_tx, route_rx) = flume::bounded(capacity); let (route_tx, route_rx) = channel::bounded(capacity);
let mut features = HashSet::new(); let mut features = HashSet::new();
features.insert(ExtraFeatures::Backup); features.insert(ExtraFeatures::Backup);
let router = Router { let router = Router {

View file

@ -1,3 +1,5 @@
use channel::Receiver;
use super::types::User; use super::types::User;
use crate::api::conn::DbResponse; use crate::api::conn::DbResponse;
use crate::api::conn::Method; use crate::api::conn::Method;
@ -5,17 +7,13 @@ use crate::api::conn::Route;
use crate::api::Response as QueryResponse; use crate::api::Response as QueryResponse;
use crate::sql::to_value; use crate::sql::to_value;
use crate::sql::Value; use crate::sql::Value;
use flume::Receiver;
use futures::StreamExt;
pub(super) fn mock(route_rx: Receiver<Route>) { pub(super) fn mock(route_rx: Receiver<Route>) {
tokio::spawn(async move { tokio::spawn(async move {
let mut stream = route_rx.into_stream(); while let Ok(Route {
while let Some(Route {
request, request,
response, response,
}) = stream.next().await }) = route_rx.recv().await
{ {
let (_, method, param) = request; let (_, method, param) = request;
let mut params = param.other; let mut params = param.other;
@ -94,7 +92,7 @@ pub(super) fn mock(route_rx: Receiver<Route>) {
}, },
}; };
if let Err(message) = response.into_send_async(result).await { if let Err(message) = response.send(result).await {
panic!("message dropped; {message:?}"); panic!("message dropped; {message:?}");
} }
} }

View file

@ -1,7 +1,6 @@
use crate::api::method::BoxFuture;
use crate::api::conn::Method; use crate::api::conn::Method;
use crate::api::conn::Param; use crate::api::conn::Param;
use crate::api::method::BoxFuture;
use crate::api::Connection; use crate::api::Connection;
use crate::api::Result; use crate::api::Result;
use crate::method::OnceLockExt; use crate::method::OnceLockExt;

View file

@ -1,7 +1,6 @@
use crate::api::method::BoxFuture;
use crate::api::conn::Method; use crate::api::conn::Method;
use crate::api::conn::Param; use crate::api::conn::Param;
use crate::api::method::BoxFuture;
use crate::api::Connection; use crate::api::Connection;
use crate::api::Result; use crate::api::Result;
use crate::method::OnceLockExt; use crate::method::OnceLockExt;

View file

@ -1,7 +1,6 @@
use crate::api::method::BoxFuture;
use crate::api::conn::Method; use crate::api::conn::Method;
use crate::api::conn::Param; use crate::api::conn::Param;
use crate::api::method::BoxFuture;
use crate::api::method::UseDb; use crate::api::method::UseDb;
use crate::api::Connection; use crate::api::Connection;
use crate::api::Result; use crate::api::Result;

View file

@ -1,8 +1,7 @@
use crate::api::method::BoxFuture;
use crate::api::conn::Method; use crate::api::conn::Method;
use crate::api::conn::Param; use crate::api::conn::Param;
use crate::api::err::Error; use crate::api::err::Error;
use crate::api::method::BoxFuture;
use crate::api::Connection; use crate::api::Connection;
use crate::api::Result; use crate::api::Result;
use crate::method::OnceLockExt; use crate::method::OnceLockExt;

View file

@ -9,7 +9,6 @@ use surrealdb::method::QueryStream;
use surrealdb::Action; use surrealdb::Action;
use surrealdb::Notification; use surrealdb::Notification;
use surrealdb_core::sql::Object; use surrealdb_core::sql::Object;
use tokio::sync::mpsc::{channel, Receiver, Sender};
use tokio::sync::RwLock; use tokio::sync::RwLock;
use tracing::info; use tracing::info;
@ -429,17 +428,16 @@ async fn receive_all_pending_notifications<
stream: Arc<RwLock<S>>, stream: Arc<RwLock<S>>,
timeout: Duration, timeout: Duration,
) -> Vec<Notification<I>> { ) -> Vec<Notification<I>> {
let (send, mut recv) = channel::<Notification<I>>(MAX_NOTIFICATIONS); let mut results = Vec::new();
let we_expect_timeout = tokio::time::timeout(timeout, async move { let we_expect_timeout = tokio::time::timeout(timeout, async {
while let Some(notification) = stream.write().await.next().await { while let Some(notification) = stream.write().await.next().await {
send.send(notification.unwrap()).await.unwrap(); if results.len() >= MAX_NOTIFICATIONS {
panic!("too many notification!")
}
results.push(notification.unwrap())
} }
}) })
.await; .await;
assert!(we_expect_timeout.is_err()); assert!(we_expect_timeout.is_err());
let mut results = Vec::new();
while let Ok(notification) = recv.try_recv() {
results.push(notification);
}
results results
} }