From 565717659aaf587a57b90682bb9ed228f15c8514 Mon Sep 17 00:00:00 2001 From: Salvador Girones Gil Date: Wed, 29 Mar 2023 20:16:18 +0200 Subject: [PATCH] Add functionality for open telemetry tracing (#1727) --- .github/workflows/ci.yml | 15 +- .github/workflows/nightly.yml | 22 +- .github/workflows/release.yml | 22 +- Cargo.lock | 335 ++++++++++++++++++++- Cargo.toml | 14 +- lib/Cargo.toml | 3 +- lib/src/dbs/executor.rs | 2 + lib/src/kvs/ds.rs | 23 ++ lib/src/kvs/tx.rs | 18 ++ lib/src/sql/parser.rs | 4 + pkg/nix/spec/aarch64-unknown-linux-gnu.nix | 3 + pkg/nix/spec/x86_64-apple-darwin.nix | 3 + pkg/nix/spec/x86_64-unknown-linux-gnu.nix | 3 + pkg/nix/spec/x86_64-unknown-linux-musl.nix | 3 + src/cli/backup.rs | 2 + src/cli/export.rs | 4 +- src/cli/import.rs | 4 +- src/cli/isready.rs | 4 +- src/cli/log.rs | 55 ---- src/cli/mod.rs | 55 ++-- src/cli/sql.rs | 4 +- src/cli/start.rs | 14 +- src/main.rs | 1 + src/net/mod.rs | 2 + src/net/rpc.rs | 20 +- src/o11y/logger.rs | 16 + src/o11y/mod.rs | 140 +++++++++ src/o11y/tracers/mod.rs | 88 ++++++ src/o11y/tracers/otlp.rs | 38 +++ 29 files changed, 805 insertions(+), 112 deletions(-) delete mode 100644 src/cli/log.rs create mode 100644 src/o11y/logger.rs create mode 100644 src/o11y/mod.rs create mode 100644 src/o11y/tracers/mod.rs create mode 100644 src/o11y/tracers/otlp.rs diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 117ae312..6c323aee 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -12,12 +12,16 @@ jobs: test: name: Test - runs-on: ubuntu-20.04 + runs-on: ubuntu-latest steps: - - name: Checkout sources uses: actions/checkout@v3 + - name: Install dependencies + run: | + sudo apt-get -y update + sudo apt-get -y install protobuf-compiler + - name: Install stable toolchain uses: dtolnay/rust-toolchain@stable @@ -28,12 +32,17 @@ jobs: lint: name: Lint - runs-on: ubuntu-20.04 + runs-on: ubuntu-latest steps: - name: Checkout sources uses: actions/checkout@v3 + - name: Install dependencies + run: | + sudo apt-get -y update + sudo apt-get -y install protobuf-compiler + - name: Install stable toolchain uses: dtolnay/rust-toolchain@stable with: diff --git a/.github/workflows/nightly.yml b/.github/workflows/nightly.yml index eea4f2d0..c75e00cc 100644 --- a/.github/workflows/nightly.yml +++ b/.github/workflows/nightly.yml @@ -8,12 +8,17 @@ jobs: test: name: Test - runs-on: ubuntu-20.04 + runs-on: ubuntu-latest steps: - name: Checkout sources uses: actions/checkout@v3 + - name: Install dependencies + run: | + sudo apt-get -y update + sudo apt-get -y install protobuf-compiler + - name: Install stable toolchain uses: dtolnay/rust-toolchain@stable @@ -24,12 +29,17 @@ jobs: lint: name: Lint - runs-on: ubuntu-20.04 + runs-on: ubuntu-latest steps: - name: Checkout sources uses: actions/checkout@v3 + - name: Install dependencies + run: | + sudo apt-get -y update + sudo apt-get -y install protobuf-compiler + - name: Install stable toolchain uses: dtolnay/rust-toolchain@stable with: @@ -63,11 +73,11 @@ jobs: file: surreal-nightly.darwin-arm64 opts: --features storage-tikv - arch: x86_64-unknown-linux-gnu - os: ubuntu-20.04 + os: ubuntu-latest file: surreal-nightly.linux-amd64 opts: --features storage-tikv - arch: aarch64-unknown-linux-gnu - os: ubuntu-20.04 + os: ubuntu-latest file: surreal-nightly.linux-arm64 opts: --features storage-tikv - arch: x86_64-pc-windows-msvc @@ -202,7 +212,7 @@ jobs: deploy: name: Deploy needs: [package] - runs-on: ubuntu-20.04 + runs-on: ubuntu-latest steps: - name: Checkout sources @@ -229,7 +239,7 @@ jobs: docker: name: Docker needs: [build] - runs-on: ubuntu-20.04 + runs-on: ubuntu-latest steps: - name: Checkout sources diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index d1837264..776d1d25 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -9,12 +9,17 @@ jobs: test: name: Test - runs-on: ubuntu-20.04 + runs-on: ubuntu-latest steps: - name: Checkout sources uses: actions/checkout@v3 + - name: Install dependencies + run: | + sudo apt-get -y update + sudo apt-get -y install protobuf-compiler + - name: Install stable toolchain uses: dtolnay/rust-toolchain@stable @@ -23,12 +28,17 @@ jobs: lint: name: Lint - runs-on: ubuntu-20.04 + runs-on: ubuntu-latest steps: - name: Checkout sources uses: actions/checkout@v3 + - name: Install dependencies + run: | + sudo apt-get -y update + sudo apt-get -y install protobuf-compiler + - name: Install stable toolchain uses: dtolnay/rust-toolchain@stable with: @@ -58,11 +68,11 @@ jobs: file: surreal-${{ github.ref_name }}.darwin-arm64 opts: --features storage-tikv - arch: x86_64-unknown-linux-gnu - os: ubuntu-20.04 + os: ubuntu-latest file: surreal-${{ github.ref_name }}.linux-amd64 opts: --features storage-tikv - arch: aarch64-unknown-linux-gnu - os: ubuntu-20.04 + os: ubuntu-latest file: surreal-${{ github.ref_name }}.linux-arm64 opts: --features storage-tikv - arch: x86_64-pc-windows-msvc @@ -197,7 +207,7 @@ jobs: deploy: name: Deploy needs: [package] - runs-on: ubuntu-20.04 + runs-on: ubuntu-latest steps: - name: Checkout sources @@ -239,7 +249,7 @@ jobs: docker: name: Docker needs: [build] - runs-on: ubuntu-20.04 + runs-on: ubuntu-latest steps: - name: Checkout sources diff --git a/Cargo.lock b/Cargo.lock index b01c3cf4..62872675 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -361,6 +361,28 @@ dependencies = [ "syn 2.0.10", ] +[[package]] +name = "async-stream" +version = "0.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ad445822218ce64be7a341abfb0b1ea43b5c23aa83902542a4542e78309d8e5e" +dependencies = [ + "async-stream-impl", + "futures-core", + "pin-project-lite", +] + +[[package]] +name = "async-stream-impl" +version = "0.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e4655ae1a7b0cdf149156f780c5bf3f1352bc53cbd9e0a361a7ef7b22947e965" +dependencies = [ + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "async-task" version = "4.4.0" @@ -931,6 +953,16 @@ version = "1.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6548a0ad5d2549e111e1f6a11a6c2e2d00ce6a3dafe22948d67c2b443f775e52" +[[package]] +name = "crossbeam-channel" +version = "0.5.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cf2b3e8478797446514c91ef04bafcb59faba183e621ad488df88983cc14128c" +dependencies = [ + "cfg-if", + "crossbeam-utils", +] + [[package]] name = "crossbeam-utils" version = "0.8.15" @@ -1029,6 +1061,19 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "dashmap" +version = "5.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "907076dfda823b0b36d2a1bb5f90c96660a5bbcd7729e10727f07858f22c4edc" +dependencies = [ + "cfg-if", + "hashbrown", + "lock_api", + "once_cell", + "parking_lot_core 0.9.7", +] + [[package]] name = "derive-new" version = "0.5.9" @@ -1823,6 +1868,18 @@ dependencies = [ "tokio-rustls", ] +[[package]] +name = "hyper-timeout" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbb958482e8c7be4bc3cf272a766a2b0bf1a6755e7a6ae777f017a31d11b13b1" +dependencies = [ + "hyper", + "pin-project-lite", + "tokio", + "tokio-io-timeout", +] + [[package]] name = "hyper-tls" version = "0.5.0" @@ -2164,6 +2221,15 @@ dependencies = [ "libc", ] +[[package]] +name = "matchers" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8263075bb86c5a1b1427b5ae862e8889656f126e9f77c484496e8b47cf5c5558" +dependencies = [ + "regex-automata", +] + [[package]] name = "matchit" version = "0.7.0" @@ -2329,6 +2395,16 @@ dependencies = [ "minimal-lexical", ] +[[package]] +name = "nu-ansi-term" +version = "0.46.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77a8165726e8236064dbb45459242600304b42a5ea24ee2948e18e023bf7ba84" +dependencies = [ + "overload", + "winapi", +] + [[package]] name = "num-bigint" version = "0.4.3" @@ -2431,12 +2507,98 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "opentelemetry" +version = "0.18.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "69d6c3d7288a106c0a363e4b0e8d308058d56902adefb16f4936f417ffef086e" +dependencies = [ + "opentelemetry_api", + "opentelemetry_sdk", +] + +[[package]] +name = "opentelemetry-otlp" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d1c928609d087790fc936a1067bdc310ae702bdf3b090c3f281b713622c8bbde" +dependencies = [ + "async-trait", + "futures 0.3.27", + "futures-util", + "http", + "opentelemetry", + "opentelemetry-proto", + "prost 0.11.8", + "thiserror", + "tokio", + "tonic", +] + +[[package]] +name = "opentelemetry-proto" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d61a2f56df5574508dd86aaca016c917489e589ece4141df1b5e349af8d66c28" +dependencies = [ + "futures 0.3.27", + "futures-util", + "opentelemetry", + "prost 0.11.8", + "tonic", + "tonic-build", +] + +[[package]] +name = "opentelemetry_api" +version = "0.18.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c24f96e21e7acc813c7a8394ee94978929db2bcc46cf6b5014fc612bf7760c22" +dependencies = [ + "fnv", + "futures-channel", + "futures-util", + "indexmap", + "js-sys", + "once_cell", + "pin-project-lite", + "thiserror", +] + +[[package]] +name = "opentelemetry_sdk" +version = "0.18.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ca41c4933371b61c2a2f214bf16931499af4ec90543604ec828f7a625c09113" +dependencies = [ + "async-trait", + "crossbeam-channel", + "dashmap", + "fnv", + "futures-channel", + "futures-executor", + "futures-util", + "once_cell", + "opentelemetry_api", + "percent-encoding", + "rand 0.8.5", + "thiserror", + "tokio", + "tokio-stream", +] + [[package]] name = "os_str_bytes" version = "6.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ceedf44fb00f2d1984b0bc98102627ce622e083e49a5bacdb3e514fa4238e267" +[[package]] +name = "overload" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" + [[package]] name = "parking" version = "2.0.0" @@ -2973,6 +3135,15 @@ dependencies = [ "regex-syntax", ] +[[package]] +name = "regex-automata" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132" +dependencies = [ + "regex-syntax", +] + [[package]] name = "regex-syntax" version = "0.6.29" @@ -3460,6 +3631,15 @@ dependencies = [ "digest", ] +[[package]] +name = "sharded-slab" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "900fba806f70c630b0a382d0d825e17a0f19fcd059a2ade1ff237bcddf446b31" +dependencies = [ + "lazy_static", +] + [[package]] name = "shlex" version = "0.1.1" @@ -3590,6 +3770,9 @@ dependencies = [ "jsonwebtoken", "log", "once_cell", + "opentelemetry", + "opentelemetry-otlp", + "opentelemetry-proto", "rand 0.8.5", "reqwest", "rmp-serde", @@ -3598,8 +3781,15 @@ dependencies = [ "serde_cbor", "serde_json", "surrealdb", + "temp-env", "thiserror", "tokio", + "tokio-stream", + "tonic", + "tracing", + "tracing-futures", + "tracing-opentelemetry", + "tracing-subscriber", "urlencoding", "warp", ] @@ -3660,6 +3850,7 @@ dependencies = [ "tokio-stream", "tokio-tungstenite 0.18.0", "tokio-util", + "tracing", "trice", "ulid", "url", @@ -3718,6 +3909,15 @@ version = "0.1.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "af547b166dd1ea4b472165569fc456cfb6818116f854690b0ff205e636523dab" +[[package]] +name = "temp-env" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5ee95b343d943e5a0d2221fb73029e8040f3c91d6d06afec86c664682a361681" +dependencies = [ + "parking_lot 0.12.1", +] + [[package]] name = "tempfile" version = "3.4.0" @@ -3935,6 +4135,16 @@ dependencies = [ "windows-sys 0.45.0", ] +[[package]] +name = "tokio-io-timeout" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "30b74022ada614a1b4834de765f9bb43877f910cc8ce4be40e89042c9223a8bf" +dependencies = [ + "pin-project-lite", + "tokio", +] + [[package]] name = "tokio-macros" version = "1.8.2" @@ -4039,6 +4249,51 @@ dependencies = [ "winnow", ] +[[package]] +name = "tonic" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f219fad3b929bef19b1f86fbc0358d35daed8f2cac972037ac0dc10bbb8d5fb" +dependencies = [ + "async-stream", + "async-trait", + "axum", + "base64 0.13.1", + "bytes", + "futures-core", + "futures-util", + "h2", + "http", + "http-body", + "hyper", + "hyper-timeout", + "percent-encoding", + "pin-project", + "prost 0.11.8", + "prost-derive 0.11.8", + "tokio", + "tokio-stream", + "tokio-util", + "tower", + "tower-layer", + "tower-service", + "tracing", + "tracing-futures", +] + +[[package]] +name = "tonic-build" +version = "0.8.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5bf5e9b9c0f7e0a7c027dcfaba7b2c60816c7049171f679d99ee2ff65d0de8c4" +dependencies = [ + "prettyplease", + "proc-macro2", + "prost-build 0.11.8", + "quote", + "syn 1.0.109", +] + [[package]] name = "tower" version = "0.4.13" @@ -4047,9 +4302,13 @@ checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c" dependencies = [ "futures-core", "futures-util", + "indexmap", "pin-project", "pin-project-lite", + "rand 0.8.5", + "slab", "tokio", + "tokio-util", "tower-layer", "tower-service", "tracing", @@ -4076,9 +4335,21 @@ dependencies = [ "cfg-if", "log", "pin-project-lite", + "tracing-attributes", "tracing-core", ] +[[package]] +name = "tracing-attributes" +version = "0.1.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4017f8f45139870ca7e672686113917c71c7a6e02d4924eda67186083c03081a" +dependencies = [ + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "tracing-core" version = "0.1.30" @@ -4086,6 +4357,60 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "24eb03ba0eab1fd845050058ce5e616558e8f8d8fca633e6b163fe25c797213a" dependencies = [ "once_cell", + "valuable", +] + +[[package]] +name = "tracing-futures" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "97d095ae15e245a057c8e8451bab9b3ee1e1f68e9ba2b4fbc18d0ac5237835f2" +dependencies = [ + "pin-project", + "tracing", +] + +[[package]] +name = "tracing-log" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "78ddad33d2d10b1ed7eb9d1f518a5674713876e97e5bb9b7345a7984fbb4f922" +dependencies = [ + "lazy_static", + "log", + "tracing-core", +] + +[[package]] +name = "tracing-opentelemetry" +version = "0.18.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "21ebb87a95ea13271332df069020513ab70bdb5637ca42d6e492dc3bbbad48de" +dependencies = [ + "once_cell", + "opentelemetry", + "tracing", + "tracing-core", + "tracing-log", + "tracing-subscriber", +] + +[[package]] +name = "tracing-subscriber" +version = "0.3.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a6176eae26dd70d0c919749377897b54a9276bd7061339665dd68777926b5a70" +dependencies = [ + "matchers", + "nu-ansi-term", + "once_cell", + "regex", + "sharded-slab", + "smallvec", + "thread_local", + "tracing", + "tracing-core", + "tracing-log", ] [[package]] @@ -4260,6 +4585,12 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "valuable" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d" + [[package]] name = "vcpkg" version = "0.2.15" @@ -4645,9 +4976,9 @@ checksum = "d2d7d3948613f75c98fd9328cfdcc45acc4d360655289d0a7d4ec931392200a3" [[package]] name = "zeroize" -version = "1.5.7" +version = "1.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c394b5bd0c6f669e7275d9c20aa90ae064cb22e75a1cad54e1b34088034b149f" +checksum = "2a0956f1ba7c7909bfb66c2e9e4124ab6f6482560f6628b5aaeba39207c9aad9" [[package]] name = "zstd" diff --git a/Cargo.toml b/Cargo.toml index bf769e48..1af5ace8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,10 +26,10 @@ codegen-units = 1 [dependencies] argon2 = "0.5.0" -clap = { version = "3.2.23", features = ["env"] } base64 = "0.21.0" bytes = "1.4.0" chrono = { version = "0.4.24", features = ["serde"] } +clap = { version = "3.2.23", features = ["env"] } fern = { version = "0.6.2", features = ["colored"] } futures = "0.3.27" http = "0.2.9" @@ -37,6 +37,8 @@ hyper = "0.14.25" jsonwebtoken = "8.3.0" log = "0.4.17" once_cell = "1.17.1" +opentelemetry = { version = "0.18", features = ["rt-tokio"] } +opentelemetry-otlp = "0.11.0" rand = "0.8.5" reqwest = { version = "0.11.15", features = ["blocking"] } rustyline = "11.0.0" @@ -47,9 +49,19 @@ serde_pack = { version = "1.1.1", package = "rmp-serde" } surrealdb = { path = "lib", features = ["protocol-http", "protocol-ws", "rustls"] } thiserror = "1.0.40" tokio = { version = "1.26.0", features = ["macros", "signal"] } +tracing = "0.1" +tracing-futures = "0.2.5" +tracing-opentelemetry = "0.18.0" +tracing-subscriber = { version = "0.3.16", features = ["env-filter"] } urlencoding = "2.1.2" warp = { version = "0.3.3", features = ["compression", "tls", "websocket"] } +[dev-dependencies] +tonic = "0.8.3" +opentelemetry-proto = {version = "0.1.0", features = ["gen-tonic", "traces", "build-server"] } +tokio-stream = { version = "0.1", features = ["net"] } +temp-env = "0.3.3" + [package.metadata.deb] maintainer-scripts = "pkg/deb/" maintainer = "Tobie Morgan Hitchcock " diff --git a/lib/Cargo.toml b/lib/Cargo.toml index 5953c9cb..a4222277 100644 --- a/lib/Cargo.toml +++ b/lib/Cargo.toml @@ -63,9 +63,9 @@ dmp = "0.1.3" echodb = { version = "0.4.0", optional = true } executor = { version = "1.5.0", package = "async-executor" } flume = "0.10.14" +foundationdb = { version = "0.7.0", default-features = false, features = ["embedded-fdb-include"], optional = true } futures = "0.3.27" futures-concurrency = "7.1.0" -foundationdb = { version = "0.7.0", default-features = false, features = ["embedded-fdb-include"], optional = true } fuzzy-matcher = "0.3.7" geo = { version = "0.24.1", features = ["use-serde"] } indexmap = { version = "1.9.3", features = ["serde"] } @@ -96,6 +96,7 @@ thiserror = "1.0.40" tikv = { version = "0.1.0", package = "tikv-client", optional = true } tokio-stream = { version = "0.1.12", optional = true } tokio-util = { version = "0.7.7", optional = true, features = ["compat"] } +tracing = "0.1.37" trice = "0.3.1" ulid = { version = "1.0.0", features = ["serde"] } url = "2.3.1" diff --git a/lib/src/dbs/executor.rs b/lib/src/dbs/executor.rs index dd721992..c2aa245d 100644 --- a/lib/src/dbs/executor.rs +++ b/lib/src/dbs/executor.rs @@ -15,6 +15,7 @@ use crate::sql::statement::Statement; use crate::sql::value::Value; use futures::lock::Mutex; use std::sync::Arc; +use tracing::instrument; use trice::Instant; pub(crate) struct Executor<'a> { @@ -127,6 +128,7 @@ impl<'a> Executor<'a> { opt.db = Some(db.into()); } + #[instrument(name = "executor", skip_all)] pub async fn execute( &mut self, mut ctx: Context<'_>, diff --git a/lib/src/kvs/ds.rs b/lib/src/kvs/ds.rs index 9f02a7de..1eb92139 100644 --- a/lib/src/kvs/ds.rs +++ b/lib/src/kvs/ds.rs @@ -13,7 +13,9 @@ use crate::sql::Query; use crate::sql::Value; use channel::Sender; use futures::lock::Mutex; +use std::fmt; use std::sync::Arc; +use tracing::instrument; /// The underlying datastore instance which stores the dataset. #[allow(dead_code)] @@ -35,6 +37,23 @@ pub(super) enum Inner { FDB(super::fdb::Datastore), } +impl fmt::Display for Datastore { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match &self.inner { + #[cfg(feature = "kv-mem")] + Inner::Mem(_) => write!(f, "memory"), + #[cfg(feature = "kv-rocksdb")] + Inner::RocksDB(_) => write!(f, "rocksdb"), + #[cfg(feature = "kv-indxdb")] + Inner::IndxDB(_) => write!(f, "indexdb"), + #[cfg(feature = "kv-tikv")] + Inner::TiKV(_) => write!(f, "tikv"), + #[cfg(feature = "kv-fdb")] + Inner::FDB(_) => write!(f, "fdb"), + } + } +} + impl Datastore { /// Creates a new datastore instance /// @@ -230,6 +249,7 @@ impl Datastore { /// Ok(()) /// } /// ``` + #[instrument(skip_all)] pub async fn execute( &self, txt: &str, @@ -279,6 +299,7 @@ impl Datastore { /// Ok(()) /// } /// ``` + #[instrument(skip_all)] pub async fn process( &self, ast: Query, @@ -327,6 +348,7 @@ impl Datastore { /// Ok(()) /// } /// ``` + #[instrument(skip_all)] pub async fn compute( &self, val: Value, @@ -365,6 +387,7 @@ impl Datastore { } /// Performs a full database export as SQL + #[instrument(skip(self, chn))] pub async fn export(&self, ns: String, db: String, chn: Sender>) -> Result<(), Error> { // Start a new transaction let mut txn = self.transaction(false, false).await?; diff --git a/lib/src/kvs/tx.rs b/lib/src/kvs/tx.rs index 212c2d46..f75199e1 100644 --- a/lib/src/kvs/tx.rs +++ b/lib/src/kvs/tx.rs @@ -26,6 +26,7 @@ use sql::statements::DefineScopeStatement; use sql::statements::DefineTableStatement; use sql::statements::DefineTokenStatement; use sql::statements::LiveStatement; +use std::fmt; use std::fmt::Debug; use std::ops::Range; use std::sync::Arc; @@ -54,6 +55,23 @@ pub(super) enum Inner { FDB(super::fdb::Transaction), } +impl fmt::Display for Transaction { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match &self.inner { + #[cfg(feature = "kv-mem")] + Inner::Mem(_) => write!(f, "memory"), + #[cfg(feature = "kv-rocksdb")] + Inner::RocksDB(_) => write!(f, "rocksdb"), + #[cfg(feature = "kv-indxdb")] + Inner::IndxDB(_) => write!(f, "indexdb"), + #[cfg(feature = "kv-tikv")] + Inner::TiKV(_) => write!(f, "tikv"), + #[cfg(feature = "kv-fdb")] + Inner::FDB(_) => write!(f, "fdb"), + } + } +} + impl Transaction { // -------------------------------------------------- // Integral methods diff --git a/lib/src/sql/parser.rs b/lib/src/sql/parser.rs index e8020eaf..4505fae3 100644 --- a/lib/src/sql/parser.rs +++ b/lib/src/sql/parser.rs @@ -6,18 +6,22 @@ use crate::sql::thing::Thing; use crate::sql::value::Value; use nom::Err; use std::str; +use tracing::instrument; /// Parses a SurrealQL [`Query`] +#[instrument(name = "parser", skip_all, fields(length = input.len()))] pub fn parse(input: &str) -> Result { parse_impl(input, query) } /// Parses a SurrealQL [`Thing`] +#[instrument(name = "parser", skip_all, fields(length = input.len()))] pub fn thing(input: &str) -> Result { parse_impl(input, super::thing::thing) } /// Parses a SurrealQL [`Value`] +#[instrument(name = "parser", skip_all, fields(length = input.len()))] pub fn json(input: &str) -> Result { parse_impl(input, super::value::json) } diff --git a/pkg/nix/spec/aarch64-unknown-linux-gnu.nix b/pkg/nix/spec/aarch64-unknown-linux-gnu.nix index 3a2ffd1c..54b023dd 100644 --- a/pkg/nix/spec/aarch64-unknown-linux-gnu.nix +++ b/pkg/nix/spec/aarch64-unknown-linux-gnu.nix @@ -16,6 +16,9 @@ LIBCLANG_PATH = "${llvmPackages.libclang.lib}/lib"; + PROTOC = "${protobuf}/bin/protoc"; + PROTOC_INCLUDE = "${protobuf}/include"; + OPENSSL_NO_VENDOR = "true"; }; } diff --git a/pkg/nix/spec/x86_64-apple-darwin.nix b/pkg/nix/spec/x86_64-apple-darwin.nix index 53134595..44e01c72 100644 --- a/pkg/nix/spec/x86_64-apple-darwin.nix +++ b/pkg/nix/spec/x86_64-apple-darwin.nix @@ -16,6 +16,9 @@ LIBCLANG_PATH = "${llvmPackages.libclang.lib}/lib"; + PROTOC = "${protobuf}/bin/protoc"; + PROTOC_INCLUDE = "${protobuf}/include"; + OPENSSL_NO_VENDOR = "true"; }; } diff --git a/pkg/nix/spec/x86_64-unknown-linux-gnu.nix b/pkg/nix/spec/x86_64-unknown-linux-gnu.nix index 46118e64..3ede31b9 100644 --- a/pkg/nix/spec/x86_64-unknown-linux-gnu.nix +++ b/pkg/nix/spec/x86_64-unknown-linux-gnu.nix @@ -22,6 +22,9 @@ LIBCLANG_PATH = "${llvmPackages.libclang.lib}/lib"; + PROTOC = "${protobuf}/bin/protoc"; + PROTOC_INCLUDE = "${protobuf}/include"; + CARGO_BUILD_TARGET = target; }; } diff --git a/pkg/nix/spec/x86_64-unknown-linux-musl.nix b/pkg/nix/spec/x86_64-unknown-linux-musl.nix index ded81a05..2ddea618 100644 --- a/pkg/nix/spec/x86_64-unknown-linux-musl.nix +++ b/pkg/nix/spec/x86_64-unknown-linux-musl.nix @@ -14,5 +14,8 @@ OPENSSL_STATIC = "true"; OPENSSL_LIB_DIR = "${pkgsStatic.openssl.out}/lib"; OPENSSL_INCLUDE_DIR = "${pkgsStatic.openssl.dev}/include"; + + PROTOC = "${protobuf}/bin/protoc"; + PROTOC_INCLUDE = "${protobuf}/include"; }; } diff --git a/src/cli/backup.rs b/src/cli/backup.rs index 30e83985..7fd39d1b 100644 --- a/src/cli/backup.rs +++ b/src/cli/backup.rs @@ -10,6 +10,8 @@ use std::io::copy; const TYPE: &str = "application/octet-stream"; pub fn init(matches: &clap::ArgMatches) -> Result<(), Error> { + // Initialize opentelemetry and logging + crate::o11y::builder().with_log_level("error").init(); // Try to parse the specified source file let from = matches.value_of("from").unwrap(); // Try to parse the specified output file diff --git a/src/cli/export.rs b/src/cli/export.rs index d5a4ce03..3a6b1c0c 100644 --- a/src/cli/export.rs +++ b/src/cli/export.rs @@ -7,8 +7,8 @@ use surrealdb::Error as SurrealError; #[tokio::main] pub async fn init(matches: &clap::ArgMatches) -> Result<(), Error> { - // Set the default logging level - crate::cli::log::init(1); + // Initialize opentelemetry and logging + crate::o11y::builder().with_log_level("error").init(); // Try to parse the file argument let file = matches.value_of("file").unwrap(); // Parse all other cli arguments diff --git a/src/cli/import.rs b/src/cli/import.rs index fa3aa7ca..cff48bbe 100644 --- a/src/cli/import.rs +++ b/src/cli/import.rs @@ -7,8 +7,8 @@ use surrealdb::Error as SurrealError; #[tokio::main] pub async fn init(matches: &clap::ArgMatches) -> Result<(), Error> { - // Set the default logging level - crate::cli::log::init(1); + // Initialize opentelemetry and logging + crate::o11y::builder().with_log_level("error").init(); // Try to parse the file argument let file = matches.value_of("file").unwrap(); // Parse all other cli arguments diff --git a/src/cli/isready.rs b/src/cli/isready.rs index f38f75c9..785e5556 100644 --- a/src/cli/isready.rs +++ b/src/cli/isready.rs @@ -3,8 +3,8 @@ use surrealdb::engine::any::connect; #[tokio::main] pub async fn init(matches: &clap::ArgMatches) -> Result<(), Error> { - // Set the default logging level - crate::cli::log::init(0); + // Initialize opentelemetry and logging + crate::o11y::builder().with_log_level("error").init(); // Parse all other cli arguments let endpoint = matches.value_of("conn").unwrap(); // Connect to the database engine diff --git a/src/cli/log.rs b/src/cli/log.rs deleted file mode 100644 index de0ffd9e..00000000 --- a/src/cli/log.rs +++ /dev/null @@ -1,55 +0,0 @@ -use fern::colors::Color; -use fern::colors::ColoredLevelConfig; - -pub fn init(verbosity: usize) { - let levels = ColoredLevelConfig::new() - .error(Color::Red) - .warn(Color::Yellow) - .info(Color::Blue) - .debug(Color::Magenta) - .trace(Color::White); - - let mut logger = fern::Dispatch::new(); - - logger = logger.format(move |out, message, record| { - out.finish(format_args!( - "{b}{time}{r} {l}{kind:<5}{r} {c}{name}{r} {l}{message}{r}", - l = format_args!("\x1B[{}m", levels.get_color(&record.level()).to_fg_str()), - b = format_args!("\x1B[{}m", Color::BrightBlack.to_fg_str()), - c = format_args!("\x1B[{}m", Color::Cyan.to_fg_str()), - r = "\x1B[0m", - time = chrono::Local::now().format("[%Y-%m-%d %H:%M:%S]"), - kind = record.level(), - name = record.target(), - message = message, - )) - }); - - logger = match verbosity { - 4 => logger.level_for("surrealdb::txn", log::LevelFilter::Trace), - _ => logger.level_for("surrealdb::txn", log::LevelFilter::Error), - }; - - logger = match verbosity { - 0 => logger.level_for("surrealdb", log::LevelFilter::Warn), - 1 => logger.level_for("surrealdb", log::LevelFilter::Info), - 2 => logger.level_for("surrealdb", log::LevelFilter::Debug), - _ => logger.level_for("surrealdb", log::LevelFilter::Trace), - }; - - logger = match verbosity { - 0 => logger.level_for("surreal", log::LevelFilter::Warn), - 1 => logger.level_for("surreal", log::LevelFilter::Info), - 2 => logger.level_for("surreal", log::LevelFilter::Debug), - _ => logger.level_for("surreal", log::LevelFilter::Trace), - }; - - logger = match verbosity { - 4 => logger.level(log::LevelFilter::Trace), - _ => logger.level(log::LevelFilter::Error), - }; - - logger = logger.chain(std::io::stderr()); - - logger.apply().unwrap(); -} diff --git a/src/cli/mod.rs b/src/cli/mod.rs index 2f9c38de..303c1860 100644 --- a/src/cli/mod.rs +++ b/src/cli/mod.rs @@ -3,7 +3,6 @@ mod config; mod export; mod import; mod isready; -mod log; mod sql; mod start; mod version; @@ -13,6 +12,8 @@ pub use config::CF; use crate::cnf::LOGO; use clap::{Arg, Command}; use std::process::ExitCode; +use tracing::Level; +use tracing_subscriber::EnvFilter; pub const LOG: &str = "surrealdb::cli"; @@ -120,6 +121,26 @@ fn key_valid(v: &str) -> Result<(), String> { } } +fn log_valid(v: &str) -> Result { + match v { + // Check if we should show all log levels + "full" => Ok(Level::TRACE.to_string()), + // Otherwise, let's only show errors + "error" => Ok(Level::ERROR.to_string()), + // Specify the log level for each code area + "warn" | "info" | "debug" | "trace" => { + Ok(format!("error,surreal={v},surrealdb={v},surrealdb::txn=error")) + } + // Let's try to parse the custom log level + _ => match EnvFilter::builder().parse(v) { + // The custom log level parsed successfully + Ok(_) => Ok(v.to_owned()), + // There was an error parsing the custom log level + Err(_) => Err(String::from("Error parsing logging configuration")), + }, + } +} + pub fn init() -> ExitCode { let setup = Command::new("SurrealDB command-line interface and server") .about(INFO) @@ -134,7 +155,7 @@ pub fn init() -> ExitCode { .arg( Arg::new("path") .index(1) - .env("DB_PATH") + .env("SURREAL_PATH") .required(false) .validator(path_valid) .default_value("memory") @@ -143,7 +164,7 @@ pub fn init() -> ExitCode { .arg( Arg::new("user") .short('u') - .env("USER") + .env("SURREAL_USER") .long("user") .forbid_empty_values(true) .default_value("root") @@ -152,7 +173,7 @@ pub fn init() -> ExitCode { .arg( Arg::new("pass") .short('p') - .env("PASS") + .env("SURREAL_PASS") .long("pass") .takes_value(true) .forbid_empty_values(true) @@ -160,7 +181,7 @@ pub fn init() -> ExitCode { ) .arg( Arg::new("addr") - .env("ADDR") + .env("SURREAL_ADDR") .long("addr") .number_of_values(1) .forbid_empty_values(true) @@ -171,7 +192,7 @@ pub fn init() -> ExitCode { .arg( Arg::new("bind") .short('b') - .env("BIND") + .env("SURREAL_BIND") .long("bind") .forbid_empty_values(true) .default_value("0.0.0.0:8000") @@ -180,7 +201,7 @@ pub fn init() -> ExitCode { .arg( Arg::new("key") .short('k') - .env("KEY") + .env("SURREAL_KEY") .long("key") .takes_value(true) .forbid_empty_values(true) @@ -189,7 +210,7 @@ pub fn init() -> ExitCode { ) .arg( Arg::new("kvs-ca") - .env("KVS_CA") + .env("SURREAL_KVS_CA") .long("kvs-ca") .takes_value(true) .forbid_empty_values(true) @@ -197,7 +218,7 @@ pub fn init() -> ExitCode { ) .arg( Arg::new("kvs-crt") - .env("KVS_CRT") + .env("SURREAL_KVS_CRT") .long("kvs-crt") .takes_value(true) .forbid_empty_values(true) @@ -207,7 +228,7 @@ pub fn init() -> ExitCode { ) .arg( Arg::new("kvs-key") - .env("KVS_KEY") + .env("SURREAL_KVS_KEY") .long("kvs-key") .takes_value(true) .forbid_empty_values(true) @@ -217,7 +238,7 @@ pub fn init() -> ExitCode { ) .arg( Arg::new("web-crt") - .env("WEB_CRT") + .env("SURREAL_WEB_CRT") .long("web-crt") .takes_value(true) .forbid_empty_values(true) @@ -225,7 +246,7 @@ pub fn init() -> ExitCode { ) .arg( Arg::new("web-key") - .env("WEB_KEY") + .env("SURREAL_WEB_KEY") .long("web-key") .takes_value(true) .forbid_empty_values(true) @@ -234,7 +255,7 @@ pub fn init() -> ExitCode { .arg( Arg::new("strict") .short('s') - .env("STRICT") + .env("SURREAL_STRICT") .long("strict") .required(false) .takes_value(false) @@ -243,17 +264,17 @@ pub fn init() -> ExitCode { .arg( Arg::new("log") .short('l') - .env("LOG") + .env("SURREAL_LOG") .long("log") .takes_value(true) .default_value("info") .forbid_empty_values(true) - .help("The logging level for the database server") - .value_parser(["warn", "info", "debug", "trace", "full"]), + .value_parser(log_valid) + .help("The logging level for the database server. One of error, warn, info, debug, trace, full."), ) .arg( Arg::new("no-banner") - .env("NO_BANNER") + .env("SURREAL_NO_BANNER") .long("no-banner") .required(false) .takes_value(false) diff --git a/src/cli/sql.rs b/src/cli/sql.rs index 2f887596..bf7ba211 100644 --- a/src/cli/sql.rs +++ b/src/cli/sql.rs @@ -12,8 +12,8 @@ use surrealdb::Response; #[tokio::main] pub async fn init(matches: &clap::ArgMatches) -> Result<(), Error> { - // Set the default logging level - crate::cli::log::init(0); + // Initialize opentelemetry and logging + crate::o11y::builder().with_log_level("warn").init(); // Parse all other cli arguments let username = matches.value_of("user").unwrap(); let password = matches.value_of("pass").unwrap(); diff --git a/src/cli/start.rs b/src/cli/start.rs index a8d6e57c..d4e0f667 100644 --- a/src/cli/start.rs +++ b/src/cli/start.rs @@ -1,5 +1,4 @@ use super::config; -use super::log; use crate::cnf::LOGO; use crate::dbs; use crate::env; @@ -9,22 +8,13 @@ use crate::net; #[tokio::main] pub async fn init(matches: &clap::ArgMatches) -> Result<(), Error> { - // Set the default log level - match matches.get_one::("log").map(String::as_str) { - Some("warn") => log::init(0), - Some("info") => log::init(1), - Some("debug") => log::init(2), - Some("trace") => log::init(3), - Some("full") => log::init(4), - _ => unreachable!(), - }; - + // Initialize opentelemetry and logging + crate::o11y::builder().with_log_level(matches.get_one::("log").unwrap()).init(); // Check if a banner should be outputted if !matches.is_present("no-banner") { // Output SurrealDB logo println!("{LOGO}"); } - // Setup the cli options config::init(matches); // Initiate environment diff --git a/src/main.rs b/src/main.rs index 8c1259b2..84d5d2c6 100644 --- a/src/main.rs +++ b/src/main.rs @@ -23,6 +23,7 @@ mod env; mod err; mod iam; mod net; +mod o11y; mod rpc; use std::process::ExitCode; diff --git a/src/net/mod.rs b/src/net/mod.rs index d895718d..dc0ae69a 100644 --- a/src/net/mod.rs +++ b/src/net/mod.rs @@ -62,6 +62,8 @@ pub async fn init() -> Result<(), Error> { let net = net.with(head::cors()); // Log all requests to the console let net = net.with(log::write()); + // Trace requests + let net = net.with(warp::trace::request()); // Get local copy of options let opt = CF.get().unwrap(); diff --git a/src/net/rpc.rs b/src/net/rpc.rs index 84667dd4..fd7000a9 100644 --- a/src/net/rpc.rs +++ b/src/net/rpc.rs @@ -27,6 +27,7 @@ use surrealdb::sql::Strand; use surrealdb::sql::Uuid; use surrealdb::sql::Value; use tokio::sync::RwLock; +use tracing::instrument; use warp::ws::{Message, WebSocket, Ws}; use warp::Filter; @@ -367,6 +368,7 @@ impl Rpc { Ok(Value::None) } + #[instrument(skip_all, name = "rpc use")] async fn yuse(&mut self, ns: Value, db: Value) -> Result { if let Value::Strand(ns) = ns { self.session.ns = Some(ns.0); @@ -377,6 +379,7 @@ impl Rpc { Ok(Value::None) } + #[instrument(skip_all, name = "rpc signup")] async fn signup(&mut self, vars: Object) -> Result { crate::iam::signup::signup(&mut self.session, vars) .await @@ -384,18 +387,20 @@ impl Rpc { .map_err(Into::into) } + #[instrument(skip_all, name = "rpc signin")] async fn signin(&mut self, vars: Object) -> Result { crate::iam::signin::signin(&mut self.session, vars) .await .map(Into::into) .map_err(Into::into) } - + #[instrument(skip_all, name = "rpc invalidate")] async fn invalidate(&mut self) -> Result { crate::iam::clear::clear(&mut self.session).await?; Ok(Value::None) } + #[instrument(skip_all, name = "rpc auth")] async fn authenticate(&mut self, token: Strand) -> Result { crate::iam::verify::token(&mut self.session, token.0).await?; Ok(Value::None) @@ -405,6 +410,7 @@ impl Rpc { // Methods for identification // ------------------------------ + #[instrument(skip_all, name = "rpc info")] async fn info(&self) -> Result { // Get a database reference let kvs = DB.get().unwrap(); @@ -424,6 +430,7 @@ impl Rpc { // Methods for setting variables // ------------------------------ + #[instrument(skip_all, name = "rpc set")] async fn set(&mut self, key: Strand, val: Value) -> Result { match val { // Remove the variable if undefined @@ -434,6 +441,7 @@ impl Rpc { Ok(Value::Null) } + #[instrument(skip_all, name = "rpc unset")] async fn unset(&mut self, key: Strand) -> Result { self.vars.remove(&key.0); Ok(Value::Null) @@ -443,6 +451,7 @@ impl Rpc { // Methods for live queries // ------------------------------ + #[instrument(skip_all, name = "rpc kill")] async fn kill(&self, id: Value) -> Result { // Get a database reference let kvs = DB.get().unwrap(); @@ -463,6 +472,7 @@ impl Rpc { Ok(res) } + #[instrument(skip_all, name = "rpc live")] async fn live(&self, tb: Value) -> Result { // Get a database reference let kvs = DB.get().unwrap(); @@ -487,6 +497,7 @@ impl Rpc { // Methods for selecting // ------------------------------ + #[instrument(skip_all, name = "rpc select")] async fn select(&self, what: Value) -> Result { // Return a single result? let one = what.is_thing(); @@ -516,6 +527,7 @@ impl Rpc { // Methods for creating // ------------------------------ + #[instrument(skip_all, name = "rpc create")] async fn create(&self, what: Value, data: Value) -> Result { // Return a single result? let one = what.is_thing(); @@ -546,6 +558,7 @@ impl Rpc { // Methods for updating // ------------------------------ + #[instrument(skip_all, name = "rpc update")] async fn update(&self, what: Value, data: Value) -> Result { // Return a single result? let one = what.is_thing(); @@ -576,6 +589,7 @@ impl Rpc { // Methods for changing // ------------------------------ + #[instrument(skip_all, name = "rpc change")] async fn change(&self, what: Value, data: Value) -> Result { // Return a single result? let one = what.is_thing(); @@ -606,6 +620,7 @@ impl Rpc { // Methods for modifying // ------------------------------ + #[instrument(skip_all, name = "rpc modify")] async fn modify(&self, what: Value, data: Value) -> Result { // Return a single result? let one = what.is_thing(); @@ -636,6 +651,7 @@ impl Rpc { // Methods for deleting // ------------------------------ + #[instrument(skip_all, name = "rpc delete")] async fn delete(&self, what: Value) -> Result { // Return a single result? let one = what.is_thing(); @@ -665,6 +681,7 @@ impl Rpc { // Methods for querying // ------------------------------ + #[instrument(skip_all, name = "rpc query")] async fn query(&self, sql: Strand) -> Result { // Get a database reference let kvs = DB.get().unwrap(); @@ -678,6 +695,7 @@ impl Rpc { Ok(res) } + #[instrument(skip_all, name = "rpc query_with")] async fn query_with(&self, sql: Strand, mut vars: Object) -> Result { // Get a database reference let kvs = DB.get().unwrap(); diff --git a/src/o11y/logger.rs b/src/o11y/logger.rs new file mode 100644 index 00000000..4b60f159 --- /dev/null +++ b/src/o11y/logger.rs @@ -0,0 +1,16 @@ +use tracing::Subscriber; +use tracing_subscriber::fmt::format::FmtSpan; +use tracing_subscriber::{EnvFilter, Layer}; + +pub fn new(level: String) -> Box + Send + Sync> +where + S: Subscriber + for<'a> tracing_subscriber::registry::LookupSpan<'a> + Send + Sync, +{ + tracing_subscriber::fmt::layer() + .compact() + .with_ansi(true) + .with_span_events(FmtSpan::NONE) + .with_writer(std::io::stderr) + .with_filter(EnvFilter::builder().parse(level).unwrap()) + .boxed() +} diff --git a/src/o11y/mod.rs b/src/o11y/mod.rs new file mode 100644 index 00000000..c595ede0 --- /dev/null +++ b/src/o11y/mod.rs @@ -0,0 +1,140 @@ +mod logger; +mod tracers; + +use tracing::Subscriber; +use tracing_subscriber::{prelude::*, util::SubscriberInitExt}; + +#[derive(Default, Debug, Clone)] +pub struct Builder { + log_level: String, +} + +pub fn builder() -> Builder { + Builder::default() +} + +impl Builder { + /// Set the log level on the builder + pub fn with_log_level(mut self, log_level: &str) -> Self { + self.log_level = log_level.to_string(); + self + } + /// Build a dispatcher with the fmt subscriber (logs) and the chosen tracer subscriber + pub fn build(self) -> Box { + Box::new( + tracing_subscriber::registry().with(logger::new(self.log_level)).with(tracers::new()), + ) + } + /// Build a dispatcher and set it as global + pub fn init(self) { + self.build().init() + } +} + +#[cfg(test)] +mod tests { + use opentelemetry::global::shutdown_tracer_provider; + use tracing::{span, Level}; + use tracing_subscriber::util::SubscriberInitExt; + + #[tokio::test(flavor = "multi_thread")] + async fn test_otlp_tracer() { + println!("Starting mock otlp server..."); + let (addr, mut req_rx) = super::tracers::tests::mock_otlp_server().await; + + { + let otlp_endpoint = format!("http://{}", addr); + temp_env::with_vars( + vec![ + ("SURREAL_TRACING_TRACER", Some("otlp")), + ("OTEL_EXPORTER_OTLP_ENDPOINT", Some(otlp_endpoint.as_str())), + ], + || { + let _enter = super::builder().build().set_default(); + + println!("Sending span..."); + + { + let span = span!(Level::INFO, "test-surreal-span"); + let _enter = span.enter(); + info!("test-surreal-event"); + } + + shutdown_tracer_provider(); + }, + ) + } + + println!("Waiting for request..."); + let req = req_rx.recv().await.expect("missing export request"); + let first_span = req + .resource_spans + .first() + .unwrap() + .instrumentation_library_spans + .first() + .unwrap() + .spans + .first() + .unwrap(); + assert_eq!("test-surreal-span", first_span.name); + let first_event = first_span.events.first().unwrap(); + assert_eq!("test-surreal-event", first_event.name); + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_tracing_filter() { + println!("Starting mock otlp server..."); + let (addr, mut req_rx) = super::tracers::tests::mock_otlp_server().await; + + { + let otlp_endpoint = format!("http://{}", addr); + temp_env::with_vars( + vec![ + ("SURREAL_TRACING_TRACER", Some("otlp")), + ("SURREAL_TRACING_FILTER", Some("debug")), + ("OTEL_EXPORTER_OTLP_ENDPOINT", Some(otlp_endpoint.as_str())), + ], + || { + let _enter = super::builder().build().set_default(); + + println!("Sending spans..."); + + { + let span = span!(Level::DEBUG, "debug"); + let _enter = span.enter(); + debug!("debug"); + trace!("trace"); + } + + { + let span = span!(Level::TRACE, "trace"); + let _enter = span.enter(); + debug!("debug"); + trace!("trace"); + } + + shutdown_tracer_provider(); + }, + ) + } + + println!("Waiting for request..."); + let req = req_rx.recv().await.expect("missing export request"); + let spans = &req + .resource_spans + .first() + .unwrap() + .instrumentation_library_spans + .first() + .unwrap() + .spans; + + assert_eq!(1, spans.len()); + assert_eq!("debug", spans.first().unwrap().name); + + let events = &spans.first().unwrap().events; + assert_eq!(1, events.len()); + assert_eq!("debug", events.first().unwrap().name); + } +} diff --git a/src/o11y/tracers/mod.rs b/src/o11y/tracers/mod.rs new file mode 100644 index 00000000..d2dc68d5 --- /dev/null +++ b/src/o11y/tracers/mod.rs @@ -0,0 +1,88 @@ +use tracing::Subscriber; +use tracing_subscriber::Layer; + +pub mod otlp; + +const TRACING_TRACER_VAR: &str = "SURREAL_TRACING_TRACER"; + +// Returns a tracer based on the value of the TRACING_TRACER_VAR env var +pub fn new() -> Option + Send + Sync>> +where + S: Subscriber + for<'a> tracing_subscriber::registry::LookupSpan<'a> + Send + Sync, +{ + match std::env::var(TRACING_TRACER_VAR).unwrap_or_default().trim().to_ascii_lowercase().as_str() + { + // If no tracer is selected, init with the fmt subscriber only + "noop" | "" => { + debug!("No tracer selected"); + None + } + // Init the registry with the OTLP tracer + "otlp" => { + debug!("Setup the OTLP tracer"); + Some(otlp::new()) + } + tracer => { + panic!("unsupported tracer {}", tracer); + } + } +} + +#[cfg(test)] +pub mod tests { + use futures::StreamExt; + use opentelemetry_proto::tonic::collector::trace::v1::{ + trace_service_server::{TraceService, TraceServiceServer}, + ExportTraceServiceRequest, ExportTraceServiceResponse, + }; + use std::{net::SocketAddr, sync::Mutex}; + use tokio::sync::mpsc; + use tokio_stream::wrappers::TcpListenerStream; + + /// Server that mocks a TraceService and receives traces + struct MockServer { + tx: Mutex>, + } + + impl MockServer { + pub fn new(tx: mpsc::Sender) -> Self { + Self { + tx: Mutex::new(tx), + } + } + } + + #[tonic::async_trait] + impl TraceService for MockServer { + async fn export( + &self, + request: tonic::Request, + ) -> Result, tonic::Status> { + self.tx.lock().unwrap().try_send(request.into_inner()).expect("Channel full"); + Ok(tonic::Response::new(ExportTraceServiceResponse {})) + } + } + + pub async fn mock_otlp_server() -> (SocketAddr, mpsc::Receiver) { + let addr: SocketAddr = "0.0.0.0:0".parse().unwrap(); + let listener = tokio::net::TcpListener::bind(addr).await.expect("failed to bind"); + let addr = listener.local_addr().unwrap(); + let stream = TcpListenerStream::new(listener).map(|s| { + if let Ok(ref s) = s { + println!("Got new conn at {}", s.peer_addr().unwrap()); + } + s + }); + + let (req_tx, req_rx) = mpsc::channel(10); + let service = TraceServiceServer::new(MockServer::new(req_tx)); + tokio::task::spawn(async move { + tonic::transport::Server::builder() + .add_service(service) + .serve_with_incoming(stream) + .await + .expect("Server failed"); + }); + (addr, req_rx) + } +} diff --git a/src/o11y/tracers/otlp.rs b/src/o11y/tracers/otlp.rs new file mode 100644 index 00000000..94bd8705 --- /dev/null +++ b/src/o11y/tracers/otlp.rs @@ -0,0 +1,38 @@ +use opentelemetry::sdk::{trace::Tracer, Resource}; +use opentelemetry::trace::TraceError; +use opentelemetry::KeyValue; +use opentelemetry_otlp::WithExportConfig; +use tracing::{Level, Subscriber}; +use tracing_subscriber::{EnvFilter, Layer}; + +const TRACING_FILTER_VAR: &str = "SURREAL_TRACING_FILTER"; + +pub fn new() -> Box + Send + Sync> +where + S: Subscriber + for<'a> tracing_subscriber::registry::LookupSpan<'a> + Send + Sync, +{ + tracing_opentelemetry::layer().with_tracer(tracer().unwrap()).with_filter(filter()).boxed() +} + +fn tracer() -> Result { + let resource = Resource::new(vec![KeyValue::new("service.name", "surrealdb")]); + + opentelemetry_otlp::new_pipeline() + .tracing() + .with_exporter(opentelemetry_otlp::new_exporter().tonic().with_env()) + .with_trace_config(opentelemetry::sdk::trace::config().with_resource(resource)) + .install_batch(opentelemetry::runtime::Tokio) +} + +/// Create a filter for the OTLP subscriber +/// +/// It creates an EnvFilter based on the TRACING_FILTER_VAR's value +/// +/// TRACING_FILTER_VAR accepts the same syntax as RUST_LOG +fn filter() -> EnvFilter { + EnvFilter::builder() + .with_env_var(TRACING_FILTER_VAR) + .with_default_directive(Level::INFO.into()) + .from_env() + .unwrap() +}