Buffer responses in a transaction

When running in a transaction, results are now buffered, and are output on COMMIT or CANCEL. If any error has occured, then all of the responses will be marked as failed.
This commit is contained in:
Tobie Morgan Hitchcock 2022-01-27 08:15:20 +00:00
parent 374644b9bd
commit 92e24e2201
2 changed files with 70 additions and 14 deletions

View file

@ -1,5 +1,5 @@
use crate::ctx::Context; use crate::ctx::Context;
use crate::dbs::response::{Response, Responses}; use crate::dbs::response::{Response, Responses, Status};
use crate::dbs::Auth; use crate::dbs::Auth;
use crate::dbs::Level; use crate::dbs::Level;
use crate::dbs::Options; use crate::dbs::Options;
@ -107,7 +107,33 @@ impl<'a> Executor<'a> {
} }
} }
pub fn buf_cancel(&self, v: Response) -> Response {
Response {
time: v.time,
status: Status::Err,
detail: Some(format!("Transaction cancelled")),
result: None,
}
}
pub fn buf_commit(&self, v: Response) -> Response {
match &self.err {
Some(_) => Response {
time: v.time,
status: Status::Err,
detail: match v.status {
Status::Ok => Some(format!("{}", Error::QueryExecutionError)),
Status::Err => v.detail,
},
result: None,
},
_ => v,
}
}
pub async fn execute(&mut self, mut ctx: Runtime, qry: Query) -> Result<Responses, Error> { pub async fn execute(&mut self, mut ctx: Runtime, qry: Query) -> Result<Responses, Error> {
// Initialise buffer of responses
let mut buf: Vec<Response> = vec![];
// Initialise array of responses // Initialise array of responses
let mut out: Vec<Response> = vec![]; let mut out: Vec<Response> = vec![];
// Create a new options // Create a new options
@ -138,20 +164,30 @@ impl<'a> Executor<'a> {
// Begin a new transaction // Begin a new transaction
Statement::Begin(stm) => { Statement::Begin(stm) => {
let res = stm.compute(&ctx, &opt, self, None).await; let res = stm.compute(&ctx, &opt, self, None).await;
self.err = res.err(); if res.is_err() {
self.err = res.err()
};
continue; continue;
} }
// Cancel a running transaction // Cancel a running transaction
Statement::Cancel(stm) => { Statement::Cancel(stm) => {
let res = stm.compute(&ctx, &opt, self, None).await; let res = stm.compute(&ctx, &opt, self, None).await;
self.err = res.err(); if res.is_err() {
self.err = res.err()
};
buf = buf.into_iter().map(|v| self.buf_cancel(v)).collect();
out.append(&mut buf);
self.txn = None; self.txn = None;
continue; continue;
} }
// Commit a running transaction // Commit a running transaction
Statement::Commit(stm) => { Statement::Commit(stm) => {
let res = stm.compute(&ctx, &opt, self, None).await; let res = stm.compute(&ctx, &opt, self, None).await;
self.err = res.err(); if res.is_err() {
self.err = res.err()
};
buf = buf.into_iter().map(|v| self.buf_commit(v)).collect();
out.append(&mut buf);
self.txn = None; self.txn = None;
continue; continue;
} }
@ -210,25 +246,38 @@ impl<'a> Executor<'a> {
}; };
// Get the statement end time // Get the statement end time
let dur = now.elapsed(); let dur = now.elapsed();
// Buffer the returned result // Produce the response
match res { let res = match res {
Ok(v) => out.push(Response { Ok(v) => Response {
time: format!("{:?}", dur), time: format!("{:?}", dur),
status: String::from("OK"), status: Status::Ok,
detail: None, detail: None,
result: v.output(), result: v.output(),
}), },
Err(e) => { Err(e) => {
// Output the error // Produce the response
out.push(Response { let res = Response {
time: format!("{:?}", dur), time: format!("{:?}", dur),
status: String::from("ERR"), status: Status::Err,
detail: Some(format!("{}", e)), detail: Some(format!("{}", e)),
result: None, result: None,
}); };
// Keep the error // Keep the error
self.err = Some(e); self.err = Some(e);
// Return
res
} }
};
// Output the response
match self.txn {
Some(_) => match stm {
Statement::Output(_) => {
buf.clear();
buf.push(res);
}
_ => buf.push(res),
},
None => out.push(res),
} }
} }
// Return responses // Return responses

View file

@ -1,13 +1,20 @@
use crate::sql::value::Value; use crate::sql::value::Value;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "UPPERCASE")]
pub enum Status {
Ok,
Err,
}
#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)] #[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
pub struct Responses(pub Vec<Response>); pub struct Responses(pub Vec<Response>);
#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)] #[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
pub struct Response { pub struct Response {
pub time: String, pub time: String,
pub status: String, pub status: Status,
#[serde(skip_serializing_if = "Option::is_none")] #[serde(skip_serializing_if = "Option::is_none")]
pub detail: Option<String>, pub detail: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")] #[serde(skip_serializing_if = "Option::is_none")]