Implement LIVE and KILL statements

This commit is contained in:
Tobie Morgan Hitchcock 2022-06-27 17:01:39 +01:00
parent 219f4a54ed
commit 569182ee7b
9 changed files with 184 additions and 24 deletions

View file

@ -21,6 +21,8 @@ pub struct Options {
pub auth: Arc<Auth>, pub auth: Arc<Auth>,
// How many subqueries have we gone into? // How many subqueries have we gone into?
pub dive: usize, pub dive: usize,
// Whether live queries are allowed?
pub live: bool,
// Should we debug query response SQL? // Should we debug query response SQL?
pub debug: bool, pub debug: bool,
// Should we force tables/events to re-run? // Should we force tables/events to re-run?
@ -50,6 +52,7 @@ impl Options {
ns: None, ns: None,
db: None, db: None,
dive: 0, dive: 0,
live: false,
perms: true, perms: true,
debug: false, debug: false,
force: false, force: false,
@ -176,6 +179,13 @@ impl Options {
} }
} }
pub fn realtime(&self) -> Result<(), Error> {
if !self.live {
return Err(Error::RealtimeDisabled);
}
Ok(())
}
// Check whether the authentication permissions are ok // Check whether the authentication permissions are ok
pub fn check(&self, level: Level) -> Result<(), Error> { pub fn check(&self, level: Level) -> Result<(), Error> {
if !self.auth.check(level) { if !self.auth.check(level) {

View file

@ -8,15 +8,17 @@ use std::sync::Arc;
pub struct Session { pub struct Session {
/// The current [`Auth`] information /// The current [`Auth`] information
pub au: Arc<Auth>, pub au: Arc<Auth>,
/// Whether realtime queries are supported
pub rt: bool,
/// The current connection IP address /// The current connection IP address
pub ip: Option<String>, pub ip: Option<String>,
/// The current connection origin /// The current connection origin
pub or: Option<String>, pub or: Option<String>,
/// The current connection ID /// The current connection ID
pub id: Option<String>, pub id: Option<String>,
/// THe currently selected namespace /// The currently selected namespace
pub ns: Option<String>, pub ns: Option<String>,
/// THe currently selected database /// The currently selected database
pub db: Option<String>, pub db: Option<String>,
/// The currently selected authentication scope /// The currently selected authentication scope
pub sc: Option<String>, pub sc: Option<String>,

View file

@ -136,6 +136,10 @@ pub enum Error {
#[error("The table does not exist")] #[error("The table does not exist")]
TbNotFound, TbNotFound,
/// Unable to perform the realtime query
#[error("Unable to perform the realtime query")]
RealtimeDisabled,
/// Too many recursive subqueries have been processed /// Too many recursive subqueries have been processed
#[error("Too many recursive subqueries have been processed")] #[error("Too many recursive subqueries have been processed")]
TooManySubqueries, TooManySubqueries,
@ -170,6 +174,18 @@ pub enum Error {
value: String, value: String,
}, },
/// Can not execute LIVE query using the specified value
#[error("Can not execute LIVE query using value '{value}'")]
LiveStatement {
value: String,
},
/// Can not execute KILL query using the specified id
#[error("Can not execute KILL query using id '{value}'")]
KillStatement {
value: String,
},
/// The permissions do not allow this query to be run on this table /// The permissions do not allow this query to be run on this table
#[error("You don't have permission to run this query on the `{table}` table")] #[error("You don't have permission to run this query on the `{table}` table")]
TablePermissions { TablePermissions {

53
lib/src/key/lq.rs Normal file
View file

@ -0,0 +1,53 @@
use crate::sql::uuid::Uuid;
use derive::Key;
use serde::{Deserialize, Serialize};
#[derive(Clone, Debug, Eq, PartialEq, PartialOrd, Serialize, Deserialize, Key)]
pub struct Lq {
__: u8,
_a: u8,
pub ns: String,
_b: u8,
pub db: String,
_c: u8,
_d: u8,
_e: u8,
pub lq: Uuid,
}
pub fn new(ns: &str, db: &str, lq: &Uuid) -> Lq {
Lq::new(ns.to_string(), db.to_string(), lq.to_owned())
}
impl Lq {
pub fn new(ns: String, db: String, lq: Uuid) -> Lq {
Lq {
__: 0x2f, // /
_a: 0x2a, // *
ns,
_b: 0x2a, // *
db,
_c: 0x21, // !
_d: 0x6c, // l
_e: 0x71, // v
lq,
}
}
}
#[cfg(test)]
mod tests {
#[test]
fn key() {
use super::*;
#[rustfmt::skip]
let val = Lq::new(
"test".to_string(),
"test".to_string(),
"00000000-0000-0000-0000-000000000000".into(),
);
let enc = Lq::encode(&val).unwrap();
let dec = Lq::decode(&enc).unwrap();
assert_eq!(val, dec);
}
}

View file

@ -1,3 +1,4 @@
use crate::sql::uuid::Uuid;
use derive::Key; use derive::Key;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
@ -13,11 +14,11 @@ pub struct Lv {
_d: u8, _d: u8,
_e: u8, _e: u8,
_f: u8, _f: u8,
pub lv: String, pub lv: Uuid,
} }
pub fn new(ns: &str, db: &str, tb: &str, lv: &str) -> Lv { pub fn new(ns: &str, db: &str, tb: &str, lv: &Uuid) -> Lv {
Lv::new(ns.to_string(), db.to_string(), tb.to_string(), lv.to_string()) Lv::new(ns.to_string(), db.to_string(), tb.to_string(), lv.to_owned())
} }
pub fn prefix(ns: &str, db: &str, tb: &str) -> Vec<u8> { pub fn prefix(ns: &str, db: &str, tb: &str) -> Vec<u8> {
@ -33,7 +34,7 @@ pub fn suffix(ns: &str, db: &str, tb: &str) -> Vec<u8> {
} }
impl Lv { impl Lv {
pub fn new(ns: String, db: String, tb: String, lv: String) -> Lv { pub fn new(ns: String, db: String, tb: String, lv: Uuid) -> Lv {
Lv { Lv {
__: 0x2f, // / __: 0x2f, // /
_a: 0x2a, // * _a: 0x2a, // *
@ -60,7 +61,7 @@ mod tests {
"test".to_string(), "test".to_string(),
"test".to_string(), "test".to_string(),
"test".to_string(), "test".to_string(),
"test".to_string(), "00000000-0000-0000-0000-000000000000".into(),
); );
let enc = Lv::encode(&val).unwrap(); let enc = Lv::encode(&val).unwrap();
let dec = Lv::decode(&enc).unwrap(); let dec = Lv::decode(&enc).unwrap();

View file

@ -12,6 +12,7 @@
/// SC /*{ns}*{db}!sc{sc} /// SC /*{ns}*{db}!sc{sc}
/// ST /*{ns}*{db}!st{sc}!tk{tk} /// ST /*{ns}*{db}!st{sc}!tk{tk}
/// TB /*{ns}*{db}!tb{tb} /// TB /*{ns}*{db}!tb{tb}
/// LQ /*{ns}*{db}!lq{lq}
/// ///
/// Table /*{ns}*{db}*{tb} /// Table /*{ns}*{db}*{tb}
/// FT /*{ns}*{db}*{tb}!ft{ft} /// FT /*{ns}*{db}*{tb}!ft{ft}
@ -37,6 +38,7 @@ pub mod graph;
pub mod index; pub mod index;
pub mod ix; pub mod ix;
pub mod kv; pub mod kv;
pub mod lq;
pub mod lv; pub mod lv;
pub mod namespace; pub mod namespace;
pub mod nl; pub mod nl;

View file

@ -163,10 +163,14 @@ impl Datastore {
let ctx = vars.attach(ctx); let ctx = vars.attach(ctx);
// Parse the SQL query text // Parse the SQL query text
let ast = sql::parse(txt)?; let ast = sql::parse(txt)?;
// Process all statements // Setup the auth options
opt.auth = sess.au.clone(); opt.auth = sess.au.clone();
// Setup the live options
opt.live = sess.rt;
// Set current NS and DB
opt.ns = sess.ns(); opt.ns = sess.ns();
opt.db = sess.db(); opt.db = sess.db();
// Process all statements
exe.execute(ctx, opt, ast).await exe.execute(ctx, opt, ast).await
} }
@ -187,10 +191,14 @@ impl Datastore {
let ctx = sess.context(ctx); let ctx = sess.context(ctx);
// Store the query variables // Store the query variables
let ctx = vars.attach(ctx); let ctx = vars.attach(ctx);
// Process all statements // Setup the auth options
opt.auth = sess.au.clone(); opt.auth = sess.au.clone();
// Setup the live options
opt.live = sess.rt;
// Set current NS and DB
opt.ns = sess.ns(); opt.ns = sess.ns();
opt.db = sess.db(); opt.db = sess.db();
// Process all statements
exe.execute(ctx, opt, ast).await exe.execute(ctx, opt, ast).await
} }
@ -213,8 +221,9 @@ impl Datastore {
let ctx = sess.context(ctx); let ctx = sess.context(ctx);
// Store the query variables // Store the query variables
let ctx = vars.attach(ctx); let ctx = vars.attach(ctx);
// Setup the query options // Setup the auth options
opt.auth = sess.au.clone(); opt.auth = sess.au.clone();
// Set current NS and DB
opt.ns = sess.ns(); opt.ns = sess.ns();
opt.db = sess.db(); opt.db = sess.db();
// Compute the value // Compute the value

View file

@ -1,10 +1,11 @@
use crate::ctx::Context; use crate::ctx::Context;
use crate::dbs::Level;
use crate::dbs::Options; use crate::dbs::Options;
use crate::dbs::Transaction; use crate::dbs::Transaction;
use crate::err::Error; use crate::err::Error;
use crate::sql::comment::shouldbespace; use crate::sql::comment::shouldbespace;
use crate::sql::error::IResult; use crate::sql::error::IResult;
use crate::sql::ident::{ident, Ident}; use crate::sql::uuid::{uuid, Uuid};
use crate::sql::value::Value; use crate::sql::value::Value;
use derive::Store; use derive::Store;
use nom::bytes::complete::tag_no_case; use nom::bytes::complete::tag_no_case;
@ -13,18 +14,52 @@ use std::fmt;
#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize, Store)] #[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize, Store)]
pub struct KillStatement { pub struct KillStatement {
pub id: Ident, pub id: Uuid,
} }
impl KillStatement { impl KillStatement {
pub(crate) async fn compute( pub(crate) async fn compute(
&self, &self,
_ctx: &Context<'_>, _ctx: &Context<'_>,
_opt: &Options, opt: &Options,
_txn: &Transaction, txn: &Transaction,
_doc: Option<&Value>, _doc: Option<&Value>,
) -> Result<Value, Error> { ) -> Result<Value, Error> {
todo!() // Allowed to run?
opt.realtime()?;
// Allowed to run?
opt.check(Level::No)?;
// Clone transaction
let run = txn.clone();
// Claim transaction
let mut run = run.lock().await;
// Create the live query key
let key = crate::key::lq::new(opt.ns(), opt.db(), &self.id);
// Fetch the live query key if it exists
match run.get(key).await? {
Some(val) => match std::str::from_utf8(&val) {
Ok(tb) => {
// Delete the live query
let key = crate::key::lq::new(opt.ns(), opt.db(), &self.id);
run.del(key).await?;
// Delete the table live query
let key = crate::key::lv::new(opt.ns(), opt.db(), tb, &self.id);
run.del(key).await?;
}
_ => {
return Err(Error::KillStatement {
value: self.id.to_string(),
})
}
},
None => {
return Err(Error::KillStatement {
value: self.id.to_string(),
})
}
}
// Return the query id
Ok(Value::None)
} }
} }
@ -37,7 +72,7 @@ impl fmt::Display for KillStatement {
pub fn kill(i: &str) -> IResult<&str, KillStatement> { pub fn kill(i: &str) -> IResult<&str, KillStatement> {
let (i, _) = tag_no_case("KILL")(i)?; let (i, _) = tag_no_case("KILL")(i)?;
let (i, _) = shouldbespace(i)?; let (i, _) = shouldbespace(i)?;
let (i, v) = ident(i)?; let (i, v) = uuid(i)?;
Ok(( Ok((
i, i,
KillStatement { KillStatement {

View file

@ -1,4 +1,5 @@
use crate::ctx::Context; use crate::ctx::Context;
use crate::dbs::Level;
use crate::dbs::Options; use crate::dbs::Options;
use crate::dbs::Transaction; use crate::dbs::Transaction;
use crate::err::Error; use crate::err::Error;
@ -7,10 +8,14 @@ use crate::sql::cond::{cond, Cond};
use crate::sql::error::IResult; use crate::sql::error::IResult;
use crate::sql::fetch::{fetch, Fetchs}; use crate::sql::fetch::{fetch, Fetchs};
use crate::sql::field::{fields, Fields}; use crate::sql::field::{fields, Fields};
use crate::sql::param::param;
use crate::sql::table::table;
use crate::sql::uuid::Uuid;
use crate::sql::value::Value; use crate::sql::value::Value;
use crate::sql::value::{whats, Values};
use derive::Store; use derive::Store;
use nom::branch::alt;
use nom::bytes::complete::tag_no_case; use nom::bytes::complete::tag_no_case;
use nom::combinator::map;
use nom::combinator::opt; use nom::combinator::opt;
use nom::sequence::preceded; use nom::sequence::preceded;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
@ -18,8 +23,9 @@ use std::fmt;
#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize, Store)] #[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize, Store)]
pub struct LiveStatement { pub struct LiveStatement {
pub id: Uuid,
pub expr: Fields, pub expr: Fields,
pub what: Values, pub what: Value,
pub cond: Option<Cond>, pub cond: Option<Cond>,
pub fetch: Option<Fetchs>, pub fetch: Option<Fetchs>,
} }
@ -27,12 +33,37 @@ pub struct LiveStatement {
impl LiveStatement { impl LiveStatement {
pub(crate) async fn compute( pub(crate) async fn compute(
&self, &self,
_ctx: &Context<'_>, ctx: &Context<'_>,
_opt: &Options, opt: &Options,
_txn: &Transaction, txn: &Transaction,
_doc: Option<&Value>, doc: Option<&Value>,
) -> Result<Value, Error> { ) -> Result<Value, Error> {
todo!() // Allowed to run?
opt.realtime()?;
// Allowed to run?
opt.check(Level::No)?;
// Clone transaction
let run = txn.clone();
// Claim transaction
let mut run = run.lock().await;
// Process the live query table
match self.what.compute(ctx, opt, txn, doc).await? {
Value::Table(tb) => {
// Insert the live query
let key = crate::key::lq::new(opt.ns(), opt.db(), &self.id);
run.putc(key, tb.as_str(), None).await?;
// Insert the table live query
let key = crate::key::lv::new(opt.ns(), opt.db(), &tb, &self.id);
run.putc(key, self.clone(), None).await?;
}
v => {
return Err(Error::LiveStatement {
value: v.to_string(),
})
}
};
// Return the query id
Ok(self.id.clone().into())
} }
} }
@ -56,12 +87,13 @@ pub fn live(i: &str) -> IResult<&str, LiveStatement> {
let (i, _) = shouldbespace(i)?; let (i, _) = shouldbespace(i)?;
let (i, _) = tag_no_case("FROM")(i)?; let (i, _) = tag_no_case("FROM")(i)?;
let (i, _) = shouldbespace(i)?; let (i, _) = shouldbespace(i)?;
let (i, what) = whats(i)?; let (i, what) = alt((map(param, Value::from), map(table, Value::from)))(i)?;
let (i, cond) = opt(preceded(shouldbespace, cond))(i)?; let (i, cond) = opt(preceded(shouldbespace, cond))(i)?;
let (i, fetch) = opt(preceded(shouldbespace, fetch))(i)?; let (i, fetch) = opt(preceded(shouldbespace, fetch))(i)?;
Ok(( Ok((
i, i,
LiveStatement { LiveStatement {
id: Uuid::new(),
expr, expr,
what, what,
cond, cond,