diff --git a/src/dbs/auth.rs b/src/dbs/auth.rs new file mode 100644 index 00000000..3552624a --- /dev/null +++ b/src/dbs/auth.rs @@ -0,0 +1,59 @@ +#[derive(Clone, Debug, Eq, PartialEq, PartialOrd)] +pub enum Level { + No, + Kv, + Ns, + Db, + Sc, +} + +#[derive(Clone, Debug, Eq, PartialEq, PartialOrd)] +pub enum Auth { + No, + Kv, + Ns(String), + Db(String, String), + Sc(String, String, String), +} + +impl Default for Auth { + fn default() -> Self { + Auth::No + } +} + +impl Auth { + pub fn check(&self, level: Level) -> bool { + match self { + Auth::No => match level { + Level::No => true, + _ => false, + }, + Auth::Kv => match level { + Level::No => true, + Level::Kv => true, + _ => false, + }, + Auth::Ns(_) => match level { + Level::No => true, + Level::Kv => true, + Level::Ns => true, + _ => false, + }, + Auth::Db(_, _) => match level { + Level::No => true, + Level::Kv => true, + Level::Ns => true, + Level::Db => true, + _ => false, + }, + Auth::Sc(_, _, _) => match level { + Level::No => true, + Level::Kv => true, + Level::Ns => true, + Level::Db => true, + Level::Sc => true, + }, + } + } +} diff --git a/src/dbs/dbs.rs b/src/dbs/dbs.rs index 8c4cb6bb..0afc10a9 100644 --- a/src/dbs/dbs.rs +++ b/src/dbs/dbs.rs @@ -1,29 +1,45 @@ -use crate::ctx::Context; use crate::dbs::executor::Executor; use crate::dbs::response::Responses; +use crate::dbs::session::Session; use crate::err::Error; use crate::sql; use crate::sql::query::Query; +use crate::sql::value::Value; use std::collections::HashMap; -pub type Vars<'a> = Option>; +pub type Variables = Option>; -pub fn execute(txt: &str, vars: Vars) -> Result { - // Parse the SQL query into an AST +pub async fn execute(txt: &str, session: Session, vars: Variables) -> Result { + // Create a new query executor + let mut exe = Executor::new(); + // Create a new execution context + let ctx = session.context(); + // Parse the SQL query text let ast = sql::parse(txt)?; - // Create a new execution context - let ctx = Context::background().freeze(); - // Create a new query executor - let exe = Executor::new(); - // Process all of the queries - exe.execute(&ctx, ast) + // Process all statements + exe.ns = session.ns; + exe.db = session.db; + exe.execute(ctx, ast) } -pub fn process(ast: Query, vars: Vars) -> Result { - // Create a new execution context - let ctx = Context::background().freeze(); +pub async fn process(ast: Query, session: Session, vars: Variables) -> Result { // Create a new query executor - let exe = Executor::new(); - // Process all of the queries - exe.execute(&ctx, ast) + let mut exe = Executor::new(); + // Store session info on context + let ctx = session.context(); + // Process all statements + exe.ns = session.ns; + exe.db = session.db; + exe.execute(ctx, ast) +} + +pub fn export(session: Session) -> Result { + // Create a new query executor + let mut exe = Executor::new(); + // Create a new execution context + let ctx = session.context(); + // Process database export + exe.ns = session.ns; + exe.db = session.db; + exe.export(ctx) } diff --git a/src/dbs/executor.rs b/src/dbs/executor.rs index fbb7d826..718eaf98 100644 --- a/src/dbs/executor.rs +++ b/src/dbs/executor.rs @@ -1,50 +1,163 @@ +use crate::ctx::Context; use crate::dbs::response::{Response, Responses}; +use crate::dbs::Auth; +use crate::dbs::Level; +use crate::dbs::Options; use crate::dbs::Process; use crate::dbs::Runtime; use crate::err::Error; use crate::sql::query::Query; +use crate::sql::statement::Statement; +use crate::sql::value::Value; use std::time::Instant; -#[derive(Clone, Debug, Default, Eq, PartialEq)] +#[derive(Debug, Default)] pub struct Executor { - pub id: String, - pub ns: String, - pub db: String, + pub id: Option, + pub ns: Option, + pub db: Option, + pub txn: Option<()>, + pub err: Option, } impl Executor { pub fn new() -> Executor { Executor { - id: String::from("id"), - ns: String::from("ns"), - db: String::from("db"), + id: None, + ns: None, + db: None, + ..Executor::default() } } - pub fn execute(&self, ctx: &Runtime, qry: Query) -> Result { - let mut r: Vec = vec![]; + pub fn check(&mut self, opt: &Options, level: Level) -> Result<(), Error> { + if opt.auth.check(level) == false { + return Err(Error::QueryPermissionsError); + } + if self.ns.is_none() { + return Err(Error::NsError); + } + if self.db.is_none() { + return Err(Error::DbError); + } + Ok(()) + } + pub fn export(&mut self, ctx: Runtime) -> Result { + todo!() + } + + pub fn execute(&mut self, mut ctx: Runtime, qry: Query) -> Result { + // Initialise array of responses + let mut out: Vec = vec![]; + // Create a new options + let mut opt = Options::new(&Auth::No); + // Process all statements in query for stm in qry.statements().iter() { + // Reset errors + if self.txn.is_none() { + self.err = None; + } // Get the statement start time let now = Instant::now(); // Process a single statement - let res = stm.process(&ctx, self, None); + let res = match stm { + // Specify runtime options + Statement::Option(stm) => { + match &stm.name.name[..] { + "FIELD_QUERIES" => opt = opt.fields(stm.what), + "EVENT_QUERIES" => opt = opt.events(stm.what), + "TABLE_QUERIES" => opt = opt.tables(stm.what), + "IMPORT" => opt = opt.import(stm.what), + _ => break, + } + continue; + } + // Begin a new transaction + Statement::Begin(stm) => { + let res = stm.process(&ctx, &opt, self, None); + self.err = res.err(); + continue; + } + // Cancel a running transaction + Statement::Cancel(stm) => { + let res = stm.process(&ctx, &opt, self, None); + self.err = res.err(); + continue; + } + // Commit a running transaction + Statement::Commit(stm) => { + let res = stm.process(&ctx, &opt, self, None); + self.err = res.err(); + continue; + } + // Process param definition statements + Statement::Set(stm) => { + match stm.process(&ctx, &opt, self, None) { + Ok(val) => { + let mut new = Context::new(&ctx); + let key = stm.name.to_owned(); + new.add_value(key, val); + ctx = new.freeze(); + } + _ => break, + } + Ok(Value::None) + } + // Process all other normal statements + _ => { + // Enable context override + let mut ctx = Context::new(&ctx).freeze(); + // Specify statement timeout + if let Some(timeout) = stm.timeout() { + let mut new = Context::new(&ctx); + new.add_timeout(timeout); + ctx = new.freeze(); + } + // Process statement + let res = stm.process(&ctx, &opt, self, None); + // Catch statement timeout + if let Some(timeout) = stm.timeout() { + if ctx.is_timedout() { + self.err = Some(Error::QueryTimeoutError { + timer: timeout, + }); + } + } + // Continue with result + res + } + }; // Get the statement end time let dur = now.elapsed(); - - r.push(Response { - sql: format!("{}", stm), - time: format!("{:?}", dur), - status: match res { - Ok(_) => String::from("OK"), - Err(_) => String::from("ERR"), - }, - result: match res { - Ok(v) => Some(v), - Err(_) => None, - }, - }) + // Check transaction errors + match &self.err { + Some(e) => out.push(Response { + time: format!("{:?}", dur), + status: String::from("ERR"), + detail: Some(format!("{}", e)), + result: None, + }), + None => { + // Format responses + match res { + Ok(v) => out.push(Response { + time: format!("{:?}", dur), + status: String::from("OK"), + detail: None, + result: v.output(), + }), + Err(e) => out.push(Response { + time: format!("{:?}", dur), + status: String::from("ERR"), + detail: Some(format!("{}", e)), + result: None, + }), + } + } + } } - Ok(Responses(r)) + // Return responses + Ok(Responses(out)) } } diff --git a/src/dbs/mod.rs b/src/dbs/mod.rs index 531c7fb2..313db6ef 100644 --- a/src/dbs/mod.rs +++ b/src/dbs/mod.rs @@ -1,13 +1,19 @@ +mod auth; mod dbs; mod executor; mod iterator; +mod options; mod process; mod response; mod runtime; +mod session; +pub use self::auth::*; pub use self::dbs::*; pub use self::executor::*; pub use self::iterator::*; +pub use self::options::*; pub use self::process::*; pub use self::response::*; pub use self::runtime::*; +pub use self::session::*; diff --git a/src/dbs/options.rs b/src/dbs/options.rs new file mode 100644 index 00000000..6498cbe5 --- /dev/null +++ b/src/dbs/options.rs @@ -0,0 +1,117 @@ +use crate::cnf; +use crate::dbs::Auth; +use crate::err::Error; +use crate::sql::version::Version; + +// An Options is passed around when processing a set of query +// statements. An Options contains specific information for how +// to process each particular statement, including the record +// version to retrieve, whether futures should be processed, and +// whether field/event/table queries should be processed (useful +// when importing data, where these queries might fail). + +#[derive(Clone, Debug, Eq, PartialEq)] +pub struct Options<'a> { + pub auth: &'a Auth, + pub dive: usize, // How many subqueries have we gone into? + pub force: bool, // Should we force tables/events to re-run? + pub fields: bool, // Should we process field queries? + pub events: bool, // Should we process event queries? + pub tables: bool, // Should we process table queries? + pub futures: bool, // Should we process function futures? + pub version: Option<&'a Version>, // Current +} + +impl<'a> Default for Options<'a> { + fn default() -> Self { + Options::new(&Auth::No) + } +} + +impl<'a> Options<'a> { + // Create a new Options object + pub fn new(auth: &'a Auth) -> Options<'a> { + Options { + auth, + dive: 0, + force: false, + fields: true, + events: true, + tables: true, + futures: false, + version: None, + } + } + + // Create a new Options object for a subquery + pub fn dive(&self) -> Result, Error> { + if self.dive < cnf::MAX_RECURSIVE_QUERIES { + Ok(Options { + dive: self.dive + 1, + ..*self + }) + } else { + Err(Error::RecursiveSubqueryError { + limit: self.dive, + }) + } + } + + // Create a new Options object for a subquery + pub fn force(&self, v: bool) -> Options<'a> { + Options { + force: v, + ..*self + } + } + + // Create a new Options object for a subquery + pub fn fields(&self, v: bool) -> Options<'a> { + Options { + fields: v, + ..*self + } + } + + // Create a new Options object for a subquery + pub fn events(&self, v: bool) -> Options<'a> { + Options { + events: v, + ..*self + } + } + + // Create a new Options object for a subquery + pub fn tables(&self, v: bool) -> Options<'a> { + Options { + tables: v, + ..*self + } + } + + // Create a new Options object for a subquery + pub fn import(&self, v: bool) -> Options<'a> { + Options { + fields: v, + events: v, + tables: v, + ..*self + } + } + + // Create a new Options object for a subquery + pub fn futures(&self, v: bool) -> Options<'a> { + Options { + futures: v, + ..*self + } + } + + // Create a new Options object for a subquery + pub fn version(&self, v: Option<&'a Version>) -> Options<'a> { + Options { + version: v, + ..*self + } + } +} diff --git a/src/dbs/session.rs b/src/dbs/session.rs new file mode 100644 index 00000000..a7cf9123 --- /dev/null +++ b/src/dbs/session.rs @@ -0,0 +1,49 @@ +use crate::ctx::Context; +use crate::dbs::Auth; +use crate::dbs::Runtime; +use crate::sql::value::Value; + +#[derive(Clone, Debug, Default, Eq, PartialEq)] +pub struct Session { + pub au: Auth, // Authentication info + pub ip: Option, // Session ip address + pub or: Option, // Session origin + pub id: Option, // Session id + pub ns: Option, // Namespace + pub db: Option, // Database + pub sc: Option, // Scope + pub sd: Option, // Scope auth data +} + +impl Session { + pub fn context(&self) -> Runtime { + let mut ctx = Context::background(); + // Add session value + let key = String::from("session"); + let val: Value = self.into(); + ctx.add_value(key, val); + // Add scope value + let key = String::from("scope"); + let val: Value = self.sc.to_owned().into(); + ctx.add_value(key, val); + // Add auth data + let key = String::from("auth"); + let val: Value = self.sd.to_owned().into(); + ctx.add_value(key, val); + // Output context + ctx.freeze() + } +} + +impl Into for &Session { + fn into(self) -> Value { + Value::from(map! { + "ip".to_string() => self.ip.to_owned().into(), + "or".to_string() => self.or.to_owned().into(), + "id".to_string() => self.id.to_owned().into(), + "ns".to_string() => self.ns.to_owned().into(), + "db".to_string() => self.db.to_owned().into(), + "sc".to_string() => self.sc.to_owned().into(), + }) + } +}