Gh 3906 fix incorrect shutdown (#4295)

This commit is contained in:
Przemyslaw Hugh Kaznowski 2024-07-03 16:06:44 +01:00 committed by GitHub
parent 7965c0031f
commit 85253812f7
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 33 additions and 13 deletions

View file

@ -219,8 +219,8 @@ pub(crate) fn router(
// Stop maintenance tasks // Stop maintenance tasks
for chan in task_chans { for chan in task_chans {
if let Err(e) = chan.send(()) { if let Err(_empty_tuple) = chan.send(()) {
error!("Error sending shutdown signal to task: {}", e); error!("Error sending shutdown signal to task");
} }
} }
tasks.resolve().await.unwrap(); tasks.resolve().await.unwrap();

View file

@ -198,8 +198,8 @@ pub(crate) fn router(
// Stop maintenance tasks // Stop maintenance tasks
for chan in task_chans { for chan in task_chans {
if let Err(e) = chan.send(()) { if let Err(_empty_tuple) = chan.send(()) {
error!("Error sending shutdown signal to maintenance task: {e}"); error!("Error sending shutdown signal to maintenance task");
} }
} }
}); });

View file

@ -1,5 +1,4 @@
use flume::Sender; use futures::{FutureExt, StreamExt};
use futures::StreamExt;
use futures_concurrency::stream::Merge; use futures_concurrency::stream::Merge;
use reblessive::TreeStack; use reblessive::TreeStack;
#[cfg(target_arch = "wasm32")] #[cfg(target_arch = "wasm32")]
@ -20,6 +19,7 @@ use crate::engine::IntervalStream;
use crate::Error as RootError; use crate::Error as RootError;
#[cfg(not(target_arch = "wasm32"))] #[cfg(not(target_arch = "wasm32"))]
use tokio::spawn as spawn_future; use tokio::spawn as spawn_future;
use tokio::sync::oneshot;
#[cfg(target_arch = "wasm32")] #[cfg(target_arch = "wasm32")]
use wasm_bindgen_futures::spawn_local as spawn_future; use wasm_bindgen_futures::spawn_local as spawn_future;
@ -63,7 +63,7 @@ impl Tasks {
} }
/// Starts tasks that are required for the correct running of the engine /// Starts tasks that are required for the correct running of the engine
pub fn start_tasks(opt: &EngineOptions, dbs: Arc<Datastore>) -> (Tasks, [Sender<()>; 2]) { pub fn start_tasks(opt: &EngineOptions, dbs: Arc<Datastore>) -> (Tasks, [oneshot::Sender<()>; 2]) {
let nd = init(opt, dbs.clone()); let nd = init(opt, dbs.clone());
let lq = live_query_change_feed(opt, dbs); let lq = live_query_change_feed(opt, dbs);
let cancellation_channels = [nd.1, lq.1]; let cancellation_channels = [nd.1, lq.1];
@ -82,7 +82,7 @@ pub fn start_tasks(opt: &EngineOptions, dbs: Arc<Datastore>) -> (Tasks, [Sender<
// //
// This function needs to be called before after the dbs::init and before the net::init functions. // This function needs to be called before after the dbs::init and before the net::init functions.
// It needs to be before net::init because the net::init function blocks until the web server stops. // It needs to be before net::init because the net::init function blocks until the web server stops.
fn init(opt: &EngineOptions, dbs: Arc<Datastore>) -> (FutureTask, Sender<()>) { fn init(opt: &EngineOptions, dbs: Arc<Datastore>) -> (FutureTask, oneshot::Sender<()>) {
let _init = crate::dbs::LoggingLifecycle::new("node agent initialisation".to_string()); let _init = crate::dbs::LoggingLifecycle::new("node agent initialisation".to_string());
let tick_interval = opt.tick_interval; let tick_interval = opt.tick_interval;
@ -93,7 +93,7 @@ fn init(opt: &EngineOptions, dbs: Arc<Datastore>) -> (FutureTask, Sender<()>) {
let ret_status = completed_status.clone(); let ret_status = completed_status.clone();
// We create a channel that can be streamed that will indicate termination // We create a channel that can be streamed that will indicate termination
let (tx, rx) = flume::bounded(1); let (tx, rx) = oneshot::channel();
let _fut = spawn_future(async move { let _fut = spawn_future(async move {
let _lifecycle = crate::dbs::LoggingLifecycle::new("heartbeat task".to_string()); let _lifecycle = crate::dbs::LoggingLifecycle::new("heartbeat task".to_string());
@ -124,7 +124,10 @@ fn init(opt: &EngineOptions, dbs: Arc<Datastore>) -> (FutureTask, Sender<()>) {
} }
// Start live query on change feeds notification processing // Start live query on change feeds notification processing
fn live_query_change_feed(opt: &EngineOptions, dbs: Arc<Datastore>) -> (FutureTask, Sender<()>) { fn live_query_change_feed(
opt: &EngineOptions,
dbs: Arc<Datastore>,
) -> (FutureTask, oneshot::Sender<()>) {
let tick_interval = opt.tick_interval; let tick_interval = opt.tick_interval;
#[cfg(target_arch = "wasm32")] #[cfg(target_arch = "wasm32")]
@ -133,7 +136,7 @@ fn live_query_change_feed(opt: &EngineOptions, dbs: Arc<Datastore>) -> (FutureTa
let ret_status = completed_status.clone(); let ret_status = completed_status.clone();
// We create a channel that can be streamed that will indicate termination // We create a channel that can be streamed that will indicate termination
let (tx, rx) = flume::bounded(1); let (tx, rx) = oneshot::channel();
let _fut = spawn_future(async move { let _fut = spawn_future(async move {
let mut stack = TreeStack::new(); let mut stack = TreeStack::new();
@ -193,6 +196,7 @@ mod test {
use crate::kvs::Datastore; use crate::kvs::Datastore;
use crate::options::EngineOptions; use crate::options::EngineOptions;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration;
#[test_log::test(tokio::test)] #[test_log::test(tokio::test)]
pub async fn tasks_complete() { pub async fn tasks_complete() {
@ -204,4 +208,20 @@ mod test {
} }
val.resolve().await.unwrap(); val.resolve().await.unwrap();
} }
#[test_log::test(tokio::test)]
pub async fn tasks_complete_channel_closed() {
let opt = EngineOptions::default();
let dbs = Arc::new(Datastore::new("memory").await.unwrap());
let val = {
let (val, _chans) = start_tasks(&opt, dbs.clone());
val
};
tokio::time::timeout(Duration::from_secs(10), val.resolve())
.await
.map_err(|e| format!("Timed out after {e}"))
.unwrap()
.map_err(|e| format!("Resolution failed: {e}"))
.unwrap();
}
} }

View file

@ -196,8 +196,8 @@ pub async fn init(
net::init(ct.clone()).await?; net::init(ct.clone()).await?;
// Shutdown and stop closed tasks // Shutdown and stop closed tasks
task_chans.into_iter().for_each(|chan| { task_chans.into_iter().for_each(|chan| {
if let Err(e) = chan.send(()) { if let Err(_empty_tuple) = chan.send(()) {
error!("Failed to send shutdown signal to task: {}", e); error!("Failed to send shutdown signal to task");
} }
}); });
ct.cancel(); ct.cancel();