From f7a8b5dff9849768eb18530c5686ee5668011642 Mon Sep 17 00:00:00 2001 From: Tobie Morgan Hitchcock Date: Sun, 15 Sep 2024 23:31:10 +0100 Subject: [PATCH] Improve logging, tracing, and provider integration (#4772) --- Cargo.lock | 8 +- Makefile.toml | 4 +- core/Cargo.toml | 2 +- core/src/dbs/executor.rs | 6 +- core/src/dbs/iterator.rs | 4 +- core/src/gql/schema.rs | 2 +- core/src/kvs/tr.rs | 33 +++++++- core/src/sql/datetime.rs | 2 +- core/src/syn/mod.rs | 117 +++++++++++++-------------- core/src/syn/test.rs | 12 ++- doc/TELEMETRY.md | 2 +- src/cli/export.rs | 3 +- src/cli/fix.rs | 4 +- src/cli/import.rs | 3 +- src/cli/isready.rs | 2 +- src/cli/ml/export.rs | 3 +- src/cli/ml/import.rs | 3 +- src/cli/sql.rs | 3 +- src/cli/start.rs | 12 +-- src/cli/upgrade.rs | 2 +- src/cli/version.rs | 2 +- src/cnf/mod.rs | 8 +- src/err/mod.rs | 23 +++++- src/telemetry/logs/mod.rs | 17 ++-- src/telemetry/metrics/http/mod.rs | 2 - src/telemetry/metrics/mod.rs | 126 ++++++++++++++---------------- src/telemetry/metrics/ws/mod.rs | 1 - src/telemetry/mod.rs | 64 ++++++++------- src/telemetry/traces/mod.rs | 63 +++++++-------- src/telemetry/traces/otlp.rs | 23 ------ tests/cli_integration.rs | 2 +- 31 files changed, 292 insertions(+), 266 deletions(-) delete mode 100644 src/telemetry/traces/otlp.rs diff --git a/Cargo.lock b/Cargo.lock index bf210582..6d54927f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5990,9 +5990,9 @@ dependencies = [ [[package]] name = "surrealcs" -version = "0.3.0" +version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9a77651296e07271c28b82ff9eb12ce7e834565d1b7f4607241f41b82d343fbe" +checksum = "457eb2adc59f9a9ba337e1fa3a6af63e09524e4adf01921b9d7079917cd7f694" dependencies = [ "bincode", "bytes", @@ -6011,9 +6011,9 @@ dependencies = [ [[package]] name = "surrealcs-kernel" -version = "0.3.0" +version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b47d835a86fe1080acbeb24352dbce92e4019f84bf28fe76c16d25b477eef032" +checksum = "f4a79da58bfc886b93a431a463f956184d4b0a9d3672833544a1b718ff86b0df" dependencies = [ "bincode", "chrono", diff --git a/Makefile.toml b/Makefile.toml index 76be2f30..fc2586aa 100644 --- a/Makefile.toml +++ b/Makefile.toml @@ -10,8 +10,8 @@ reduce_output = true default_to_workspace = false [env] -ALL_FEATURES={ value = "storage-mem,storage-surrealkv,storage-rocksdb,storage-tikv,storage-fdb,scripting,http,jwks,ml,storage-fdb-7_1", condition = { env_not_set = ["ALL_FEATURES"] } } -DEV_FEATURES={ value = "storage-mem,storage-surrealkv,scripting,http,jwks,ml", condition = { env_not_set = ["DEV_FEATURES"] } } +ALL_FEATURES={ value = "storage-mem,storage-surrealkv,storage-surrealcs,storage-rocksdb,storage-tikv,storage-fdb,scripting,http,jwks,ml,storage-fdb-7_1", condition = { env_not_set = ["ALL_FEATURES"] } } +DEV_FEATURES={ value = "storage-mem,storage-surrealkv,storage-surrealcs,scripting,http,jwks,ml", condition = { env_not_set = ["DEV_FEATURES"] } } SURREAL_LOG={ value = "full", condition = { env_not_set = ["SURREAL_LOG"] } } SURREAL_USER={ value = "root", condition = { env_not_set = ["SURREAL_USER"] } } SURREAL_PASS={ value = "root", condition = { env_not_set = ["SURREAL_PASS"] } } diff --git a/core/Cargo.toml b/core/Cargo.toml index e84f7129..daf60a01 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -145,7 +145,7 @@ sha2 = "0.10.8" snap = "1.1.0" storekey = "0.5.0" subtle = "2.6" -surrealcs = { version = "0.3.0", optional = true } +surrealcs = { version = "0.3.1", optional = true } surrealkv = { version = "0.3.4", optional = true } surrealml = { version = "0.1.1", optional = true, package = "surrealml-core" } tempfile = { version = "3.10.1", optional = true } diff --git a/core/src/dbs/executor.rs b/core/src/dbs/executor.rs index 03413e6a..3fa1528f 100644 --- a/core/src/dbs/executor.rs +++ b/core/src/dbs/executor.rs @@ -27,6 +27,8 @@ use trice::Instant; #[cfg(target_arch = "wasm32")] use wasm_bindgen_futures::spawn_local as spawn; +const TARGET: &str = "surrealdb::core::dbs"; + pub(crate) struct Executor<'a> { err: bool, kvs: &'a Datastore, @@ -184,7 +186,7 @@ impl<'a> Executor<'a> { Ok(ctx.freeze()) } - #[instrument(level = "debug", name = "executor", skip_all)] + #[instrument(level = "debug", name = "executor", target = "surrealdb::core::dbs", skip_all)] pub async fn execute( &mut self, mut ctx: Context, @@ -207,7 +209,7 @@ impl<'a> Executor<'a> { // Process all statements in query for stm in qry.into_iter() { // Log the statement - debug!("Executing: {}", stm); + trace!(target: TARGET, statement = %stm, "Executing statement"); // Reset errors if self.txn.is_none() { self.err = false; diff --git a/core/src/dbs/iterator.rs b/core/src/dbs/iterator.rs index dc1587d9..6aad6301 100644 --- a/core/src/dbs/iterator.rs +++ b/core/src/dbs/iterator.rs @@ -22,6 +22,8 @@ use reblessive::TreeStack; use std::mem; use std::sync::Arc; +const TARGET: &str = "surrealdb::core::dbs"; + #[derive(Clone)] pub(crate) enum Iterable { Value(Value), @@ -290,7 +292,7 @@ impl Iterator { stm: &Statement<'_>, ) -> Result { // Log the statement - trace!("Iterating: {}", stm); + trace!(target: TARGET, statement = %stm, "Iterating statement"); // Enable context override let mut cancel_ctx = MutableContext::new(ctx); self.run = cancel_ctx.add_cancel(); diff --git a/core/src/gql/schema.rs b/core/src/gql/schema.rs index ba585b38..8d127ab4 100644 --- a/core/src/gql/schema.rs +++ b/core/src/gql/schema.rs @@ -919,7 +919,7 @@ fn gql_to_sql_kind(val: &GqlValue, kind: Kind) -> Result { _ => Err(type_error(kind, val)), }, Kind::Datetime => match val { - GqlValue::String(s) => match syn::datetime_raw(s) { + GqlValue::String(s) => match syn::datetime(s) { Ok(dt) => Ok(dt.into()), Err(_) => Err(type_error(kind, val)), }, diff --git a/core/src/kvs/tr.rs b/core/src/kvs/tr.rs index 3abd0bf9..04c0c972 100644 --- a/core/src/kvs/tr.rs +++ b/core/src/kvs/tr.rs @@ -7,7 +7,6 @@ use crate::dbs::node::Timestamp; use crate::doc::CursorValue; use crate::err::Error; use crate::idg::u32::U32; -#[cfg(debug_assertions)] use crate::key::debug::Sprintable; use crate::kvs::batch::Batch; use crate::kvs::clock::SizedClock; @@ -30,6 +29,8 @@ use std::fmt::Debug; use std::ops::Range; use std::sync::Arc; +const TARGET: &str = "surrealdb::core::kvs::tr"; + /// Used to determine the behaviour when a transaction is not closed correctly #[derive(Debug, Default)] pub enum Check { @@ -184,6 +185,7 @@ impl Transactor { /// in a [`Error::TxFinished`] error. #[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)] pub async fn closed(&self) -> bool { + trace!(target: TARGET, "Closed"); expand_inner!(&self.inner, v => { v.closed() }) } @@ -192,6 +194,7 @@ impl Transactor { /// This reverses all changes made within the transaction. #[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)] pub async fn cancel(&mut self) -> Result<(), Error> { + trace!(target: TARGET, "Cancel"); expand_inner!(&mut self.inner, v => { v.cancel().await }) } @@ -200,6 +203,7 @@ impl Transactor { /// This attempts to commit all changes made within the transaction. #[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)] pub async fn commit(&mut self) -> Result<(), Error> { + trace!(target: TARGET, "Commit"); expand_inner!(&mut self.inner, v => { v.commit().await }) } @@ -210,6 +214,7 @@ impl Transactor { K: Into + Debug, { let key = key.into(); + trace!(target: TARGET, key = key.sprint(), "Exists"); expand_inner!(&mut self.inner, v => { v.exists(key).await }) } @@ -220,6 +225,7 @@ impl Transactor { K: Into + Debug, { let key = key.into(); + trace!(target: TARGET, key = key.sprint(), version = version, "Get"); expand_inner!(&mut self.inner, v => { v.get(key, version).await }) } @@ -230,6 +236,7 @@ impl Transactor { K: Into + Debug, { let keys = keys.into_iter().map(Into::into).collect::>(); + trace!(target: TARGET, keys = keys.sprint(), "GetM"); expand_inner!(&mut self.inner, v => { v.getm(keys).await }) } @@ -247,6 +254,8 @@ impl Transactor { { let beg: Key = rng.start.into(); let end: Key = rng.end.into(); + let rng = beg.as_slice()..end.as_slice(); + trace!(target: TARGET, rng = rng.sprint(), version = version, "GetR"); expand_inner!(&mut self.inner, v => { v.getr(beg..end, version).await }) } @@ -259,6 +268,7 @@ impl Transactor { K: Into + Debug, { let key: Key = key.into(); + trace!(target: TARGET, key = key.sprint(), "GetP"); expand_inner!(&mut self.inner, v => { v.getp(key).await }) } @@ -270,6 +280,7 @@ impl Transactor { V: Into + Debug, { let key = key.into(); + trace!(target: TARGET, key = key.sprint(), version = version, "Set"); expand_inner!(&mut self.inner, v => { v.set(key, val, version).await }) } @@ -281,6 +292,7 @@ impl Transactor { V: Into + Debug, { let key = key.into(); + trace!(target: TARGET, key = key.sprint(), version = version, "Put"); expand_inner!(&mut self.inner, v => { v.put(key, val, version).await }) } @@ -292,6 +304,7 @@ impl Transactor { V: Into + Debug, { let key = key.into(); + trace!(target: TARGET, key = key.sprint(), "PutC"); expand_inner!(&mut self.inner, v => { v.putc(key, val, chk).await }) } @@ -302,6 +315,7 @@ impl Transactor { K: Into + Debug, { let key = key.into(); + trace!(target: TARGET, key = key.sprint(), "Del"); expand_inner!(&mut self.inner, v => { v.del(key).await }) } @@ -313,6 +327,7 @@ impl Transactor { V: Into + Debug, { let key = key.into(); + trace!(target: TARGET, key = key.sprint(), "DelC"); expand_inner!(&mut self.inner, v => { v.delc(key, chk).await }) } @@ -326,6 +341,8 @@ impl Transactor { { let beg: Key = rng.start.into(); let end: Key = rng.end.into(); + let rng = beg.as_slice()..end.as_slice(); + trace!(target: TARGET, rng = rng.sprint(), "DelR"); expand_inner!(&mut self.inner, v => { v.delr(beg..end).await }) } @@ -338,6 +355,7 @@ impl Transactor { K: Into + Debug, { let key: Key = key.into(); + trace!(target: TARGET, key = key.sprint(), "DelP"); expand_inner!(&mut self.inner, v => { v.delp(key).await }) } @@ -351,6 +369,11 @@ impl Transactor { { let beg: Key = rng.start.into(); let end: Key = rng.end.into(); + let rng = beg.as_slice()..end.as_slice(); + trace!(target: TARGET, rng = rng.sprint(), limit = limit, "Keys"); + if beg > end { + return Ok(vec![]); + } expand_inner!(&mut self.inner, v => { v.keys(beg..end, limit).await }) } @@ -369,6 +392,8 @@ impl Transactor { { let beg: Key = rng.start.into(); let end: Key = rng.end.into(); + let rng = beg.as_slice()..end.as_slice(); + trace!(target: TARGET, rng = rng.sprint(), limit = limit, version = version, "Scan"); if beg > end { return Ok(vec![]); } @@ -391,6 +416,8 @@ impl Transactor { { let beg: Key = rng.start.into(); let end: Key = rng.end.into(); + let rng = beg.as_slice()..end.as_slice(); + trace!(target: TARGET, rng = rng.sprint(), values = values, version = version, "Batch"); expand_inner!(&mut self.inner, v => { v.batch(beg..end, batch, values, version).await }) } @@ -587,8 +614,8 @@ impl Transactor { // on other concurrent transactions that can write to the ts_key or the keys after it. let key = crate::key::database::vs::new(ns, db); let vst = self.get_timestamp(key).await?; - #[cfg(debug_assertions)] trace!( + target: TARGET, "Setting timestamp {} for versionstamp {:?} in ns: {}, db: {}", ts, crate::vs::conv::versionstamp_to_u64(&vst), @@ -604,8 +631,8 @@ impl Transactor { let ts_pairs: Vec<(Vec, Vec)> = self.getr(begin..end, None).await?; let latest_ts_pair = ts_pairs.last(); if let Some((k, _)) = latest_ts_pair { - #[cfg(debug_assertions)] trace!( + target: TARGET, "There already was a greater committed timestamp {} in ns: {}, db: {} found: {}", ts, ns, diff --git a/core/src/sql/datetime.rs b/core/src/sql/datetime.rs index a62ea7ce..82120a24 100644 --- a/core/src/sql/datetime.rs +++ b/core/src/sql/datetime.rs @@ -62,7 +62,7 @@ impl TryFrom for Datetime { impl TryFrom<&str> for Datetime { type Error = (); fn try_from(v: &str) -> Result { - match syn::datetime_raw(v) { + match syn::datetime(v) { Ok(v) => Ok(v), _ => Err(()), } diff --git a/core/src/syn/mod.rs b/core/src/syn/mod.rs index 3ad1558b..1dd7fee0 100644 --- a/core/src/syn/mod.rs +++ b/core/src/syn/mod.rs @@ -23,6 +23,8 @@ use parser::Parser; use reblessive::Stack; use token::t; +const TARGET: &str = "surrealdb::core::syn"; + /// Takes a string and returns if it could be a reserved keyword in certain contexts. pub fn could_be_reserved_keyword(s: &str) -> bool { lexer::keywords::could_be_reserved(s) @@ -38,9 +40,9 @@ pub fn could_be_reserved_keyword(s: &str) -> bool { /// /// If you encounter this limit and believe that it should be increased, /// please [open an issue](https://github.com/surrealdb/surrealdb/issues)! -#[instrument(level = "debug", name = "parser", skip_all, fields(length = input.len()))] +#[instrument(level = "trace", target = "surrealdb::core::syn", fields(length = input.len()))] pub fn parse(input: &str) -> Result { - debug!("parsing query, input = {input}"); + trace!(target: TARGET, "Parsing SurrealQL query"); let mut parser = Parser::new(input.as_bytes()); let mut stack = Stack::new(); stack @@ -51,9 +53,9 @@ pub fn parse(input: &str) -> Result { } /// Parses a SurrealQL [`Value`]. -#[instrument(level = "debug", name = "parser", skip_all, fields(length = input.len()))] +#[instrument(level = "trace", target = "surrealdb::core::syn", fields(length = input.len()))] pub fn value(input: &str) -> Result { - debug!("parsing value, input = {input}"); + trace!(target: TARGET, "Parsing SurrealQL value"); let mut parser = Parser::new(input.as_bytes()); let mut stack = Stack::new(); stack @@ -64,40 +66,10 @@ pub fn value(input: &str) -> Result { .map_err(Error::InvalidQuery) } -/// Parses a SurrealQL [`Value`]. -#[cfg(test)] -#[instrument(level = "debug", name = "parser", skip_all, fields(length = input.len()))] -pub(crate) fn value_field(input: &str) -> Result { - debug!("parsing value, input = {input}"); - let mut parser = Parser::new(input.as_bytes()); - let mut stack = Stack::new(); - stack - .enter(|stk| parser.parse_value_field(stk)) - .finish() - .and_then(|e| parser.assert_finished().map(|_| e)) - .map_err(|e| e.render_on(input)) - .map_err(Error::InvalidQuery) -} - -/// Parses a SurrealQL [`Value`]. -#[instrument(level = "debug", name = "parser", skip_all, fields(length = input.len()))] -pub fn value_legacy_strand(input: &str) -> Result { - debug!("parsing value with legacy strings, input = {input}"); - let mut parser = Parser::new(input.as_bytes()); - let mut stack = Stack::new(); - parser.allow_legacy_strand(true); - stack - .enter(|stk| parser.parse_value_table(stk)) - .finish() - .and_then(|e| parser.assert_finished().map(|_| e)) - .map_err(|e| e.render_on(input)) - .map_err(Error::InvalidQuery) -} - /// Parses JSON into an inert SurrealQL [`Value`] -#[instrument(level = "debug", name = "parser", skip_all, fields(length = input.len()))] +#[instrument(level = "trace", target = "surrealdb::core::syn", fields(length = input.len()))] pub fn json(input: &str) -> Result { - debug!("parsing json, input = {input}"); + trace!(target: TARGET, "Parsing inert JSON value"); let mut parser = Parser::new(input.as_bytes()); let mut stack = Stack::new(); stack @@ -108,24 +80,10 @@ pub fn json(input: &str) -> Result { .map_err(Error::InvalidQuery) } -/// Parses JSON into an inert SurrealQL [`Value`] -#[instrument(level = "debug", name = "parser", skip_all, fields(length = input.len()))] -pub fn json_legacy_strand(input: &str) -> Result { - debug!("parsing json with legacy strings, input = {input}"); - let mut parser = Parser::new(input.as_bytes()); - let mut stack = Stack::new(); - parser.allow_legacy_strand(true); - stack - .enter(|stk| parser.parse_json(stk)) - .finish() - .and_then(|e| parser.assert_finished().map(|_| e)) - .map_err(|e| e.render_on(input)) - .map_err(Error::InvalidQuery) -} /// Parses a SurrealQL Subquery [`Subquery`] -#[instrument(level = "debug", name = "parser", skip_all, fields(length = input.len()))] +#[instrument(level = "trace", target = "surrealdb::core::syn", fields(length = input.len()))] pub fn subquery(input: &str) -> Result { - debug!("parsing subquery, input = {input}"); + trace!(target: TARGET, "Parsing SurrealQL subquery"); let mut parser = Parser::new(input.as_bytes()); let mut stack = Stack::new(); stack @@ -137,9 +95,9 @@ pub fn subquery(input: &str) -> Result { } /// Parses a SurrealQL [`Idiom`] -#[instrument(level = "debug", name = "parser", skip_all, fields(length = input.len()))] +#[instrument(level = "trace", target = "surrealdb::core::syn", fields(length = input.len()))] pub fn idiom(input: &str) -> Result { - debug!("parsing idiom, input = {input}"); + trace!(target: TARGET, "Parsing SurrealQL idiom"); let mut parser = Parser::new(input.as_bytes()); parser.table_as_field = true; let mut stack = Stack::new(); @@ -152,8 +110,9 @@ pub fn idiom(input: &str) -> Result { } /// Parse a datetime without enclosing delimiters from a string. -pub fn datetime_raw(input: &str) -> Result { - debug!("parsing datetime, input = {input}"); +#[instrument(level = "trace", target = "surrealdb::core::syn", fields(length = input.len()))] +pub fn datetime(input: &str) -> Result { + trace!(target: TARGET, "Parsing SurrealQL datetime"); let mut lexer = Lexer::new(input.as_bytes()); let res = compound::datetime_inner(&mut lexer); if let Err(e) = lexer.assert_finished() { @@ -163,8 +122,9 @@ pub fn datetime_raw(input: &str) -> Result { } /// Parse a duration from a string. +#[instrument(level = "trace", target = "surrealdb::core::syn", fields(length = input.len()))] pub fn duration(input: &str) -> Result { - debug!("parsing duration, input = {input}"); + trace!(target: TARGET, "Parsing SurrealQL duration"); let mut parser = Parser::new(input.as_bytes()); parser .next_token_value::() @@ -173,9 +133,10 @@ pub fn duration(input: &str) -> Result { .map_err(Error::InvalidQuery) } -/// Parse a range +/// Parse a range. +#[instrument(level = "trace", target = "surrealdb::core::syn", fields(length = input.len()))] pub fn range(input: &str) -> Result { - debug!("parsing range, input = {input}"); + trace!(target: TARGET, "Parsing SurrealQL range"); let mut parser = Parser::new(input.as_bytes()); let mut stack = Stack::new(); stack @@ -187,8 +148,9 @@ pub fn range(input: &str) -> Result { } /// Parse a record id. +#[instrument(level = "trace", target = "surrealdb::core::syn", fields(length = input.len()))] pub fn thing(input: &str) -> Result { - debug!("parsing thing, input = {input}"); + trace!(target: TARGET, "Parsing SurrealQL thing"); let mut parser = Parser::new(input.as_bytes()); let mut stack = Stack::new(); stack @@ -200,12 +162,11 @@ pub fn thing(input: &str) -> Result { } /// Parse a block, expects the value to be wrapped in `{}`. +#[instrument(level = "trace", target = "surrealdb::core::syn", fields(length = input.len()))] pub fn block(input: &str) -> Result { - debug!("parsing block, input = {input}"); - + trace!(target: TARGET, "Parsing SurrealQL block"); let mut parser = Parser::new(input.as_bytes()); let mut stack = Stack::new(); - let token = parser.peek(); match token.kind { t!("{") => { @@ -224,3 +185,33 @@ pub fn block(input: &str) -> Result { )), } } + +/// Parses a SurrealQL [`Value`] and parses values within strings. +#[instrument(level = "trace", target = "surrealdb::core::syn", fields(length = input.len()))] +pub fn value_legacy_strand(input: &str) -> Result { + trace!(target: TARGET, "Parsing SurrealQL value, with legacy strings"); + let mut parser = Parser::new(input.as_bytes()); + let mut stack = Stack::new(); + parser.allow_legacy_strand(true); + stack + .enter(|stk| parser.parse_value_table(stk)) + .finish() + .and_then(|e| parser.assert_finished().map(|_| e)) + .map_err(|e| e.render_on(input)) + .map_err(Error::InvalidQuery) +} + +/// Parses JSON into an inert SurrealQL [`Value`] and parses values within strings. +#[instrument(level = "trace", target = "surrealdb::core::syn", fields(length = input.len()))] +pub fn json_legacy_strand(input: &str) -> Result { + trace!(target: TARGET, "Parsing inert JSON value, with legacy strings"); + let mut parser = Parser::new(input.as_bytes()); + let mut stack = Stack::new(); + parser.allow_legacy_strand(true); + stack + .enter(|stk| parser.parse_json(stk)) + .finish() + .and_then(|e| parser.assert_finished().map(|_| e)) + .map_err(|e| e.render_on(input)) + .map_err(Error::InvalidQuery) +} diff --git a/core/src/syn/test.rs b/core/src/syn/test.rs index 0650e6a7..f278d5f0 100644 --- a/core/src/syn/test.rs +++ b/core/src/syn/test.rs @@ -4,12 +4,22 @@ use super::lexer::Lexer; use super::parse; use super::parser::Parser; use super::Parse; +use crate::err::Error; use crate::sql::{Array, Expression, Ident, Idiom, Param, Script, Thing, Value}; use crate::syn::token::{t, TokenKind}; impl Parse for Value { fn parse(val: &str) -> Self { - super::value_field(val).inspect_err(|e| eprintln!("{e}")).unwrap() + let mut parser = Parser::new(val.as_bytes()); + let mut stack = Stack::new(); + stack + .enter(|stk| parser.parse_value_field(stk)) + .finish() + .and_then(|e| parser.assert_finished().map(|_| e)) + .map_err(|e| e.render_on(val)) + .map_err(Error::InvalidQuery) + .inspect_err(|e| eprintln!("{e}")) + .unwrap() } } diff --git a/doc/TELEMETRY.md b/doc/TELEMETRY.md index 82e4d2ce..7fc375e3 100644 --- a/doc/TELEMETRY.md +++ b/doc/TELEMETRY.md @@ -8,7 +8,7 @@ For local development, you can start the observability stack defined in `dev/doc ``` $ docker-compose -f dev/docker/compose.yaml up -d -$ SURREAL_TRACING_TRACER=otlp OTEL_EXPORTER_OTLP_ENDPOINT="http://localhost:4317" surreal start +$ SURREAL_TELEMETRY_PROVIDER=otlp OTEL_EXPORTER_OTLP_ENDPOINT="http://localhost:4317" surreal start ``` Now you can use the SurrealDB server and see the telemetry data opening this URL in the browser: http://localhost:3000 diff --git a/src/cli/export.rs b/src/cli/export.rs index f7a37e70..74ebb24d 100644 --- a/src/cli/export.rs +++ b/src/cli/export.rs @@ -14,7 +14,6 @@ pub struct ExportCommandArguments { #[arg(default_value = "-")] #[arg(index = 1)] file: String, - #[command(flatten)] conn: DatabaseConnectionArguments, #[command(flatten)] @@ -42,7 +41,7 @@ pub async fn init( }: ExportCommandArguments, ) -> Result<(), Error> { // Initialize opentelemetry and logging - crate::telemetry::builder().with_log_level("error").init(); + crate::telemetry::builder().with_log_level("info").init()?; // If username and password are specified, and we are connecting to a remote SurrealDB server, then we need to authenticate. // If we are connecting directly to a datastore (i.e. surrealkv://local.skv or tikv://...), then we don't need to authenticate because we use an embedded (local) SurrealDB instance with auth disabled. diff --git a/src/cli/fix.rs b/src/cli/fix.rs index c4cc3e73..9c9a5937 100644 --- a/src/cli/fix.rs +++ b/src/cli/fix.rs @@ -26,9 +26,7 @@ pub async fn init( }: FixCommandArguments, ) -> Result<(), Error> { // Initialize opentelemetry and logging - crate::telemetry::builder().with_filter(log).init(); - // Start metrics subsystem - crate::telemetry::metrics::init().expect("failed to initialize metrics"); + crate::telemetry::builder().with_filter(log).init()?; // Clean the path let endpoint = path.into_endpoint()?; let path = if endpoint.path.is_empty() { diff --git a/src/cli/import.rs b/src/cli/import.rs index a892e355..451499e2 100644 --- a/src/cli/import.rs +++ b/src/cli/import.rs @@ -39,10 +39,9 @@ pub async fn init( }: ImportCommandArguments, ) -> Result<(), Error> { // Initialize opentelemetry and logging - crate::telemetry::builder().with_log_level("info").init(); + crate::telemetry::builder().with_log_level("info").init()?; // Default datastore configuration for local engines let config = Config::new().capabilities(Capabilities::all()); - // If username and password are specified, and we are connecting to a remote SurrealDB server, then we need to authenticate. // If we are connecting directly to a datastore (i.e. surrealkv://local.skv or tikv://...), then we don't need to authenticate because we use an embedded (local) SurrealDB instance with auth disabled. let client = if username.is_some() diff --git a/src/cli/isready.rs b/src/cli/isready.rs index 4ee517e6..bd707d9e 100644 --- a/src/cli/isready.rs +++ b/src/cli/isready.rs @@ -17,7 +17,7 @@ pub async fn init( }: IsReadyCommandArguments, ) -> Result<(), Error> { // Initialize opentelemetry and logging - crate::telemetry::builder().with_log_level("error").init(); + crate::telemetry::builder().with_log_level("error").init()?; // Connect to the database engine connect(endpoint).await?; println!("OK"); diff --git a/src/cli/ml/export.rs b/src/cli/ml/export.rs index 55a0d6bb..313c594d 100644 --- a/src/cli/ml/export.rs +++ b/src/cli/ml/export.rs @@ -58,8 +58,7 @@ pub async fn init( }: ExportCommandArguments, ) -> Result<(), Error> { // Initialize opentelemetry and logging - crate::telemetry::builder().with_log_level("error").init(); - + crate::telemetry::builder().with_log_level("info").init()?; // If username and password are specified, and we are connecting to a remote SurrealDB server, then we need to authenticate. // If we are connecting directly to a datastore (i.e. surrealkv://local.skv or tikv://...), then we don't need to authenticate because we use an embedded (local) SurrealDB instance with auth disabled. let client = if username.is_some() diff --git a/src/cli/ml/import.rs b/src/cli/ml/import.rs index e1d32167..23f5518f 100644 --- a/src/cli/ml/import.rs +++ b/src/cli/ml/import.rs @@ -39,10 +39,9 @@ pub async fn init( }: ImportCommandArguments, ) -> Result<(), Error> { // Initialize opentelemetry and logging - crate::telemetry::builder().with_log_level("info").init(); + crate::telemetry::builder().with_log_level("info").init()?; // Default datastore configuration for local engines let config = Config::new().capabilities(Capabilities::all()); - // If username and password are specified, and we are connecting to a remote SurrealDB server, then we need to authenticate. // If we are connecting directly to a datastore (i.e. surrealkv://local.skv or tikv://...), then we don't need to authenticate because we use an embedded (local) SurrealDB instance with auth disabled. let client = if username.is_some() diff --git a/src/cli/sql.rs b/src/cli/sql.rs index b2e1a7e2..7e13dde1 100644 --- a/src/cli/sql.rs +++ b/src/cli/sql.rs @@ -62,10 +62,9 @@ pub async fn init( }: SqlCommandArguments, ) -> Result<(), Error> { // Initialize opentelemetry and logging - crate::telemetry::builder().with_log_level("warn").init(); + crate::telemetry::builder().with_log_level("warn").init()?; // Default datastore configuration for local engines let config = Config::new().capabilities(Capabilities::all()); - // If username and password are specified, and we are connecting to a remote SurrealDB server, then we need to authenticate. // If we are connecting directly to a datastore (i.e. surrealkv://local.skv or tikv://...), then we don't need to authenticate because we use an embedded (local) SurrealDB instance with auth disabled. let client = if username.is_some() diff --git a/src/cli/start.rs b/src/cli/start.rs index 96ae064b..3493f24e 100644 --- a/src/cli/start.rs +++ b/src/cli/start.rs @@ -39,7 +39,9 @@ pub struct StartCommandArguments { #[arg(value_parser = super::validator::key_valid)] #[arg(hide = true)] // Not currently in use key: Option, - + // + // Tasks + // #[arg( help = "The interval at which to run node agent tick (including garbage collection)", help_heading = "Database" @@ -47,7 +49,6 @@ pub struct StartCommandArguments { #[arg(env = "SURREAL_TICK_INTERVAL", long = "tick-interval", value_parser = super::validator::duration)] #[arg(default_value = "10s")] tick_interval: Duration, - // // Authentication // @@ -75,14 +76,12 @@ pub struct StartCommandArguments { requires = "username" )] password: Option, - // // Datastore connection // #[command(next_help_heading = "Datastore connection")] #[command(flatten)] kvs: Option, - // // HTTP Server // @@ -151,10 +150,7 @@ pub async fn init( }: StartCommandArguments, ) -> Result<(), Error> { // Initialize opentelemetry and logging - crate::telemetry::builder().with_filter(log).init(); - // Start metrics subsystem - crate::telemetry::metrics::init().expect("failed to initialize metrics"); - + crate::telemetry::builder().with_filter(log).init()?; // Check if we should output a banner if !no_banner { println!("{LOGO}"); diff --git a/src/cli/upgrade.rs b/src/cli/upgrade.rs index 951c8099..46860c83 100644 --- a/src/cli/upgrade.rs +++ b/src/cli/upgrade.rs @@ -88,7 +88,7 @@ pub(crate) fn parse_version(input: &str) -> Result { pub async fn init(args: UpgradeCommandArguments) -> Result<(), Error> { // Initialize opentelemetry and logging - crate::telemetry::builder().with_log_level("error").init(); + crate::telemetry::builder().with_log_level("error").init()?; // Upgrading overwrites the existing executable let exe = std::env::current_exe()?; diff --git a/src/cli/version.rs b/src/cli/version.rs index 488f6a9f..b3c22b49 100644 --- a/src/cli/version.rs +++ b/src/cli/version.rs @@ -18,7 +18,7 @@ pub async fn init( }: VersionCommandArguments, ) -> Result<(), Error> { // Initialize opentelemetry and logging - crate::telemetry::builder().with_log_level("error").init(); + crate::telemetry::builder().with_log_level("error").init()?; // Print server version if endpoint supplied else CLI version if let Some(e) = endpoint { // Print remote server version diff --git a/src/cnf/mod.rs b/src/cnf/mod.rs index 9cd920fc..b5a2d2b7 100644 --- a/src/cnf/mod.rs +++ b/src/cnf/mod.rs @@ -19,14 +19,14 @@ pub const DEBUG_BUILD_WARNING: &str = "\ ┌─────────────────────────────────────────────────────────────────────────────┐ │ !!! THIS IS A DEVELOPMENT BUILD !!! │ │ Development builds are not intended for production use and include │ -│ tooling and features that may affect the performance of the database. | +│ tooling and features that may affect the performance of the database. │ └─────────────────────────────────────────────────────────────────────────────┘"; /// The publicly visible name of the server pub const PKG_NAME: &str = "surrealdb"; /// The public endpoint for the administration interface -pub const APP_ENDPOINT: &str = "https://surrealdb.com/app"; +pub const APP_ENDPOINT: &str = "https://surrealdb.com/surrealist"; /// The maximum HTTP body size of the HTTP /ml endpoints (defaults to 4 GiB) pub static HTTP_MAX_ML_BODY_SIZE: LazyLock = @@ -86,6 +86,10 @@ pub static RUNTIME_STACK_SIZE: LazyLock = pub static RUNTIME_MAX_BLOCKING_THREADS: LazyLock = lazy_env_parse!("SURREAL_RUNTIME_MAX_BLOCKING_THREADS", usize, 512); +/// How many threads which can be started for blocking operations (defaults to 512) +pub static TELEMETRY_PROVIDER: LazyLock = + lazy_env_parse!("SURREAL_TELEMETRY_PROVIDER", String, String::new()); + /// The version identifier of this build pub static PKG_VERSION: LazyLock = LazyLock::new(|| match option_env!("SURREAL_BUILD_METADATA") { diff --git a/src/err/mod.rs b/src/err/mod.rs index 670f4718..de6a6449 100644 --- a/src/err/mod.rs +++ b/src/err/mod.rs @@ -4,6 +4,7 @@ use axum::Error as AxumError; use axum::Json; use base64::DecodeError as Base64Error; use http::{HeaderName, StatusCode}; +use opentelemetry::global::Error as OpentelemetryError; use reqwest::Error as ReqwestError; use serde::Serialize; use std::io::Error as IoError; @@ -63,8 +64,8 @@ pub enum Error { #[error("There was an error with auth: {0}")] Auth(#[from] SurrealAuthError), - #[error("There was an error with the node agent")] - NodeAgent, + #[error("There was an error with opentelemetry: {0}")] + Otel(#[from] OpentelemetryError), /// Statement has been deprecated #[error("{0}")] @@ -113,6 +114,24 @@ impl From for Error { } } +impl From for Error { + fn from(e: opentelemetry::logs::LogError) -> Error { + Error::Otel(OpentelemetryError::Log(e)) + } +} + +impl From for Error { + fn from(e: opentelemetry::trace::TraceError) -> Error { + Error::Otel(OpentelemetryError::Trace(e)) + } +} + +impl From for Error { + fn from(e: opentelemetry::metrics::MetricsError) -> Error { + Error::Otel(OpentelemetryError::Metric(e)) + } +} + impl From> for Error { fn from(e: ciborium::de::Error) -> Error { Error::Cbor(format!("{e}")) diff --git a/src/telemetry/logs/mod.rs b/src/telemetry/logs/mod.rs index ff70e6fc..5a81f3b9 100644 --- a/src/telemetry/logs/mod.rs +++ b/src/telemetry/logs/mod.rs @@ -1,26 +1,31 @@ use crate::cli::validator::parser::env_filter::CustomEnvFilter; +use crate::err::Error; use tracing::Subscriber; use tracing_subscriber::fmt::format::FmtSpan; use tracing_subscriber::Layer; -pub fn new(filter: CustomEnvFilter) -> Box + Send + Sync> +pub fn new(filter: CustomEnvFilter) -> Result + Send + Sync>, Error> where S: Subscriber + for<'a> tracing_subscriber::registry::LookupSpan<'a> + Send + Sync, { #[cfg(not(debug_assertions))] { - tracing_subscriber::fmt::layer() + Ok(tracing_subscriber::fmt::layer() .compact() .with_ansi(true) + .with_file(false) .with_target(true) + .with_line_number(false) + .with_thread_ids(false) + .with_thread_names(false) .with_span_events(FmtSpan::NONE) .with_writer(std::io::stderr) .with_filter(filter.0) - .boxed() + .boxed()) } #[cfg(debug_assertions)] { - tracing_subscriber::fmt::layer() + Ok(tracing_subscriber::fmt::layer() .compact() .with_ansi(true) .with_file(true) @@ -28,9 +33,9 @@ where .with_line_number(true) .with_thread_ids(false) .with_thread_names(false) - .with_span_events(FmtSpan::NEW | FmtSpan::CLOSE) + .with_span_events(FmtSpan::NONE) .with_writer(std::io::stderr) .with_filter(filter.0) - .boxed() + .boxed()) } } diff --git a/src/telemetry/metrics/http/mod.rs b/src/telemetry/metrics/http/mod.rs index e8234556..c0cdee98 100644 --- a/src/telemetry/metrics/http/mod.rs +++ b/src/telemetry/metrics/http/mod.rs @@ -41,13 +41,11 @@ pub static HTTP_SERVER_RESPONSE_SIZE: LazyLock> = LazyLock::new(| fn observe_active_request(value: i64, tracker: &HttpCallMetricTracker) -> Result<(), MetricsError> { let attrs = tracker.active_req_attrs(); - HTTP_SERVER_ACTIVE_REQUESTS.add(value, &attrs); Ok(()) } fn record_request_duration(tracker: &HttpCallMetricTracker) { - // Record the duration of the request. HTTP_SERVER_DURATION .record(tracker.duration().as_millis() as u64, &tracker.request_duration_attrs()); } diff --git a/src/telemetry/metrics/mod.rs b/src/telemetry/metrics/mod.rs index 3c365a2a..42235bcc 100644 --- a/src/telemetry/metrics/mod.rs +++ b/src/telemetry/metrics/mod.rs @@ -1,8 +1,8 @@ pub mod http; pub mod ws; +use crate::cnf::TELEMETRY_PROVIDER; use opentelemetry::metrics::MetricsError; -use opentelemetry::{global, Context as TelemetryContext}; use opentelemetry_otlp::MetricsExporterBuilder; use opentelemetry_sdk::metrics::reader::{DefaultAggregationSelector, DefaultTemporalitySelector}; use opentelemetry_sdk::metrics::{ @@ -24,72 +24,64 @@ static HISTOGRAM_BUCKETS_MS: &[f64] = &[ const KB: f64 = 1024.0; const MB: f64 = 1024.0 * KB; const HISTOGRAM_BUCKETS_BYTES: &[f64] = &[ - 1.0 * KB, // 1 KB - 2.0 * KB, // 2 KB - 5.0 * KB, // 5 KB - 10.0 * KB, // 10 KB - 100.0 * KB, // 100 KB - 500.0 * KB, // 500 KB - 1.0 * MB, // 1 MB - 2.5 * MB, // 2 MB - 5.0 * MB, // 5 MB - 10.0 * MB, // 10 MB - 25.0 * MB, // 25 MB - 50.0 * MB, // 50 MB - 100.0 * MB, // 100 MB + 1.0 * KB, + 2.0 * KB, + 5.0 * KB, + 10.0 * KB, + 100.0 * KB, + 500.0 * KB, + 1.0 * MB, + 2.5 * MB, + 5.0 * MB, + 10.0 * MB, + 25.0 * MB, + 50.0 * MB, + 100.0 * MB, ]; -fn build_controller() -> Result { - let exporter = MetricsExporterBuilder::from(opentelemetry_otlp::new_exporter().tonic()) - .build_metrics_exporter( - Box::new(DefaultTemporalitySelector::new()), - Box::new(DefaultAggregationSelector::new()), - ) - .unwrap(); - let reader = PeriodicReader::builder(exporter, runtime::Tokio).build(); - - let histo_duration_view = { - let criteria = Instrument::new().name("*.duration"); - let mask = Stream::new().aggregation(Aggregation::ExplicitBucketHistogram { - boundaries: HISTOGRAM_BUCKETS_MS.to_vec(), - record_min_max: true, - }); - opentelemetry_sdk::metrics::new_view(criteria, mask)? - }; - - let histo_size_view = { - let criteria = Instrument::new().name("*.size"); - let mask = Stream::new().aggregation(Aggregation::ExplicitBucketHistogram { - boundaries: HISTOGRAM_BUCKETS_BYTES.to_vec(), - record_min_max: true, - }); - opentelemetry_sdk::metrics::new_view(criteria, mask)? - }; - - Ok(SdkMeterProvider::builder() - .with_reader(reader) - .with_resource(OTEL_DEFAULT_RESOURCE.clone()) - .with_view(histo_duration_view) - .with_view(histo_size_view) - .build()) -} - -// Initialize the metrics subsystem -// Panics if initialization fails -pub fn init() -> Result<(), MetricsError> { - let meter_provider = build_controller()?; - - global::set_meter_provider(meter_provider); - Ok(()) -} - -// -// Shutdown the metrics providers -// -pub fn shutdown(_cx: &TelemetryContext) -> Result<(), MetricsError> { - // TODO(sgirones): The stop method hangs forever, so we are not calling it until we figure out why - // METER_PROVIDER_DURATION.stop(cx)?; - // METER_PROVIDER_SIZE.stop(cx)?; - - Ok(()) +// Returns a metrics configuration based on the SURREAL_TELEMETRY_PROVIDER environment variable +pub fn init() -> Result, MetricsError> { + match TELEMETRY_PROVIDER.trim() { + // The OTLP telemetry provider has been specified + s if s.eq_ignore_ascii_case("otlp") => { + // Create a new metrics exporter using tonic + let exporter = MetricsExporterBuilder::from(opentelemetry_otlp::new_exporter().tonic()) + .build_metrics_exporter( + Box::new(DefaultTemporalitySelector::new()), + Box::new(DefaultAggregationSelector::new()), + ) + .unwrap(); + // Create the reader to run with Tokio + let reader = PeriodicReader::builder(exporter, runtime::Tokio).build(); + // Add a view for metering durations + let histogram_duration_view = { + let criteria = Instrument::new().name("*.duration"); + let mask = Stream::new().aggregation(Aggregation::ExplicitBucketHistogram { + boundaries: HISTOGRAM_BUCKETS_MS.to_vec(), + record_min_max: true, + }); + opentelemetry_sdk::metrics::new_view(criteria, mask)? + }; + // Add a view for metering sizes + let histogram_size_view = { + let criteria = Instrument::new().name("*.size"); + let mask = Stream::new().aggregation(Aggregation::ExplicitBucketHistogram { + boundaries: HISTOGRAM_BUCKETS_BYTES.to_vec(), + record_min_max: true, + }); + opentelemetry_sdk::metrics::new_view(criteria, mask)? + }; + // Create the new metrics provider + Ok(Some( + SdkMeterProvider::builder() + .with_reader(reader) + .with_resource(OTEL_DEFAULT_RESOURCE.clone()) + .with_view(histogram_duration_view) + .with_view(histogram_size_view) + .build(), + )) + } + // No matching telemetry provider was found + _ => Ok(None), + } } diff --git a/src/telemetry/metrics/ws/mod.rs b/src/telemetry/metrics/ws/mod.rs index d0e121f6..ee2390dc 100644 --- a/src/telemetry/metrics/ws/mod.rs +++ b/src/telemetry/metrics/ws/mod.rs @@ -57,7 +57,6 @@ pub fn on_disconnect() -> Result<(), MetricsError> { pub(super) fn observe_active_connection(value: i64) -> Result<(), MetricsError> { let attrs = otel_common_attrs(); - RPC_SERVER_ACTIVE_CONNECTIONS.add(value, &attrs); Ok(()) } diff --git a/src/telemetry/mod.rs b/src/telemetry/mod.rs index d839f04b..a4e28109 100644 --- a/src/telemetry/mod.rs +++ b/src/telemetry/mod.rs @@ -3,8 +3,8 @@ pub mod metrics; pub mod traces; use crate::cli::validator::parser::env_filter::CustomEnvFilter; -use opentelemetry::metrics::MetricsError; -use opentelemetry::Context; +use crate::err::Error; +use opentelemetry::global; use opentelemetry::KeyValue; use opentelemetry_sdk::resource::{ EnvResourceDetector, SdkProvidedResourceDetector, TelemetryResourceDetector, @@ -19,6 +19,7 @@ use tracing_subscriber::util::SubscriberInitExt; use tracing_subscriber::EnvFilter; pub static OTEL_DEFAULT_RESOURCE: LazyLock = LazyLock::new(|| { + // Set the default otel metadata if available let res = Resource::from_detectors( Duration::from_secs(5), vec![ @@ -30,7 +31,6 @@ pub static OTEL_DEFAULT_RESOURCE: LazyLock = LazyLock::new(|| { Box::new(TelemetryResourceDetector), ], ); - // If no external service.name is set, set it to surrealdb if res.get("service.name".into()).unwrap_or("".into()).as_str() == "unknown_service" { res.merge(&Resource::new([KeyValue::new("service.name", "surrealdb")])) @@ -57,6 +57,20 @@ impl Default for Builder { } impl Builder { + /// Install the tracing dispatcher globally + pub fn init(self) -> Result<(), Error> { + // Setup logs, tracing, and metrics + self.build()?.init(); + // Everything ok + Ok(()) + } + + /// Set the log filter on the builder + pub fn with_filter(mut self, filter: CustomEnvFilter) -> Self { + self.filter = filter; + self + } + /// Set the log level on the builder pub fn with_log_level(mut self, log_level: &str) -> Self { if let Ok(filter) = filter_from_value(log_level) { @@ -65,35 +79,28 @@ impl Builder { self } - /// Set the filter on the builder - pub fn with_filter(mut self, filter: CustomEnvFilter) -> Self { - self.filter = filter; - self - } - - /// Build a tracing dispatcher with the fmt subscriber (logs) and the chosen tracer subscriber - pub fn build(self) -> Box { + /// Build a tracing dispatcher with the logs and tracer subscriber + pub fn build(&self) -> Result, Error> { // Setup a registry for composing layers let registry = tracing_subscriber::registry(); // Setup logging layer - let registry = registry.with(logs::new(self.filter.clone())); + let registry = registry.with(logs::new(self.filter.clone())?); // Setup tracing layer - let registry = registry.with(traces::new(self.filter)); + let registry = registry.with(traces::new(self.filter.clone())?); + // Setup the metrics layer + if let Some(provider) = metrics::init()? { + global::set_meter_provider(provider); + } // Return the registry - Box::new(registry) - } - - /// Install the tracing dispatcher globally - pub fn init(self) { - self.build().init(); + Ok(Box::new(registry)) } } -pub fn shutdown() -> Result<(), MetricsError> { +pub fn shutdown() -> Result<(), Error> { // Flush all telemetry data and block until done opentelemetry::global::shutdown_tracer_provider(); - // Shutdown the metrics provider fully - metrics::shutdown(&Context::current()) + // Everything ok + Ok(()) } /// Create an EnvFilter from the given value. If the value is not a valid log level, it will be treated as EnvFilter directives. @@ -114,7 +121,8 @@ pub fn filter_from_value(v: &str) -> Result { "trace" => EnvFilter::builder() .parse("warn,surreal=trace,surrealdb=trace,surrealcs=warn,surrealdb::core::kvs=debug"), // Check if we should show all surreal logs - "full" => EnvFilter::builder().parse("debug,surreal=trace,surrealdb=trace,surrealcs=debug"), + "full" => EnvFilter::builder() + .parse("debug,surreal=trace,surrealdb=trace,surrealcs=debug,surrealdb::core=trace"), // Check if we should show all module logs "all" => Ok(EnvFilter::default().add_directive(Level::TRACE.into())), // Let's try to parse the custom log level @@ -139,11 +147,12 @@ mod tests { let otlp_endpoint = format!("http://{addr}"); temp_env::with_vars( vec![ - ("SURREAL_TRACING_TRACER", Some("otlp")), + ("SURREAL_TELEMETRY_PROVIDER", Some("otlp")), ("OTEL_EXPORTER_OTLP_ENDPOINT", Some(otlp_endpoint.as_str())), ], || { - let _enter = telemetry::builder().with_log_level("info").build().set_default(); + let _enter = + telemetry::builder().with_log_level("info").build().unwrap().set_default(); println!("Sending span..."); @@ -180,11 +189,12 @@ mod tests { let otlp_endpoint = format!("http://{addr}"); temp_env::with_vars( vec![ - ("SURREAL_TRACING_TRACER", Some("otlp")), + ("SURREAL_TELEMETRY_PROVIDER", Some("otlp")), ("OTEL_EXPORTER_OTLP_ENDPOINT", Some(otlp_endpoint.as_str())), ], || { - let _enter = telemetry::builder().with_log_level("debug").build().set_default(); + let _enter = + telemetry::builder().with_log_level("debug").build().unwrap().set_default(); println!("Sending spans..."); diff --git a/src/telemetry/traces/mod.rs b/src/telemetry/traces/mod.rs index 2e1ba1d9..d3054ce4 100644 --- a/src/telemetry/traces/mod.rs +++ b/src/telemetry/traces/mod.rs @@ -1,45 +1,46 @@ +pub mod rpc; + +use crate::cli::validator::parser::env_filter::CustomEnvFilter; +use crate::cnf::TELEMETRY_PROVIDER; +use crate::err::Error; +use crate::telemetry::OTEL_DEFAULT_RESOURCE; +use opentelemetry::trace::TracerProvider as _; +use opentelemetry_otlp::SpanExporterBuilder; +use opentelemetry_sdk::trace::{Config, TracerProvider}; use tracing::Subscriber; use tracing_subscriber::Layer; -use crate::cli::validator::parser::env_filter::CustomEnvFilter; -use opentelemetry::trace::TracerProvider as _; - -pub mod otlp; -pub mod rpc; - -const TRACING_TRACER_VAR: &str = "SURREAL_TRACING_TRACER"; - -// Returns a tracer based on the value of the TRACING_TRACER_VAR env var -pub fn new(filter: CustomEnvFilter) -> Option + Send + Sync>> +// Returns a tracer provider based on the SURREAL_TELEMETRY_PROVIDER environment variable +pub fn new(filter: CustomEnvFilter) -> Result + Send + Sync>>, Error> where S: Subscriber + for<'a> tracing_subscriber::registry::LookupSpan<'a> + Send + Sync, { - match std::env::var(TRACING_TRACER_VAR).unwrap_or_default().trim().to_ascii_lowercase().as_str() - { - // If no tracer is selected, init with the fmt subscriber only - "noop" | "" => { - debug!("No tracer selected"); - None - } - // Init the registry with the OTLP tracer - "otlp" => { - // Create the OTLP tracer provider - let tracer_provider = - otlp::build_tracer_provider().expect("Failed to initialize OTLP tracer provider"); + match TELEMETRY_PROVIDER.trim() { + // The OTLP telemetry provider has been specified + s if s.eq_ignore_ascii_case("otlp") => { + // Create a new OTLP exporter using gRPC + let exporter = opentelemetry_otlp::new_exporter().tonic(); + // Build a new span exporter which uses gRPC + let span_exporter = SpanExporterBuilder::Tonic(exporter).build_span_exporter()?; + // Define the OTEL metadata configuration + let config = Config::default().with_resource(OTEL_DEFAULT_RESOURCE.clone()); + // Create the provider with the Tokio runtime + let provider = TracerProvider::builder() + .with_batch_exporter(span_exporter, opentelemetry_sdk::runtime::Tokio) + .with_config(config) + .build(); // Set it as the global tracer provider - let _ = opentelemetry::global::set_tracer_provider(tracer_provider.clone()); - // Returns a tracing subscriber layer built with the selected tracer and filter. - // It will be used by the `tracing` crate to decide what spans to send to the global tracer provider - Some( + let _ = opentelemetry::global::set_tracer_provider(provider.clone()); + // Return the tracing layer with the specified filter + Ok(Some( tracing_opentelemetry::layer() - .with_tracer(tracer_provider.tracer("surealdb")) + .with_tracer(provider.tracer("surealdb")) .with_filter(filter.0) .boxed(), - ) - } - tracer => { - panic!("unsupported tracer {tracer}"); + )) } + // No matching telemetry provider was found + _ => Ok(None), } } diff --git a/src/telemetry/traces/otlp.rs b/src/telemetry/traces/otlp.rs deleted file mode 100644 index a77bb60c..00000000 --- a/src/telemetry/traces/otlp.rs +++ /dev/null @@ -1,23 +0,0 @@ -use opentelemetry::trace::TraceError; -// use opentelemetry::{ -// trace::{Span, SpanBuilder, Tracer as _, TracerProvider as _}, -// Context, -// }; -use opentelemetry_otlp::SpanExporterBuilder; -use opentelemetry_sdk::trace::{Config, TracerProvider}; -// use tracing_subscriber::prelude::*; - -use crate::telemetry::OTEL_DEFAULT_RESOURCE; - -pub(super) fn build_tracer_provider() -> Result { - let exporter = opentelemetry_otlp::new_exporter().tonic(); - let span_exporter = SpanExporterBuilder::Tonic(exporter).build_span_exporter()?; - let config = Config::default().with_resource(OTEL_DEFAULT_RESOURCE.clone()); - - let provider = TracerProvider::builder() - .with_batch_exporter(span_exporter, opentelemetry_sdk::runtime::Tokio) - .with_config(config) - .build(); - - Ok(provider) -} diff --git a/tests/cli_integration.rs b/tests/cli_integration.rs index 9d68ec63..6bf1adeb 100644 --- a/tests/cli_integration.rs +++ b/tests/cli_integration.rs @@ -1473,7 +1473,7 @@ fn remove_debug_info(output: String) -> String { ┌─────────────────────────────────────────────────────────────────────────────┐ │ !!! THIS IS A DEVELOPMENT BUILD !!! │ │ Development builds are not intended for production use and include │ -│ tooling and features that may affect the performance of the database. | +│ tooling and features that may affect the performance of the database. │ └─────────────────────────────────────────────────────────────────────────────┘ "; // The last line in the above is important