From 64adb2e913ab3a0ff9579051a09b4775a00841b9 Mon Sep 17 00:00:00 2001 From: Przemyslaw Hugh Kaznowski Date: Tue, 20 Jun 2023 23:50:26 +0100 Subject: [PATCH] Rebased live query changes (#2136) --- lib/src/dbs/cl.rs | 19 +++++ lib/src/dbs/executor.rs | 66 +++++++++++++--- lib/src/dbs/mod.rs | 4 + lib/src/dbs/notification.rs | 39 +++++++++ lib/src/dbs/options.rs | 54 ++++++++++++- lib/src/dbs/response.rs | 12 +++ lib/src/dbs/test.rs | 5 +- lib/src/doc/lives.rs | 53 ++++++++++--- lib/src/err/mod.rs | 12 +++ lib/src/key/cl.rs | 41 ++++++++++ lib/src/key/hb.rs | 45 +++++++++++ lib/src/key/lq.rs | 31 +++++--- lib/src/key/lv.rs | 2 +- lib/src/key/mod.rs | 9 ++- lib/src/kvs/ds.rs | 55 +++++++++++++ lib/src/kvs/tx.rs | 59 +++++++++++++- lib/src/sql/statements/kill.rs | 8 +- lib/src/sql/statements/live.rs | 18 +++-- lib/src/sql/statements/sleep.rs | 6 +- src/net/rpc.rs | 136 ++++++++++++++++++++++++++------ tests/cli.rs | 2 +- 21 files changed, 601 insertions(+), 75 deletions(-) create mode 100644 lib/src/dbs/cl.rs create mode 100644 lib/src/dbs/notification.rs create mode 100644 lib/src/key/cl.rs create mode 100644 lib/src/key/hb.rs diff --git a/lib/src/dbs/cl.rs b/lib/src/dbs/cl.rs new file mode 100644 index 00000000..11a3b043 --- /dev/null +++ b/lib/src/dbs/cl.rs @@ -0,0 +1,19 @@ +use derive::Store; +use serde::{Deserialize, Serialize}; + +// NOTE: This is not a statement, but as per layering, keeping it here till we +// have a better structure. +#[derive(Clone, Debug, Eq, PartialEq, Deserialize, Serialize, PartialOrd, Hash, Store)] +pub struct ClusterMembership { + pub name: String, + // TiKV = TiKV TSO Timestamp as u64 + // not TiKV = local nanos as u64 + pub heartbeat: Timestamp, +} +// This struct is meant to represent a timestamp that can be used to partially order +// events in a cluster. It should be derived from a timestamp oracle, such as the +// one available in TiKV via the client `TimestampExt` implementation. +#[derive(Clone, Debug, Eq, PartialEq, Deserialize, Serialize, PartialOrd, Hash, Store)] +pub struct Timestamp { + pub value: u64, +} diff --git a/lib/src/dbs/executor.rs b/lib/src/dbs/executor.rs index 4dc3391d..462bd9e4 100644 --- a/lib/src/dbs/executor.rs +++ b/lib/src/dbs/executor.rs @@ -1,11 +1,12 @@ use crate::cnf::PROTECTED_PARAM_NAMES; use crate::ctx::Context; use crate::dbs::response::Response; -use crate::dbs::Auth; use crate::dbs::Level; +use crate::dbs::Notification; use crate::dbs::Options; use crate::dbs::Transaction; use crate::dbs::LOG; +use crate::dbs::{Auth, QueryType}; use crate::err::Error; use crate::kvs::Datastore; use crate::sql::paths::DB; @@ -13,6 +14,7 @@ use crate::sql::paths::NS; use crate::sql::query::Query; use crate::sql::statement::Statement; use crate::sql::value::Value; +use channel::{Receiver, Sender}; use futures::lock::Mutex; use std::sync::Arc; use tracing::instrument; @@ -97,6 +99,7 @@ impl<'a> Executor<'a> { Response { time: v.time, result: Err(Error::QueryCancelled), + query_type: QueryType::Other, } } @@ -113,11 +116,27 @@ impl<'a> Executor<'a> { .unwrap_or(Error::QueryNotExecuted)), Err(e) => Err(e), }, + query_type: QueryType::Other, }, _ => v, } } + /// Consume the live query notifications + async fn clear(&self, _: Sender, rcv: Receiver) { + while rcv.try_recv().is_ok() { + // Ignore notification + } + } + + /// Flush notifications from a buffer channel (live queries) to the committed notification channel. + /// This is because we don't want to broadcast notifications to the user for failed transactions. + async fn flush(&self, chn: Sender, rcv: Receiver) { + while let Ok(v) = rcv.try_recv() { + let _ = chn.send(v).await; + } + } + async fn set_ns(&self, ctx: &mut Context<'_>, opt: &mut Options, ns: &str) { let mut session = ctx.value("session").unwrap_or(&Value::None).clone(); session.put(NS.as_ref(), ns.to_owned().into()); @@ -136,9 +155,15 @@ impl<'a> Executor<'a> { pub async fn execute( &mut self, mut ctx: Context<'_>, - mut opt: Options, + opt: Options, qry: Query, ) -> Result, Error> { + // Take the notification channel + let chn = opt.sender.clone(); + // Create a notification channel + let (send, recv) = channel::unbounded(); + // Swap the notification channel + let mut opt = opt.sender(send); // Initialise buffer of responses let mut buf: Vec = vec![]; // Initialise array of responses @@ -156,7 +181,7 @@ impl<'a> Executor<'a> { // Check if this is a RETURN statement let clr = matches!(stm, Statement::Output(_)); // Process a single statement - let res = match stm { + let res = match stm.clone() { // Specify runtime options Statement::Option(mut stm) => { // Selected DB? @@ -185,17 +210,21 @@ impl<'a> Executor<'a> { // Cancel a running transaction Statement::Cancel(_) => { self.cancel(true).await; + self.clear(chn.clone(), recv.clone()).await; buf = buf.into_iter().map(|v| self.buf_cancel(v)).collect(); out.append(&mut buf); debug_assert!(self.txn.is_none(), "cancel(true) should have unset txn"); + self.txn = None; continue; } // Commit a running transaction Statement::Commit(_) => { let commit_error = self.commit(true).await.err(); buf = buf.into_iter().map(|v| self.buf_commit(v, &commit_error)).collect(); + self.flush(chn.clone(), recv.clone()).await; out.append(&mut buf); debug_assert!(self.txn.is_none(), "commit(true) should have unset txn"); + self.txn = None; continue; } // Switch to a different NS or DB @@ -263,13 +292,22 @@ impl<'a> Executor<'a> { // Finalise transaction, returning nothing unless it couldn't commit if writeable { match self.commit(loc).await { - Err(e) => Err(Error::QueryNotExecutedDetail { - message: e.to_string(), - }), - Ok(_) => Ok(Value::None), + Err(e) => { + // Clear live query notifications + self.clear(chn.clone(), recv.clone()).await; + Err(Error::QueryNotExecutedDetail { + message: e.to_string(), + }) + } + Ok(_) => { + // Flush live query notifications + self.flush(chn.clone(), recv.clone()).await; + Ok(Value::None) + } } } else { self.cancel(loc).await; + self.clear(chn.clone(), recv.clone()).await; Ok(Value::None) } } @@ -327,18 +365,23 @@ impl<'a> Executor<'a> { // Finalise transaction and return the result. if res.is_ok() && stm.writeable() { if let Err(e) = self.commit(loc).await { + // Clear live query notification details + self.clear(chn.clone(), recv.clone()).await; // The commit failed Err(Error::QueryNotExecutedDetail { message: e.to_string(), }) } else { + // Flush the live query change notifications + self.flush(chn.clone(), recv.clone()).await; // Successful, committed result res } } else { self.cancel(loc).await; - - // An error + // Clear live query notification details + self.clear(chn.clone(), recv.clone()).await; + // Return an error res } } @@ -356,6 +399,11 @@ impl<'a> Executor<'a> { self.err = true; e }), + query_type: match stm { + Statement::Live(_) => QueryType::Live, + Statement::Kill(_) => QueryType::Kill, + _ => QueryType::Other, + }, }; // Output the response if self.txn.is_some() { diff --git a/lib/src/dbs/mod.rs b/lib/src/dbs/mod.rs index 799478b9..61c66301 100644 --- a/lib/src/dbs/mod.rs +++ b/lib/src/dbs/mod.rs @@ -6,6 +6,7 @@ mod auth; mod executor; mod iterate; mod iterator; +mod notification; mod options; mod response; mod session; @@ -20,6 +21,7 @@ pub use self::session::*; pub(crate) use self::executor::*; pub(crate) use self::iterator::*; +pub(crate) use self::notification::*; pub(crate) use self::statement::*; pub(crate) use self::transaction::*; pub(crate) use self::variables::*; @@ -30,6 +32,8 @@ mod channel; #[cfg(not(target_arch = "wasm32"))] pub use self::channel::*; +pub mod cl; + #[cfg(test)] pub(crate) mod test; diff --git a/lib/src/dbs/notification.rs b/lib/src/dbs/notification.rs new file mode 100644 index 00000000..f75d8060 --- /dev/null +++ b/lib/src/dbs/notification.rs @@ -0,0 +1,39 @@ +use crate::sql::Value; +use serde::{Deserialize, Serialize}; +use std::fmt; +use uuid::Uuid; + +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] +pub struct Notification { + pub id: Uuid, + pub action: Action, + pub result: Value, +} + +impl fmt::Display for Notification { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!( + f, + "Notification {{ id: {}, action: {}, result: {} }}", + self.id, self.action, self.result + ) + } +} + +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] +#[serde(rename_all = "UPPERCASE")] +pub enum Action { + Create, + Update, + Delete, +} + +impl fmt::Display for Action { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match *self { + Action::Create => write!(f, "CREATE"), + Action::Update => write!(f, "UPDATE"), + Action::Delete => write!(f, "DELETE"), + } + } +} diff --git a/lib/src/dbs/options.rs b/lib/src/dbs/options.rs index 86b5130a..0e2f3a3d 100644 --- a/lib/src/dbs/options.rs +++ b/lib/src/dbs/options.rs @@ -1,8 +1,11 @@ use crate::cnf; use crate::dbs::Auth; use crate::dbs::Level; +use crate::dbs::Notification; use crate::err::Error; +use channel::Sender; use std::sync::Arc; +use uuid::Uuid; /// An Options is passed around when processing a set of query /// statements. An Options contains specific information for how @@ -11,8 +14,10 @@ use std::sync::Arc; /// whether field/event/table queries should be processed (useful /// when importing data, where these queries might fail). -#[derive(Clone, Debug, Eq, PartialEq)] +#[derive(Clone, Debug)] pub struct Options { + /// Current Node ID + pub id: Arc, /// Currently selected NS pub ns: Option>, /// Currently selected DB @@ -39,18 +44,21 @@ pub struct Options { pub indexes: bool, /// Should we process function futures? pub futures: bool, + /// + pub sender: Sender, } impl Default for Options { fn default() -> Self { - Options::new(Auth::No) + Options::new(Arc::new(Uuid::new_v4()), channel::unbounded().0, Arc::new(Auth::No)) } } impl Options { /// Create a new Options object - pub fn new(auth: Auth) -> Options { + pub fn new(id: Arc, send: Sender, auth: Arc) -> Options { Options { + id, ns: None, db: None, dive: 0, @@ -63,10 +71,16 @@ impl Options { tables: true, indexes: true, futures: false, - auth: Arc::new(auth), + sender: send, + auth, } } + /// Get current Node ID + pub fn id(&self) -> &Uuid { + self.id.as_ref() + } + /// Get currently selected NS pub fn ns(&self) -> &str { self.ns.as_ref().unwrap() @@ -85,7 +99,9 @@ impl Options { let dive = self.dive.saturating_add(cost); if dive <= *cnf::MAX_COMPUTATION_DEPTH { Ok(Options { + sender: self.sender.clone(), auth: self.auth.clone(), + id: self.id.clone(), ns: self.ns.clone(), db: self.db.clone(), dive, @@ -99,7 +115,9 @@ impl Options { /// Create a new Options object for a subquery pub fn force(&self, v: bool) -> Options { Options { + sender: self.sender.clone(), auth: self.auth.clone(), + id: self.id.clone(), ns: self.ns.clone(), db: self.db.clone(), force: v, @@ -110,7 +128,9 @@ impl Options { /// Create a new Options object for a subquery pub fn perms(&self, v: bool) -> Options { Options { + sender: self.sender.clone(), auth: self.auth.clone(), + id: self.id.clone(), ns: self.ns.clone(), db: self.db.clone(), perms: v, @@ -121,7 +141,9 @@ impl Options { /// Create a new Options object for a subquery pub fn fields(&self, v: bool) -> Options { Options { + sender: self.sender.clone(), auth: self.auth.clone(), + id: self.id.clone(), ns: self.ns.clone(), db: self.db.clone(), fields: v, @@ -132,7 +154,9 @@ impl Options { /// Create a new Options object for a subquery pub fn events(&self, v: bool) -> Options { Options { + sender: self.sender.clone(), auth: self.auth.clone(), + id: self.id.clone(), ns: self.ns.clone(), db: self.db.clone(), events: v, @@ -143,7 +167,9 @@ impl Options { /// Create a new Options object for a subquery pub fn tables(&self, v: bool) -> Options { Options { + sender: self.sender.clone(), auth: self.auth.clone(), + id: self.id.clone(), ns: self.ns.clone(), db: self.db.clone(), tables: v, @@ -154,7 +180,9 @@ impl Options { /// Create a new Options object for a subquery pub fn indexes(&self, v: bool) -> Options { Options { + sender: self.sender.clone(), auth: self.auth.clone(), + id: self.id.clone(), ns: self.ns.clone(), db: self.db.clone(), indexes: v, @@ -165,7 +193,9 @@ impl Options { /// Create a new Options object for a subquery pub fn import(&self, v: bool) -> Options { Options { + sender: self.sender.clone(), auth: self.auth.clone(), + id: self.id.clone(), ns: self.ns.clone(), db: self.db.clone(), fields: !v, @@ -178,7 +208,9 @@ impl Options { /// Create a new Options object for a subquery pub fn strict(&self, v: bool) -> Options { Options { + sender: self.sender.clone(), auth: self.auth.clone(), + id: self.id.clone(), ns: self.ns.clone(), db: self.db.clone(), strict: v, @@ -189,7 +221,9 @@ impl Options { /// Create a new Options object for a subquery pub fn futures(&self, v: bool) -> Options { Options { + sender: self.sender.clone(), auth: self.auth.clone(), + id: self.id.clone(), ns: self.ns.clone(), db: self.db.clone(), futures: v, @@ -197,6 +231,18 @@ impl Options { } } + /// Create a new Options object for a subquery + pub fn sender(&self, v: Sender) -> Options { + Options { + auth: self.auth.clone(), + id: self.id.clone(), + ns: self.ns.clone(), + db: self.db.clone(), + sender: v, + ..*self + } + } + /// Check whether realtime queries are supported pub fn realtime(&self) -> Result<(), Error> { if !self.live { diff --git a/lib/src/dbs/response.rs b/lib/src/dbs/response.rs index 4ccc6902..a7ca01c7 100644 --- a/lib/src/dbs/response.rs +++ b/lib/src/dbs/response.rs @@ -6,11 +6,23 @@ use std::time::Duration; pub(crate) const TOKEN: &str = "$surrealdb::private::sql::Response"; +#[derive(Debug)] +pub enum QueryType { + // Any kind of query + Other, + // Indicates that the response live query id must be tracked + Live, + // Indicates that the live query should be removed from tracking + Kill, +} + /// The return value when running a query set on the database. #[derive(Debug)] pub struct Response { pub time: Duration, pub result: Result, + // Record the query type in case processing the response is necessary (such as tracking live queries). + pub query_type: QueryType, } impl Response { diff --git a/lib/src/dbs/test.rs b/lib/src/dbs/test.rs index 41df9d76..b9280afc 100644 --- a/lib/src/dbs/test.rs +++ b/lib/src/dbs/test.rs @@ -1,12 +1,13 @@ use crate::ctx::Context; -use crate::dbs::Options; +use crate::dbs::{Auth, Options}; use crate::kvs::Datastore; use futures::lock::Mutex; use std::sync::Arc; +use uuid::Uuid; pub async fn mock<'a>() -> (Context<'a>, Options) { let mut ctx = Context::default(); - let opt = Options::default(); + let opt = Options::new(Arc::new(Uuid::new_v4()), channel::unbounded().0, Arc::new(Auth::Kv)); let kvs = Datastore::new("memory").await.unwrap(); let txn = kvs.transaction(true, false).await.unwrap(); let txn = Arc::new(Mutex::new(txn)); diff --git a/lib/src/doc/lives.rs b/lib/src/doc/lives.rs index afbde50d..576b79af 100644 --- a/lib/src/doc/lives.rs +++ b/lib/src/doc/lives.rs @@ -1,15 +1,18 @@ use crate::ctx::Context; +use crate::dbs::Action; +use crate::dbs::Notification; use crate::dbs::Options; use crate::dbs::Statement; use crate::doc::Document; use crate::err::Error; +use crate::sql::Value; impl<'a> Document<'a> { pub async fn lives( &self, ctx: &Context<'_>, opt: &Options, - _stm: &Statement<'_>, + stm: &Statement<'_>, ) -> Result<(), Error> { // Check if forced if !opt.force && !self.changed() { @@ -18,24 +21,56 @@ impl<'a> Document<'a> { // Clone transaction let txn = ctx.clone_transaction()?; // Get the record id - let _ = self.id.as_ref().unwrap(); + let id = self.id.as_ref().unwrap(); // Loop through all index statements for lv in self.lv(opt, &txn).await?.iter() { // Create a new statement - let stm = Statement::from(lv); + let lq = Statement::from(lv); // Check LIVE SELECT where condition - if self.check(ctx, opt, &stm).await.is_err() { + if self.check(ctx, opt, stm).await.is_err() { continue; } // Check what type of data change this is if stm.is_delete() { - // Send a DELETE notification to the WebSocket + // Send a DELETE notification + if opt.id() == &lv.node.0 { + let thing = (*id).clone(); + opt.sender + .send(Notification { + id: lv.id.0, + action: Action::Delete, + result: Value::Thing(thing), + }) + .await?; + } else { + // TODO: Send to storage + } } else if self.is_new() { - // Process the CREATE notification to send - let _ = self.pluck(ctx, opt, &stm).await?; + // Send a CREATE notification + if opt.id() == &lv.node.0 { + opt.sender + .send(Notification { + id: lv.id.0, + action: Action::Create, + result: self.pluck(ctx, opt, &lq).await?, + }) + .await?; + } else { + // TODO: Send to storage + } } else { - // Process the CREATE notification to send - let _ = self.pluck(ctx, opt, &stm).await?; + // Send a UPDATE notification + if opt.id() == &lv.node.0 { + opt.sender + .send(Notification { + id: lv.id.0, + action: Action::Update, + result: self.pluck(ctx, opt, &lq).await?, + }) + .await?; + } else { + // TODO: Send to storage + } }; } // Carry on diff --git a/lib/src/err/mod.rs b/lib/src/err/mod.rs index 5807d07e..4de11613 100644 --- a/lib/src/err/mod.rs +++ b/lib/src/err/mod.rs @@ -247,6 +247,18 @@ pub enum Error { value: String, }, + // The cluster node already exists + #[error("The node '{value}' already exists")] + ClAlreadyExists { + value: String, + }, + + // The cluster node does not exist + #[error("The node '{value}' does not exist")] + ClNotFound { + value: String, + }, + /// The requested scope token does not exist #[error("The scope token '{value}' does not exist")] StNotFound { diff --git a/lib/src/key/cl.rs b/lib/src/key/cl.rs new file mode 100644 index 00000000..6b079e19 --- /dev/null +++ b/lib/src/key/cl.rs @@ -0,0 +1,41 @@ +use derive::Key; +use serde::{Deserialize, Serialize}; +use uuid::Uuid; + +// Represents cluster information. +// In the future, this could also include broadcast addresses and other information. +#[derive(Clone, Debug, Eq, PartialEq, PartialOrd, Serialize, Deserialize, Key)] +pub struct Cl { + __: u8, + _a: u8, + _b: u8, + _c: u8, + pub nd: Uuid, +} + +impl Cl { + pub fn new(nd: Uuid) -> Self { + Self { + __: 0x2f, // / + _a: 0x21, // ! + _b: 0x63, // c + _c: 0x6c, // l + nd, + } + } +} + +#[cfg(test)] +mod tests { + #[test] + fn key() { + use super::*; + #[rustfmt::skip] + let val = Cl::new( + Uuid::default(), + ); + let enc = Cl::encode(&val).unwrap(); + let dec = Cl::decode(&enc).unwrap(); + assert_eq!(val, dec); + } +} diff --git a/lib/src/key/hb.rs b/lib/src/key/hb.rs new file mode 100644 index 00000000..b0960dd6 --- /dev/null +++ b/lib/src/key/hb.rs @@ -0,0 +1,45 @@ +use crate::dbs::cl::Timestamp; +use derive::Key; +use serde::{Deserialize, Serialize}; +use uuid::Uuid; + +#[derive(Clone, Debug, Eq, PartialEq, PartialOrd, Serialize, Deserialize, Key)] +pub struct Hb { + __: u8, + _a: u8, + _b: u8, + _c: u8, + _d: u8, + pub hb: Timestamp, + pub nd: Uuid, +} + +impl Hb { + pub fn new(hb: Timestamp, nd: Uuid) -> Self { + Self { + __: 0x2f, // / + _a: 0x21, // ! + _b: 0x68, // h + _c: 0x62, // b + hb, + _d: 0x2f, // / + nd, + } + } +} + +#[cfg(test)] +mod tests { + #[test] + fn key() { + use super::*; + #[rustfmt::skip] + let val = Hb::new( + Timestamp { value: 123 }, + Uuid::default(), + ); + let enc = Hb::encode(&val).unwrap(); + let dec = Hb::decode(&enc).unwrap(); + assert_eq!(val, dec); + } +} diff --git a/lib/src/key/lq.rs b/lib/src/key/lq.rs index 0f229cd9..5b628dce 100644 --- a/lib/src/key/lq.rs +++ b/lib/src/key/lq.rs @@ -1,35 +1,43 @@ -use crate::sql::uuid::Uuid; use derive::Key; use serde::{Deserialize, Serialize}; +use uuid::Uuid; #[derive(Clone, Debug, Eq, PartialEq, PartialOrd, Serialize, Deserialize, Key)] pub struct Lq<'a> { __: u8, _a: u8, - pub ns: &'a str, _b: u8, - pub db: &'a str, _c: u8, + pub nd: Uuid, _d: u8, + pub ns: &'a str, _e: u8, + pub db: &'a str, + _f: u8, + _g: u8, + _h: u8, pub lq: Uuid, } -pub fn new<'a>(ns: &'a str, db: &'a str, lq: &Uuid) -> Lq<'a> { - Lq::new(ns, db, lq.to_owned()) +pub fn new<'a>(nd: &Uuid, ns: &'a str, db: &'a str, lq: &Uuid) -> Lq<'a> { + Lq::new(nd.to_owned(), ns, db, lq.to_owned()) } impl<'a> Lq<'a> { - pub fn new(ns: &'a str, db: &'a str, lq: Uuid) -> Self { + pub fn new(nd: Uuid, ns: &'a str, db: &'a str, lq: Uuid) -> Self { Self { __: b'/', - _a: b'*', + _a: b'!', + _b: b'n', + _c: b'd', + nd, + _d: b'*', ns, - _b: b'*', + _e: b'*', db, - _c: b'!', - _d: b'l', - _e: b'v', + _f: b'!', + _g: b'l', + _h: b'v', lq, } } @@ -42,6 +50,7 @@ mod tests { use super::*; #[rustfmt::skip] let val = Lq::new( + Uuid::default(), "test", "test", Uuid::default(), diff --git a/lib/src/key/lv.rs b/lib/src/key/lv.rs index 22e68554..470316c8 100644 --- a/lib/src/key/lv.rs +++ b/lib/src/key/lv.rs @@ -1,6 +1,6 @@ -use crate::sql::uuid::Uuid; use derive::Key; use serde::{Deserialize, Serialize}; +use uuid::Uuid; #[derive(Clone, Debug, Eq, PartialEq, PartialOrd, Serialize, Deserialize, Key)] pub struct Lv<'a> { diff --git a/lib/src/key/mod.rs b/lib/src/key/mod.rs index 537b738f..4410fc0c 100644 --- a/lib/src/key/mod.rs +++ b/lib/src/key/mod.rs @@ -1,6 +1,12 @@ //! How the keys are structured in the key value store /// /// KV / +/// +/// ND /!nd{nd} +/// LQ /!nd{nd}*{ns}*{db}!lq{lq} +/// +/// HB /!hb{ts}/{nd} +/// /// NS /!ns{ns} /// /// Namespace /*{ns} @@ -15,7 +21,6 @@ /// PA /*{ns}*{db}!pa{pa} /// SC /*{ns}*{db}!sc{sc} /// TB /*{ns}*{db}!tb{tb} -/// LQ /*{ns}*{db}!lq{lq} /// /// Scope /*{ns}*{db}±{sc} /// ST /*{ns}*{db}±{sc}!st{tk} @@ -56,6 +61,7 @@ pub mod bp; // Stores BTree nodes for postings pub mod bs; // Stores FullText index states pub mod bt; // Stores BTree nodes for terms pub mod bu; // Stores terms for term_ids +pub mod cl; // Stores cluster membership information pub mod database; // Stores the key prefix for all keys under a database pub mod db; // Stores a DEFINE DATABASE config definition pub mod dl; // Stores a DEFINE LOGIN ON DATABASE config definition @@ -65,6 +71,7 @@ pub mod fc; // Stores a DEFINE FUNCTION config definition pub mod fd; // Stores a DEFINE FIELD config definition pub mod ft; // Stores a DEFINE TABLE AS config definition pub mod graph; // Stores a graph edge pointer +pub mod hb; // Stores a heartbeat per registered cluster node pub mod index; // Stores an index entry pub mod ix; // Stores a DEFINE INDEX config definition pub mod kv; // Stores the key prefix for all keys diff --git a/lib/src/kvs/ds.rs b/lib/src/kvs/ds.rs index fa080a1c..ef869e62 100644 --- a/lib/src/kvs/ds.rs +++ b/lib/src/kvs/ds.rs @@ -2,6 +2,7 @@ use super::tx::Transaction; use crate::ctx::Context; use crate::dbs::Attach; use crate::dbs::Executor; +use crate::dbs::Notification; use crate::dbs::Options; use crate::dbs::Response; use crate::dbs::Session; @@ -11,17 +12,22 @@ use crate::kvs::LOG; use crate::sql; use crate::sql::Query; use crate::sql::Value; +use channel::Receiver; use channel::Sender; use futures::lock::Mutex; use std::fmt; use std::sync::Arc; use std::time::Duration; use tracing::instrument; +use uuid::Uuid; /// The underlying datastore instance which stores the dataset. #[allow(dead_code)] pub struct Datastore { + pub(super) id: Arc, pub(super) inner: Inner, + pub(super) send: Sender, + pub(super) recv: Receiver, query_timeout: Option, } @@ -204,8 +210,13 @@ impl Datastore { Err(Error::Ds("Unable to load the specified datastore".into())) } }; + // Create a live query notification channel + let (send, recv) = channel::bounded(100); inner.map(|inner| Self { + id: Arc::new(Uuid::new_v4()), inner, + send, + recv, query_timeout: None, }) } @@ -216,6 +227,24 @@ impl Datastore { self } + // Adds entries to the KV store indicating membership information + pub async fn register_membership(&self) -> Result<(), Error> { + let mut tx = self.transaction(true, false).await?; + tx.set_cl(sql::Uuid::from(*self.id.as_ref())).await?; + tx.set_hb(sql::Uuid::from(*self.id.as_ref())).await?; + tx.commit().await?; + Ok(()) + } + + // Creates a heartbeat entry for the member indicating to the cluster + // that the node is alive + pub async fn heartbeat(&self) -> Result<(), Error> { + let mut tx = self.transaction(true, false).await?; + tx.set_hb(sql::Uuid::from(*self.id.as_ref())).await?; + tx.commit().await?; + Ok(()) + } + /// Create a new transaction on this datastore /// /// ```rust,no_run @@ -343,6 +372,8 @@ impl Datastore { let ctx = sess.context(ctx); // Store the query variables let ctx = vars.attach(ctx)?; + // Setup the notification channel + opt.sender = self.send.clone(); // Setup the auth options opt.auth = sess.au.clone(); // Setup the live options @@ -400,6 +431,8 @@ impl Datastore { let ctx = sess.context(ctx); // Store the query variables let ctx = vars.attach(ctx)?; + // Setup the notification channel + opt.sender = self.send.clone(); // Setup the auth options opt.auth = sess.au.clone(); // Set current NS and DB @@ -418,6 +451,28 @@ impl Datastore { Ok(res) } + /// Subscribe to live notifications + /// + /// ```rust,no_run + /// use surrealdb::kvs::Datastore; + /// use surrealdb::err::Error; + /// use surrealdb::dbs::Session; + /// + /// #[tokio::main] + /// async fn main() -> Result<(), Error> { + /// let ds = Datastore::new("memory").await?; + /// let ses = Session::for_kv(); + /// while let Ok(v) = ds.notifications().recv().await { + /// println!("Received notification: {v}"); + /// } + /// Ok(()) + /// } + /// ``` + #[instrument(skip_all)] + pub fn notifications(&self) -> Receiver { + self.recv.clone() + } + /// Performs a full database export as SQL #[instrument(skip(self, chn))] pub async fn export(&self, ns: String, db: String, chn: Sender>) -> Result<(), Error> { diff --git a/lib/src/kvs/tx.rs b/lib/src/kvs/tx.rs index 18dff7fd..396e8c32 100644 --- a/lib/src/kvs/tx.rs +++ b/lib/src/kvs/tx.rs @@ -2,6 +2,8 @@ use super::kv::Add; use super::kv::Convert; use super::Key; use super::Val; +use crate::dbs::cl::ClusterMembership; +use crate::dbs::cl::Timestamp; use crate::err::Error; use crate::key::thing; use crate::kvs::cache::Cache; @@ -11,7 +13,7 @@ use crate::sql::paths::EDGE; use crate::sql::paths::IN; use crate::sql::paths::OUT; use crate::sql::thing::Thing; -use crate::sql::Value; +use crate::sql::{Uuid, Value}; use channel::Sender; use sql::permission::Permissions; use sql::statements::DefineAnalyzerStatement; @@ -31,6 +33,7 @@ use std::fmt; use std::fmt::Debug; use std::ops::Range; use std::sync::Arc; +use std::time::{SystemTime, UNIX_EPOCH}; #[cfg(debug_assertions)] const LOG: &str = "surrealdb::txn"; @@ -789,6 +792,60 @@ impl Transaction { Ok(()) } + // Register cluster membership + // NOTE: Setting cluster membership sets the heartbeat + // Remember to set the heartbeat as well + pub async fn set_cl(&mut self, id: Uuid) -> Result<(), Error> { + let key = crate::key::cl::Cl::new(id.0); + match self.get_cl(id.clone()).await? { + Some(_) => Err(Error::ClAlreadyExists { + value: id.0.to_string(), + }), + None => { + let value = ClusterMembership { + name: id.0.to_string(), + heartbeat: self.clock(), + }; + self.put(key, value).await?; + Ok(()) + } + } + } + + // Retrieve cluster information + pub async fn get_cl(&mut self, id: Uuid) -> Result, Error> { + let key = crate::key::cl::Cl::new(id.0); + let val = self.get(key).await?; + match val { + Some(v) => Ok(Some::(v.into())), + None => Ok(None), + } + } + + fn clock(&self) -> Timestamp { + // Use a timestamp oracle if available + let now: u128 = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_millis(); + Timestamp { + value: now as u64, + } + } + + // Set heartbeat + pub async fn set_hb(&mut self, id: Uuid) -> Result<(), Error> { + let now = self.clock(); + let key = crate::key::hb::Hb::new(now.clone(), id.0); + // We do not need to do a read, we always want to overwrite + self.put( + key, + ClusterMembership { + name: id.0.to_string(), + heartbeat: now, + }, + ) + .await?; + Ok(()) + } + /// Retrieve all namespace definitions in a datastore. pub async fn all_ns(&mut self) -> Result, Error> { let key = crate::key::ns::prefix(); diff --git a/lib/src/sql/statements/kill.rs b/lib/src/sql/statements/kill.rs index 5f183da8..8c672ba7 100644 --- a/lib/src/sql/statements/kill.rs +++ b/lib/src/sql/statements/kill.rs @@ -29,14 +29,14 @@ impl KillStatement { let txn = ctx.clone_transaction()?; // Claim transaction let mut run = txn.lock().await; - // Create the live query key - let key = crate::key::lq::new(opt.ns(), opt.db(), &self.id); + // Fetch the live query key + let key = crate::key::lq::new(opt.id(), opt.ns(), opt.db(), &self.id); // Fetch the live query key if it exists match run.get(key).await? { Some(val) => match std::str::from_utf8(&val) { Ok(tb) => { - // Delete the live query - let key = crate::key::lq::new(opt.ns(), opt.db(), &self.id); + // Delete the node live query + let key = crate::key::lq::new(opt.id(), opt.ns(), opt.db(), &self.id); run.del(key).await?; // Delete the table live query let key = crate::key::lv::new(opt.ns(), opt.db(), tb, &self.id); diff --git a/lib/src/sql/statements/live.rs b/lib/src/sql/statements/live.rs index 2d0ed0b6..215fb2f2 100644 --- a/lib/src/sql/statements/live.rs +++ b/lib/src/sql/statements/live.rs @@ -9,8 +9,8 @@ use crate::sql::fetch::{fetch, Fetchs}; use crate::sql::field::{fields, Fields}; use crate::sql::param::param; use crate::sql::table::table; -use crate::sql::uuid::Uuid; use crate::sql::value::Value; +use crate::sql::Uuid; use derive::Store; use nom::branch::alt; use nom::bytes::complete::tag_no_case; @@ -23,6 +23,7 @@ use std::fmt; #[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize, Store, Hash)] pub struct LiveStatement { pub id: Uuid, + pub node: Uuid, pub expr: Fields, pub what: Value, pub cond: Option, @@ -45,12 +46,16 @@ impl LiveStatement { // Process the live query table match self.what.compute(ctx, opt).await? { Value::Table(tb) => { - // Insert the live query - let key = crate::key::lq::new(opt.ns(), opt.db(), &self.id); + // Clone the current statement + let mut stm = self.clone(); + // Store the current Node ID + stm.node = Uuid(*opt.id); + // Insert the node live query + let key = crate::key::lq::new(opt.id(), opt.ns(), opt.db(), &self.id); run.putc(key, tb.as_str(), None).await?; // Insert the table live query let key = crate::key::lv::new(opt.ns(), opt.db(), &tb, &self.id); - run.putc(key, self.clone(), None).await?; + run.putc(key, stm, None).await?; } v => { return Err(Error::LiveStatement { @@ -59,7 +64,7 @@ impl LiveStatement { } }; // Return the query id - Ok(self.id.clone().into()) + Ok(Value::Uuid(self.id.clone())) } } @@ -89,7 +94,8 @@ pub fn live(i: &str) -> IResult<&str, LiveStatement> { Ok(( i, LiveStatement { - id: Uuid::new(), + id: Uuid::new_v4(), + node: Uuid::default(), expr, what, cond, diff --git a/lib/src/sql/statements/sleep.rs b/lib/src/sql/statements/sleep.rs index 95a1f2c9..f9d23b92 100644 --- a/lib/src/sql/statements/sleep.rs +++ b/lib/src/sql/statements/sleep.rs @@ -58,11 +58,12 @@ pub fn sleep(i: &str) -> IResult<&str, SleepStatement> { #[cfg(test)] mod tests { - use super::*; use crate::dbs::test::mock; use crate::dbs::Auth; + use std::sync::Arc; use std::time::SystemTime; + use uuid::Uuid; #[test] fn test_sleep_statement_sec() { @@ -86,7 +87,8 @@ mod tests { async fn test_sleep_compute() { let sql = "SLEEP 500ms"; let time = SystemTime::now(); - let opt = Options::new(Auth::Kv); + let opt = + Options::new(Arc::new(Uuid::new_v4()), channel::unbounded().0, Arc::new(Auth::Kv)); let (ctx, _) = mock().await; let (_, stm) = sleep(sql).unwrap(); let value = stm.compute(&ctx, &opt).await.unwrap(); diff --git a/src/net/rpc.rs b/src/net/rpc.rs index 378d3e8f..3ad5a43f 100644 --- a/src/net/rpc.rs +++ b/src/net/rpc.rs @@ -14,13 +14,13 @@ use crate::rpc::res::Failure; use crate::rpc::res::Output; use futures::{SinkExt, StreamExt}; use once_cell::sync::Lazy; -use serde::Serialize; + use std::collections::BTreeMap; use std::collections::HashMap; use std::sync::Arc; use surrealdb::channel; use surrealdb::channel::Sender; -use surrealdb::dbs::Session; +use surrealdb::dbs::{QueryType, Response, Session}; use surrealdb::opt::auth::Root; use surrealdb::sql::Array; use surrealdb::sql::Object; @@ -33,8 +33,11 @@ use warp::ws::{Message, WebSocket, Ws}; use warp::Filter; type WebSockets = RwLock>>; +// Mapping of LiveQueryID to WebSocketID +type LiveQueries = RwLock>; static WEBSOCKETS: Lazy = Lazy::new(WebSockets::default); +static LIVE_QUERIES: Lazy = Lazy::new(LiveQueries::default); #[allow(opaque_hidden_inferred_bound)] pub fn config() -> impl Filter + Clone { @@ -119,6 +122,44 @@ impl Rpc { } } }); + // Send notifications to the client + let moved_rpc = rpc.clone(); + tokio::task::spawn(async move { + let rpc = moved_rpc; + while let Ok(v) = DB.get().unwrap().notifications().recv().await { + trace!(target: LOG, "Received notification: {:?}", v); + // Find which websocket the notification belongs to + match LIVE_QUERIES.read().await.get(&v.id) { + Some(ws_id) => { + // Send the notification to the client + let msg_text = res::success(None, v.clone()); + let ws_write = WEBSOCKETS.write().await; + match ws_write.get(ws_id) { + None => { + error!( + target: LOG, + "Tracked WebSocket {:?} not found for lq: {:?}", ws_id, &v.id + ); + } + Some(ws_sender) => { + msg_text + .send(rpc.read().await.format.clone(), ws_sender.clone()) + .await; + trace!( + target: LOG, + "Sent notification to WebSocket {:?} for lq: {:?}", + ws_id, + &v.id + ); + } + } + } + None => { + error!(target: LOG, "Unknown websocket for live query: {:?}", v.id); + } + } + } + }); // Get messages from the client while let Some(msg) = wrx.next().await { match msg { @@ -172,6 +213,18 @@ impl Rpc { trace!(target: LOG, "WebSocket {} disconnected", id); // Remove this WebSocket from the list of WebSockets WEBSOCKETS.write().await.remove(&id); + // Remove all live queries + let mut locked_lq_map = LIVE_QUERIES.write().await; + let mut live_query_to_gc: Vec = vec![]; + for (key, value) in locked_lq_map.iter() { + if value == &id { + trace!(target: LOG, "Removing live query: {}", key); + live_query_to_gc.push(*key); + } + } + for key in live_query_to_gc { + locked_lq_map.remove(&key); + } } /// Call RPC methods from the WebSocket @@ -463,44 +516,40 @@ impl Rpc { #[instrument(skip_all, name = "rpc kill", fields(websocket=self.uuid.to_string()))] async fn kill(&self, id: Value) -> Result { - // Get a database reference - let kvs = DB.get().unwrap(); - // Get local copy of options - let opt = CF.get().unwrap(); // Specify the SQL query string let sql = "KILL $id"; // Specify the query parameters - let var = Some(map! { + let var = map! { String::from("id") => id, => &self.vars - }); + }; // Execute the query on the database - let mut res = kvs.execute(sql, &self.session, var, opt.strict).await?; + let mut res = self.query_with(Strand::from(sql), Object::from(var)).await?; // Extract the first query result - let res = res.remove(0).result?; - // Return the result to the client - Ok(res) + let response = res.remove(0); + match response.result { + Ok(v) => Ok(v), + Err(e) => Err(Error::from(e)), + } } #[instrument(skip_all, name = "rpc live", fields(websocket=self.uuid.to_string()))] async fn live(&self, tb: Value) -> Result { - // Get a database reference - let kvs = DB.get().unwrap(); - // Get local copy of options - let opt = CF.get().unwrap(); // Specify the SQL query string let sql = "LIVE SELECT * FROM $tb"; // Specify the query parameters - let var = Some(map! { + let var = map! { String::from("tb") => tb.could_be_table(), => &self.vars - }); + }; // Execute the query on the database - let mut res = kvs.execute(sql, &self.session, var, opt.strict).await?; + let mut res = self.query_with(Strand::from(sql), Object::from(var)).await?; // Extract the first query result - let res = res.remove(0).result?; - // Return the result to the client - Ok(res) + let response = res.remove(0); + match response.result { + Ok(v) => Ok(v), + Err(e) => Err(Error::from(e)), + } } // ------------------------------ @@ -687,12 +736,43 @@ impl Rpc { Ok(res) } + async fn handle_live_query_results(&self, res: &Response) { + match &res.query_type { + QueryType::Live => { + if let Ok(Value::Uuid(lqid)) = &res.result { + // Match on Uuid type + LIVE_QUERIES.write().await.insert(lqid.0, self.uuid); + trace!( + target: LOG, + "Registered live query {} on websocket {}", + lqid, + self.uuid + ); + } + } + QueryType::Kill => { + if let Ok(Value::Uuid(lqid)) = &res.result { + let ws_id = LIVE_QUERIES.write().await.remove(&lqid.0); + if let Some(ws_id) = ws_id { + trace!( + target: LOG, + "Unregistered live query {} on websocket {}", + lqid, + ws_id + ); + } + } + } + _ => {} + } + } + // ------------------------------ // Methods for querying // ------------------------------ #[instrument(skip_all, name = "rpc query", fields(websocket=self.uuid.to_string()))] - async fn query(&self, sql: Strand) -> Result { + async fn query(&self, sql: Strand) -> Result, Error> { // Get a database reference let kvs = DB.get().unwrap(); // Get local copy of options @@ -701,12 +781,16 @@ impl Rpc { let var = Some(self.vars.clone()); // Execute the query on the database let res = kvs.execute(&sql, &self.session, var, opt.strict).await?; + // Post-process hooks for web layer + for response in &res { + self.handle_live_query_results(response).await; + } // Return the result to the client Ok(res) } #[instrument(skip_all, name = "rpc query_with", fields(websocket=self.uuid.to_string()))] - async fn query_with(&self, sql: Strand, mut vars: Object) -> Result { + async fn query_with(&self, sql: Strand, mut vars: Object) -> Result, Error> { // Get a database reference let kvs = DB.get().unwrap(); // Get local copy of options @@ -715,6 +799,10 @@ impl Rpc { let var = Some(mrg! { vars.0, &self.vars }); // Execute the query on the database let res = kvs.execute(&sql, &self.session, var, opt.strict).await?; + // Post-process hooks for web layer + for response in &res { + self.handle_live_query_results(response).await; + } // Return the result to the client Ok(res) } diff --git a/tests/cli.rs b/tests/cli.rs index 5a29b684..ee5d4c36 100644 --- a/tests/cli.rs +++ b/tests/cli.rs @@ -117,7 +117,7 @@ mod cli_integration { let _server = run(&start_args); - std::thread::sleep(std::time::Duration::from_millis(500)); + std::thread::sleep(std::time::Duration::from_millis(5000)); assert!(run(&format!("isready --conn http://{addr}")).output().is_ok());