Finish implementation of executor transaction logic

This commit is contained in:
Tobie Morgan Hitchcock 2022-01-26 13:57:17 +00:00
parent ca57df132e
commit 835018d5f4
4 changed files with 125 additions and 53 deletions

View file

@ -5,6 +5,7 @@ use crate::dbs::Level;
use crate::dbs::Options;
use crate::dbs::Runtime;
use crate::err::Error;
use crate::kvs::transaction;
use crate::kvs::Transaction;
use crate::sql::query::Query;
use crate::sql::statement::Statement;
@ -51,6 +52,61 @@ impl<'a> Executor<'a> {
todo!()
}
async fn begin(&mut self) -> bool {
match self.txn {
Some(_) => false,
None => match transaction(true, false).await {
Ok(v) => {
self.txn = Some(Arc::new(Mutex::new(v)));
true
}
Err(e) => {
self.err = Some(e);
false
}
},
}
}
async fn commit(&mut self, local: bool) {
if local {
match &self.txn {
Some(txn) => {
let txn = txn.clone();
let mut txn = txn.lock().await;
if let Err(e) = txn.commit().await {
self.err = Some(e);
}
self.txn = None;
}
None => unreachable!(),
}
}
}
async fn cancel(&mut self, local: bool) {
if local {
match &self.txn {
Some(txn) => {
let txn = txn.clone();
let mut txn = txn.lock().await;
if let Err(e) = txn.cancel().await {
self.err = Some(e);
}
self.txn = None;
}
None => unreachable!(),
}
}
}
async fn finish(&mut self, res: &Result<Value, Error>, local: bool) {
match res {
Ok(_) => self.commit(local).await,
Err(_) => self.cancel(local).await,
}
}
pub async fn execute(&mut self, mut ctx: Runtime, qry: Query) -> Result<Responses, Error> {
// Initialise array of responses
let mut out: Vec<Response> = vec![];
@ -59,7 +115,7 @@ impl<'a> Executor<'a> {
// Process all statements in query
for stm in qry.statements().iter() {
// Log the statement
debug!(target: NAME, "{}", stm);
debug!("{}", stm);
// Reset errors
if self.txn.is_none() {
self.err = None;
@ -102,8 +158,7 @@ impl<'a> Executor<'a> {
// Commit a running transaction
Statement::Use(stm) => {
let res = stm.compute(&ctx, &opt, self, None).await;
self.err = res.err();
continue;
res
}
// Process param definition statements
Statement::Set(stm) => {
@ -119,55 +174,60 @@ impl<'a> Executor<'a> {
Ok(Value::None)
}
// Process all other normal statements
_ => {
// Enable context override
let mut ctx = Context::new(&ctx).freeze();
// Specify statement timeout
if let Some(timeout) = stm.timeout() {
let mut new = Context::new(&ctx);
new.add_timeout(timeout);
ctx = new.freeze();
}
// Process statement
let res = stm.compute(&ctx, &opt, self, None).await;
// Catch statement timeout
if let Some(timeout) = stm.timeout() {
if ctx.is_timedout() {
self.err = Some(Error::QueryTimeoutError {
timer: timeout,
});
_ => match self.err {
// This transaction has failed
Some(_) => Err(Error::QueryExecutionError),
// Compute the statement normally
None => {
// Create a transaction
let loc = self.begin().await;
// Enable context override
let mut ctx = Context::new(&ctx).freeze();
// Specify statement timeout
if let Some(timeout) = stm.timeout() {
let mut new = Context::new(&ctx);
new.add_timeout(timeout);
ctx = new.freeze();
}
// Process the statement
let res = stm.compute(&ctx, &opt, self, None).await;
// Catch statement timeout
let res = match stm.timeout() {
Some(timeout) => match ctx.is_timedout() {
true => Err(Error::QueryTimeoutError {
timer: timeout,
}),
false => res,
},
None => res,
};
// Finalise transaction
self.finish(&res, loc).await;
// Return the result
res
}
// Continue with result
res
}
},
};
// Get the statement end time
let dur = now.elapsed();
// Check transaction errors
match &self.err {
Some(e) => out.push(Response {
// Buffer the returned result
match res {
Ok(v) => out.push(Response {
time: format!("{:?}", dur),
status: String::from("ERR"),
detail: Some(format!("{}", e)),
result: None,
status: String::from("OK"),
detail: None,
result: v.output(),
}),
None => {
// Format responses
match res {
Ok(v) => out.push(Response {
time: format!("{:?}", dur),
status: String::from("OK"),
detail: None,
result: v.output(),
}),
Err(e) => out.push(Response {
time: format!("{:?}", dur),
status: String::from("ERR"),
detail: Some(format!("{}", e)),
result: None,
}),
}
Err(e) => {
// Output the error
out.push(Response {
time: format!("{:?}", dur),
status: String::from("ERR"),
detail: Some(format!("{}", e)),
result: None,
});
// Keep the error
self.err = Some(e);
}
}
}

View file

@ -70,6 +70,9 @@ pub enum Error {
timer: Duration,
},
#[error("Query not executed due to failed transaction")]
QueryExecutionError,
#[error("You don't have permission to perform this query type")]
QueryPermissionsError,

View file

@ -24,14 +24,24 @@ impl CommitStatement {
_doc: Option<&Value>,
) -> Result<Value, Error> {
match &exe.txn {
Some(txn) => {
let txn = txn.clone();
let mut txn = txn.lock().await;
match txn.commit().await {
Ok(_) => Ok(Value::None),
Err(e) => Err(e),
Some(txn) => match &exe.err {
Some(_) => {
let txn = txn.clone();
let mut txn = txn.lock().await;
match txn.cancel().await {
Ok(_) => Ok(Value::None),
Err(e) => Err(e),
}
}
}
None => {
let txn = txn.clone();
let mut txn = txn.lock().await;
match txn.commit().await {
Ok(_) => Ok(Value::None),
Err(e) => Err(e),
}
}
},
None => Ok(Value::None),
}
}

View file

@ -399,7 +399,6 @@ impl Value {
match self {
Value::None => None,
Value::Void => None,
Value::Null => None,
_ => Some(self),
}
}