From 569182ee7bb0acbc8ab1fd436c39ffb571a183cc Mon Sep 17 00:00:00 2001 From: Tobie Morgan Hitchcock Date: Mon, 27 Jun 2022 17:01:39 +0100 Subject: [PATCH] Implement LIVE and KILL statements --- lib/src/dbs/options.rs | 10 +++++++ lib/src/dbs/session.rs | 6 ++-- lib/src/err/mod.rs | 16 ++++++++++ lib/src/key/lq.rs | 53 ++++++++++++++++++++++++++++++++++ lib/src/key/lv.rs | 11 +++---- lib/src/key/mod.rs | 2 ++ lib/src/kvs/ds.rs | 15 ++++++++-- lib/src/sql/statements/kill.rs | 47 ++++++++++++++++++++++++++---- lib/src/sql/statements/live.rs | 48 +++++++++++++++++++++++++----- 9 files changed, 184 insertions(+), 24 deletions(-) create mode 100644 lib/src/key/lq.rs diff --git a/lib/src/dbs/options.rs b/lib/src/dbs/options.rs index 564fb3d7..a0b6d370 100644 --- a/lib/src/dbs/options.rs +++ b/lib/src/dbs/options.rs @@ -21,6 +21,8 @@ pub struct Options { pub auth: Arc, // How many subqueries have we gone into? pub dive: usize, + // Whether live queries are allowed? + pub live: bool, // Should we debug query response SQL? pub debug: bool, // Should we force tables/events to re-run? @@ -50,6 +52,7 @@ impl Options { ns: None, db: None, dive: 0, + live: false, perms: true, debug: false, force: false, @@ -176,6 +179,13 @@ impl Options { } } + pub fn realtime(&self) -> Result<(), Error> { + if !self.live { + return Err(Error::RealtimeDisabled); + } + Ok(()) + } + // Check whether the authentication permissions are ok pub fn check(&self, level: Level) -> Result<(), Error> { if !self.auth.check(level) { diff --git a/lib/src/dbs/session.rs b/lib/src/dbs/session.rs index 842d4ca7..dc9470e6 100644 --- a/lib/src/dbs/session.rs +++ b/lib/src/dbs/session.rs @@ -8,15 +8,17 @@ use std::sync::Arc; pub struct Session { /// The current [`Auth`] information pub au: Arc, + /// Whether realtime queries are supported + pub rt: bool, /// The current connection IP address pub ip: Option, /// The current connection origin pub or: Option, /// The current connection ID pub id: Option, - /// THe currently selected namespace + /// The currently selected namespace pub ns: Option, - /// THe currently selected database + /// The currently selected database pub db: Option, /// The currently selected authentication scope pub sc: Option, diff --git a/lib/src/err/mod.rs b/lib/src/err/mod.rs index 3c0535df..53424142 100644 --- a/lib/src/err/mod.rs +++ b/lib/src/err/mod.rs @@ -136,6 +136,10 @@ pub enum Error { #[error("The table does not exist")] TbNotFound, + /// Unable to perform the realtime query + #[error("Unable to perform the realtime query")] + RealtimeDisabled, + /// Too many recursive subqueries have been processed #[error("Too many recursive subqueries have been processed")] TooManySubqueries, @@ -170,6 +174,18 @@ pub enum Error { value: String, }, + /// Can not execute LIVE query using the specified value + #[error("Can not execute LIVE query using value '{value}'")] + LiveStatement { + value: String, + }, + + /// Can not execute KILL query using the specified id + #[error("Can not execute KILL query using id '{value}'")] + KillStatement { + value: String, + }, + /// The permissions do not allow this query to be run on this table #[error("You don't have permission to run this query on the `{table}` table")] TablePermissions { diff --git a/lib/src/key/lq.rs b/lib/src/key/lq.rs new file mode 100644 index 00000000..7d9a79df --- /dev/null +++ b/lib/src/key/lq.rs @@ -0,0 +1,53 @@ +use crate::sql::uuid::Uuid; +use derive::Key; +use serde::{Deserialize, Serialize}; + +#[derive(Clone, Debug, Eq, PartialEq, PartialOrd, Serialize, Deserialize, Key)] +pub struct Lq { + __: u8, + _a: u8, + pub ns: String, + _b: u8, + pub db: String, + _c: u8, + _d: u8, + _e: u8, + pub lq: Uuid, +} + +pub fn new(ns: &str, db: &str, lq: &Uuid) -> Lq { + Lq::new(ns.to_string(), db.to_string(), lq.to_owned()) +} + +impl Lq { + pub fn new(ns: String, db: String, lq: Uuid) -> Lq { + Lq { + __: 0x2f, // / + _a: 0x2a, // * + ns, + _b: 0x2a, // * + db, + _c: 0x21, // ! + _d: 0x6c, // l + _e: 0x71, // v + lq, + } + } +} + +#[cfg(test)] +mod tests { + #[test] + fn key() { + use super::*; + #[rustfmt::skip] + let val = Lq::new( + "test".to_string(), + "test".to_string(), + "00000000-0000-0000-0000-000000000000".into(), + ); + let enc = Lq::encode(&val).unwrap(); + let dec = Lq::decode(&enc).unwrap(); + assert_eq!(val, dec); + } +} diff --git a/lib/src/key/lv.rs b/lib/src/key/lv.rs index 8732d7bc..8efe604a 100644 --- a/lib/src/key/lv.rs +++ b/lib/src/key/lv.rs @@ -1,3 +1,4 @@ +use crate::sql::uuid::Uuid; use derive::Key; use serde::{Deserialize, Serialize}; @@ -13,11 +14,11 @@ pub struct Lv { _d: u8, _e: u8, _f: u8, - pub lv: String, + pub lv: Uuid, } -pub fn new(ns: &str, db: &str, tb: &str, lv: &str) -> Lv { - Lv::new(ns.to_string(), db.to_string(), tb.to_string(), lv.to_string()) +pub fn new(ns: &str, db: &str, tb: &str, lv: &Uuid) -> Lv { + Lv::new(ns.to_string(), db.to_string(), tb.to_string(), lv.to_owned()) } pub fn prefix(ns: &str, db: &str, tb: &str) -> Vec { @@ -33,7 +34,7 @@ pub fn suffix(ns: &str, db: &str, tb: &str) -> Vec { } impl Lv { - pub fn new(ns: String, db: String, tb: String, lv: String) -> Lv { + pub fn new(ns: String, db: String, tb: String, lv: Uuid) -> Lv { Lv { __: 0x2f, // / _a: 0x2a, // * @@ -60,7 +61,7 @@ mod tests { "test".to_string(), "test".to_string(), "test".to_string(), - "test".to_string(), + "00000000-0000-0000-0000-000000000000".into(), ); let enc = Lv::encode(&val).unwrap(); let dec = Lv::decode(&enc).unwrap(); diff --git a/lib/src/key/mod.rs b/lib/src/key/mod.rs index 0b28bf43..6d8c8b7d 100644 --- a/lib/src/key/mod.rs +++ b/lib/src/key/mod.rs @@ -12,6 +12,7 @@ /// SC /*{ns}*{db}!sc{sc} /// ST /*{ns}*{db}!st{sc}!tk{tk} /// TB /*{ns}*{db}!tb{tb} +/// LQ /*{ns}*{db}!lq{lq} /// /// Table /*{ns}*{db}*{tb} /// FT /*{ns}*{db}*{tb}!ft{ft} @@ -37,6 +38,7 @@ pub mod graph; pub mod index; pub mod ix; pub mod kv; +pub mod lq; pub mod lv; pub mod namespace; pub mod nl; diff --git a/lib/src/kvs/ds.rs b/lib/src/kvs/ds.rs index 7701acbc..04a59a72 100644 --- a/lib/src/kvs/ds.rs +++ b/lib/src/kvs/ds.rs @@ -163,10 +163,14 @@ impl Datastore { let ctx = vars.attach(ctx); // Parse the SQL query text let ast = sql::parse(txt)?; - // Process all statements + // Setup the auth options opt.auth = sess.au.clone(); + // Setup the live options + opt.live = sess.rt; + // Set current NS and DB opt.ns = sess.ns(); opt.db = sess.db(); + // Process all statements exe.execute(ctx, opt, ast).await } @@ -187,10 +191,14 @@ impl Datastore { let ctx = sess.context(ctx); // Store the query variables let ctx = vars.attach(ctx); - // Process all statements + // Setup the auth options opt.auth = sess.au.clone(); + // Setup the live options + opt.live = sess.rt; + // Set current NS and DB opt.ns = sess.ns(); opt.db = sess.db(); + // Process all statements exe.execute(ctx, opt, ast).await } @@ -213,8 +221,9 @@ impl Datastore { let ctx = sess.context(ctx); // Store the query variables let ctx = vars.attach(ctx); - // Setup the query options + // Setup the auth options opt.auth = sess.au.clone(); + // Set current NS and DB opt.ns = sess.ns(); opt.db = sess.db(); // Compute the value diff --git a/lib/src/sql/statements/kill.rs b/lib/src/sql/statements/kill.rs index a7ec22c5..0846e5f9 100644 --- a/lib/src/sql/statements/kill.rs +++ b/lib/src/sql/statements/kill.rs @@ -1,10 +1,11 @@ use crate::ctx::Context; +use crate::dbs::Level; use crate::dbs::Options; use crate::dbs::Transaction; use crate::err::Error; use crate::sql::comment::shouldbespace; use crate::sql::error::IResult; -use crate::sql::ident::{ident, Ident}; +use crate::sql::uuid::{uuid, Uuid}; use crate::sql::value::Value; use derive::Store; use nom::bytes::complete::tag_no_case; @@ -13,18 +14,52 @@ use std::fmt; #[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize, Store)] pub struct KillStatement { - pub id: Ident, + pub id: Uuid, } impl KillStatement { pub(crate) async fn compute( &self, _ctx: &Context<'_>, - _opt: &Options, - _txn: &Transaction, + opt: &Options, + txn: &Transaction, _doc: Option<&Value>, ) -> Result { - todo!() + // Allowed to run? + opt.realtime()?; + // Allowed to run? + opt.check(Level::No)?; + // Clone transaction + let run = txn.clone(); + // Claim transaction + let mut run = run.lock().await; + // Create the live query key + let key = crate::key::lq::new(opt.ns(), opt.db(), &self.id); + // Fetch the live query key if it exists + match run.get(key).await? { + Some(val) => match std::str::from_utf8(&val) { + Ok(tb) => { + // Delete the live query + let key = crate::key::lq::new(opt.ns(), opt.db(), &self.id); + run.del(key).await?; + // Delete the table live query + let key = crate::key::lv::new(opt.ns(), opt.db(), tb, &self.id); + run.del(key).await?; + } + _ => { + return Err(Error::KillStatement { + value: self.id.to_string(), + }) + } + }, + None => { + return Err(Error::KillStatement { + value: self.id.to_string(), + }) + } + } + // Return the query id + Ok(Value::None) } } @@ -37,7 +72,7 @@ impl fmt::Display for KillStatement { pub fn kill(i: &str) -> IResult<&str, KillStatement> { let (i, _) = tag_no_case("KILL")(i)?; let (i, _) = shouldbespace(i)?; - let (i, v) = ident(i)?; + let (i, v) = uuid(i)?; Ok(( i, KillStatement { diff --git a/lib/src/sql/statements/live.rs b/lib/src/sql/statements/live.rs index 75679076..9de22924 100644 --- a/lib/src/sql/statements/live.rs +++ b/lib/src/sql/statements/live.rs @@ -1,4 +1,5 @@ use crate::ctx::Context; +use crate::dbs::Level; use crate::dbs::Options; use crate::dbs::Transaction; use crate::err::Error; @@ -7,10 +8,14 @@ use crate::sql::cond::{cond, Cond}; use crate::sql::error::IResult; use crate::sql::fetch::{fetch, Fetchs}; use crate::sql::field::{fields, Fields}; +use crate::sql::param::param; +use crate::sql::table::table; +use crate::sql::uuid::Uuid; use crate::sql::value::Value; -use crate::sql::value::{whats, Values}; use derive::Store; +use nom::branch::alt; use nom::bytes::complete::tag_no_case; +use nom::combinator::map; use nom::combinator::opt; use nom::sequence::preceded; use serde::{Deserialize, Serialize}; @@ -18,8 +23,9 @@ use std::fmt; #[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize, Store)] pub struct LiveStatement { + pub id: Uuid, pub expr: Fields, - pub what: Values, + pub what: Value, pub cond: Option, pub fetch: Option, } @@ -27,12 +33,37 @@ pub struct LiveStatement { impl LiveStatement { pub(crate) async fn compute( &self, - _ctx: &Context<'_>, - _opt: &Options, - _txn: &Transaction, - _doc: Option<&Value>, + ctx: &Context<'_>, + opt: &Options, + txn: &Transaction, + doc: Option<&Value>, ) -> Result { - todo!() + // Allowed to run? + opt.realtime()?; + // Allowed to run? + opt.check(Level::No)?; + // Clone transaction + let run = txn.clone(); + // Claim transaction + let mut run = run.lock().await; + // Process the live query table + match self.what.compute(ctx, opt, txn, doc).await? { + Value::Table(tb) => { + // Insert the live query + let key = crate::key::lq::new(opt.ns(), opt.db(), &self.id); + run.putc(key, tb.as_str(), None).await?; + // Insert the table live query + let key = crate::key::lv::new(opt.ns(), opt.db(), &tb, &self.id); + run.putc(key, self.clone(), None).await?; + } + v => { + return Err(Error::LiveStatement { + value: v.to_string(), + }) + } + }; + // Return the query id + Ok(self.id.clone().into()) } } @@ -56,12 +87,13 @@ pub fn live(i: &str) -> IResult<&str, LiveStatement> { let (i, _) = shouldbespace(i)?; let (i, _) = tag_no_case("FROM")(i)?; let (i, _) = shouldbespace(i)?; - let (i, what) = whats(i)?; + let (i, what) = alt((map(param, Value::from), map(table, Value::from)))(i)?; let (i, cond) = opt(preceded(shouldbespace, cond))(i)?; let (i, fetch) = opt(preceded(shouldbespace, fetch))(i)?; Ok(( i, LiveStatement { + id: Uuid::new(), expr, what, cond,