diff --git a/src/dbs/executor.rs b/src/dbs/executor.rs index 8b05d252..f9e60504 100644 --- a/src/dbs/executor.rs +++ b/src/dbs/executor.rs @@ -5,6 +5,7 @@ use crate::dbs::Level; use crate::dbs::Options; use crate::dbs::Runtime; use crate::err::Error; +use crate::kvs::transaction; use crate::kvs::Transaction; use crate::sql::query::Query; use crate::sql::statement::Statement; @@ -51,6 +52,61 @@ impl<'a> Executor<'a> { todo!() } + async fn begin(&mut self) -> bool { + match self.txn { + Some(_) => false, + None => match transaction(true, false).await { + Ok(v) => { + self.txn = Some(Arc::new(Mutex::new(v))); + true + } + Err(e) => { + self.err = Some(e); + false + } + }, + } + } + + async fn commit(&mut self, local: bool) { + if local { + match &self.txn { + Some(txn) => { + let txn = txn.clone(); + let mut txn = txn.lock().await; + if let Err(e) = txn.commit().await { + self.err = Some(e); + } + self.txn = None; + } + None => unreachable!(), + } + } + } + + async fn cancel(&mut self, local: bool) { + if local { + match &self.txn { + Some(txn) => { + let txn = txn.clone(); + let mut txn = txn.lock().await; + if let Err(e) = txn.cancel().await { + self.err = Some(e); + } + self.txn = None; + } + None => unreachable!(), + } + } + } + + async fn finish(&mut self, res: &Result, local: bool) { + match res { + Ok(_) => self.commit(local).await, + Err(_) => self.cancel(local).await, + } + } + pub async fn execute(&mut self, mut ctx: Runtime, qry: Query) -> Result { // Initialise array of responses let mut out: Vec = vec![]; @@ -59,7 +115,7 @@ impl<'a> Executor<'a> { // Process all statements in query for stm in qry.statements().iter() { // Log the statement - debug!(target: NAME, "{}", stm); + debug!("{}", stm); // Reset errors if self.txn.is_none() { self.err = None; @@ -102,8 +158,7 @@ impl<'a> Executor<'a> { // Commit a running transaction Statement::Use(stm) => { let res = stm.compute(&ctx, &opt, self, None).await; - self.err = res.err(); - continue; + res } // Process param definition statements Statement::Set(stm) => { @@ -119,55 +174,60 @@ impl<'a> Executor<'a> { Ok(Value::None) } // Process all other normal statements - _ => { - // Enable context override - let mut ctx = Context::new(&ctx).freeze(); - // Specify statement timeout - if let Some(timeout) = stm.timeout() { - let mut new = Context::new(&ctx); - new.add_timeout(timeout); - ctx = new.freeze(); - } - // Process statement - let res = stm.compute(&ctx, &opt, self, None).await; - // Catch statement timeout - if let Some(timeout) = stm.timeout() { - if ctx.is_timedout() { - self.err = Some(Error::QueryTimeoutError { - timer: timeout, - }); + _ => match self.err { + // This transaction has failed + Some(_) => Err(Error::QueryExecutionError), + // Compute the statement normally + None => { + // Create a transaction + let loc = self.begin().await; + // Enable context override + let mut ctx = Context::new(&ctx).freeze(); + // Specify statement timeout + if let Some(timeout) = stm.timeout() { + let mut new = Context::new(&ctx); + new.add_timeout(timeout); + ctx = new.freeze(); } + // Process the statement + let res = stm.compute(&ctx, &opt, self, None).await; + // Catch statement timeout + let res = match stm.timeout() { + Some(timeout) => match ctx.is_timedout() { + true => Err(Error::QueryTimeoutError { + timer: timeout, + }), + false => res, + }, + None => res, + }; + // Finalise transaction + self.finish(&res, loc).await; + // Return the result + res } - // Continue with result - res - } + }, }; // Get the statement end time let dur = now.elapsed(); - // Check transaction errors - match &self.err { - Some(e) => out.push(Response { + // Buffer the returned result + match res { + Ok(v) => out.push(Response { time: format!("{:?}", dur), - status: String::from("ERR"), - detail: Some(format!("{}", e)), - result: None, + status: String::from("OK"), + detail: None, + result: v.output(), }), - None => { - // Format responses - match res { - Ok(v) => out.push(Response { - time: format!("{:?}", dur), - status: String::from("OK"), - detail: None, - result: v.output(), - }), - Err(e) => out.push(Response { - time: format!("{:?}", dur), - status: String::from("ERR"), - detail: Some(format!("{}", e)), - result: None, - }), - } + Err(e) => { + // Output the error + out.push(Response { + time: format!("{:?}", dur), + status: String::from("ERR"), + detail: Some(format!("{}", e)), + result: None, + }); + // Keep the error + self.err = Some(e); } } } diff --git a/src/err/mod.rs b/src/err/mod.rs index 5fdc65e9..b704dbea 100644 --- a/src/err/mod.rs +++ b/src/err/mod.rs @@ -70,6 +70,9 @@ pub enum Error { timer: Duration, }, + #[error("Query not executed due to failed transaction")] + QueryExecutionError, + #[error("You don't have permission to perform this query type")] QueryPermissionsError, diff --git a/src/sql/statements/commit.rs b/src/sql/statements/commit.rs index 83350b10..c9ba83a4 100644 --- a/src/sql/statements/commit.rs +++ b/src/sql/statements/commit.rs @@ -24,14 +24,24 @@ impl CommitStatement { _doc: Option<&Value>, ) -> Result { match &exe.txn { - Some(txn) => { - let txn = txn.clone(); - let mut txn = txn.lock().await; - match txn.commit().await { - Ok(_) => Ok(Value::None), - Err(e) => Err(e), + Some(txn) => match &exe.err { + Some(_) => { + let txn = txn.clone(); + let mut txn = txn.lock().await; + match txn.cancel().await { + Ok(_) => Ok(Value::None), + Err(e) => Err(e), + } } - } + None => { + let txn = txn.clone(); + let mut txn = txn.lock().await; + match txn.commit().await { + Ok(_) => Ok(Value::None), + Err(e) => Err(e), + } + } + }, None => Ok(Value::None), } } diff --git a/src/sql/value/value.rs b/src/sql/value/value.rs index 9f7df31c..99c35185 100644 --- a/src/sql/value/value.rs +++ b/src/sql/value/value.rs @@ -399,7 +399,6 @@ impl Value { match self { Value::None => None, Value::Void => None, - Value::Null => None, _ => Some(self), } }