Improve logging, tracing, and provider integration (#4772)

This commit is contained in:
Tobie Morgan Hitchcock 2024-09-15 23:31:10 +01:00 committed by GitHub
parent 00be48674d
commit f7a8b5dff9
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
31 changed files with 292 additions and 266 deletions

8
Cargo.lock generated
View file

@ -5990,9 +5990,9 @@ dependencies = [
[[package]]
name = "surrealcs"
version = "0.3.0"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9a77651296e07271c28b82ff9eb12ce7e834565d1b7f4607241f41b82d343fbe"
checksum = "457eb2adc59f9a9ba337e1fa3a6af63e09524e4adf01921b9d7079917cd7f694"
dependencies = [
"bincode",
"bytes",
@ -6011,9 +6011,9 @@ dependencies = [
[[package]]
name = "surrealcs-kernel"
version = "0.3.0"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b47d835a86fe1080acbeb24352dbce92e4019f84bf28fe76c16d25b477eef032"
checksum = "f4a79da58bfc886b93a431a463f956184d4b0a9d3672833544a1b718ff86b0df"
dependencies = [
"bincode",
"chrono",

View file

@ -10,8 +10,8 @@ reduce_output = true
default_to_workspace = false
[env]
ALL_FEATURES={ value = "storage-mem,storage-surrealkv,storage-rocksdb,storage-tikv,storage-fdb,scripting,http,jwks,ml,storage-fdb-7_1", condition = { env_not_set = ["ALL_FEATURES"] } }
DEV_FEATURES={ value = "storage-mem,storage-surrealkv,scripting,http,jwks,ml", condition = { env_not_set = ["DEV_FEATURES"] } }
ALL_FEATURES={ value = "storage-mem,storage-surrealkv,storage-surrealcs,storage-rocksdb,storage-tikv,storage-fdb,scripting,http,jwks,ml,storage-fdb-7_1", condition = { env_not_set = ["ALL_FEATURES"] } }
DEV_FEATURES={ value = "storage-mem,storage-surrealkv,storage-surrealcs,scripting,http,jwks,ml", condition = { env_not_set = ["DEV_FEATURES"] } }
SURREAL_LOG={ value = "full", condition = { env_not_set = ["SURREAL_LOG"] } }
SURREAL_USER={ value = "root", condition = { env_not_set = ["SURREAL_USER"] } }
SURREAL_PASS={ value = "root", condition = { env_not_set = ["SURREAL_PASS"] } }

View file

@ -145,7 +145,7 @@ sha2 = "0.10.8"
snap = "1.1.0"
storekey = "0.5.0"
subtle = "2.6"
surrealcs = { version = "0.3.0", optional = true }
surrealcs = { version = "0.3.1", optional = true }
surrealkv = { version = "0.3.4", optional = true }
surrealml = { version = "0.1.1", optional = true, package = "surrealml-core" }
tempfile = { version = "3.10.1", optional = true }

View file

@ -27,6 +27,8 @@ use trice::Instant;
#[cfg(target_arch = "wasm32")]
use wasm_bindgen_futures::spawn_local as spawn;
const TARGET: &str = "surrealdb::core::dbs";
pub(crate) struct Executor<'a> {
err: bool,
kvs: &'a Datastore,
@ -184,7 +186,7 @@ impl<'a> Executor<'a> {
Ok(ctx.freeze())
}
#[instrument(level = "debug", name = "executor", skip_all)]
#[instrument(level = "debug", name = "executor", target = "surrealdb::core::dbs", skip_all)]
pub async fn execute(
&mut self,
mut ctx: Context,
@ -207,7 +209,7 @@ impl<'a> Executor<'a> {
// Process all statements in query
for stm in qry.into_iter() {
// Log the statement
debug!("Executing: {}", stm);
trace!(target: TARGET, statement = %stm, "Executing statement");
// Reset errors
if self.txn.is_none() {
self.err = false;

View file

@ -22,6 +22,8 @@ use reblessive::TreeStack;
use std::mem;
use std::sync::Arc;
const TARGET: &str = "surrealdb::core::dbs";
#[derive(Clone)]
pub(crate) enum Iterable {
Value(Value),
@ -290,7 +292,7 @@ impl Iterator {
stm: &Statement<'_>,
) -> Result<Value, Error> {
// Log the statement
trace!("Iterating: {}", stm);
trace!(target: TARGET, statement = %stm, "Iterating statement");
// Enable context override
let mut cancel_ctx = MutableContext::new(ctx);
self.run = cancel_ctx.add_cancel();

View file

@ -919,7 +919,7 @@ fn gql_to_sql_kind(val: &GqlValue, kind: Kind) -> Result<SqlValue, GqlError> {
_ => Err(type_error(kind, val)),
},
Kind::Datetime => match val {
GqlValue::String(s) => match syn::datetime_raw(s) {
GqlValue::String(s) => match syn::datetime(s) {
Ok(dt) => Ok(dt.into()),
Err(_) => Err(type_error(kind, val)),
},

View file

@ -7,7 +7,6 @@ use crate::dbs::node::Timestamp;
use crate::doc::CursorValue;
use crate::err::Error;
use crate::idg::u32::U32;
#[cfg(debug_assertions)]
use crate::key::debug::Sprintable;
use crate::kvs::batch::Batch;
use crate::kvs::clock::SizedClock;
@ -30,6 +29,8 @@ use std::fmt::Debug;
use std::ops::Range;
use std::sync::Arc;
const TARGET: &str = "surrealdb::core::kvs::tr";
/// Used to determine the behaviour when a transaction is not closed correctly
#[derive(Debug, Default)]
pub enum Check {
@ -184,6 +185,7 @@ impl Transactor {
/// in a [`Error::TxFinished`] error.
#[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
pub async fn closed(&self) -> bool {
trace!(target: TARGET, "Closed");
expand_inner!(&self.inner, v => { v.closed() })
}
@ -192,6 +194,7 @@ impl Transactor {
/// This reverses all changes made within the transaction.
#[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
pub async fn cancel(&mut self) -> Result<(), Error> {
trace!(target: TARGET, "Cancel");
expand_inner!(&mut self.inner, v => { v.cancel().await })
}
@ -200,6 +203,7 @@ impl Transactor {
/// This attempts to commit all changes made within the transaction.
#[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
pub async fn commit(&mut self) -> Result<(), Error> {
trace!(target: TARGET, "Commit");
expand_inner!(&mut self.inner, v => { v.commit().await })
}
@ -210,6 +214,7 @@ impl Transactor {
K: Into<Key> + Debug,
{
let key = key.into();
trace!(target: TARGET, key = key.sprint(), "Exists");
expand_inner!(&mut self.inner, v => { v.exists(key).await })
}
@ -220,6 +225,7 @@ impl Transactor {
K: Into<Key> + Debug,
{
let key = key.into();
trace!(target: TARGET, key = key.sprint(), version = version, "Get");
expand_inner!(&mut self.inner, v => { v.get(key, version).await })
}
@ -230,6 +236,7 @@ impl Transactor {
K: Into<Key> + Debug,
{
let keys = keys.into_iter().map(Into::into).collect::<Vec<Key>>();
trace!(target: TARGET, keys = keys.sprint(), "GetM");
expand_inner!(&mut self.inner, v => { v.getm(keys).await })
}
@ -247,6 +254,8 @@ impl Transactor {
{
let beg: Key = rng.start.into();
let end: Key = rng.end.into();
let rng = beg.as_slice()..end.as_slice();
trace!(target: TARGET, rng = rng.sprint(), version = version, "GetR");
expand_inner!(&mut self.inner, v => { v.getr(beg..end, version).await })
}
@ -259,6 +268,7 @@ impl Transactor {
K: Into<Key> + Debug,
{
let key: Key = key.into();
trace!(target: TARGET, key = key.sprint(), "GetP");
expand_inner!(&mut self.inner, v => { v.getp(key).await })
}
@ -270,6 +280,7 @@ impl Transactor {
V: Into<Val> + Debug,
{
let key = key.into();
trace!(target: TARGET, key = key.sprint(), version = version, "Set");
expand_inner!(&mut self.inner, v => { v.set(key, val, version).await })
}
@ -281,6 +292,7 @@ impl Transactor {
V: Into<Val> + Debug,
{
let key = key.into();
trace!(target: TARGET, key = key.sprint(), version = version, "Put");
expand_inner!(&mut self.inner, v => { v.put(key, val, version).await })
}
@ -292,6 +304,7 @@ impl Transactor {
V: Into<Val> + Debug,
{
let key = key.into();
trace!(target: TARGET, key = key.sprint(), "PutC");
expand_inner!(&mut self.inner, v => { v.putc(key, val, chk).await })
}
@ -302,6 +315,7 @@ impl Transactor {
K: Into<Key> + Debug,
{
let key = key.into();
trace!(target: TARGET, key = key.sprint(), "Del");
expand_inner!(&mut self.inner, v => { v.del(key).await })
}
@ -313,6 +327,7 @@ impl Transactor {
V: Into<Val> + Debug,
{
let key = key.into();
trace!(target: TARGET, key = key.sprint(), "DelC");
expand_inner!(&mut self.inner, v => { v.delc(key, chk).await })
}
@ -326,6 +341,8 @@ impl Transactor {
{
let beg: Key = rng.start.into();
let end: Key = rng.end.into();
let rng = beg.as_slice()..end.as_slice();
trace!(target: TARGET, rng = rng.sprint(), "DelR");
expand_inner!(&mut self.inner, v => { v.delr(beg..end).await })
}
@ -338,6 +355,7 @@ impl Transactor {
K: Into<Key> + Debug,
{
let key: Key = key.into();
trace!(target: TARGET, key = key.sprint(), "DelP");
expand_inner!(&mut self.inner, v => { v.delp(key).await })
}
@ -351,6 +369,11 @@ impl Transactor {
{
let beg: Key = rng.start.into();
let end: Key = rng.end.into();
let rng = beg.as_slice()..end.as_slice();
trace!(target: TARGET, rng = rng.sprint(), limit = limit, "Keys");
if beg > end {
return Ok(vec![]);
}
expand_inner!(&mut self.inner, v => { v.keys(beg..end, limit).await })
}
@ -369,6 +392,8 @@ impl Transactor {
{
let beg: Key = rng.start.into();
let end: Key = rng.end.into();
let rng = beg.as_slice()..end.as_slice();
trace!(target: TARGET, rng = rng.sprint(), limit = limit, version = version, "Scan");
if beg > end {
return Ok(vec![]);
}
@ -391,6 +416,8 @@ impl Transactor {
{
let beg: Key = rng.start.into();
let end: Key = rng.end.into();
let rng = beg.as_slice()..end.as_slice();
trace!(target: TARGET, rng = rng.sprint(), values = values, version = version, "Batch");
expand_inner!(&mut self.inner, v => { v.batch(beg..end, batch, values, version).await })
}
@ -587,8 +614,8 @@ impl Transactor {
// on other concurrent transactions that can write to the ts_key or the keys after it.
let key = crate::key::database::vs::new(ns, db);
let vst = self.get_timestamp(key).await?;
#[cfg(debug_assertions)]
trace!(
target: TARGET,
"Setting timestamp {} for versionstamp {:?} in ns: {}, db: {}",
ts,
crate::vs::conv::versionstamp_to_u64(&vst),
@ -604,8 +631,8 @@ impl Transactor {
let ts_pairs: Vec<(Vec<u8>, Vec<u8>)> = self.getr(begin..end, None).await?;
let latest_ts_pair = ts_pairs.last();
if let Some((k, _)) = latest_ts_pair {
#[cfg(debug_assertions)]
trace!(
target: TARGET,
"There already was a greater committed timestamp {} in ns: {}, db: {} found: {}",
ts,
ns,

View file

@ -62,7 +62,7 @@ impl TryFrom<Strand> for Datetime {
impl TryFrom<&str> for Datetime {
type Error = ();
fn try_from(v: &str) -> Result<Self, Self::Error> {
match syn::datetime_raw(v) {
match syn::datetime(v) {
Ok(v) => Ok(v),
_ => Err(()),
}

View file

@ -23,6 +23,8 @@ use parser::Parser;
use reblessive::Stack;
use token::t;
const TARGET: &str = "surrealdb::core::syn";
/// Takes a string and returns if it could be a reserved keyword in certain contexts.
pub fn could_be_reserved_keyword(s: &str) -> bool {
lexer::keywords::could_be_reserved(s)
@ -38,9 +40,9 @@ pub fn could_be_reserved_keyword(s: &str) -> bool {
///
/// If you encounter this limit and believe that it should be increased,
/// please [open an issue](https://github.com/surrealdb/surrealdb/issues)!
#[instrument(level = "debug", name = "parser", skip_all, fields(length = input.len()))]
#[instrument(level = "trace", target = "surrealdb::core::syn", fields(length = input.len()))]
pub fn parse(input: &str) -> Result<Query, Error> {
debug!("parsing query, input = {input}");
trace!(target: TARGET, "Parsing SurrealQL query");
let mut parser = Parser::new(input.as_bytes());
let mut stack = Stack::new();
stack
@ -51,9 +53,9 @@ pub fn parse(input: &str) -> Result<Query, Error> {
}
/// Parses a SurrealQL [`Value`].
#[instrument(level = "debug", name = "parser", skip_all, fields(length = input.len()))]
#[instrument(level = "trace", target = "surrealdb::core::syn", fields(length = input.len()))]
pub fn value(input: &str) -> Result<Value, Error> {
debug!("parsing value, input = {input}");
trace!(target: TARGET, "Parsing SurrealQL value");
let mut parser = Parser::new(input.as_bytes());
let mut stack = Stack::new();
stack
@ -64,40 +66,10 @@ pub fn value(input: &str) -> Result<Value, Error> {
.map_err(Error::InvalidQuery)
}
/// Parses a SurrealQL [`Value`].
#[cfg(test)]
#[instrument(level = "debug", name = "parser", skip_all, fields(length = input.len()))]
pub(crate) fn value_field(input: &str) -> Result<Value, Error> {
debug!("parsing value, input = {input}");
let mut parser = Parser::new(input.as_bytes());
let mut stack = Stack::new();
stack
.enter(|stk| parser.parse_value_field(stk))
.finish()
.and_then(|e| parser.assert_finished().map(|_| e))
.map_err(|e| e.render_on(input))
.map_err(Error::InvalidQuery)
}
/// Parses a SurrealQL [`Value`].
#[instrument(level = "debug", name = "parser", skip_all, fields(length = input.len()))]
pub fn value_legacy_strand(input: &str) -> Result<Value, Error> {
debug!("parsing value with legacy strings, input = {input}");
let mut parser = Parser::new(input.as_bytes());
let mut stack = Stack::new();
parser.allow_legacy_strand(true);
stack
.enter(|stk| parser.parse_value_table(stk))
.finish()
.and_then(|e| parser.assert_finished().map(|_| e))
.map_err(|e| e.render_on(input))
.map_err(Error::InvalidQuery)
}
/// Parses JSON into an inert SurrealQL [`Value`]
#[instrument(level = "debug", name = "parser", skip_all, fields(length = input.len()))]
#[instrument(level = "trace", target = "surrealdb::core::syn", fields(length = input.len()))]
pub fn json(input: &str) -> Result<Value, Error> {
debug!("parsing json, input = {input}");
trace!(target: TARGET, "Parsing inert JSON value");
let mut parser = Parser::new(input.as_bytes());
let mut stack = Stack::new();
stack
@ -108,24 +80,10 @@ pub fn json(input: &str) -> Result<Value, Error> {
.map_err(Error::InvalidQuery)
}
/// Parses JSON into an inert SurrealQL [`Value`]
#[instrument(level = "debug", name = "parser", skip_all, fields(length = input.len()))]
pub fn json_legacy_strand(input: &str) -> Result<Value, Error> {
debug!("parsing json with legacy strings, input = {input}");
let mut parser = Parser::new(input.as_bytes());
let mut stack = Stack::new();
parser.allow_legacy_strand(true);
stack
.enter(|stk| parser.parse_json(stk))
.finish()
.and_then(|e| parser.assert_finished().map(|_| e))
.map_err(|e| e.render_on(input))
.map_err(Error::InvalidQuery)
}
/// Parses a SurrealQL Subquery [`Subquery`]
#[instrument(level = "debug", name = "parser", skip_all, fields(length = input.len()))]
#[instrument(level = "trace", target = "surrealdb::core::syn", fields(length = input.len()))]
pub fn subquery(input: &str) -> Result<Subquery, Error> {
debug!("parsing subquery, input = {input}");
trace!(target: TARGET, "Parsing SurrealQL subquery");
let mut parser = Parser::new(input.as_bytes());
let mut stack = Stack::new();
stack
@ -137,9 +95,9 @@ pub fn subquery(input: &str) -> Result<Subquery, Error> {
}
/// Parses a SurrealQL [`Idiom`]
#[instrument(level = "debug", name = "parser", skip_all, fields(length = input.len()))]
#[instrument(level = "trace", target = "surrealdb::core::syn", fields(length = input.len()))]
pub fn idiom(input: &str) -> Result<Idiom, Error> {
debug!("parsing idiom, input = {input}");
trace!(target: TARGET, "Parsing SurrealQL idiom");
let mut parser = Parser::new(input.as_bytes());
parser.table_as_field = true;
let mut stack = Stack::new();
@ -152,8 +110,9 @@ pub fn idiom(input: &str) -> Result<Idiom, Error> {
}
/// Parse a datetime without enclosing delimiters from a string.
pub fn datetime_raw(input: &str) -> Result<Datetime, Error> {
debug!("parsing datetime, input = {input}");
#[instrument(level = "trace", target = "surrealdb::core::syn", fields(length = input.len()))]
pub fn datetime(input: &str) -> Result<Datetime, Error> {
trace!(target: TARGET, "Parsing SurrealQL datetime");
let mut lexer = Lexer::new(input.as_bytes());
let res = compound::datetime_inner(&mut lexer);
if let Err(e) = lexer.assert_finished() {
@ -163,8 +122,9 @@ pub fn datetime_raw(input: &str) -> Result<Datetime, Error> {
}
/// Parse a duration from a string.
#[instrument(level = "trace", target = "surrealdb::core::syn", fields(length = input.len()))]
pub fn duration(input: &str) -> Result<Duration, Error> {
debug!("parsing duration, input = {input}");
trace!(target: TARGET, "Parsing SurrealQL duration");
let mut parser = Parser::new(input.as_bytes());
parser
.next_token_value::<Duration>()
@ -173,9 +133,10 @@ pub fn duration(input: &str) -> Result<Duration, Error> {
.map_err(Error::InvalidQuery)
}
/// Parse a range
/// Parse a range.
#[instrument(level = "trace", target = "surrealdb::core::syn", fields(length = input.len()))]
pub fn range(input: &str) -> Result<Range, Error> {
debug!("parsing range, input = {input}");
trace!(target: TARGET, "Parsing SurrealQL range");
let mut parser = Parser::new(input.as_bytes());
let mut stack = Stack::new();
stack
@ -187,8 +148,9 @@ pub fn range(input: &str) -> Result<Range, Error> {
}
/// Parse a record id.
#[instrument(level = "trace", target = "surrealdb::core::syn", fields(length = input.len()))]
pub fn thing(input: &str) -> Result<Thing, Error> {
debug!("parsing thing, input = {input}");
trace!(target: TARGET, "Parsing SurrealQL thing");
let mut parser = Parser::new(input.as_bytes());
let mut stack = Stack::new();
stack
@ -200,12 +162,11 @@ pub fn thing(input: &str) -> Result<Thing, Error> {
}
/// Parse a block, expects the value to be wrapped in `{}`.
#[instrument(level = "trace", target = "surrealdb::core::syn", fields(length = input.len()))]
pub fn block(input: &str) -> Result<Block, Error> {
debug!("parsing block, input = {input}");
trace!(target: TARGET, "Parsing SurrealQL block");
let mut parser = Parser::new(input.as_bytes());
let mut stack = Stack::new();
let token = parser.peek();
match token.kind {
t!("{") => {
@ -224,3 +185,33 @@ pub fn block(input: &str) -> Result<Block, Error> {
)),
}
}
/// Parses a SurrealQL [`Value`] and parses values within strings.
#[instrument(level = "trace", target = "surrealdb::core::syn", fields(length = input.len()))]
pub fn value_legacy_strand(input: &str) -> Result<Value, Error> {
trace!(target: TARGET, "Parsing SurrealQL value, with legacy strings");
let mut parser = Parser::new(input.as_bytes());
let mut stack = Stack::new();
parser.allow_legacy_strand(true);
stack
.enter(|stk| parser.parse_value_table(stk))
.finish()
.and_then(|e| parser.assert_finished().map(|_| e))
.map_err(|e| e.render_on(input))
.map_err(Error::InvalidQuery)
}
/// Parses JSON into an inert SurrealQL [`Value`] and parses values within strings.
#[instrument(level = "trace", target = "surrealdb::core::syn", fields(length = input.len()))]
pub fn json_legacy_strand(input: &str) -> Result<Value, Error> {
trace!(target: TARGET, "Parsing inert JSON value, with legacy strings");
let mut parser = Parser::new(input.as_bytes());
let mut stack = Stack::new();
parser.allow_legacy_strand(true);
stack
.enter(|stk| parser.parse_json(stk))
.finish()
.and_then(|e| parser.assert_finished().map(|_| e))
.map_err(|e| e.render_on(input))
.map_err(Error::InvalidQuery)
}

View file

@ -4,12 +4,22 @@ use super::lexer::Lexer;
use super::parse;
use super::parser::Parser;
use super::Parse;
use crate::err::Error;
use crate::sql::{Array, Expression, Ident, Idiom, Param, Script, Thing, Value};
use crate::syn::token::{t, TokenKind};
impl Parse<Self> for Value {
fn parse(val: &str) -> Self {
super::value_field(val).inspect_err(|e| eprintln!("{e}")).unwrap()
let mut parser = Parser::new(val.as_bytes());
let mut stack = Stack::new();
stack
.enter(|stk| parser.parse_value_field(stk))
.finish()
.and_then(|e| parser.assert_finished().map(|_| e))
.map_err(|e| e.render_on(val))
.map_err(Error::InvalidQuery)
.inspect_err(|e| eprintln!("{e}"))
.unwrap()
}
}

View file

@ -8,7 +8,7 @@ For local development, you can start the observability stack defined in `dev/doc
```
$ docker-compose -f dev/docker/compose.yaml up -d
$ SURREAL_TRACING_TRACER=otlp OTEL_EXPORTER_OTLP_ENDPOINT="http://localhost:4317" surreal start
$ SURREAL_TELEMETRY_PROVIDER=otlp OTEL_EXPORTER_OTLP_ENDPOINT="http://localhost:4317" surreal start
```
Now you can use the SurrealDB server and see the telemetry data opening this URL in the browser: http://localhost:3000

View file

@ -14,7 +14,6 @@ pub struct ExportCommandArguments {
#[arg(default_value = "-")]
#[arg(index = 1)]
file: String,
#[command(flatten)]
conn: DatabaseConnectionArguments,
#[command(flatten)]
@ -42,7 +41,7 @@ pub async fn init(
}: ExportCommandArguments,
) -> Result<(), Error> {
// Initialize opentelemetry and logging
crate::telemetry::builder().with_log_level("error").init();
crate::telemetry::builder().with_log_level("info").init()?;
// If username and password are specified, and we are connecting to a remote SurrealDB server, then we need to authenticate.
// If we are connecting directly to a datastore (i.e. surrealkv://local.skv or tikv://...), then we don't need to authenticate because we use an embedded (local) SurrealDB instance with auth disabled.

View file

@ -26,9 +26,7 @@ pub async fn init(
}: FixCommandArguments,
) -> Result<(), Error> {
// Initialize opentelemetry and logging
crate::telemetry::builder().with_filter(log).init();
// Start metrics subsystem
crate::telemetry::metrics::init().expect("failed to initialize metrics");
crate::telemetry::builder().with_filter(log).init()?;
// Clean the path
let endpoint = path.into_endpoint()?;
let path = if endpoint.path.is_empty() {

View file

@ -39,10 +39,9 @@ pub async fn init(
}: ImportCommandArguments,
) -> Result<(), Error> {
// Initialize opentelemetry and logging
crate::telemetry::builder().with_log_level("info").init();
crate::telemetry::builder().with_log_level("info").init()?;
// Default datastore configuration for local engines
let config = Config::new().capabilities(Capabilities::all());
// If username and password are specified, and we are connecting to a remote SurrealDB server, then we need to authenticate.
// If we are connecting directly to a datastore (i.e. surrealkv://local.skv or tikv://...), then we don't need to authenticate because we use an embedded (local) SurrealDB instance with auth disabled.
let client = if username.is_some()

View file

@ -17,7 +17,7 @@ pub async fn init(
}: IsReadyCommandArguments,
) -> Result<(), Error> {
// Initialize opentelemetry and logging
crate::telemetry::builder().with_log_level("error").init();
crate::telemetry::builder().with_log_level("error").init()?;
// Connect to the database engine
connect(endpoint).await?;
println!("OK");

View file

@ -58,8 +58,7 @@ pub async fn init(
}: ExportCommandArguments,
) -> Result<(), Error> {
// Initialize opentelemetry and logging
crate::telemetry::builder().with_log_level("error").init();
crate::telemetry::builder().with_log_level("info").init()?;
// If username and password are specified, and we are connecting to a remote SurrealDB server, then we need to authenticate.
// If we are connecting directly to a datastore (i.e. surrealkv://local.skv or tikv://...), then we don't need to authenticate because we use an embedded (local) SurrealDB instance with auth disabled.
let client = if username.is_some()

View file

@ -39,10 +39,9 @@ pub async fn init(
}: ImportCommandArguments,
) -> Result<(), Error> {
// Initialize opentelemetry and logging
crate::telemetry::builder().with_log_level("info").init();
crate::telemetry::builder().with_log_level("info").init()?;
// Default datastore configuration for local engines
let config = Config::new().capabilities(Capabilities::all());
// If username and password are specified, and we are connecting to a remote SurrealDB server, then we need to authenticate.
// If we are connecting directly to a datastore (i.e. surrealkv://local.skv or tikv://...), then we don't need to authenticate because we use an embedded (local) SurrealDB instance with auth disabled.
let client = if username.is_some()

View file

@ -62,10 +62,9 @@ pub async fn init(
}: SqlCommandArguments,
) -> Result<(), Error> {
// Initialize opentelemetry and logging
crate::telemetry::builder().with_log_level("warn").init();
crate::telemetry::builder().with_log_level("warn").init()?;
// Default datastore configuration for local engines
let config = Config::new().capabilities(Capabilities::all());
// If username and password are specified, and we are connecting to a remote SurrealDB server, then we need to authenticate.
// If we are connecting directly to a datastore (i.e. surrealkv://local.skv or tikv://...), then we don't need to authenticate because we use an embedded (local) SurrealDB instance with auth disabled.
let client = if username.is_some()

View file

@ -39,7 +39,9 @@ pub struct StartCommandArguments {
#[arg(value_parser = super::validator::key_valid)]
#[arg(hide = true)] // Not currently in use
key: Option<String>,
//
// Tasks
//
#[arg(
help = "The interval at which to run node agent tick (including garbage collection)",
help_heading = "Database"
@ -47,7 +49,6 @@ pub struct StartCommandArguments {
#[arg(env = "SURREAL_TICK_INTERVAL", long = "tick-interval", value_parser = super::validator::duration)]
#[arg(default_value = "10s")]
tick_interval: Duration,
//
// Authentication
//
@ -75,14 +76,12 @@ pub struct StartCommandArguments {
requires = "username"
)]
password: Option<String>,
//
// Datastore connection
//
#[command(next_help_heading = "Datastore connection")]
#[command(flatten)]
kvs: Option<StartCommandRemoteTlsOptions>,
//
// HTTP Server
//
@ -151,10 +150,7 @@ pub async fn init(
}: StartCommandArguments,
) -> Result<(), Error> {
// Initialize opentelemetry and logging
crate::telemetry::builder().with_filter(log).init();
// Start metrics subsystem
crate::telemetry::metrics::init().expect("failed to initialize metrics");
crate::telemetry::builder().with_filter(log).init()?;
// Check if we should output a banner
if !no_banner {
println!("{LOGO}");

View file

@ -88,7 +88,7 @@ pub(crate) fn parse_version(input: &str) -> Result<Version, Error> {
pub async fn init(args: UpgradeCommandArguments) -> Result<(), Error> {
// Initialize opentelemetry and logging
crate::telemetry::builder().with_log_level("error").init();
crate::telemetry::builder().with_log_level("error").init()?;
// Upgrading overwrites the existing executable
let exe = std::env::current_exe()?;

View file

@ -18,7 +18,7 @@ pub async fn init(
}: VersionCommandArguments,
) -> Result<(), Error> {
// Initialize opentelemetry and logging
crate::telemetry::builder().with_log_level("error").init();
crate::telemetry::builder().with_log_level("error").init()?;
// Print server version if endpoint supplied else CLI version
if let Some(e) = endpoint {
// Print remote server version

View file

@ -19,14 +19,14 @@ pub const DEBUG_BUILD_WARNING: &str = "\
!!! THIS IS A DEVELOPMENT BUILD !!!
Development builds are not intended for production use and include
tooling and features that may affect the performance of the database. |
tooling and features that may affect the performance of the database.
";
/// The publicly visible name of the server
pub const PKG_NAME: &str = "surrealdb";
/// The public endpoint for the administration interface
pub const APP_ENDPOINT: &str = "https://surrealdb.com/app";
pub const APP_ENDPOINT: &str = "https://surrealdb.com/surrealist";
/// The maximum HTTP body size of the HTTP /ml endpoints (defaults to 4 GiB)
pub static HTTP_MAX_ML_BODY_SIZE: LazyLock<usize> =
@ -86,6 +86,10 @@ pub static RUNTIME_STACK_SIZE: LazyLock<usize> =
pub static RUNTIME_MAX_BLOCKING_THREADS: LazyLock<usize> =
lazy_env_parse!("SURREAL_RUNTIME_MAX_BLOCKING_THREADS", usize, 512);
/// How many threads which can be started for blocking operations (defaults to 512)
pub static TELEMETRY_PROVIDER: LazyLock<String> =
lazy_env_parse!("SURREAL_TELEMETRY_PROVIDER", String, String::new());
/// The version identifier of this build
pub static PKG_VERSION: LazyLock<String> =
LazyLock::new(|| match option_env!("SURREAL_BUILD_METADATA") {

View file

@ -4,6 +4,7 @@ use axum::Error as AxumError;
use axum::Json;
use base64::DecodeError as Base64Error;
use http::{HeaderName, StatusCode};
use opentelemetry::global::Error as OpentelemetryError;
use reqwest::Error as ReqwestError;
use serde::Serialize;
use std::io::Error as IoError;
@ -63,8 +64,8 @@ pub enum Error {
#[error("There was an error with auth: {0}")]
Auth(#[from] SurrealAuthError),
#[error("There was an error with the node agent")]
NodeAgent,
#[error("There was an error with opentelemetry: {0}")]
Otel(#[from] OpentelemetryError),
/// Statement has been deprecated
#[error("{0}")]
@ -113,6 +114,24 @@ impl From<ciborium::value::Error> for Error {
}
}
impl From<opentelemetry::logs::LogError> for Error {
fn from(e: opentelemetry::logs::LogError) -> Error {
Error::Otel(OpentelemetryError::Log(e))
}
}
impl From<opentelemetry::trace::TraceError> for Error {
fn from(e: opentelemetry::trace::TraceError) -> Error {
Error::Otel(OpentelemetryError::Trace(e))
}
}
impl From<opentelemetry::metrics::MetricsError> for Error {
fn from(e: opentelemetry::metrics::MetricsError) -> Error {
Error::Otel(OpentelemetryError::Metric(e))
}
}
impl<T: std::fmt::Debug> From<ciborium::de::Error<T>> for Error {
fn from(e: ciborium::de::Error<T>) -> Error {
Error::Cbor(format!("{e}"))

View file

@ -1,26 +1,31 @@
use crate::cli::validator::parser::env_filter::CustomEnvFilter;
use crate::err::Error;
use tracing::Subscriber;
use tracing_subscriber::fmt::format::FmtSpan;
use tracing_subscriber::Layer;
pub fn new<S>(filter: CustomEnvFilter) -> Box<dyn Layer<S> + Send + Sync>
pub fn new<S>(filter: CustomEnvFilter) -> Result<Box<dyn Layer<S> + Send + Sync>, Error>
where
S: Subscriber + for<'a> tracing_subscriber::registry::LookupSpan<'a> + Send + Sync,
{
#[cfg(not(debug_assertions))]
{
tracing_subscriber::fmt::layer()
Ok(tracing_subscriber::fmt::layer()
.compact()
.with_ansi(true)
.with_file(false)
.with_target(true)
.with_line_number(false)
.with_thread_ids(false)
.with_thread_names(false)
.with_span_events(FmtSpan::NONE)
.with_writer(std::io::stderr)
.with_filter(filter.0)
.boxed()
.boxed())
}
#[cfg(debug_assertions)]
{
tracing_subscriber::fmt::layer()
Ok(tracing_subscriber::fmt::layer()
.compact()
.with_ansi(true)
.with_file(true)
@ -28,9 +33,9 @@ where
.with_line_number(true)
.with_thread_ids(false)
.with_thread_names(false)
.with_span_events(FmtSpan::NEW | FmtSpan::CLOSE)
.with_span_events(FmtSpan::NONE)
.with_writer(std::io::stderr)
.with_filter(filter.0)
.boxed()
.boxed())
}
}

View file

@ -41,13 +41,11 @@ pub static HTTP_SERVER_RESPONSE_SIZE: LazyLock<Histogram<u64>> = LazyLock::new(|
fn observe_active_request(value: i64, tracker: &HttpCallMetricTracker) -> Result<(), MetricsError> {
let attrs = tracker.active_req_attrs();
HTTP_SERVER_ACTIVE_REQUESTS.add(value, &attrs);
Ok(())
}
fn record_request_duration(tracker: &HttpCallMetricTracker) {
// Record the duration of the request.
HTTP_SERVER_DURATION
.record(tracker.duration().as_millis() as u64, &tracker.request_duration_attrs());
}

View file

@ -1,8 +1,8 @@
pub mod http;
pub mod ws;
use crate::cnf::TELEMETRY_PROVIDER;
use opentelemetry::metrics::MetricsError;
use opentelemetry::{global, Context as TelemetryContext};
use opentelemetry_otlp::MetricsExporterBuilder;
use opentelemetry_sdk::metrics::reader::{DefaultAggregationSelector, DefaultTemporalitySelector};
use opentelemetry_sdk::metrics::{
@ -24,31 +24,37 @@ static HISTOGRAM_BUCKETS_MS: &[f64] = &[
const KB: f64 = 1024.0;
const MB: f64 = 1024.0 * KB;
const HISTOGRAM_BUCKETS_BYTES: &[f64] = &[
1.0 * KB, // 1 KB
2.0 * KB, // 2 KB
5.0 * KB, // 5 KB
10.0 * KB, // 10 KB
100.0 * KB, // 100 KB
500.0 * KB, // 500 KB
1.0 * MB, // 1 MB
2.5 * MB, // 2 MB
5.0 * MB, // 5 MB
10.0 * MB, // 10 MB
25.0 * MB, // 25 MB
50.0 * MB, // 50 MB
100.0 * MB, // 100 MB
1.0 * KB,
2.0 * KB,
5.0 * KB,
10.0 * KB,
100.0 * KB,
500.0 * KB,
1.0 * MB,
2.5 * MB,
5.0 * MB,
10.0 * MB,
25.0 * MB,
50.0 * MB,
100.0 * MB,
];
fn build_controller() -> Result<SdkMeterProvider, MetricsError> {
// Returns a metrics configuration based on the SURREAL_TELEMETRY_PROVIDER environment variable
pub fn init() -> Result<Option<SdkMeterProvider>, MetricsError> {
match TELEMETRY_PROVIDER.trim() {
// The OTLP telemetry provider has been specified
s if s.eq_ignore_ascii_case("otlp") => {
// Create a new metrics exporter using tonic
let exporter = MetricsExporterBuilder::from(opentelemetry_otlp::new_exporter().tonic())
.build_metrics_exporter(
Box::new(DefaultTemporalitySelector::new()),
Box::new(DefaultAggregationSelector::new()),
)
.unwrap();
// Create the reader to run with Tokio
let reader = PeriodicReader::builder(exporter, runtime::Tokio).build();
let histo_duration_view = {
// Add a view for metering durations
let histogram_duration_view = {
let criteria = Instrument::new().name("*.duration");
let mask = Stream::new().aggregation(Aggregation::ExplicitBucketHistogram {
boundaries: HISTOGRAM_BUCKETS_MS.to_vec(),
@ -56,8 +62,8 @@ fn build_controller() -> Result<SdkMeterProvider, MetricsError> {
});
opentelemetry_sdk::metrics::new_view(criteria, mask)?
};
let histo_size_view = {
// Add a view for metering sizes
let histogram_size_view = {
let criteria = Instrument::new().name("*.size");
let mask = Stream::new().aggregation(Aggregation::ExplicitBucketHistogram {
boundaries: HISTOGRAM_BUCKETS_BYTES.to_vec(),
@ -65,31 +71,17 @@ fn build_controller() -> Result<SdkMeterProvider, MetricsError> {
});
opentelemetry_sdk::metrics::new_view(criteria, mask)?
};
Ok(SdkMeterProvider::builder()
// Create the new metrics provider
Ok(Some(
SdkMeterProvider::builder()
.with_reader(reader)
.with_resource(OTEL_DEFAULT_RESOURCE.clone())
.with_view(histo_duration_view)
.with_view(histo_size_view)
.build())
.with_view(histogram_duration_view)
.with_view(histogram_size_view)
.build(),
))
}
// Initialize the metrics subsystem
// Panics if initialization fails
pub fn init() -> Result<(), MetricsError> {
let meter_provider = build_controller()?;
global::set_meter_provider(meter_provider);
Ok(())
// No matching telemetry provider was found
_ => Ok(None),
}
//
// Shutdown the metrics providers
//
pub fn shutdown(_cx: &TelemetryContext) -> Result<(), MetricsError> {
// TODO(sgirones): The stop method hangs forever, so we are not calling it until we figure out why
// METER_PROVIDER_DURATION.stop(cx)?;
// METER_PROVIDER_SIZE.stop(cx)?;
Ok(())
}

View file

@ -57,7 +57,6 @@ pub fn on_disconnect() -> Result<(), MetricsError> {
pub(super) fn observe_active_connection(value: i64) -> Result<(), MetricsError> {
let attrs = otel_common_attrs();
RPC_SERVER_ACTIVE_CONNECTIONS.add(value, &attrs);
Ok(())
}

View file

@ -3,8 +3,8 @@ pub mod metrics;
pub mod traces;
use crate::cli::validator::parser::env_filter::CustomEnvFilter;
use opentelemetry::metrics::MetricsError;
use opentelemetry::Context;
use crate::err::Error;
use opentelemetry::global;
use opentelemetry::KeyValue;
use opentelemetry_sdk::resource::{
EnvResourceDetector, SdkProvidedResourceDetector, TelemetryResourceDetector,
@ -19,6 +19,7 @@ use tracing_subscriber::util::SubscriberInitExt;
use tracing_subscriber::EnvFilter;
pub static OTEL_DEFAULT_RESOURCE: LazyLock<Resource> = LazyLock::new(|| {
// Set the default otel metadata if available
let res = Resource::from_detectors(
Duration::from_secs(5),
vec![
@ -30,7 +31,6 @@ pub static OTEL_DEFAULT_RESOURCE: LazyLock<Resource> = LazyLock::new(|| {
Box::new(TelemetryResourceDetector),
],
);
// If no external service.name is set, set it to surrealdb
if res.get("service.name".into()).unwrap_or("".into()).as_str() == "unknown_service" {
res.merge(&Resource::new([KeyValue::new("service.name", "surrealdb")]))
@ -57,6 +57,20 @@ impl Default for Builder {
}
impl Builder {
/// Install the tracing dispatcher globally
pub fn init(self) -> Result<(), Error> {
// Setup logs, tracing, and metrics
self.build()?.init();
// Everything ok
Ok(())
}
/// Set the log filter on the builder
pub fn with_filter(mut self, filter: CustomEnvFilter) -> Self {
self.filter = filter;
self
}
/// Set the log level on the builder
pub fn with_log_level(mut self, log_level: &str) -> Self {
if let Ok(filter) = filter_from_value(log_level) {
@ -65,35 +79,28 @@ impl Builder {
self
}
/// Set the filter on the builder
pub fn with_filter(mut self, filter: CustomEnvFilter) -> Self {
self.filter = filter;
self
}
/// Build a tracing dispatcher with the fmt subscriber (logs) and the chosen tracer subscriber
pub fn build(self) -> Box<dyn Subscriber + Send + Sync + 'static> {
/// Build a tracing dispatcher with the logs and tracer subscriber
pub fn build(&self) -> Result<Box<dyn Subscriber + Send + Sync + 'static>, Error> {
// Setup a registry for composing layers
let registry = tracing_subscriber::registry();
// Setup logging layer
let registry = registry.with(logs::new(self.filter.clone()));
let registry = registry.with(logs::new(self.filter.clone())?);
// Setup tracing layer
let registry = registry.with(traces::new(self.filter));
let registry = registry.with(traces::new(self.filter.clone())?);
// Setup the metrics layer
if let Some(provider) = metrics::init()? {
global::set_meter_provider(provider);
}
// Return the registry
Box::new(registry)
}
/// Install the tracing dispatcher globally
pub fn init(self) {
self.build().init();
Ok(Box::new(registry))
}
}
pub fn shutdown() -> Result<(), MetricsError> {
pub fn shutdown() -> Result<(), Error> {
// Flush all telemetry data and block until done
opentelemetry::global::shutdown_tracer_provider();
// Shutdown the metrics provider fully
metrics::shutdown(&Context::current())
// Everything ok
Ok(())
}
/// Create an EnvFilter from the given value. If the value is not a valid log level, it will be treated as EnvFilter directives.
@ -114,7 +121,8 @@ pub fn filter_from_value(v: &str) -> Result<EnvFilter, ParseError> {
"trace" => EnvFilter::builder()
.parse("warn,surreal=trace,surrealdb=trace,surrealcs=warn,surrealdb::core::kvs=debug"),
// Check if we should show all surreal logs
"full" => EnvFilter::builder().parse("debug,surreal=trace,surrealdb=trace,surrealcs=debug"),
"full" => EnvFilter::builder()
.parse("debug,surreal=trace,surrealdb=trace,surrealcs=debug,surrealdb::core=trace"),
// Check if we should show all module logs
"all" => Ok(EnvFilter::default().add_directive(Level::TRACE.into())),
// Let's try to parse the custom log level
@ -139,11 +147,12 @@ mod tests {
let otlp_endpoint = format!("http://{addr}");
temp_env::with_vars(
vec![
("SURREAL_TRACING_TRACER", Some("otlp")),
("SURREAL_TELEMETRY_PROVIDER", Some("otlp")),
("OTEL_EXPORTER_OTLP_ENDPOINT", Some(otlp_endpoint.as_str())),
],
|| {
let _enter = telemetry::builder().with_log_level("info").build().set_default();
let _enter =
telemetry::builder().with_log_level("info").build().unwrap().set_default();
println!("Sending span...");
@ -180,11 +189,12 @@ mod tests {
let otlp_endpoint = format!("http://{addr}");
temp_env::with_vars(
vec![
("SURREAL_TRACING_TRACER", Some("otlp")),
("SURREAL_TELEMETRY_PROVIDER", Some("otlp")),
("OTEL_EXPORTER_OTLP_ENDPOINT", Some(otlp_endpoint.as_str())),
],
|| {
let _enter = telemetry::builder().with_log_level("debug").build().set_default();
let _enter =
telemetry::builder().with_log_level("debug").build().unwrap().set_default();
println!("Sending spans...");

View file

@ -1,45 +1,46 @@
pub mod rpc;
use crate::cli::validator::parser::env_filter::CustomEnvFilter;
use crate::cnf::TELEMETRY_PROVIDER;
use crate::err::Error;
use crate::telemetry::OTEL_DEFAULT_RESOURCE;
use opentelemetry::trace::TracerProvider as _;
use opentelemetry_otlp::SpanExporterBuilder;
use opentelemetry_sdk::trace::{Config, TracerProvider};
use tracing::Subscriber;
use tracing_subscriber::Layer;
use crate::cli::validator::parser::env_filter::CustomEnvFilter;
use opentelemetry::trace::TracerProvider as _;
pub mod otlp;
pub mod rpc;
const TRACING_TRACER_VAR: &str = "SURREAL_TRACING_TRACER";
// Returns a tracer based on the value of the TRACING_TRACER_VAR env var
pub fn new<S>(filter: CustomEnvFilter) -> Option<Box<dyn Layer<S> + Send + Sync>>
// Returns a tracer provider based on the SURREAL_TELEMETRY_PROVIDER environment variable
pub fn new<S>(filter: CustomEnvFilter) -> Result<Option<Box<dyn Layer<S> + Send + Sync>>, Error>
where
S: Subscriber + for<'a> tracing_subscriber::registry::LookupSpan<'a> + Send + Sync,
{
match std::env::var(TRACING_TRACER_VAR).unwrap_or_default().trim().to_ascii_lowercase().as_str()
{
// If no tracer is selected, init with the fmt subscriber only
"noop" | "" => {
debug!("No tracer selected");
None
}
// Init the registry with the OTLP tracer
"otlp" => {
// Create the OTLP tracer provider
let tracer_provider =
otlp::build_tracer_provider().expect("Failed to initialize OTLP tracer provider");
match TELEMETRY_PROVIDER.trim() {
// The OTLP telemetry provider has been specified
s if s.eq_ignore_ascii_case("otlp") => {
// Create a new OTLP exporter using gRPC
let exporter = opentelemetry_otlp::new_exporter().tonic();
// Build a new span exporter which uses gRPC
let span_exporter = SpanExporterBuilder::Tonic(exporter).build_span_exporter()?;
// Define the OTEL metadata configuration
let config = Config::default().with_resource(OTEL_DEFAULT_RESOURCE.clone());
// Create the provider with the Tokio runtime
let provider = TracerProvider::builder()
.with_batch_exporter(span_exporter, opentelemetry_sdk::runtime::Tokio)
.with_config(config)
.build();
// Set it as the global tracer provider
let _ = opentelemetry::global::set_tracer_provider(tracer_provider.clone());
// Returns a tracing subscriber layer built with the selected tracer and filter.
// It will be used by the `tracing` crate to decide what spans to send to the global tracer provider
Some(
let _ = opentelemetry::global::set_tracer_provider(provider.clone());
// Return the tracing layer with the specified filter
Ok(Some(
tracing_opentelemetry::layer()
.with_tracer(tracer_provider.tracer("surealdb"))
.with_tracer(provider.tracer("surealdb"))
.with_filter(filter.0)
.boxed(),
)
}
tracer => {
panic!("unsupported tracer {tracer}");
))
}
// No matching telemetry provider was found
_ => Ok(None),
}
}

View file

@ -1,23 +0,0 @@
use opentelemetry::trace::TraceError;
// use opentelemetry::{
// trace::{Span, SpanBuilder, Tracer as _, TracerProvider as _},
// Context,
// };
use opentelemetry_otlp::SpanExporterBuilder;
use opentelemetry_sdk::trace::{Config, TracerProvider};
// use tracing_subscriber::prelude::*;
use crate::telemetry::OTEL_DEFAULT_RESOURCE;
pub(super) fn build_tracer_provider() -> Result<TracerProvider, TraceError> {
let exporter = opentelemetry_otlp::new_exporter().tonic();
let span_exporter = SpanExporterBuilder::Tonic(exporter).build_span_exporter()?;
let config = Config::default().with_resource(OTEL_DEFAULT_RESOURCE.clone());
let provider = TracerProvider::builder()
.with_batch_exporter(span_exporter, opentelemetry_sdk::runtime::Tokio)
.with_config(config)
.build();
Ok(provider)
}

View file

@ -1473,7 +1473,7 @@ fn remove_debug_info(output: String) -> String {
!!! THIS IS A DEVELOPMENT BUILD !!!
Development builds are not intended for production use and include
tooling and features that may affect the performance of the database. |
tooling and features that may affect the performance of the database.
";
// The last line in the above is important