Add context to live query notifications (#3461)
This commit is contained in:
parent
d55d1a3b6e
commit
19a04c86ba
4 changed files with 49 additions and 8 deletions
|
@ -293,7 +293,7 @@ impl Connection {
|
||||||
async move {
|
async move {
|
||||||
let span = Span::current();
|
let span = Span::current();
|
||||||
let req_cx = RequestContext::default();
|
let req_cx = RequestContext::default();
|
||||||
let otel_cx = TelemetryContext::new().with_value(req_cx.clone());
|
let otel_cx = Arc::new(TelemetryContext::new().with_value(req_cx.clone()));
|
||||||
// Parse the RPC request structure
|
// Parse the RPC request structure
|
||||||
match fmt.req(msg) {
|
match fmt.req(msg) {
|
||||||
Ok(req) => {
|
Ok(req) => {
|
||||||
|
@ -304,18 +304,24 @@ impl Connection {
|
||||||
"rpc.request_id",
|
"rpc.request_id",
|
||||||
req.id.clone().map(Value::as_string).unwrap_or_default(),
|
req.id.clone().map(Value::as_string).unwrap_or_default(),
|
||||||
);
|
);
|
||||||
let otel_cx = TelemetryContext::current_with_value(
|
let otel_cx = Arc::new(TelemetryContext::current_with_value(
|
||||||
req_cx.with_method(&req.method).with_size(len),
|
req_cx.with_method(&req.method).with_size(len),
|
||||||
);
|
));
|
||||||
// Process the message
|
// Process the message
|
||||||
let res =
|
let res =
|
||||||
Connection::process_message(rpc.clone(), &req.method, req.params).await;
|
Connection::process_message(rpc.clone(), &req.method, req.params).await;
|
||||||
// Process the response
|
// Process the response
|
||||||
res.into_response(req.id).send(fmt, &chn).with_context(otel_cx).await
|
res.into_response(req.id)
|
||||||
|
.send(otel_cx.clone(), fmt, &chn)
|
||||||
|
.with_context(otel_cx.as_ref().clone())
|
||||||
|
.await
|
||||||
}
|
}
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
// Process the response
|
// Process the response
|
||||||
failure(None, err).send(fmt, &chn).with_context(otel_cx).await
|
failure(None, err)
|
||||||
|
.send(otel_cx.clone(), fmt, &chn)
|
||||||
|
.with_context(otel_cx.as_ref().clone())
|
||||||
|
.await
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -8,7 +8,9 @@ pub mod response;
|
||||||
use crate::dbs::DB;
|
use crate::dbs::DB;
|
||||||
use crate::rpc::connection::Connection;
|
use crate::rpc::connection::Connection;
|
||||||
use crate::rpc::response::success;
|
use crate::rpc::response::success;
|
||||||
|
use crate::telemetry::metrics::ws::NotificationContext;
|
||||||
use once_cell::sync::Lazy;
|
use once_cell::sync::Lazy;
|
||||||
|
use opentelemetry::Context as TelemetryContext;
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
@ -48,12 +50,17 @@ pub(crate) async fn notifications(canceller: CancellationToken) {
|
||||||
if let Some(rpc) = WEBSOCKETS.read().await.get(id) {
|
if let Some(rpc) = WEBSOCKETS.read().await.get(id) {
|
||||||
// Serialize the message to send
|
// Serialize the message to send
|
||||||
let message = success(None, notification);
|
let message = success(None, notification);
|
||||||
|
// Add metrics
|
||||||
|
let cx = TelemetryContext::new();
|
||||||
|
let not_ctx = NotificationContext::default()
|
||||||
|
.with_live_id(id.to_string());
|
||||||
|
let cx = Arc::new(cx.with_value(not_ctx));
|
||||||
// Get the WebSocket output format
|
// Get the WebSocket output format
|
||||||
let format = rpc.read().await.format;
|
let format = rpc.read().await.format;
|
||||||
// get the WebSocket sending channel
|
// get the WebSocket sending channel
|
||||||
let sender = rpc.read().await.channels.0.clone();
|
let sender = rpc.read().await.channels.0.clone();
|
||||||
// Send the notification to the client
|
// Send the notification to the client
|
||||||
message.send(format, &sender).await
|
message.send(cx, format, &sender).await
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
|
|
@ -6,6 +6,7 @@ use opentelemetry::Context as TelemetryContext;
|
||||||
use revision::revisioned;
|
use revision::revisioned;
|
||||||
use serde::Serialize;
|
use serde::Serialize;
|
||||||
use serde_json::Value as Json;
|
use serde_json::Value as Json;
|
||||||
|
use std::sync::Arc;
|
||||||
use surrealdb::channel::Sender;
|
use surrealdb::channel::Sender;
|
||||||
use surrealdb::dbs;
|
use surrealdb::dbs;
|
||||||
use surrealdb::dbs::Notification;
|
use surrealdb::dbs::Notification;
|
||||||
|
@ -93,7 +94,7 @@ impl Response {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Send the response to the WebSocket channel
|
/// Send the response to the WebSocket channel
|
||||||
pub async fn send(self, fmt: Format, chn: &Sender<Message>) {
|
pub async fn send(self, cx: Arc<TelemetryContext>, fmt: Format, chn: &Sender<Message>) {
|
||||||
// Create a new tracing span
|
// Create a new tracing span
|
||||||
let span = Span::current();
|
let span = Span::current();
|
||||||
// Log the rpc response call
|
// Log the rpc response call
|
||||||
|
@ -113,7 +114,7 @@ impl Response {
|
||||||
let (len, msg) = fmt.res(self).unwrap();
|
let (len, msg) = fmt.res(self).unwrap();
|
||||||
// Send the message to the write channel
|
// Send the message to the write channel
|
||||||
if chn.send(msg).await.is_ok() {
|
if chn.send(msg).await.is_ok() {
|
||||||
record_rpc(&TelemetryContext::current(), len, is_error);
|
record_rpc(cx.as_ref(), len, is_error);
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -98,6 +98,27 @@ impl RequestContext {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Clone, Debug, PartialEq)]
|
||||||
|
pub struct NotificationContext {
|
||||||
|
pub live_id: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Default for NotificationContext {
|
||||||
|
fn default() -> Self {
|
||||||
|
Self {
|
||||||
|
live_id: "unknown".to_string(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl NotificationContext {
|
||||||
|
pub fn with_live_id(self, live_id: String) -> Self {
|
||||||
|
Self {
|
||||||
|
live_id,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Updates the request and response metrics for an RPC method.
|
/// Updates the request and response metrics for an RPC method.
|
||||||
pub fn record_rpc(cx: &TelemetryContext, res_size: usize, is_error: bool) {
|
pub fn record_rpc(cx: &TelemetryContext, res_size: usize, is_error: bool) {
|
||||||
let mut attrs = otel_common_attrs();
|
let mut attrs = otel_common_attrs();
|
||||||
|
@ -111,6 +132,12 @@ pub fn record_rpc(cx: &TelemetryContext, res_size: usize, is_error: bool) {
|
||||||
]);
|
]);
|
||||||
duration = cx.start.elapsed().as_millis() as u64;
|
duration = cx.start.elapsed().as_millis() as u64;
|
||||||
req_size = cx.size as u64;
|
req_size = cx.size as u64;
|
||||||
|
} else if let Some(cx) = cx.get::<NotificationContext>() {
|
||||||
|
attrs.extend_from_slice(&[
|
||||||
|
KeyValue::new("rpc.method", "notification"),
|
||||||
|
KeyValue::new("rpc.error", is_error),
|
||||||
|
KeyValue::new("rpc.live_id", cx.live_id.clone()),
|
||||||
|
]);
|
||||||
} else {
|
} else {
|
||||||
// If a bug causes the RequestContent to be empty, we still want to record the metrics to avoid a silent failure.
|
// If a bug causes the RequestContent to be empty, we still want to record the metrics to avoid a silent failure.
|
||||||
warn!("record_rpc: no request context found, resulting metrics will be invalid");
|
warn!("record_rpc: no request context found, resulting metrics will be invalid");
|
||||||
|
|
Loading…
Reference in a new issue