Minor improvements to live queries (#2230)
This commit is contained in:
parent
cf4cfc1908
commit
ea76b01ce4
2 changed files with 50 additions and 63 deletions
|
@ -185,10 +185,14 @@ impl<'a> Executor<'a> {
|
|||
}
|
||||
// Get the statement start time
|
||||
let now = Instant::now();
|
||||
// Check if this is a LIVE statement
|
||||
let is_stm_live = matches!(stm, Statement::Live(_));
|
||||
// Check if this is a KILL statement
|
||||
let is_stm_kill = matches!(stm, Statement::Kill(_));
|
||||
// Check if this is a RETURN statement
|
||||
let clr = matches!(stm, Statement::Output(_));
|
||||
let is_stm_output = matches!(stm, Statement::Output(_));
|
||||
// Process a single statement
|
||||
let res = match stm.clone() {
|
||||
let res = match stm {
|
||||
// Specify runtime options
|
||||
Statement::Option(mut stm) => {
|
||||
// Selected DB?
|
||||
|
@ -399,15 +403,15 @@ impl<'a> Executor<'a> {
|
|||
self.err = true;
|
||||
e
|
||||
}),
|
||||
query_type: match stm {
|
||||
Statement::Live(_) => QueryType::Live,
|
||||
Statement::Kill(_) => QueryType::Kill,
|
||||
query_type: match (is_stm_live, is_stm_kill) {
|
||||
(true, _) => QueryType::Live,
|
||||
(_, true) => QueryType::Kill,
|
||||
_ => QueryType::Other,
|
||||
},
|
||||
};
|
||||
// Output the response
|
||||
if self.txn.is_some() {
|
||||
if clr {
|
||||
if is_stm_output {
|
||||
buf.clear();
|
||||
}
|
||||
buf.push(res);
|
||||
|
|
|
@ -30,6 +30,7 @@ use uuid::Uuid;
|
|||
use warp::ws::{Message, WebSocket, Ws};
|
||||
use warp::Filter;
|
||||
|
||||
// Mapping of WebSocketID to WebSocket
|
||||
type WebSockets = RwLock<HashMap<Uuid, Sender<Message>>>;
|
||||
// Mapping of LiveQueryID to WebSocketID
|
||||
type LiveQueries = RwLock<HashMap<Uuid, Uuid>>;
|
||||
|
@ -125,35 +126,17 @@ impl Rpc {
|
|||
tokio::task::spawn(async move {
|
||||
let rpc = moved_rpc;
|
||||
if let Some(channel) = DB.get().unwrap().notifications() {
|
||||
while let Ok(v) = channel.recv().await {
|
||||
trace!("Received notification: {:?}", v);
|
||||
// Find which websocket the notification belongs to
|
||||
match LIVE_QUERIES.read().await.get(&v.id) {
|
||||
Some(ws_id) => {
|
||||
while let Ok(notification) = channel.recv().await {
|
||||
// Find which WebSocket the notification belongs to
|
||||
if let Some(ws_id) = LIVE_QUERIES.read().await.get(¬ification.id) {
|
||||
// Check to see if the WebSocket exists
|
||||
if let Some(websocket) = WEBSOCKETS.read().await.get(ws_id) {
|
||||
// Serialize the message to send
|
||||
let message = res::success(None, notification);
|
||||
// Get the current output format
|
||||
let format = rpc.read().await.format.clone();
|
||||
// Send the notification to the client
|
||||
let msg_text = res::success(None, v.clone());
|
||||
let ws_write = WEBSOCKETS.write().await;
|
||||
match ws_write.get(ws_id) {
|
||||
None => {
|
||||
error!(
|
||||
"Tracked WebSocket {:?} not found for lq: {:?}",
|
||||
ws_id, &v.id
|
||||
);
|
||||
}
|
||||
Some(ws_sender) => {
|
||||
msg_text
|
||||
.send(rpc.read().await.format.clone(), ws_sender.clone())
|
||||
.await;
|
||||
trace!(
|
||||
"Sent notification to WebSocket {:?} for lq: {:?}",
|
||||
ws_id,
|
||||
&v.id
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
None => {
|
||||
error!("Unknown websocket for live query: {:?}", v.id);
|
||||
message.send(format, websocket.clone()).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -213,17 +196,13 @@ impl Rpc {
|
|||
// Remove this WebSocket from the list of WebSockets
|
||||
WEBSOCKETS.write().await.remove(&id);
|
||||
// Remove all live queries
|
||||
let mut locked_lq_map = LIVE_QUERIES.write().await;
|
||||
let mut live_query_to_gc: Vec<Uuid> = vec![];
|
||||
for (key, value) in locked_lq_map.iter() {
|
||||
LIVE_QUERIES.write().await.retain(|key, value| {
|
||||
if value == &id {
|
||||
trace!("Removing live query: {}", key);
|
||||
live_query_to_gc.push(*key);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
for key in live_query_to_gc {
|
||||
locked_lq_map.remove(&key);
|
||||
}
|
||||
true
|
||||
});
|
||||
}
|
||||
|
||||
/// Call RPC methods from the WebSocket
|
||||
|
@ -720,27 +699,6 @@ impl Rpc {
|
|||
Ok(res)
|
||||
}
|
||||
|
||||
async fn handle_live_query_results(&self, res: &Response) {
|
||||
match &res.query_type {
|
||||
QueryType::Live => {
|
||||
if let Ok(Value::Uuid(lqid)) = &res.result {
|
||||
// Match on Uuid type
|
||||
LIVE_QUERIES.write().await.insert(lqid.0, self.uuid);
|
||||
trace!("Registered live query {} on websocket {}", lqid, self.uuid);
|
||||
}
|
||||
}
|
||||
QueryType::Kill => {
|
||||
if let Ok(Value::Uuid(lqid)) = &res.result {
|
||||
let ws_id = LIVE_QUERIES.write().await.remove(&lqid.0);
|
||||
if let Some(ws_id) = ws_id {
|
||||
trace!("Unregistered live query {} on websocket {}", lqid, ws_id);
|
||||
}
|
||||
}
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
|
||||
// ------------------------------
|
||||
// Methods for querying
|
||||
// ------------------------------
|
||||
|
@ -776,4 +734,29 @@ impl Rpc {
|
|||
// Return the result to the client
|
||||
Ok(res)
|
||||
}
|
||||
|
||||
// ------------------------------
|
||||
// Private methods
|
||||
// ------------------------------
|
||||
|
||||
async fn handle_live_query_results(&self, res: &Response) {
|
||||
match &res.query_type {
|
||||
QueryType::Live => {
|
||||
if let Ok(Value::Uuid(lqid)) = &res.result {
|
||||
// Match on Uuid type
|
||||
LIVE_QUERIES.write().await.insert(lqid.0, self.uuid);
|
||||
trace!("Registered live query {} on websocket {}", lqid, self.uuid);
|
||||
}
|
||||
}
|
||||
QueryType::Kill => {
|
||||
if let Ok(Value::Uuid(lqid)) = &res.result {
|
||||
let ws_id = LIVE_QUERIES.write().await.remove(&lqid.0);
|
||||
if let Some(ws_id) = ws_id {
|
||||
trace!("Unregistered live query {} on websocket {}", lqid, ws_id);
|
||||
}
|
||||
}
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue