Don't store selected NS and DB on executor

This commit is contained in:
Tobie Morgan Hitchcock 2022-02-06 21:06:52 +00:00
parent 49a62aaba5
commit be7f22e35b
67 changed files with 293 additions and 372 deletions

View file

@ -1,4 +1,5 @@
use crate::dbs::executor::Executor;
use crate::dbs::options::Options;
use crate::dbs::response::Responses;
use crate::dbs::session::Session;
use crate::dbs::variables::Attach;
@ -7,8 +8,11 @@ use crate::err::Error;
use crate::sql;
use crate::sql::query::Query;
use hyper::body::Sender;
use std::sync::Arc;
pub async fn execute(txt: &str, session: Session, vars: Variables) -> Result<Responses, Error> {
// Create a new query options
let mut opt = Options::default();
// Create a new query executor
let mut exe = Executor::new();
// Create a new execution context
@ -18,12 +22,14 @@ pub async fn execute(txt: &str, session: Session, vars: Variables) -> Result<Res
// Parse the SQL query text
let ast = sql::parse(txt)?;
// Process all statements
exe.ns = session.ns;
exe.db = session.db;
exe.execute(ctx, ast).await
opt.ns = session.ns.map(Arc::new);
opt.db = session.db.map(Arc::new);
exe.execute(ctx, opt, ast).await
}
pub async fn process(ast: Query, session: Session, vars: Variables) -> Result<Responses, Error> {
// Create a new query options
let mut opt = Options::default();
// Create a new query executor
let mut exe = Executor::new();
// Store session info on context
@ -31,18 +37,20 @@ pub async fn process(ast: Query, session: Session, vars: Variables) -> Result<Re
// Attach the defined variables
let ctx = vars.attach(ctx);
// Process all statements
exe.ns = session.ns;
exe.db = session.db;
exe.execute(ctx, ast).await
opt.ns = session.ns.map(Arc::new);
opt.db = session.db.map(Arc::new);
exe.execute(ctx, opt, ast).await
}
pub async fn export(session: Session, sender: Sender) -> Result<(), Error> {
// Create a new query options
let mut opt = Options::default();
// Create a new query executor
let mut exe = Executor::new();
// Create a new execution context
let ctx = session.context();
// Process database export
exe.ns = session.ns;
exe.db = session.db;
exe.export(ctx, sender).await
opt.ns = session.ns.map(Arc::new);
opt.db = session.db.map(Arc::new);
exe.export(ctx, opt, sender).await
}

View file

@ -1,7 +1,6 @@
use crate::ctx::Context;
use crate::dbs::response::{Response, Responses, Status};
use crate::dbs::Auth;
use crate::dbs::Level;
use crate::dbs::Options;
use crate::dbs::Runtime;
use crate::err::Error;
@ -16,38 +15,19 @@ use std::time::Instant;
#[derive(Default)]
pub struct Executor<'a> {
pub id: Option<String>,
pub ns: Option<String>,
pub db: Option<String>,
pub txn: Option<Arc<Mutex<Transaction<'a>>>>,
pub err: Option<Error>,
txn: Option<Arc<Mutex<Transaction<'a>>>>,
err: Option<Error>,
}
impl<'a> Executor<'a> {
pub fn new() -> Executor<'a> {
Executor {
id: None,
ns: None,
db: None,
..Executor::default()
}
}
pub fn check(&self, opt: &Options, level: Level) -> Result<(), Error> {
if opt.auth.check(level) == false {
return Err(Error::QueryPermissionsError);
}
if self.ns.is_none() {
return Err(Error::NsError);
}
if self.db.is_none() {
return Err(Error::DbError);
}
Ok(())
}
async fn begin(&mut self) -> bool {
match self.txn {
match &self.txn {
Some(_) => false,
None => match transaction(true, false).await {
Ok(v) => {
@ -65,15 +45,25 @@ impl<'a> Executor<'a> {
async fn commit(&mut self, local: bool) {
if local {
match &self.txn {
Some(txn) => {
let txn = txn.clone();
let mut txn = txn.lock().await;
if let Err(e) = txn.commit().await {
self.err = Some(e);
Some(txn) => match &self.err {
Some(_) => {
let txn = txn.clone();
let mut txn = txn.lock().await;
if let Err(e) = txn.cancel().await {
self.err = Some(e);
}
self.txn = None;
}
self.txn = None;
}
None => unreachable!(),
None => {
let txn = txn.clone();
let mut txn = txn.lock().await;
if let Err(e) = txn.commit().await {
self.err = Some(e);
}
self.txn = None;
}
},
None => (),
}
}
}
@ -94,13 +84,6 @@ impl<'a> Executor<'a> {
}
}
async fn finish(&mut self, res: &Result<Value, Error>, local: bool) {
match res {
Ok(_) => self.commit(local).await,
Err(_) => self.cancel(local).await,
}
}
fn buf_cancel(&self, v: Response) -> Response {
Response {
sql: v.sql,
@ -127,13 +110,16 @@ impl<'a> Executor<'a> {
}
}
pub async fn execute(&mut self, mut ctx: Runtime, qry: Query) -> Result<Responses, Error> {
pub async fn execute(
&mut self,
mut ctx: Runtime,
mut opt: Options,
qry: Query,
) -> Result<Responses, Error> {
// Initialise buffer of responses
let mut buf: Vec<Response> = vec![];
// Initialise array of responses
let mut out: Vec<Response> = vec![];
// Create a new options
let mut opt = Options::new(&Auth::No);
// Process all statements in query
for stm in qry.statements().iter() {
// Log the statement
@ -159,39 +145,56 @@ impl<'a> Executor<'a> {
continue;
}
// Begin a new transaction
Statement::Begin(stm) => {
let res = stm.compute(&ctx, &opt, self, None).await;
if res.is_err() {
self.err = res.err()
};
Statement::Begin(_) => {
self.begin().await;
continue;
}
// Cancel a running transaction
Statement::Cancel(stm) => {
let res = stm.compute(&ctx, &opt, self, None).await;
if res.is_err() {
self.err = res.err()
};
Statement::Cancel(_) => {
self.cancel(true).await;
buf = buf.into_iter().map(|v| self.buf_cancel(v)).collect();
out.append(&mut buf);
self.txn = None;
continue;
}
// Commit a running transaction
Statement::Commit(stm) => {
let res = stm.compute(&ctx, &opt, self, None).await;
if res.is_err() {
self.err = res.err()
};
Statement::Commit(_) => {
self.commit(true).await;
buf = buf.into_iter().map(|v| self.buf_commit(v)).collect();
out.append(&mut buf);
self.txn = None;
continue;
}
// Commit a running transaction
// Switch to a different NS or DB
Statement::Use(stm) => {
let res = stm.compute(&ctx, &opt, self, None).await;
res
if let Some(ref ns) = stm.ns {
match &*opt.auth {
Auth::No => opt.ns = Some(Arc::new(ns.to_owned())),
Auth::Kv => opt.ns = Some(Arc::new(ns.to_owned())),
Auth::Ns(v) if v == ns => opt.ns = Some(Arc::new(ns.to_owned())),
_ => {
opt.ns = None;
return Err(Error::NsAuthenticationError {
ns: ns.to_owned(),
});
}
}
}
if let Some(ref db) = stm.db {
match &*opt.auth {
Auth::No => opt.db = Some(Arc::new(db.to_owned())),
Auth::Kv => opt.db = Some(Arc::new(db.to_owned())),
Auth::Ns(_) => opt.db = Some(Arc::new(db.to_owned())),
Auth::Db(_, v) if v == db => opt.db = Some(Arc::new(db.to_owned())),
_ => {
opt.db = None;
return Err(Error::DbAuthenticationError {
db: db.to_owned(),
});
}
}
}
Ok(Value::None)
}
// Process param definition statements
Statement::Set(stm) => {
@ -235,7 +238,10 @@ impl<'a> Executor<'a> {
None => res,
};
// Finalise transaction
self.finish(&res, loc).await;
match &res {
Ok(_) => self.commit(loc).await,
Err(_) => self.cancel(loc).await,
};
// Return the result
res
}

View file

@ -1,4 +1,5 @@
use crate::dbs::Executor;
use crate::dbs::Options;
use crate::dbs::Runtime;
use crate::err::Error;
use crate::kvs::transaction;
@ -12,7 +13,12 @@ macro_rules! output {
}
impl<'a> Executor<'a> {
pub async fn export(&mut self, ctx: Runtime, mut chn: Sender) -> Result<(), Error> {
pub async fn export(
&mut self,
ctx: Runtime,
opt: Options,
mut chn: Sender,
) -> Result<(), Error> {
// Start a new transaction
let txn = transaction(false, false).await?;
// Output OPTIONS

View file

@ -15,7 +15,7 @@ impl Value {
pub async fn iterate(
self,
ctx: &Runtime,
opt: &Options<'_>,
opt: &Options,
exe: &Executor<'_>,
ite: &mut Iterator<'_>,
) -> Result<(), Error> {
@ -35,7 +35,7 @@ impl Array {
pub async fn iterate(
self,
ctx: &Runtime,
opt: &Options<'_>,
opt: &Options,
exe: &Executor<'_>,
ite: &mut Iterator<'_>,
) -> Result<(), Error> {
@ -56,7 +56,7 @@ impl Model {
pub async fn iterate(
self,
ctx: &Runtime,
opt: &Options<'_>,
opt: &Options,
exe: &Executor<'_>,
ite: &mut Iterator<'_>,
) -> Result<(), Error> {
@ -90,7 +90,7 @@ impl Thing {
pub async fn iterate(
self,
ctx: &Runtime,
opt: &Options<'_>,
opt: &Options,
exe: &Executor<'_>,
ite: &mut Iterator<'_>,
) -> Result<(), Error> {
@ -102,7 +102,7 @@ impl Table {
pub async fn iterate(
self,
ctx: &Runtime,
opt: &Options<'_>,
opt: &Options,
exe: &Executor<'_>,
ite: &mut Iterator<'_>,
) -> Result<(), Error> {

View file

@ -63,10 +63,11 @@ impl<'a> Iterator<'a> {
}))
}
// Process the records and output
pub async fn output(
&mut self,
ctx: &Runtime,
opt: &Options<'_>,
opt: &Options,
exe: &Executor<'_>,
) -> Result<Value, Error> {
// Log the statement
@ -96,35 +97,35 @@ impl<'a> Iterator<'a> {
}
#[inline]
fn output_split(&mut self, ctx: &Runtime, opt: &Options<'_>, exe: &Executor) {
fn output_split(&mut self, ctx: &Runtime, opt: &Options, exe: &Executor) {
if self.split.is_some() {
// Ignore
}
}
#[inline]
fn output_group(&mut self, ctx: &Runtime, opt: &Options<'_>, exe: &Executor) {
fn output_group(&mut self, ctx: &Runtime, opt: &Options, exe: &Executor) {
if self.group.is_some() {
// Ignore
}
}
#[inline]
fn output_order(&mut self, ctx: &Runtime, opt: &Options<'_>, exe: &Executor) {
fn output_order(&mut self, ctx: &Runtime, opt: &Options, exe: &Executor) {
if self.order.is_some() {
// Ignore
}
}
#[inline]
fn output_start(&mut self, ctx: &Runtime, opt: &Options<'_>, exe: &Executor) {
fn output_start(&mut self, ctx: &Runtime, opt: &Options, exe: &Executor) {
if let Some(v) = self.start {
self.results = mem::take(&mut self.results).into_iter().skip(v.0).collect();
}
}
#[inline]
fn output_limit(&mut self, ctx: &Runtime, opt: &Options<'_>, exe: &Executor) {
fn output_limit(&mut self, ctx: &Runtime, opt: &Options, exe: &Executor) {
if let Some(v) = self.limit {
self.results = mem::take(&mut self.results).into_iter().take(v.0).collect();
}
@ -133,7 +134,7 @@ impl<'a> Iterator<'a> {
async fn iterate(
&mut self,
ctx: &Runtime,
opt: &Options<'_>,
opt: &Options,
exe: &Executor<'_>,
) -> Result<(), Error> {
match self.parallel {
@ -167,7 +168,7 @@ impl<'a> Iterator<'a> {
pub async fn process(
&mut self,
ctx: &Runtime,
opt: &Options<'_>,
opt: &Options,
exe: &Executor<'_>,
thg: Option<Thing>,
val: Value,

View file

@ -1,7 +1,8 @@
use crate::cnf;
use crate::dbs::Auth;
use crate::dbs::Level;
use crate::err::Error;
use crate::sql::version::Version;
use std::sync::Arc;
// An Options is passed around when processing a set of query
// statements. An Options contains specific information for how
@ -11,29 +12,41 @@ use crate::sql::version::Version;
// when importing data, where these queries might fail).
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct Options<'a> {
pub auth: &'a Auth,
pub dive: usize, // How many subqueries have we gone into?
pub debug: bool, // Should we debug query response SQL?
pub force: bool, // Should we force tables/events to re-run?
pub fields: bool, // Should we process field queries?
pub events: bool, // Should we process event queries?
pub tables: bool, // Should we process table queries?
pub futures: bool, // Should we process function futures?
pub version: Option<&'a Version>, // Current
pub struct Options {
// Currently selected NS
pub ns: Option<Arc<String>>,
// Currently selected DB
pub db: Option<Arc<String>>,
// Connection authentication data
pub auth: Arc<Auth>,
// How many subqueries have we gone into?
pub dive: usize,
// Should we debug query response SQL?
pub debug: bool,
// Should we force tables/events to re-run?
pub force: bool,
// Should we process field queries?
pub fields: bool,
// Should we process event queries?
pub events: bool,
// Should we process table queries?
pub tables: bool,
// Should we process function futures?
pub futures: bool,
}
impl<'a> Default for Options<'a> {
impl Default for Options {
fn default() -> Self {
Options::new(&Auth::No)
Options::new(Auth::No)
}
}
impl<'a> Options<'a> {
impl Options {
// Create a new Options object
pub fn new(auth: &'a Auth) -> Options<'a> {
pub fn new(auth: Auth) -> Options {
Options {
auth,
ns: None,
db: None,
dive: 0,
debug: false,
force: false,
@ -41,14 +54,27 @@ impl<'a> Options<'a> {
events: true,
tables: true,
futures: false,
version: None,
auth: Arc::new(auth),
}
}
// Get currently selected NS
pub fn ns(&self) -> &String {
self.ns.as_ref().unwrap()
}
// Get currently selected DB
pub fn db(&self) -> &String {
self.db.as_ref().unwrap()
}
// Create a new Options object for a subquery
pub fn dive(&self) -> Result<Options<'a>, Error> {
pub fn dive(&self) -> Result<Options, Error> {
if self.dive < cnf::MAX_RECURSIVE_QUERIES {
Ok(Options {
auth: self.auth.clone(),
ns: self.ns.clone(),
db: self.db.clone(),
dive: self.dive + 1,
..*self
})
@ -60,48 +86,66 @@ impl<'a> Options<'a> {
}
// Create a new Options object for a subquery
pub fn debug(&self, v: bool) -> Options<'a> {
pub fn debug(&self, v: bool) -> Options {
Options {
auth: self.auth.clone(),
ns: self.ns.clone(),
db: self.db.clone(),
debug: v,
..*self
}
}
// Create a new Options object for a subquery
pub fn force(&self, v: bool) -> Options<'a> {
pub fn force(&self, v: bool) -> Options {
Options {
auth: self.auth.clone(),
ns: self.ns.clone(),
db: self.db.clone(),
force: v,
..*self
}
}
// Create a new Options object for a subquery
pub fn fields(&self, v: bool) -> Options<'a> {
pub fn fields(&self, v: bool) -> Options {
Options {
auth: self.auth.clone(),
ns: self.ns.clone(),
db: self.db.clone(),
fields: v,
..*self
}
}
// Create a new Options object for a subquery
pub fn events(&self, v: bool) -> Options<'a> {
pub fn events(&self, v: bool) -> Options {
Options {
auth: self.auth.clone(),
ns: self.ns.clone(),
db: self.db.clone(),
events: v,
..*self
}
}
// Create a new Options object for a subquery
pub fn tables(&self, v: bool) -> Options<'a> {
pub fn tables(&self, v: bool) -> Options {
Options {
auth: self.auth.clone(),
ns: self.ns.clone(),
db: self.db.clone(),
tables: v,
..*self
}
}
// Create a new Options object for a subquery
pub fn import(&self, v: bool) -> Options<'a> {
pub fn import(&self, v: bool) -> Options {
Options {
auth: self.auth.clone(),
ns: self.ns.clone(),
db: self.db.clone(),
fields: v,
events: v,
tables: v,
@ -110,18 +154,27 @@ impl<'a> Options<'a> {
}
// Create a new Options object for a subquery
pub fn futures(&self, v: bool) -> Options<'a> {
pub fn futures(&self, v: bool) -> Options {
Options {
auth: self.auth.clone(),
ns: self.ns.clone(),
db: self.db.clone(),
futures: v,
..*self
}
}
// Create a new Options object for a subquery
pub fn version(&self, v: Option<&'a Version>) -> Options<'a> {
Options {
version: v,
..*self
// Check whether the authentication permissions are ok
pub fn check(&self, level: Level) -> Result<(), Error> {
if self.auth.check(level) == false {
return Err(Error::QueryPermissionsError);
}
if self.ns.is_none() {
return Err(Error::NsError);
}
if self.db.is_none() {
return Err(Error::DbError);
}
Ok(())
}
}

View file

@ -3,7 +3,7 @@ use crate::dbs::executor::Executor;
use crate::dbs::Options;
use crate::dbs::Runtime;
pub fn mock<'a>() -> (Runtime, Options<'a>, Executor<'a>) {
pub fn mock<'a>() -> (Runtime, Options, Executor<'a>) {
let ctx = Context::default().freeze();
let opt = Options::default();
let exe = Executor::new();

View file

@ -9,7 +9,7 @@ impl Document {
pub async fn admit(
&self,
_ctx: &Runtime,
_opt: &Options<'_>,
_opt: &Options,
_exe: &Executor<'_>,
stm: &Statement<'_>,
) -> Result<(), Error> {

View file

@ -9,7 +9,7 @@ impl Document {
pub async fn allow(
&self,
_ctx: &Runtime,
_opt: &Options<'_>,
_opt: &Options,
_exe: &Executor<'_>,
_stm: &Statement<'_>,
) -> Result<(), Error> {

View file

@ -9,7 +9,7 @@ impl Document {
pub async fn check(
&self,
ctx: &Runtime,
opt: &Options<'_>,
opt: &Options,
exe: &Executor<'_>,
stm: &Statement<'_>,
) -> Result<(), Error> {

View file

@ -10,7 +10,7 @@ impl Document {
pub async fn compute(
&mut self,
ctx: &Runtime,
opt: &Options<'_>,
opt: &Options,
exe: &Executor<'_>,
stm: &Statement<'_>,
) -> Result<Value, Error> {

View file

@ -10,7 +10,7 @@ impl Document {
pub async fn create(
&mut self,
ctx: &Runtime,
opt: &Options<'_>,
opt: &Options,
exe: &Executor<'_>,
stm: &Statement<'_>,
) -> Result<Value, Error> {

View file

@ -10,7 +10,7 @@ impl Document {
pub async fn delete(
&mut self,
ctx: &Runtime,
opt: &Options<'_>,
opt: &Options,
exe: &Executor<'_>,
stm: &Statement<'_>,
) -> Result<Value, Error> {

View file

@ -9,7 +9,7 @@ impl Document {
pub async fn erase(
&mut self,
ctx: &Runtime,
opt: &Options<'_>,
opt: &Options,
exe: &Executor<'_>,
_stm: &Statement<'_>,
) -> Result<(), Error> {

View file

@ -9,7 +9,7 @@ impl Document {
pub async fn event(
&self,
_ctx: &Runtime,
_opt: &Options<'_>,
_opt: &Options,
_exe: &Executor<'_>,
_stm: &Statement<'_>,
) -> Result<(), Error> {

View file

@ -9,7 +9,7 @@ impl Document {
pub async fn index(
&self,
_ctx: &Runtime,
_opt: &Options<'_>,
_opt: &Options,
_exe: &Executor<'_>,
_stm: &Statement<'_>,
) -> Result<(), Error> {

View file

@ -10,7 +10,7 @@ impl Document {
pub async fn insert(
&mut self,
_ctx: &Runtime,
_opt: &Options<'_>,
_opt: &Options,
_exe: &Executor<'_>,
_stm: &Statement<'_>,
) -> Result<Value, Error> {

View file

@ -9,7 +9,7 @@ impl Document {
pub async fn lives(
&self,
_ctx: &Runtime,
_opt: &Options<'_>,
_opt: &Options,
_exe: &Executor<'_>,
_stm: &Statement<'_>,
) -> Result<(), Error> {

View file

@ -12,7 +12,7 @@ impl Document {
pub async fn merge(
&mut self,
ctx: &Runtime,
opt: &Options<'_>,
opt: &Options,
exe: &Executor<'_>,
stm: &Statement<'_>,
) -> Result<(), Error> {

View file

@ -13,7 +13,7 @@ impl Document {
pub async fn pluck(
&self,
ctx: &Runtime,
opt: &Options<'_>,
opt: &Options,
exe: &Executor<'_>,
stm: &Statement<'_>,
) -> Result<Value, Error> {

View file

@ -10,7 +10,7 @@ impl Document {
pub async fn relate(
&mut self,
ctx: &Runtime,
opt: &Options<'_>,
opt: &Options,
exe: &Executor<'_>,
stm: &Statement<'_>,
) -> Result<Value, Error> {

View file

@ -10,7 +10,7 @@ impl Document {
pub async fn select(
&self,
ctx: &Runtime,
opt: &Options<'_>,
opt: &Options,
exe: &Executor<'_>,
stm: &Statement<'_>,
) -> Result<Value, Error> {

View file

@ -9,7 +9,7 @@ impl Document {
pub async fn store(
&self,
_ctx: &Runtime,
_opt: &Options<'_>,
_opt: &Options,
_exe: &Executor<'_>,
_stm: &Statement<'_>,
) -> Result<(), Error> {

View file

@ -9,7 +9,7 @@ impl Document {
pub async fn table(
&self,
_ctx: &Runtime,
_opt: &Options<'_>,
_opt: &Options,
_exe: &Executor<'_>,
_stm: &Statement<'_>,
) -> Result<(), Error> {

View file

@ -10,7 +10,7 @@ impl Document {
pub async fn update(
&mut self,
ctx: &Runtime,
opt: &Options<'_>,
opt: &Options,
exe: &Executor<'_>,
stm: &Statement<'_>,
) -> Result<Value, Error> {

View file

@ -116,7 +116,7 @@ impl Array {
pub async fn compute(
&self,
ctx: &Runtime,
opt: &Options<'_>,
opt: &Options,
exe: &Executor<'_>,
doc: Option<&Value>,
) -> Result<Value, Error> {

View file

@ -31,7 +31,7 @@ impl Expression {
pub async fn compute(
&self,
ctx: &Runtime,
opt: &Options<'_>,
opt: &Options,
exe: &Executor<'_>,
doc: Option<&Value>,
) -> Result<Value, Error> {

View file

@ -34,7 +34,7 @@ impl Function {
pub async fn compute(
&self,
ctx: &Runtime,
opt: &Options<'_>,
opt: &Options,
exe: &Executor<'_>,
doc: Option<&Value>,
) -> Result<Value, Error> {

View file

@ -71,7 +71,7 @@ impl Idiom {
pub async fn compute(
&self,
ctx: &Runtime,
opt: &Options<'_>,
opt: &Options,
exe: &Executor<'_>,
doc: Option<&Value>,
) -> Result<Value, Error> {

View file

@ -95,7 +95,7 @@ impl Object {
pub async fn compute(
&self,
ctx: &Runtime,
opt: &Options<'_>,
opt: &Options,
exe: &Executor<'_>,
doc: Option<&Value>,
) -> Result<Value, Error> {

View file

@ -29,7 +29,7 @@ impl Param {
pub async fn compute(
&self,
ctx: &Runtime,
opt: &Options<'_>,
opt: &Options,
exe: &Executor<'_>,
doc: Option<&Value>,
) -> Result<Value, Error> {

View file

@ -108,7 +108,7 @@ impl Statement {
pub async fn compute(
&self,
ctx: &Runtime,
opt: &Options<'_>,
opt: &Options,
exe: &Executor<'_>,
doc: Option<&Value>,
) -> Result<Value, Error> {

View file

@ -1,42 +1,15 @@
use crate::dbs::Executor;
use crate::dbs::Options;
use crate::dbs::Runtime;
use crate::err::Error;
use crate::kvs::transaction;
use crate::sql::comment::shouldbespace;
use crate::sql::error::IResult;
use crate::sql::value::Value;
use futures::lock::Mutex;
use nom::branch::alt;
use nom::bytes::complete::tag_no_case;
use nom::combinator::opt;
use nom::sequence::tuple;
use serde::{Deserialize, Serialize};
use std::fmt;
use std::sync::Arc;
#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
pub struct BeginStatement;
impl BeginStatement {
pub async fn compute(
&self,
_ctx: &Runtime,
_opt: &Options<'_>,
exe: &mut Executor<'_>,
_doc: Option<&Value>,
) -> Result<Value, Error> {
match &exe.txn {
Some(_) => Ok(Value::None),
None => {
let txn = transaction(true, false).await?;
exe.txn = Some(Arc::new(Mutex::new(txn)));
Ok(Value::None)
}
}
}
}
impl fmt::Display for BeginStatement {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "BEGIN TRANSACTION")

View file

@ -1,10 +1,5 @@
use crate::dbs::Executor;
use crate::dbs::Options;
use crate::dbs::Runtime;
use crate::err::Error;
use crate::sql::comment::shouldbespace;
use crate::sql::error::IResult;
use crate::sql::value::Value;
use nom::branch::alt;
use nom::bytes::complete::tag_no_case;
use nom::combinator::opt;
@ -15,28 +10,6 @@ use std::fmt;
#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
pub struct CancelStatement;
impl CancelStatement {
pub async fn compute(
&self,
_ctx: &Runtime,
_opt: &Options<'_>,
exe: &Executor<'_>,
_doc: Option<&Value>,
) -> Result<Value, Error> {
match &exe.txn {
Some(txn) => {
let txn = txn.clone();
let mut txn = txn.lock().await;
match txn.cancel().await {
Ok(_) => Ok(Value::None),
Err(e) => Err(e),
}
}
None => Ok(Value::None),
}
}
}
impl fmt::Display for CancelStatement {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "CANCEL TRANSACTION")

View file

@ -1,10 +1,5 @@
use crate::dbs::Executor;
use crate::dbs::Options;
use crate::dbs::Runtime;
use crate::err::Error;
use crate::sql::comment::shouldbespace;
use crate::sql::error::IResult;
use crate::sql::value::Value;
use nom::branch::alt;
use nom::bytes::complete::tag_no_case;
use nom::combinator::opt;
@ -15,38 +10,6 @@ use std::fmt;
#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
pub struct CommitStatement;
impl CommitStatement {
pub async fn compute(
&self,
_ctx: &Runtime,
_opt: &Options<'_>,
exe: &Executor<'_>,
_doc: Option<&Value>,
) -> Result<Value, Error> {
match &exe.txn {
Some(txn) => match &exe.err {
Some(_) => {
let txn = txn.clone();
let mut txn = txn.lock().await;
match txn.cancel().await {
Ok(_) => Ok(Value::None),
Err(e) => Err(e),
}
}
None => {
let txn = txn.clone();
let mut txn = txn.lock().await;
match txn.commit().await {
Ok(_) => Ok(Value::None),
Err(e) => Err(e),
}
}
},
None => Ok(Value::None),
}
}
}
impl fmt::Display for CommitStatement {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "COMMIT TRANSACTION")

View file

@ -32,12 +32,12 @@ impl CreateStatement {
pub async fn compute(
&self,
ctx: &Runtime,
opt: &Options<'_>,
opt: &Options,
exe: &Executor<'_>,
doc: Option<&Value>,
) -> Result<Value, Error> {
// Allowed to run?
exe.check(opt, Level::No)?;
opt.check(Level::No)?;
// Create a new iterator
let mut i = Iterator::new();
// Pass in current statement

View file

@ -42,7 +42,7 @@ impl DefineStatement {
pub async fn compute(
&self,
ctx: &Runtime,
opt: &Options<'_>,
opt: &Options,
exe: &Executor<'_>,
doc: Option<&Value>,
) -> Result<Value, Error> {
@ -103,12 +103,12 @@ impl DefineNamespaceStatement {
pub async fn compute(
&self,
_ctx: &Runtime,
opt: &Options<'_>,
opt: &Options,
exe: &Executor<'_>,
_doc: Option<&Value>,
) -> Result<Value, Error> {
// Allowed to run?
exe.check(opt, Level::Kv)?;
opt.check(Level::Kv)?;
// Continue
todo!()
}
@ -147,12 +147,12 @@ impl DefineDatabaseStatement {
pub async fn compute(
&self,
_ctx: &Runtime,
opt: &Options<'_>,
opt: &Options,
exe: &Executor<'_>,
_doc: Option<&Value>,
) -> Result<Value, Error> {
// Allowed to run?
exe.check(opt, Level::Ns)?;
opt.check(Level::Ns)?;
// Continue
todo!()
}
@ -196,14 +196,14 @@ impl DefineLoginStatement {
pub async fn compute(
&self,
_ctx: &Runtime,
opt: &Options<'_>,
opt: &Options,
exe: &Executor<'_>,
_doc: Option<&Value>,
) -> Result<Value, Error> {
// Allowed to run?
match self.base {
Base::Ns => exe.check(opt, Level::Kv)?,
Base::Db => exe.check(opt, Level::Ns)?,
Base::Ns => opt.check(Level::Kv)?,
Base::Db => opt.check(Level::Ns)?,
_ => unreachable!(),
}
// Continue
@ -294,14 +294,14 @@ impl DefineTokenStatement {
pub async fn compute(
&self,
_ctx: &Runtime,
opt: &Options<'_>,
opt: &Options,
exe: &Executor<'_>,
_doc: Option<&Value>,
) -> Result<Value, Error> {
// Allowed to run?
match self.base {
Base::Ns => exe.check(opt, Level::Kv)?,
Base::Db => exe.check(opt, Level::Ns)?,
Base::Ns => opt.check(Level::Kv)?,
Base::Db => opt.check(Level::Ns)?,
_ => unreachable!(),
}
// Continue
@ -369,12 +369,12 @@ impl DefineScopeStatement {
pub async fn compute(
&self,
_ctx: &Runtime,
opt: &Options<'_>,
opt: &Options,
exe: &Executor<'_>,
_doc: Option<&Value>,
) -> Result<Value, Error> {
// Allowed to run?
exe.check(opt, Level::Db)?;
opt.check(Level::Db)?;
// Continue
todo!()
}
@ -492,12 +492,12 @@ impl DefineTableStatement {
pub async fn compute(
&self,
_ctx: &Runtime,
opt: &Options<'_>,
opt: &Options,
exe: &Executor<'_>,
_doc: Option<&Value>,
) -> Result<Value, Error> {
// Allowed to run?
exe.check(opt, Level::Db)?;
opt.check(Level::Db)?;
// Continue
todo!()
}
@ -623,12 +623,12 @@ impl DefineEventStatement {
pub async fn compute(
&self,
_ctx: &Runtime,
opt: &Options<'_>,
opt: &Options,
exe: &Executor<'_>,
_doc: Option<&Value>,
) -> Result<Value, Error> {
// Allowed to run?
exe.check(opt, Level::Db)?;
opt.check(Level::Db)?;
// Continue
todo!()
}
@ -697,12 +697,12 @@ impl DefineFieldStatement {
pub async fn compute(
&self,
_ctx: &Runtime,
opt: &Options<'_>,
opt: &Options,
exe: &Executor<'_>,
_doc: Option<&Value>,
) -> Result<Value, Error> {
// Allowed to run?
exe.check(opt, Level::Db)?;
opt.check(Level::Db)?;
// Continue
todo!()
}
@ -836,12 +836,12 @@ impl DefineIndexStatement {
pub async fn compute(
&self,
_ctx: &Runtime,
opt: &Options<'_>,
opt: &Options,
exe: &Executor<'_>,
_doc: Option<&Value>,
) -> Result<Value, Error> {
// Allowed to run?
exe.check(opt, Level::Db)?;
opt.check(Level::Db)?;
// Continue
todo!()
}

View file

@ -33,12 +33,12 @@ impl DeleteStatement {
pub async fn compute(
&self,
ctx: &Runtime,
opt: &Options<'_>,
opt: &Options,
exe: &Executor<'_>,
doc: Option<&Value>,
) -> Result<Value, Error> {
// Allowed to run?
exe.check(opt, Level::No)?;
opt.check(Level::No)?;
// Create a new iterator
let mut i = Iterator::new();
// Pass in current statement

View file

@ -22,7 +22,7 @@ impl IfelseStatement {
pub async fn compute(
&self,
ctx: &Runtime,
opt: &Options<'_>,
opt: &Options,
exe: &Executor<'_>,
doc: Option<&Value>,
) -> Result<Value, Error> {

View file

@ -24,16 +24,16 @@ impl InfoStatement {
pub async fn compute(
&self,
ctx: &Runtime,
opt: &Options<'_>,
opt: &Options,
exe: &Executor<'_>,
_doc: Option<&Value>,
) -> Result<Value, Error> {
// Allowed to run?
match self {
InfoStatement::Namespace => exe.check(opt, Level::Ns)?,
InfoStatement::Database => exe.check(opt, Level::Db)?,
InfoStatement::Scope(_) => exe.check(opt, Level::Db)?,
InfoStatement::Table(_) => exe.check(opt, Level::Db)?,
InfoStatement::Namespace => opt.check(Level::Ns)?,
InfoStatement::Database => opt.check(Level::Db)?,
InfoStatement::Scope(_) => opt.check(Level::Db)?,
InfoStatement::Table(_) => opt.check(Level::Db)?,
}
// Continue
todo!()

View file

@ -36,12 +36,12 @@ impl InsertStatement {
pub async fn compute(
&self,
ctx: &Runtime,
opt: &Options<'_>,
opt: &Options,
exe: &Executor<'_>,
doc: Option<&Value>,
) -> Result<Value, Error> {
// Allowed to run?
exe.check(opt, Level::No)?;
opt.check(Level::No)?;
// Create a new iterator
let mut i = Iterator::new();
// Pass in current statement

View file

@ -19,7 +19,7 @@ impl KillStatement {
pub async fn compute(
&self,
_ctx: &Runtime,
_opt: &Options<'_>,
_opt: &Options,
_exe: &Executor<'_>,
_doc: Option<&Value>,
) -> Result<Value, Error> {

View file

@ -29,7 +29,7 @@ impl LiveStatement {
pub async fn compute(
&self,
_ctx: &Runtime,
_opt: &Options<'_>,
_opt: &Options,
_exe: &Executor<'_>,
_doc: Option<&Value>,
) -> Result<Value, Error> {

View file

@ -1,13 +1,7 @@
use crate::dbs::Executor;
use crate::dbs::Level;
use crate::dbs::Options;
use crate::dbs::Runtime;
use crate::err::Error;
use crate::sql::comment::mightbespace;
use crate::sql::comment::shouldbespace;
use crate::sql::error::IResult;
use crate::sql::ident::{ident, Ident};
use crate::sql::value::Value;
use nom::branch::alt;
use nom::bytes::complete::tag;
use nom::bytes::complete::tag_no_case;
@ -22,21 +16,6 @@ pub struct OptionStatement {
pub what: bool,
}
impl OptionStatement {
pub async fn compute(
&self,
_ctx: &Runtime,
opt: &Options<'_>,
exe: &Executor<'_>,
_doc: Option<&Value>,
) -> Result<Value, Error> {
// Allowed to run?
exe.check(opt, Level::Db)?;
// Return nothing
Ok(Value::None)
}
}
impl fmt::Display for OptionStatement {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
if self.what {

View file

@ -18,7 +18,7 @@ impl OutputStatement {
pub async fn compute(
&self,
ctx: &Runtime,
opt: &Options<'_>,
opt: &Options,
exe: &Executor<'_>,
doc: Option<&Value>,
) -> Result<Value, Error> {

View file

@ -39,12 +39,12 @@ impl RelateStatement {
pub async fn compute(
&self,
ctx: &Runtime,
opt: &Options<'_>,
opt: &Options,
exe: &Executor<'_>,
doc: Option<&Value>,
) -> Result<Value, Error> {
// Allowed to run?
exe.check(opt, Level::No)?;
opt.check(Level::No)?;
// Create a new iterator
let mut i = Iterator::new();
// Pass in current statement

View file

@ -32,7 +32,7 @@ impl RemoveStatement {
pub async fn compute(
&self,
ctx: &Runtime,
opt: &Options<'_>,
opt: &Options,
exe: &Executor<'_>,
doc: Option<&Value>,
) -> Result<Value, Error> {
@ -93,12 +93,12 @@ impl RemoveNamespaceStatement {
pub async fn compute(
&self,
_ctx: &Runtime,
opt: &Options<'_>,
opt: &Options,
exe: &Executor<'_>,
_doc: Option<&Value>,
) -> Result<Value, Error> {
// Allowed to run?
exe.check(opt, Level::Kv)?;
opt.check(Level::Kv)?;
// Continue
todo!()
}
@ -137,12 +137,12 @@ impl RemoveDatabaseStatement {
pub async fn compute(
&self,
_ctx: &Runtime,
opt: &Options<'_>,
opt: &Options,
exe: &Executor<'_>,
_doc: Option<&Value>,
) -> Result<Value, Error> {
// Allowed to run?
exe.check(opt, Level::Ns)?;
opt.check(Level::Ns)?;
// Continue
todo!()
}
@ -182,14 +182,14 @@ impl RemoveLoginStatement {
pub async fn compute(
&self,
_ctx: &Runtime,
opt: &Options<'_>,
opt: &Options,
exe: &Executor<'_>,
_doc: Option<&Value>,
) -> Result<Value, Error> {
// Allowed to run?
match self.base {
Base::Ns => exe.check(opt, Level::Kv)?,
Base::Db => exe.check(opt, Level::Ns)?,
Base::Ns => opt.check(Level::Kv)?,
Base::Db => opt.check(Level::Ns)?,
_ => unreachable!(),
}
// Continue
@ -236,14 +236,14 @@ impl RemoveTokenStatement {
pub async fn compute(
&self,
_ctx: &Runtime,
opt: &Options<'_>,
opt: &Options,
exe: &Executor<'_>,
_doc: Option<&Value>,
) -> Result<Value, Error> {
// Allowed to run?
match self.base {
Base::Ns => exe.check(opt, Level::Kv)?,
Base::Db => exe.check(opt, Level::Ns)?,
Base::Ns => opt.check(Level::Kv)?,
Base::Db => opt.check(Level::Ns)?,
_ => unreachable!(),
}
// Continue
@ -289,12 +289,12 @@ impl RemoveScopeStatement {
pub async fn compute(
&self,
_ctx: &Runtime,
opt: &Options<'_>,
opt: &Options,
exe: &Executor<'_>,
_doc: Option<&Value>,
) -> Result<Value, Error> {
// Allowed to run?
exe.check(opt, Level::Db)?;
opt.check(Level::Db)?;
// Continue
todo!()
}
@ -333,12 +333,12 @@ impl RemoveTableStatement {
pub async fn compute(
&self,
_ctx: &Runtime,
opt: &Options<'_>,
opt: &Options,
exe: &Executor<'_>,
_doc: Option<&Value>,
) -> Result<Value, Error> {
// Allowed to run?
exe.check(opt, Level::Db)?;
opt.check(Level::Db)?;
// Continue
todo!()
}
@ -378,12 +378,12 @@ impl RemoveEventStatement {
pub async fn compute(
&self,
_ctx: &Runtime,
opt: &Options<'_>,
opt: &Options,
exe: &Executor<'_>,
_doc: Option<&Value>,
) -> Result<Value, Error> {
// Allowed to run?
exe.check(opt, Level::Db)?;
opt.check(Level::Db)?;
// Continue
todo!()
}
@ -429,12 +429,12 @@ impl RemoveFieldStatement {
pub async fn compute(
&self,
_ctx: &Runtime,
opt: &Options<'_>,
opt: &Options,
exe: &Executor<'_>,
_doc: Option<&Value>,
) -> Result<Value, Error> {
// Allowed to run?
exe.check(opt, Level::Db)?;
opt.check(Level::Db)?;
// Continue
todo!()
}
@ -480,12 +480,12 @@ impl RemoveIndexStatement {
pub async fn compute(
&self,
_ctx: &Runtime,
opt: &Options<'_>,
opt: &Options,
exe: &Executor<'_>,
_doc: Option<&Value>,
) -> Result<Value, Error> {
// Allowed to run?
exe.check(opt, Level::Db)?;
opt.check(Level::Db)?;
// Continue
todo!()
}

View file

@ -67,12 +67,12 @@ impl SelectStatement {
pub async fn compute(
&self,
ctx: &Runtime,
opt: &Options<'_>,
opt: &Options,
exe: &Executor<'_>,
doc: Option<&Value>,
) -> Result<Value, Error> {
// Allowed to run?
exe.check(opt, Level::No)?;
opt.check(Level::No)?;
// Create a new iterator
let mut i = Iterator::new();
// Pass in current statement
@ -85,8 +85,6 @@ impl SelectStatement {
i.start = self.start.as_ref();
// Ensure futures are processed
let opt = &opt.futures(true);
// Specify the document version
let opt = &opt.version(self.version.as_ref());
// Loop over the select targets
for w in self.what.0.iter() {
let v = w.compute(ctx, opt, exe, doc).await?;

View file

@ -23,7 +23,7 @@ impl SetStatement {
pub async fn compute(
&self,
ctx: &Runtime,
opt: &Options<'_>,
opt: &Options,
exe: &Executor<'_>,
doc: Option<&Value>,
) -> Result<Value, Error> {

View file

@ -35,12 +35,12 @@ impl UpdateStatement {
pub async fn compute(
&self,
ctx: &Runtime,
opt: &Options<'_>,
opt: &Options,
exe: &Executor<'_>,
doc: Option<&Value>,
) -> Result<Value, Error> {
// Allowed to run?
exe.check(opt, Level::No)?;
opt.check(Level::No)?;
// Create a new iterator
let mut i = Iterator::new();
// Pass in current statement

View file

@ -20,45 +20,6 @@ pub struct UseStatement {
pub db: Option<String>,
}
impl UseStatement {
pub async fn compute(
&self,
_ctx: &Runtime,
opt: &Options<'_>,
exe: &mut Executor<'_>,
_doc: Option<&Value>,
) -> Result<Value, Error> {
if let Some(ns) = &self.ns {
match opt.auth {
Auth::No => exe.ns = Some(ns.to_owned()),
Auth::Kv => exe.ns = Some(ns.to_owned()),
Auth::Ns(v) if v == ns => exe.ns = Some(ns.to_owned()),
_ => {
exe.ns = None;
return Err(Error::NsAuthenticationError {
ns: ns.to_owned(),
});
}
}
}
if let Some(db) = &self.db {
match opt.auth {
Auth::No => exe.db = Some(db.to_owned()),
Auth::Kv => exe.db = Some(db.to_owned()),
Auth::Ns(_) => exe.db = Some(db.to_owned()),
Auth::Db(_, v) if v == db => exe.db = Some(db.to_owned()),
_ => {
exe.db = None;
return Err(Error::DbAuthenticationError {
db: db.to_owned(),
});
}
}
}
Ok(Value::None)
}
}
impl fmt::Display for UseStatement {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "USE")?;

View file

@ -42,7 +42,7 @@ impl Subquery {
pub async fn compute(
&self,
ctx: &Runtime,
opt: &Options<'_>,
opt: &Options,
exe: &Executor<'_>,
doc: Option<&Value>,
) -> Result<Value, Error> {

View file

@ -10,7 +10,7 @@ impl Value {
pub async fn array(
&mut self,
ctx: &Runtime,
opt: &Options<'_>,
opt: &Options,
exe: &Executor<'_>,
path: &Idiom,
) -> Result<(), Error> {

View file

@ -9,7 +9,7 @@ impl Value {
pub async fn clear(
&mut self,
_ctx: &Runtime,
_opt: &Options<'_>,
_opt: &Options,
_exe: &Executor<'_>,
) -> Result<(), Error> {
*self = Value::from(Object::default());

View file

@ -10,7 +10,7 @@ impl Value {
pub async fn decrement(
&mut self,
ctx: &Runtime,
opt: &Options<'_>,
opt: &Options,
exe: &Executor<'_>,
path: &Idiom,
val: Value,

View file

@ -24,7 +24,7 @@ impl Value {
pub async fn def(
&mut self,
ctx: &Runtime,
opt: &Options<'_>,
opt: &Options,
exe: &Executor<'_>,
val: Option<&Thing>,
) -> Result<(), Error> {

View file

@ -15,7 +15,7 @@ impl Value {
pub async fn del(
&mut self,
ctx: &Runtime,
opt: &Options<'_>,
opt: &Options,
exe: &Executor<'_>,
path: &Idiom,
) -> Result<(), Error> {

View file

@ -10,7 +10,7 @@ impl Value {
pub async fn first(
&self,
ctx: &Runtime,
opt: &Options<'_>,
opt: &Options,
exe: &Executor<'_>,
) -> Result<Self, Error> {
self.get(ctx, opt, exe, &Idiom::from(vec![Part::First])).await

View file

@ -15,7 +15,7 @@ impl Value {
pub async fn get(
&self,
ctx: &Runtime,
opt: &Options<'_>,
opt: &Options,
exe: &Executor<'_>,
path: &Idiom,
) -> Result<Self, Error> {

View file

@ -10,7 +10,7 @@ impl Value {
pub async fn increment(
&mut self,
ctx: &Runtime,
opt: &Options<'_>,
opt: &Options,
exe: &Executor<'_>,
path: &Idiom,
val: Value,

View file

@ -10,7 +10,7 @@ impl Value {
pub async fn last(
&self,
ctx: &Runtime,
opt: &Options<'_>,
opt: &Options,
exe: &Executor<'_>,
) -> Result<Self, Error> {
self.get(ctx, opt, exe, &Idiom::from(vec![Part::Last])).await

View file

@ -9,7 +9,7 @@ impl Value {
pub async fn merge(
&mut self,
ctx: &Runtime,
opt: &Options<'_>,
opt: &Options,
exe: &Executor<'_>,
val: &Object,
) -> Result<(), Error> {

View file

@ -10,7 +10,7 @@ impl Value {
pub async fn object(
&mut self,
ctx: &Runtime,
opt: &Options<'_>,
opt: &Options,
exe: &Executor<'_>,
path: &Idiom,
) -> Result<(), Error> {

View file

@ -10,7 +10,7 @@ impl Value {
pub async fn patch(
&mut self,
ctx: &Runtime,
opt: &Options<'_>,
opt: &Options,
exe: &Executor<'_>,
val: &Array,
) -> Result<(), Error> {

View file

@ -9,7 +9,7 @@ impl Value {
pub async fn replace(
&mut self,
_ctx: &Runtime,
_opt: &Options<'_>,
_opt: &Options,
_exe: &Executor<'_>,
val: &Object,
) -> Result<(), Error> {

View file

@ -14,7 +14,7 @@ impl Value {
pub async fn set(
&mut self,
ctx: &Runtime,
opt: &Options<'_>,
opt: &Options,
exe: &Executor<'_>,
path: &Idiom,
val: Value,

View file

@ -793,7 +793,7 @@ impl Value {
pub async fn compute(
&self,
ctx: &Runtime,
opt: &Options<'_>,
opt: &Options,
exe: &Executor<'_>,
doc: Option<&'async_recursion Value>,
) -> Result<Value, Error> {