diff --git a/src/rpc/connection.rs b/src/rpc/connection.rs index 8a601173..4262bccf 100644 --- a/src/rpc/connection.rs +++ b/src/rpc/connection.rs @@ -180,19 +180,17 @@ impl Connection { // Check if this has shutdown _ = canceller.cancelled() => break, // Wait for the next message to send - msg = internal_receiver.next() => { - if let Some(res) = msg { - // Send the message to the client - if let Err(err) = sender.send(res).await { - // Output any errors if not a close error - if err.to_string() != CONN_CLOSED_ERR { - debug!("WebSocket error: {:?}", err); - } - // Cancel the WebSocket tasks - rpc.read().await.canceller.cancel(); - // Exit out of the loop - break; + Some(res) = internal_receiver.next() => { + // Send the message to the client + if let Err(err) = sender.send(res).await { + // Output any errors if not a close error + if err.to_string() != CONN_CLOSED_ERR { + debug!("WebSocket error: {:?}", err); } + // Cancel the WebSocket tasks + rpc.read().await.canceller.cancel(); + // Exit out of the loop + break; } }, } @@ -216,42 +214,51 @@ impl Connection { biased; // Check if this has shutdown _ = canceller.cancelled() => break, - // Wait for the next message to read - msg = receiver.next() => { - if let Some(msg) = msg { - // Process the received WebSocket message - match msg { - // We've received a message from the client - Ok(msg) => match msg { - Message::Text(_) => { - tasks.spawn(Connection::handle_message(rpc.clone(), msg, internal_sender.clone())); - } - Message::Binary(_) => { - tasks.spawn(Connection::handle_message(rpc.clone(), msg, internal_sender.clone())); - } - Message::Close(_) => { - // Respond with a close message - if let Err(err) = internal_sender.send(Message::Close(None)).await { - trace!("WebSocket error when replying to the Close frame: {:?}", err); - }; - // Cancel the WebSocket tasks - rpc.read().await.canceller.cancel(); - // Exit out of the loop - break; - } - _ => { - // Ignore everything else - } - }, - Err(err) => { - // There was an error with the WebSocket - trace!("WebSocket error: {:?}", err); - // Cancel the WebSocket tasks - rpc.read().await.canceller.cancel(); - // Exit out of the loop - break; - } + // Remove any completed tasks + Some(out) = tasks.join_next() => match out { + // The task completed successfully + Ok(_) => continue, + // There was an uncaught panic in the task + Err(err) => { + // There was an error with the task + trace!("WebSocket request error: {:?}", err); + // Cancel the WebSocket tasks + rpc.read().await.canceller.cancel(); + // Exit out of the loop + break; + } + }, + // Wait for the next received message + Some(msg) = receiver.next() => match msg { + // We've received a message from the client + Ok(msg) => match msg { + Message::Text(_) => { + tasks.spawn(Connection::handle_message(rpc.clone(), msg, internal_sender.clone())); } + Message::Binary(_) => { + tasks.spawn(Connection::handle_message(rpc.clone(), msg, internal_sender.clone())); + } + Message::Close(_) => { + // Respond with a close message + if let Err(err) = internal_sender.send(Message::Close(None)).await { + trace!("WebSocket error when replying to the Close frame: {:?}", err); + }; + // Cancel the WebSocket tasks + rpc.read().await.canceller.cancel(); + // Exit out of the loop + break; + } + _ => { + // Ignore everything else + } + }, + Err(err) => { + // There was an error with the WebSocket + trace!("WebSocket error: {:?}", err); + // Cancel the WebSocket tasks + rpc.read().await.canceller.cancel(); + // Exit out of the loop + break; } } } @@ -259,9 +266,12 @@ impl Connection { // Wait for all tasks to finish while let Some(res) = tasks.join_next().await { if let Err(err) = res { - error!("Error while handling RPC message: {}", err); + // There was an error with the task + trace!("WebSocket request error: {:?}", err); } } + // Abort all tasks + tasks.shutdown().await; } /// Send live query notifications to the client