SurrealDB started

This commit is contained in:
Borodinov Ilya 2024-12-09 15:42:28 +03:00
parent 54e47e68f7
commit f7e39fa7b6
Signed by: noth
GPG key ID: 75503B2EF596D1BD
14 changed files with 533 additions and 1961 deletions

3
.gitignore vendored
View file

@ -2,3 +2,6 @@
.devenv .devenv
.direnv .direnv
/node_modules /node_modules
history.txt
/run
Cargo.lock

1940
Cargo.lock generated

File diff suppressed because it is too large Load diff

View file

@ -1,9 +1,5 @@
[workspace] [workspace]
members = [ members = [".", "magic", "vespid", "vespid/macros"]
".", "magic",
"vespid",
"vespid/macros"
]
[workspace.dependencies] [workspace.dependencies]
vespid.path = "vespid" vespid.path = "vespid"
@ -32,3 +28,12 @@ git2 = "0.19.0"
icondata.workspace = true icondata.workspace = true
tower-http = { version = "0.6.2", features = ["full"] } tower-http = { version = "0.6.2", features = ["full"] }
tailwind_fuse.workspace = true tailwind_fuse.workspace = true
opentelemetry = "0.27.1"
opentelemetry-otlp = "0.27.0"
opentelemetry_sdk = { version = "0.27.1", features = ["rt-tokio"] }
opentelemetry-semantic-conventions = "0.27.0"
surrealdb = "2.1.2"
tracing-opentelemetry = "0.28.0"
serde = "1.0.215"
itertools = "0.13.0"
once_cell = "1.20.2"

70
compose.yaml Normal file
View file

@ -0,0 +1,70 @@
name: crusto
networks:
crusto: {}
services:
jaeger:
image: jaegertracing/all-in-one:latest
ports:
- "4317"
- "16686:16686"
networks:
- crusto
grafana:
image: grafana/grafana:latest
networks:
- crusto
ports:
- "3001:3000"
environment:
- GF_AUTH_ANONYMOUS_ENABLED=true
- GF_AUTH_ANONYMOUS_ORG_ROLE=Admin
- GF_PATHS_DATA=/var/lib/grafana
volumes:
- ./infra/grafana/datasource-jaeger.yml:/etc/grafana/provisioning/datasources/jaeger.yml
- ./infra/grafana/datasource-prometheus.yml:/etc/grafana/provisioning/datasources/prometheus.yml
- ./infra/grafana/dashboards:/etc/grafana/provisioning/dashboards
- ./run/grafana:/var/lib/grafana
prometheus:
image: prom/prometheus:latest
networks:
- crusto
ports:
- "9090:9090"
volumes:
- ./infra/prometheus.yml:/etc/prometheus/prometheus.yml
otel-collector:
image: otel/opentelemetry-collector-contrib
networks:
- crusto
volumes:
- ./infra/otel-collector-config.yaml:/etc/otelcol-contrib/config.yaml
ports:
- 1888:1888 # pprof extension
- 8888:8888 # Prometheus metrics exposed by the Collector
- 8889:8889 # Prometheus exporter metrics
- 13133:13133 # health_check extension
- 4317:4317 # OTLP gRPC receiver
- 4318:4318 # OTLP http receiver
- 55679:55679 # zpages extension
depends_on:
- jaeger
surrealdb:
image: surrealdb/surrealdb:latest-dev
networks:
- crusto
environment:
SURREAL_TELEMETRY_PROVIDER: otlp
OTEL_EXPORTER_OTLP_ENDPOINT: http://otel-collector:4317
SURREAL_LOG: debug
ports:
- protocol: tcp
published: 9867
target: 8000
user: "0"
volumes:
- ./run/data.db:/data.db
command: start surrealkv:/data.db -u root -p root
depends_on:
- jaeger
- otel-collector

View file

@ -0,0 +1,7 @@
apiVersion: 1
datasources:
- name: Jaeger
type: jaeger
access: proxy
url: http://jaeger:16686

View file

@ -0,0 +1,10 @@
# Prometheus: prometheus:9090
apiVersion: 1
datasources:
- name: Prometheus
type: prometheus
access: proxy
url: http://prometheus:9090
isDefault: true

View file

@ -0,0 +1,39 @@
# 1. Receive otlp
# 2. Log logs to stdout
# 3. Send traces to jaeger at jaeger:4317 (using otlp)
# 4. Export metrics to prometheus using port 8889
receivers:
otlp:
protocols:
grpc:
endpoint: 0.0.0.0:4317
http:
endpoint: 0.0.0.0:4318
processors:
batch:
exporters:
debug:
otlp/jaeger:
endpoint: jaeger:4317
tls:
insecure: true
prometheus:
endpoint: 0.0.0.0:8889
service:
pipelines:
traces:
receivers: [otlp]
processors: [batch]
exporters: [otlp/jaeger]
metrics:
receivers: [otlp]
processors: [batch]
exporters: [prometheus]
logs:
receivers: [otlp]
processors: [batch]
exporters: [debug]

5
infra/prometheus.yml Normal file
View file

@ -0,0 +1,5 @@
scrape_configs:
- job_name: "otel-collector"
scrape_interval: 10s
static_configs:
- targets: ["otel-collector:8889"]

6
src/db.rs Normal file
View file

@ -0,0 +1,6 @@
use once_cell::sync::Lazy;
use surrealdb::{engine::remote::ws, Surreal};
pub mod flake;
pub static DB: Lazy<Surreal<ws::Client>> = Lazy::new(Surreal::init);

262
src/db/flake.rs Normal file
View file

@ -0,0 +1,262 @@
use std::{
borrow::Cow,
fmt::{Debug, Display},
future::IntoFuture,
hash::Hash,
marker::PhantomData,
sync::Arc,
};
use eyre::eyre;
use itertools::Itertools;
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use surrealdb::sql::{Id, Thing};
use tracing::{trace_span, Instrument};
use vespid::Oco;
use super::DB;
pub trait Refable: DeserializeOwned {
const TABLE: &'static str;
}
#[macro_export]
macro_rules! refable {
(@base $name:ident, $table:literal) => {
impl $crate::db::Refable for $name {
const TABLE: &'static str = $table;
}
impl $crate::db::AsFlake<$name> for $name {
fn as_flake(&self) -> std::borrow::Cow<'_, str> {
std::borrow::Cow::Borrowed(self.id.flake())
}
}
impl core::ops::Deref for $name {
type Target = $crate::db::Flake<$name>;
fn deref(&self) -> &Self::Target {
&self.id
}
}
};
($name:ident, $table:literal) => {
$crate::refable!(@base $name, $table);
$crate::db_helpers!($name);
};
}
#[macro_export]
macro_rules! db_helpers {
($name:ident) => {
#[allow(unused)]
impl $name {
#[tracing::instrument(level = "trace", skip(id), fields(id = AsRef::<str>::as_ref(&id), otel.name = concat!(stringify!($name), "::get"), otel.kind = "client", peer.service = "surrealdb"))]
pub async fn get(id: impl AsRef<str>) -> crate::Result<$name> {
use $crate::db::{Refable, DB};
let data: Option<$name> = DB.select(($name::TABLE, id.as_ref())).await?;
data.ok_or_else(|| eyre!("{}:{} not found", Self::TABLE, id.as_ref()).into())
}
#[tracing::instrument(level = "trace", fields(otel.name = concat!(stringify!($name), "::all"), otel.kind = "client", peer.service = "surrealdb"))]
pub async fn all() -> crate::Result<Vec<$name>> {
use $crate::db::{Refable, DB};
let data: Vec<$name> = DB.select($name::TABLE).await?;
Ok(data)
}
#[tracing::instrument(level = "trace", skip_all, fields(otel.name = concat!(stringify!($name), "::delete"), otel.kind = "client", id = self.id.flake(), peer.service = "surrealdb"))]
pub async fn delete(&self) -> crate::Result<()> {
use $crate::db::{Refable, DB};
let _: Option<serde::de::IgnoredAny> =
DB.delete(($name::TABLE, self.id.flake())).await?;
Ok(())
}
}
};
}
pub use refable;
impl Refable for () {
const TABLE: &'static str = "";
}
pub struct BorrowedFlake<'a, T: Refable>(pub &'a str, PhantomData<T>);
impl<'a, T: Refable> BorrowedFlake<'a, T> {
pub fn new(s: &'a str) -> Self {
Self(s, PhantomData)
}
}
impl<'a, T: Refable> AsFlake<T> for BorrowedFlake<'a, T> {
fn as_flake(&self) -> Cow<'_, str> {
Cow::Borrowed(self.0)
}
}
pub trait AsFlake<T: Refable> {
fn as_flake(&self) -> Cow<'_, str>;
}
impl<T: Refable> AsFlake<T> for Flake<T> {
fn as_flake(&self) -> Cow<'_, str> {
Cow::Borrowed(&self.0)
}
}
impl<T: Refable, F: AsFlake<T>> AsFlake<T> for &F {
fn as_flake(&self) -> Cow<'_, str> {
(**self).as_flake()
}
}
pub struct Flake<T: ?Sized>(pub Oco<'static, str>, PhantomData<T>);
impl<T: Refable + ?Sized> Debug for Flake<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "Flake({}:{})", T::TABLE, self.flake())
}
}
impl<T: ?Sized> Hash for Flake<T> {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
self.flake().hash(state)
}
}
impl<T: ?Sized> PartialEq for Flake<T> {
fn eq(&self, other: &Self) -> bool {
self.flake() == other.flake()
}
}
impl<T: ?Sized> Eq for Flake<T> {}
impl<T: ?Sized> Clone for Flake<T> {
fn clone(&self) -> Self {
Self(self.0.clone(), PhantomData)
}
}
impl<T: ?Sized> Flake<T> {
pub fn flake(&self) -> &str {
&self.0
}
}
impl<T: Refable + ?Sized> Flake<T> {
pub fn borrowed(s: &str) -> BorrowedFlake<T> {
BorrowedFlake::new(s)
}
pub fn new(s: impl Into<Oco<'static, str>>) -> Self {
Self(s.into(), PhantomData)
}
pub fn into_thing(self) -> Thing {
Thing::from((T::TABLE, self.flake()))
}
pub fn from_thing_format(thing: &str) -> Self {
let mut parts = thing.split(':');
let flake_or_table = parts.next().unwrap();
Self::new(Oco::Counted(Arc::from(
parts.next().unwrap_or(flake_or_table),
)))
}
pub fn from_data(data: &T) -> Self
where
T: AsFlake<T>,
{
Self::new(Oco::Counted(Arc::from(data.as_flake().as_ref())))
}
pub async fn fetch(&self) -> eyre::Result<T> {
let data: Option<T> = DB
.select::<Option<T>>((T::TABLE, self.flake()))
.into_future()
.instrument(trace_span!(
"Flake::fetch",
otel.kind = "client",
otel.name = format!(
"{}::get",
std::any::type_name::<T>().trim_start_matches("chat::")
),
peer.service = "surrealdb",
id = self.flake()
))
.await?;
data.ok_or_else(|| eyre!("{}:{} not found", T::TABLE, self.flake()))
}
pub fn cast<U: Refable>(self) -> Flake<U> {
Flake(self.0, PhantomData)
}
}
impl<T: Refable + ?Sized> Display for Flake<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}:{}", T::TABLE, self.flake())
}
}
impl<T: Refable> AsRef<str> for Flake<T> {
fn as_ref(&self) -> &str {
self.flake()
}
}
impl<'de, T: Refable> Deserialize<'de> for Flake<T> {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
struct V<T: Refable>(PhantomData<T>);
impl<'de, T: Refable> serde::de::Visitor<'de> for V<T> {
type Value = Flake<T>;
fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
writeln!(formatter, "a string or Thing")
}
fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
where
E: serde::de::Error,
{
Ok(Flake::from_thing_format(v))
}
fn visit_map<A>(self, map: A) -> Result<Self::Value, A::Error>
where
A: serde::de::MapAccess<'de>,
{
use serde::Deserialize;
Ok(Flake(
Oco::Owned(
Thing::deserialize(serde::de::value::MapAccessDeserializer::new(map))?
.id
.to_raw(),
),
PhantomData,
))
}
}
deserializer.deserialize_any(V::<T>(PhantomData))
}
}
impl<T: Refable> Serialize for Flake<T> {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
self.flake().serialize(serializer)
//Thing::from((String::from(T::TABLE), Id::String(self.flake().to_string())))
// .serialize(serializer)
}
}

View file

@ -6,11 +6,14 @@ extern crate tracing;
use std::sync::{atomic::AtomicU64, Arc}; use std::sync::{atomic::AtomicU64, Arc};
use axum::{response::Html, routing::get}; use axum::{response::Html, routing::get};
use tracing::level_filters::LevelFilter;
use tracing_subscriber::layer::SubscriberExt;
use magic::prelude::*; use magic::prelude::*;
use surrealdb::{engine::remote::ws::Ws, opt::auth::Root};
use vespid::axum::render; use vespid::axum::render;
mod db;
use db::DB;
mod tracing_stuff;
#[component] #[component]
fn Shell(children: String) -> String { fn Shell(children: String) -> String {
info!("Index"); info!("Index");
@ -49,16 +52,19 @@ async fn index() -> Html<String> {
#[tokio::main] #[tokio::main]
async fn main() -> eyre::Result<()> { async fn main() -> eyre::Result<()> {
color_eyre::install()?; color_eyre::install()?;
let registry = tracing_subscriber::registry().with( let _otel_guard = tracing_stuff::init_tracing_subscriber();
tracing_subscriber::EnvFilter::builder() let url = std::env::var("CRUSTO_DB_URL").unwrap_or_else(|_| "localhost:9867".to_string());
.with_default_directive(LevelFilter::INFO.into()) debug!(%url, "Database connecting");
.from_env_lossy(), DB.connect::<Ws>(url).await?;
); debug!("Database connected");
tracing::subscriber::set_global_default( DB.signin(Root {
registry username: "root",
.with(tracing_error::ErrorLayer::default()) password: "root",
.with(tracing_subscriber::fmt::layer()), })
)?; .await?;
DB.use_ns("crusto").await?;
DB.use_db("crusto").await?;
info!("Database ready");
let amount_of_refreshes = Arc::new(AtomicU64::new(0)); let amount_of_refreshes = Arc::new(AtomicU64::new(0));

98
src/tracing_stuff.rs Normal file
View file

@ -0,0 +1,98 @@
use opentelemetry::{global, trace::TracerProvider as _, KeyValue};
use opentelemetry_sdk::{
metrics::{MeterProviderBuilder, PeriodicReader, SdkMeterProvider},
runtime,
trace::{RandomIdGenerator, Sampler, Tracer, TracerProvider},
Resource,
};
use opentelemetry_semantic_conventions::{
resource::{SERVICE_NAME, SERVICE_VERSION},
SCHEMA_URL,
};
use tracing::level_filters::LevelFilter;
use tracing_opentelemetry::{MetricsLayer, OpenTelemetryLayer};
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
fn resource() -> Resource {
Resource::from_schema_url(
[
KeyValue::new(SERVICE_NAME, "crusto"),
KeyValue::new(SERVICE_VERSION, env!("CARGO_PKG_VERSION")),
],
SCHEMA_URL,
)
}
fn init_meter_provider() -> SdkMeterProvider {
let exporter = opentelemetry_otlp::MetricExporter::builder()
.with_tonic()
.with_temporality(opentelemetry_sdk::metrics::Temporality::default())
.build()
.unwrap();
let reader = PeriodicReader::builder(exporter, runtime::Tokio)
.with_interval(std::time::Duration::from_secs(30))
.build();
let meter_provider = MeterProviderBuilder::default()
.with_resource(resource())
.with_reader(reader)
.build();
global::set_meter_provider(meter_provider.clone());
meter_provider
}
fn init_tracer() -> Tracer {
let exporter = opentelemetry_otlp::SpanExporter::builder()
.with_tonic()
.build()
.unwrap();
let provider = TracerProvider::builder()
.with_sampler(Sampler::ParentBased(Box::new(Sampler::TraceIdRatioBased(
1.0,
))))
.with_id_generator(RandomIdGenerator::default())
.with_resource(resource())
.with_batch_exporter(exporter, runtime::Tokio)
.build();
provider.tracer("crusto")
}
pub fn init_tracing_subscriber() -> OtelGuard {
let meter_provider = init_meter_provider();
let tracer = init_tracer();
tracing_subscriber::registry()
.with(
tracing_subscriber::EnvFilter::builder()
.with_default_directive(LevelFilter::INFO.into())
.from_env_lossy(),
)
//.with(funnylog::tracing::TracingLayer::new(
// funnylog::terminal::TerminalConfig::default().to_stdout().ignore_error(),
//))
.with(tracing_error::ErrorLayer::default())
.with(tracing_subscriber::fmt::layer())
.with(MetricsLayer::new(meter_provider.clone()))
.with(OpenTelemetryLayer::new(tracer))
.init();
OtelGuard { meter_provider }
}
pub struct OtelGuard {
meter_provider: SdkMeterProvider,
}
impl Drop for OtelGuard {
fn drop(&mut self) {
if let Err(err) = self.meter_provider.shutdown() {
eprintln!("meter_provider.shutdown: {err:?}");
}
opentelemetry::global::shutdown_tracer_provider();
}
}

View file

@ -1,6 +1,7 @@
use std::collections::HashSet; use std::collections::HashSet;
use proc_macro::TokenStream; use proc_macro::TokenStream;
use proc_macro2::Span;
use proc_macro2_diagnostics::Diagnostic; use proc_macro2_diagnostics::Diagnostic;
use quote::{quote, quote_spanned, ToTokens}; use quote::{quote, quote_spanned, ToTokens};
use rstml::{ use rstml::{
@ -155,7 +156,7 @@ fn walk_nodes<'a>(nodes: &'a Vec<Node>) -> WalkNodesOutput<'a> {
out.values.push(block.to_token_stream()); out.values.push(block.to_token_stream());
} }
NodeAttribute::Attribute(attribute) => { NodeAttribute::Attribute(attribute) => {
let (static_format, value) = walk_attribute(attribute); let (static_format, _span, value) = walk_attribute(attribute);
out.static_format.push_str(&static_format); out.static_format.push_str(&static_format);
if let Some(value) = value { if let Some(value) = value {
out.values.push(value); out.values.push(value);
@ -226,7 +227,7 @@ fn walk_nodes<'a>(nodes: &'a Vec<Node>) -> WalkNodesOutput<'a> {
out out
} }
fn walk_attribute(attribute: &KeyedAttribute) -> (String, Option<proc_macro2::TokenStream>) { fn walk_attribute(attribute: &KeyedAttribute) -> (String, Span, Option<proc_macro2::TokenStream>) {
let mut static_format = String::new(); let mut static_format = String::new();
let mut format_value = None; let mut format_value = None;
let key = match attribute.key.to_string().as_str() { let key = match attribute.key.to_string().as_str() {
@ -275,7 +276,7 @@ fn walk_attribute(attribute: &KeyedAttribute) -> (String, Option<proc_macro2::To
None => {} None => {}
} }
(static_format, format_value) (static_format, attribute.key.span(), format_value)
} }
fn is_component_tag_name(name: &str) -> bool { fn is_component_tag_name(name: &str) -> bool {

View file

@ -1,3 +1,3 @@
#!/usr/bin/env bash #!/usr/bin/env bash
cargo watch -s "bun tailwind:build; cargo run" cargo watch -w src -w magic -w vespid -w Cargo.toml -s "bun tailwind:build; cargo run"