Fix memory leak (#3066)

This commit is contained in:
Tobie Morgan Hitchcock 2023-12-05 09:28:29 +00:00 committed by GitHub
parent 3b5a7411cf
commit 08ec62cbe1
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -180,19 +180,17 @@ impl Connection {
// Check if this has shutdown
_ = canceller.cancelled() => break,
// Wait for the next message to send
msg = internal_receiver.next() => {
if let Some(res) = msg {
// Send the message to the client
if let Err(err) = sender.send(res).await {
// Output any errors if not a close error
if err.to_string() != CONN_CLOSED_ERR {
debug!("WebSocket error: {:?}", err);
}
// Cancel the WebSocket tasks
rpc.read().await.canceller.cancel();
// Exit out of the loop
break;
Some(res) = internal_receiver.next() => {
// Send the message to the client
if let Err(err) = sender.send(res).await {
// Output any errors if not a close error
if err.to_string() != CONN_CLOSED_ERR {
debug!("WebSocket error: {:?}", err);
}
// Cancel the WebSocket tasks
rpc.read().await.canceller.cancel();
// Exit out of the loop
break;
}
},
}
@ -216,42 +214,51 @@ impl Connection {
biased;
// Check if this has shutdown
_ = canceller.cancelled() => break,
// Wait for the next message to read
msg = receiver.next() => {
if let Some(msg) = msg {
// Process the received WebSocket message
match msg {
// We've received a message from the client
Ok(msg) => match msg {
Message::Text(_) => {
tasks.spawn(Connection::handle_message(rpc.clone(), msg, internal_sender.clone()));
}
Message::Binary(_) => {
tasks.spawn(Connection::handle_message(rpc.clone(), msg, internal_sender.clone()));
}
Message::Close(_) => {
// Respond with a close message
if let Err(err) = internal_sender.send(Message::Close(None)).await {
trace!("WebSocket error when replying to the Close frame: {:?}", err);
};
// Cancel the WebSocket tasks
rpc.read().await.canceller.cancel();
// Exit out of the loop
break;
}
_ => {
// Ignore everything else
}
},
Err(err) => {
// There was an error with the WebSocket
trace!("WebSocket error: {:?}", err);
// Cancel the WebSocket tasks
rpc.read().await.canceller.cancel();
// Exit out of the loop
break;
}
// Remove any completed tasks
Some(out) = tasks.join_next() => match out {
// The task completed successfully
Ok(_) => continue,
// There was an uncaught panic in the task
Err(err) => {
// There was an error with the task
trace!("WebSocket request error: {:?}", err);
// Cancel the WebSocket tasks
rpc.read().await.canceller.cancel();
// Exit out of the loop
break;
}
},
// Wait for the next received message
Some(msg) = receiver.next() => match msg {
// We've received a message from the client
Ok(msg) => match msg {
Message::Text(_) => {
tasks.spawn(Connection::handle_message(rpc.clone(), msg, internal_sender.clone()));
}
Message::Binary(_) => {
tasks.spawn(Connection::handle_message(rpc.clone(), msg, internal_sender.clone()));
}
Message::Close(_) => {
// Respond with a close message
if let Err(err) = internal_sender.send(Message::Close(None)).await {
trace!("WebSocket error when replying to the Close frame: {:?}", err);
};
// Cancel the WebSocket tasks
rpc.read().await.canceller.cancel();
// Exit out of the loop
break;
}
_ => {
// Ignore everything else
}
},
Err(err) => {
// There was an error with the WebSocket
trace!("WebSocket error: {:?}", err);
// Cancel the WebSocket tasks
rpc.read().await.canceller.cancel();
// Exit out of the loop
break;
}
}
}
@ -259,9 +266,12 @@ impl Connection {
// Wait for all tasks to finish
while let Some(res) = tasks.join_next().await {
if let Err(err) = res {
error!("Error while handling RPC message: {}", err);
// There was an error with the task
trace!("WebSocket request error: {:?}", err);
}
}
// Abort all tasks
tasks.shutdown().await;
}
/// Send live query notifications to the client