diff --git a/lib/src/kvs/tx.rs b/lib/src/kvs/tx.rs index c9fe0f72..87927c84 100644 --- a/lib/src/kvs/tx.rs +++ b/lib/src/kvs/tx.rs @@ -802,7 +802,6 @@ impl Transaction { nxt = Some(k.clone()); } // Delete - trace!("Found getr {:?} {:?}", crate::key::debug::sprint_key(&k), v); out.push((k, v)); // Count num -= 1; diff --git a/src/rpc/connection.rs b/src/rpc/connection.rs index 1e42716f..a2057e3a 100644 --- a/src/rpc/connection.rs +++ b/src/rpc/connection.rs @@ -86,29 +86,13 @@ impl Connection { ws_id, WebSocketRef(internal_sender.clone(), rpc.read().await.graceful_shutdown.clone()), ); - let mut live_queries_to_gc = Vec::new(); - // Remove all live queries - LIVE_QUERIES.write().await.retain(|key, value| { - if value == &ws_id { - trace!("Removing live query: {}", key); - live_queries_to_gc.push(*key); - return false; - } - true - }); - - // Garbage collect Live Query - if let Err(e) = - DB.get().unwrap().garbage_collect_dead_session(live_queries_to_gc.as_slice()).await - { - error!("Failed to garbage collect dead sessions: {:?}", e); - } + // Spawn async tasks for the WebSocket let mut tasks = JoinSet::new(); tasks.spawn(Self::ping(rpc.clone(), internal_sender.clone())); tasks.spawn(Self::read(rpc.clone(), receiver, internal_sender.clone())); tasks.spawn(Self::write(rpc.clone(), sender, internal_receiver.clone())); - tasks.spawn(Self::lq_notifications(rpc.clone())); + tasks.spawn(Self::notifications(rpc.clone())); // Wait until all tasks finish while let Some(res) = tasks.join_next().await { @@ -117,10 +101,26 @@ impl Connection { } } + trace!("WebSocket {} disconnected", ws_id); + // Remove this WebSocket from the list WEBSOCKETS.write().await.remove(&ws_id); - trace!("WebSocket {} disconnected", ws_id); + // Remove all live queries + let mut gc = Vec::new(); + LIVE_QUERIES.write().await.retain(|key, value| { + if value == &ws_id { + trace!("Removing live query: {}", key); + gc.push(*key); + return false; + } + true + }); + + // Garbage collect queries + if let Err(e) = DB.get().unwrap().garbage_collect_dead_session(gc.as_slice()).await { + error!("Failed to garbage collect dead sessions: {:?}", e); + } if let Err(err) = telemetry::metrics::ws::on_disconnect() { error!("Error running metrics::ws::on_disconnect hook: {}", err); @@ -241,7 +241,7 @@ impl Connection { } /// Send live query notifications to the client - async fn lq_notifications(rpc: Arc>) { + async fn notifications(rpc: Arc>) { if let Some(channel) = DB.get().unwrap().notifications() { let cancel_token = rpc.read().await.graceful_shutdown.clone(); loop { diff --git a/src/rpc/mod.rs b/src/rpc/mod.rs index d201d01b..8b87e85c 100644 --- a/src/rpc/mod.rs +++ b/src/rpc/mod.rs @@ -15,8 +15,8 @@ use uuid::Uuid; static CONN_CLOSED_ERR: &str = "Connection closed normally"; -// Mapping of WebSocketID to WebSocket pub struct WebSocketRef(Sender, CancellationToken); +// Mapping of WebSocketID to WebSocket type WebSockets = RwLock>; // Mapping of LiveQueryID to WebSocketID type LiveQueries = RwLock>;