From 19a04c86baeac47d817e72ac7d91055ade49015c Mon Sep 17 00:00:00 2001 From: Przemyslaw Hugh Kaznowski Date: Mon, 12 Feb 2024 11:54:38 +0000 Subject: [PATCH] Add context to live query notifications (#3461) --- src/rpc/connection.rs | 16 +++++++++++----- src/rpc/mod.rs | 9 ++++++++- src/rpc/response.rs | 5 +++-- src/telemetry/metrics/ws/mod.rs | 27 +++++++++++++++++++++++++++ 4 files changed, 49 insertions(+), 8 deletions(-) diff --git a/src/rpc/connection.rs b/src/rpc/connection.rs index 033cbc5e..89577e6b 100644 --- a/src/rpc/connection.rs +++ b/src/rpc/connection.rs @@ -293,7 +293,7 @@ impl Connection { async move { let span = Span::current(); let req_cx = RequestContext::default(); - let otel_cx = TelemetryContext::new().with_value(req_cx.clone()); + let otel_cx = Arc::new(TelemetryContext::new().with_value(req_cx.clone())); // Parse the RPC request structure match fmt.req(msg) { Ok(req) => { @@ -304,18 +304,24 @@ impl Connection { "rpc.request_id", req.id.clone().map(Value::as_string).unwrap_or_default(), ); - let otel_cx = TelemetryContext::current_with_value( + let otel_cx = Arc::new(TelemetryContext::current_with_value( req_cx.with_method(&req.method).with_size(len), - ); + )); // Process the message let res = Connection::process_message(rpc.clone(), &req.method, req.params).await; // Process the response - res.into_response(req.id).send(fmt, &chn).with_context(otel_cx).await + res.into_response(req.id) + .send(otel_cx.clone(), fmt, &chn) + .with_context(otel_cx.as_ref().clone()) + .await } Err(err) => { // Process the response - failure(None, err).send(fmt, &chn).with_context(otel_cx).await + failure(None, err) + .send(otel_cx.clone(), fmt, &chn) + .with_context(otel_cx.as_ref().clone()) + .await } } } diff --git a/src/rpc/mod.rs b/src/rpc/mod.rs index f146033c..35b87b23 100644 --- a/src/rpc/mod.rs +++ b/src/rpc/mod.rs @@ -8,7 +8,9 @@ pub mod response; use crate::dbs::DB; use crate::rpc::connection::Connection; use crate::rpc::response::success; +use crate::telemetry::metrics::ws::NotificationContext; use once_cell::sync::Lazy; +use opentelemetry::Context as TelemetryContext; use std::collections::HashMap; use std::sync::Arc; use std::time::Duration; @@ -48,12 +50,17 @@ pub(crate) async fn notifications(canceller: CancellationToken) { if let Some(rpc) = WEBSOCKETS.read().await.get(id) { // Serialize the message to send let message = success(None, notification); + // Add metrics + let cx = TelemetryContext::new(); + let not_ctx = NotificationContext::default() + .with_live_id(id.to_string()); + let cx = Arc::new(cx.with_value(not_ctx)); // Get the WebSocket output format let format = rpc.read().await.format; // get the WebSocket sending channel let sender = rpc.read().await.channels.0.clone(); // Send the notification to the client - message.send(format, &sender).await + message.send(cx, format, &sender).await } } }, diff --git a/src/rpc/response.rs b/src/rpc/response.rs index f0bb27fd..f012bc46 100644 --- a/src/rpc/response.rs +++ b/src/rpc/response.rs @@ -6,6 +6,7 @@ use opentelemetry::Context as TelemetryContext; use revision::revisioned; use serde::Serialize; use serde_json::Value as Json; +use std::sync::Arc; use surrealdb::channel::Sender; use surrealdb::dbs; use surrealdb::dbs::Notification; @@ -93,7 +94,7 @@ impl Response { } /// Send the response to the WebSocket channel - pub async fn send(self, fmt: Format, chn: &Sender) { + pub async fn send(self, cx: Arc, fmt: Format, chn: &Sender) { // Create a new tracing span let span = Span::current(); // Log the rpc response call @@ -113,7 +114,7 @@ impl Response { let (len, msg) = fmt.res(self).unwrap(); // Send the message to the write channel if chn.send(msg).await.is_ok() { - record_rpc(&TelemetryContext::current(), len, is_error); + record_rpc(cx.as_ref(), len, is_error); }; } } diff --git a/src/telemetry/metrics/ws/mod.rs b/src/telemetry/metrics/ws/mod.rs index 6e8c0551..99880d60 100644 --- a/src/telemetry/metrics/ws/mod.rs +++ b/src/telemetry/metrics/ws/mod.rs @@ -98,6 +98,27 @@ impl RequestContext { } } +#[derive(Clone, Debug, PartialEq)] +pub struct NotificationContext { + pub live_id: String, +} + +impl Default for NotificationContext { + fn default() -> Self { + Self { + live_id: "unknown".to_string(), + } + } +} + +impl NotificationContext { + pub fn with_live_id(self, live_id: String) -> Self { + Self { + live_id, + } + } +} + /// Updates the request and response metrics for an RPC method. pub fn record_rpc(cx: &TelemetryContext, res_size: usize, is_error: bool) { let mut attrs = otel_common_attrs(); @@ -111,6 +132,12 @@ pub fn record_rpc(cx: &TelemetryContext, res_size: usize, is_error: bool) { ]); duration = cx.start.elapsed().as_millis() as u64; req_size = cx.size as u64; + } else if let Some(cx) = cx.get::() { + attrs.extend_from_slice(&[ + KeyValue::new("rpc.method", "notification"), + KeyValue::new("rpc.error", is_error), + KeyValue::new("rpc.live_id", cx.live_id.clone()), + ]); } else { // If a bug causes the RequestContent to be empty, we still want to record the metrics to avoid a silent failure. warn!("record_rpc: no request context found, resulting metrics will be invalid");