diff --git a/lib/src/api/engine/local/native.rs b/lib/src/api/engine/local/native.rs index 9ded5208..fdabd4a5 100644 --- a/lib/src/api/engine/local/native.rs +++ b/lib/src/api/engine/local/native.rs @@ -219,8 +219,8 @@ pub(crate) fn router( // Stop maintenance tasks for chan in task_chans { - if let Err(e) = chan.send(()) { - error!("Error sending shutdown signal to task: {}", e); + if let Err(_empty_tuple) = chan.send(()) { + error!("Error sending shutdown signal to task"); } } tasks.resolve().await.unwrap(); diff --git a/lib/src/api/engine/local/wasm.rs b/lib/src/api/engine/local/wasm.rs index 2dfd4983..70d134b9 100644 --- a/lib/src/api/engine/local/wasm.rs +++ b/lib/src/api/engine/local/wasm.rs @@ -198,8 +198,8 @@ pub(crate) fn router( // Stop maintenance tasks for chan in task_chans { - if let Err(e) = chan.send(()) { - error!("Error sending shutdown signal to maintenance task: {e}"); + if let Err(_empty_tuple) = chan.send(()) { + error!("Error sending shutdown signal to maintenance task"); } } }); diff --git a/lib/src/api/engine/tasks.rs b/lib/src/api/engine/tasks.rs index 13a223a7..f5365506 100644 --- a/lib/src/api/engine/tasks.rs +++ b/lib/src/api/engine/tasks.rs @@ -1,5 +1,4 @@ -use flume::Sender; -use futures::StreamExt; +use futures::{FutureExt, StreamExt}; use futures_concurrency::stream::Merge; use reblessive::TreeStack; #[cfg(target_arch = "wasm32")] @@ -20,6 +19,7 @@ use crate::engine::IntervalStream; use crate::Error as RootError; #[cfg(not(target_arch = "wasm32"))] use tokio::spawn as spawn_future; +use tokio::sync::oneshot; #[cfg(target_arch = "wasm32")] use wasm_bindgen_futures::spawn_local as spawn_future; @@ -63,7 +63,7 @@ impl Tasks { } /// Starts tasks that are required for the correct running of the engine -pub fn start_tasks(opt: &EngineOptions, dbs: Arc) -> (Tasks, [Sender<()>; 2]) { +pub fn start_tasks(opt: &EngineOptions, dbs: Arc) -> (Tasks, [oneshot::Sender<()>; 2]) { let nd = init(opt, dbs.clone()); let lq = live_query_change_feed(opt, dbs); let cancellation_channels = [nd.1, lq.1]; @@ -82,7 +82,7 @@ pub fn start_tasks(opt: &EngineOptions, dbs: Arc) -> (Tasks, [Sender< // // This function needs to be called before after the dbs::init and before the net::init functions. // It needs to be before net::init because the net::init function blocks until the web server stops. -fn init(opt: &EngineOptions, dbs: Arc) -> (FutureTask, Sender<()>) { +fn init(opt: &EngineOptions, dbs: Arc) -> (FutureTask, oneshot::Sender<()>) { let _init = crate::dbs::LoggingLifecycle::new("node agent initialisation".to_string()); let tick_interval = opt.tick_interval; @@ -93,7 +93,7 @@ fn init(opt: &EngineOptions, dbs: Arc) -> (FutureTask, Sender<()>) { let ret_status = completed_status.clone(); // We create a channel that can be streamed that will indicate termination - let (tx, rx) = flume::bounded(1); + let (tx, rx) = oneshot::channel(); let _fut = spawn_future(async move { let _lifecycle = crate::dbs::LoggingLifecycle::new("heartbeat task".to_string()); @@ -124,7 +124,10 @@ fn init(opt: &EngineOptions, dbs: Arc) -> (FutureTask, Sender<()>) { } // Start live query on change feeds notification processing -fn live_query_change_feed(opt: &EngineOptions, dbs: Arc) -> (FutureTask, Sender<()>) { +fn live_query_change_feed( + opt: &EngineOptions, + dbs: Arc, +) -> (FutureTask, oneshot::Sender<()>) { let tick_interval = opt.tick_interval; #[cfg(target_arch = "wasm32")] @@ -133,7 +136,7 @@ fn live_query_change_feed(opt: &EngineOptions, dbs: Arc) -> (FutureTa let ret_status = completed_status.clone(); // We create a channel that can be streamed that will indicate termination - let (tx, rx) = flume::bounded(1); + let (tx, rx) = oneshot::channel(); let _fut = spawn_future(async move { let mut stack = TreeStack::new(); @@ -193,6 +196,7 @@ mod test { use crate::kvs::Datastore; use crate::options::EngineOptions; use std::sync::Arc; + use std::time::Duration; #[test_log::test(tokio::test)] pub async fn tasks_complete() { @@ -204,4 +208,20 @@ mod test { } val.resolve().await.unwrap(); } + + #[test_log::test(tokio::test)] + pub async fn tasks_complete_channel_closed() { + let opt = EngineOptions::default(); + let dbs = Arc::new(Datastore::new("memory").await.unwrap()); + let val = { + let (val, _chans) = start_tasks(&opt, dbs.clone()); + val + }; + tokio::time::timeout(Duration::from_secs(10), val.resolve()) + .await + .map_err(|e| format!("Timed out after {e}")) + .unwrap() + .map_err(|e| format!("Resolution failed: {e}")) + .unwrap(); + } } diff --git a/src/cli/start.rs b/src/cli/start.rs index fd42dde1..bd0c2fd4 100644 --- a/src/cli/start.rs +++ b/src/cli/start.rs @@ -196,8 +196,8 @@ pub async fn init( net::init(ct.clone()).await?; // Shutdown and stop closed tasks task_chans.into_iter().for_each(|chan| { - if let Err(e) = chan.send(()) { - error!("Failed to send shutdown signal to task: {}", e); + if let Err(_empty_tuple) = chan.send(()) { + error!("Failed to send shutdown signal to task"); } }); ct.cancel();