[otel] Fix memory leak caused by OTEL callbacks (#2771)

This commit is contained in:
Salvador Girones Gil 2023-10-02 14:27:18 +02:00 committed by GitHub
parent 74ad693709
commit 920cdfc71a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 24 additions and 28 deletions

View file

@ -1,7 +1,7 @@
pub(super) mod tower_layer; pub(super) mod tower_layer;
use once_cell::sync::Lazy; use once_cell::sync::Lazy;
use opentelemetry::metrics::{Histogram, MetricsError, ObservableUpDownCounter, Unit}; use opentelemetry::metrics::{Histogram, MetricsError, Unit, UpDownCounter};
use opentelemetry::Context as TelemetryContext; use opentelemetry::Context as TelemetryContext;
use self::tower_layer::HttpCallMetricTracker; use self::tower_layer::HttpCallMetricTracker;
@ -16,9 +16,9 @@ pub static HTTP_SERVER_DURATION: Lazy<Histogram<u64>> = Lazy::new(|| {
.init() .init()
}); });
pub static HTTP_SERVER_ACTIVE_REQUESTS: Lazy<ObservableUpDownCounter<i64>> = Lazy::new(|| { pub static HTTP_SERVER_ACTIVE_REQUESTS: Lazy<UpDownCounter<i64>> = Lazy::new(|| {
METER_DURATION METER_DURATION
.i64_observable_up_down_counter("http.server.active_requests") .i64_up_down_counter("http.server.active_requests")
.with_description("The number of active HTTP requests.") .with_description("The number of active HTTP requests.")
.init() .init()
}); });
@ -39,19 +39,11 @@ pub static HTTP_SERVER_RESPONSE_SIZE: Lazy<Histogram<u64>> = Lazy::new(|| {
.init() .init()
}); });
fn observe_request_start(tracker: &HttpCallMetricTracker) -> Result<(), MetricsError> {
observe_active_request(1, tracker)
}
fn observe_request_finish(tracker: &HttpCallMetricTracker) -> Result<(), MetricsError> {
observe_active_request(-1, tracker)
}
fn observe_active_request(value: i64, tracker: &HttpCallMetricTracker) -> Result<(), MetricsError> { fn observe_active_request(value: i64, tracker: &HttpCallMetricTracker) -> Result<(), MetricsError> {
let attrs = tracker.active_req_attrs(); let attrs = tracker.active_req_attrs();
METER_DURATION HTTP_SERVER_ACTIVE_REQUESTS.add(&TelemetryContext::current(), value, &attrs);
.register_callback(move |ctx| HTTP_SERVER_ACTIVE_REQUESTS.observe(ctx, value, &attrs)) Ok(())
} }
fn record_request_duration(tracker: &HttpCallMetricTracker) { fn record_request_duration(tracker: &HttpCallMetricTracker) {

View file

@ -81,13 +81,16 @@ where
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project(); let this = self.project();
this.tracker.set_state(ResultState::Started); // Initialize the metrics if not already done.
if this.tracker.state.get_mut() == &ResultState::None {
this.tracker.set_state(ResultState::Started);
if let Err(err) = on_request_start(this.tracker) { if let Err(err) = on_request_start(this.tracker) {
error!("Failed to setup metrics when request started: {}", err); error!("Failed to setup metrics when request started: {}", err);
// Consider this request not tracked: reset the state to None, so that the drop handler does not decrease the counter. // Consider this request not tracked: reset the state to None, so that the drop handler does not decrease the counter.
this.tracker.set_state(ResultState::None); this.tracker.set_state(ResultState::None);
}; };
}
let response = futures_util::ready!(this.inner.poll(cx)); let response = futures_util::ready!(this.inner.poll(cx));
@ -123,6 +126,7 @@ pub struct HttpCallMetricTracker {
finish: Option<Instant>, finish: Option<Instant>,
} }
#[derive(PartialEq, Eq)]
pub enum ResultState { pub enum ResultState {
/// The result was already processed. /// The result was already processed.
None, None,
@ -241,13 +245,13 @@ impl Drop for HttpCallMetricTracker {
} }
pub fn on_request_start(tracker: &HttpCallMetricTracker) -> Result<(), MetricsError> { pub fn on_request_start(tracker: &HttpCallMetricTracker) -> Result<(), MetricsError> {
// Setup the active_requests observer // Increase the number of active requests.
super::observe_request_start(tracker) super::observe_active_request(1, tracker)
} }
pub fn on_request_finish(tracker: &HttpCallMetricTracker) -> Result<(), MetricsError> { pub fn on_request_finish(tracker: &HttpCallMetricTracker) -> Result<(), MetricsError> {
// Setup the active_requests observer // Decrease the number of active requests.
super::observe_request_finish(tracker)?; super::observe_active_request(-1, tracker)?;
// Record the duration of the request. // Record the duration of the request.
super::record_request_duration(tracker); super::record_request_duration(tracker);

View file

@ -3,7 +3,7 @@ use std::time::Instant;
use once_cell::sync::Lazy; use once_cell::sync::Lazy;
use opentelemetry::KeyValue; use opentelemetry::KeyValue;
use opentelemetry::{ use opentelemetry::{
metrics::{Histogram, MetricsError, ObservableUpDownCounter, Unit}, metrics::{Histogram, MetricsError, Unit, UpDownCounter},
Context as TelemetryContext, Context as TelemetryContext,
}; };
@ -17,9 +17,9 @@ pub static RPC_SERVER_DURATION: Lazy<Histogram<u64>> = Lazy::new(|| {
.init() .init()
}); });
pub static RPC_SERVER_ACTIVE_CONNECTIONS: Lazy<ObservableUpDownCounter<i64>> = Lazy::new(|| { pub static RPC_SERVER_ACTIVE_CONNECTIONS: Lazy<UpDownCounter<i64>> = Lazy::new(|| {
METER_DURATION METER_DURATION
.i64_observable_up_down_counter("rpc.server.active_connections") .i64_up_down_counter("rpc.server.active_connections")
.with_description("The number of active WebSocket connections.") .with_description("The number of active WebSocket connections.")
.init() .init()
}); });
@ -57,8 +57,8 @@ pub fn on_disconnect() -> Result<(), MetricsError> {
pub(super) fn observe_active_connection(value: i64) -> Result<(), MetricsError> { pub(super) fn observe_active_connection(value: i64) -> Result<(), MetricsError> {
let attrs = otel_common_attrs(); let attrs = otel_common_attrs();
METER_DURATION RPC_SERVER_ACTIVE_CONNECTIONS.add(&TelemetryContext::current(), value, &attrs);
.register_callback(move |cx| RPC_SERVER_ACTIVE_CONNECTIONS.observe(cx, value, &attrs)) Ok(())
} }
// //