diff --git a/src/telemetry/metrics/http/mod.rs b/src/telemetry/metrics/http/mod.rs index 6847631d..0af71028 100644 --- a/src/telemetry/metrics/http/mod.rs +++ b/src/telemetry/metrics/http/mod.rs @@ -1,7 +1,7 @@ pub(super) mod tower_layer; use once_cell::sync::Lazy; -use opentelemetry::metrics::{Histogram, MetricsError, ObservableUpDownCounter, Unit}; +use opentelemetry::metrics::{Histogram, MetricsError, Unit, UpDownCounter}; use opentelemetry::Context as TelemetryContext; use self::tower_layer::HttpCallMetricTracker; @@ -16,9 +16,9 @@ pub static HTTP_SERVER_DURATION: Lazy> = Lazy::new(|| { .init() }); -pub static HTTP_SERVER_ACTIVE_REQUESTS: Lazy> = Lazy::new(|| { +pub static HTTP_SERVER_ACTIVE_REQUESTS: Lazy> = Lazy::new(|| { METER_DURATION - .i64_observable_up_down_counter("http.server.active_requests") + .i64_up_down_counter("http.server.active_requests") .with_description("The number of active HTTP requests.") .init() }); @@ -39,19 +39,11 @@ pub static HTTP_SERVER_RESPONSE_SIZE: Lazy> = Lazy::new(|| { .init() }); -fn observe_request_start(tracker: &HttpCallMetricTracker) -> Result<(), MetricsError> { - observe_active_request(1, tracker) -} - -fn observe_request_finish(tracker: &HttpCallMetricTracker) -> Result<(), MetricsError> { - observe_active_request(-1, tracker) -} - fn observe_active_request(value: i64, tracker: &HttpCallMetricTracker) -> Result<(), MetricsError> { let attrs = tracker.active_req_attrs(); - METER_DURATION - .register_callback(move |ctx| HTTP_SERVER_ACTIVE_REQUESTS.observe(ctx, value, &attrs)) + HTTP_SERVER_ACTIVE_REQUESTS.add(&TelemetryContext::current(), value, &attrs); + Ok(()) } fn record_request_duration(tracker: &HttpCallMetricTracker) { diff --git a/src/telemetry/metrics/http/tower_layer.rs b/src/telemetry/metrics/http/tower_layer.rs index 55dfe67f..157322e9 100644 --- a/src/telemetry/metrics/http/tower_layer.rs +++ b/src/telemetry/metrics/http/tower_layer.rs @@ -81,13 +81,16 @@ where fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = self.project(); - this.tracker.set_state(ResultState::Started); + // Initialize the metrics if not already done. + if this.tracker.state.get_mut() == &ResultState::None { + this.tracker.set_state(ResultState::Started); - if let Err(err) = on_request_start(this.tracker) { - error!("Failed to setup metrics when request started: {}", err); - // Consider this request not tracked: reset the state to None, so that the drop handler does not decrease the counter. - this.tracker.set_state(ResultState::None); - }; + if let Err(err) = on_request_start(this.tracker) { + error!("Failed to setup metrics when request started: {}", err); + // Consider this request not tracked: reset the state to None, so that the drop handler does not decrease the counter. + this.tracker.set_state(ResultState::None); + }; + } let response = futures_util::ready!(this.inner.poll(cx)); @@ -123,6 +126,7 @@ pub struct HttpCallMetricTracker { finish: Option, } +#[derive(PartialEq, Eq)] pub enum ResultState { /// The result was already processed. None, @@ -241,13 +245,13 @@ impl Drop for HttpCallMetricTracker { } pub fn on_request_start(tracker: &HttpCallMetricTracker) -> Result<(), MetricsError> { - // Setup the active_requests observer - super::observe_request_start(tracker) + // Increase the number of active requests. + super::observe_active_request(1, tracker) } pub fn on_request_finish(tracker: &HttpCallMetricTracker) -> Result<(), MetricsError> { - // Setup the active_requests observer - super::observe_request_finish(tracker)?; + // Decrease the number of active requests. + super::observe_active_request(-1, tracker)?; // Record the duration of the request. super::record_request_duration(tracker); diff --git a/src/telemetry/metrics/ws/mod.rs b/src/telemetry/metrics/ws/mod.rs index 144a223e..cc0e87e2 100644 --- a/src/telemetry/metrics/ws/mod.rs +++ b/src/telemetry/metrics/ws/mod.rs @@ -3,7 +3,7 @@ use std::time::Instant; use once_cell::sync::Lazy; use opentelemetry::KeyValue; use opentelemetry::{ - metrics::{Histogram, MetricsError, ObservableUpDownCounter, Unit}, + metrics::{Histogram, MetricsError, Unit, UpDownCounter}, Context as TelemetryContext, }; @@ -17,9 +17,9 @@ pub static RPC_SERVER_DURATION: Lazy> = Lazy::new(|| { .init() }); -pub static RPC_SERVER_ACTIVE_CONNECTIONS: Lazy> = Lazy::new(|| { +pub static RPC_SERVER_ACTIVE_CONNECTIONS: Lazy> = Lazy::new(|| { METER_DURATION - .i64_observable_up_down_counter("rpc.server.active_connections") + .i64_up_down_counter("rpc.server.active_connections") .with_description("The number of active WebSocket connections.") .init() }); @@ -57,8 +57,8 @@ pub fn on_disconnect() -> Result<(), MetricsError> { pub(super) fn observe_active_connection(value: i64) -> Result<(), MetricsError> { let attrs = otel_common_attrs(); - METER_DURATION - .register_callback(move |cx| RPC_SERVER_ACTIVE_CONNECTIONS.observe(cx, value, &attrs)) + RPC_SERVER_ACTIVE_CONNECTIONS.add(&TelemetryContext::current(), value, &attrs); + Ok(()) } //