Refactor and reduce allocations in query executor. (#1819)

This commit is contained in:
Finn Bear 2023-04-20 05:30:47 -07:00 committed by GitHub
parent 46c15cfb8e
commit 7e5f4388a2
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 67 additions and 64 deletions

View file

@ -59,24 +59,17 @@ impl<'a> Executor<'a> {
async fn commit(&mut self, local: bool) {
if local {
if let Some(txn) = self.txn.as_ref() {
match &self.err {
true => {
let txn = txn.clone();
let mut txn = txn.lock().await;
if txn.cancel().await.is_err() {
self.err = true;
}
self.txn = None;
}
false => {
let txn = txn.clone();
let mut txn = txn.lock().await;
if txn.commit().await.is_err() {
self.err = true;
}
self.txn = None;
}
let txn = txn.clone();
let mut txn = txn.lock().await;
let result = if self.err {
txn.cancel().await
} else {
txn.commit().await
};
if result.is_err() {
self.err = true;
}
self.txn = None;
}
}
}
@ -140,7 +133,7 @@ impl<'a> Executor<'a> {
// Initialise array of responses
let mut out: Vec<Response> = vec![];
// Process all statements in query
for stm in qry.iter() {
for stm in qry.into_iter() {
// Log the statement
debug!(target: LOG, "Executing: {}", stm);
// Reset errors
@ -149,23 +142,27 @@ impl<'a> Executor<'a> {
}
// Get the statement start time
let now = Instant::now();
// Check if this is a RETURN statement
let clr = matches!(stm, Statement::Output(_));
// Process a single statement
let res = match stm {
// Specify runtime options
Statement::Option(stm) => {
Statement::Option(mut stm) => {
// Selected DB?
opt.needs(Level::Db)?;
// Allowed to run?
opt.check(Level::Db)?;
// Convert to uppercase
stm.name.0.make_ascii_uppercase();
// Process the option
match &stm.name.to_uppercase()[..] {
"FIELDS" => opt = opt.fields(stm.what),
"EVENTS" => opt = opt.events(stm.what),
"TABLES" => opt = opt.tables(stm.what),
"IMPORT" => opt = opt.import(stm.what),
"FORCE" => opt = opt.force(stm.what),
opt = match stm.name.0.as_str() {
"FIELDS" => opt.fields(stm.what),
"EVENTS" => opt.events(stm.what),
"TABLES" => opt.tables(stm.what),
"IMPORT" => opt.import(stm.what),
"FORCE" => opt.force(stm.what),
_ => break,
}
};
// Continue
continue;
}
@ -223,7 +220,7 @@ impl<'a> Executor<'a> {
Ok(Value::None)
}
// Process param definition statements
Statement::Set(stm) => {
Statement::Set(mut stm) => {
// Create a transaction
let loc = self.begin(stm.writeable()).await;
// Check the transaction
@ -238,16 +235,19 @@ impl<'a> Executor<'a> {
false => stm.compute(&ctx, &opt, &self.txn(), None).await,
// The user tried to set a protected variable
true => Err(Error::InvalidParam {
name: stm.name.to_owned(),
// Move the parameter name, as we no longer need it
name: std::mem::take(&mut stm.name),
}),
};
// Check the statement
match res {
Ok(val) => {
// Check if writeable
let writeable = stm.writeable();
// Set the parameter
ctx.add_value(stm.name.to_owned(), val);
ctx.add_value(stm.name, val);
// Finalise transaction
match stm.writeable() {
match writeable {
true => self.commit(loc).await,
false => self.cancel(loc).await,
}
@ -297,13 +297,11 @@ impl<'a> Executor<'a> {
None => stm.compute(&ctx, &opt, &self.txn(), None).await,
};
// Finalise transaction
match &res {
Ok(_) => match stm.writeable() {
true => self.commit(loc).await,
false => self.cancel(loc).await,
},
Err(_) => self.cancel(loc).await,
};
if res.is_ok() && stm.writeable() {
self.commit(loc).await;
} else {
self.cancel(loc).await;
}
// Return the result
res
}
@ -311,36 +309,25 @@ impl<'a> Executor<'a> {
}
},
};
// Get the statement end time
let dur = now.elapsed();
// Produce the response
let res = match res {
Ok(v) => Response {
time: dur,
result: Ok(v),
},
Err(e) => {
// Produce the response
let res = Response {
time: dur,
result: Err(e),
};
// Mark the error
let res = Response {
// Get the statement end time
time: now.elapsed(),
// TODO: Replace with `inspect_err` once stable.
result: res.map_err(|e| {
// Mark the error.
self.err = true;
// Return
res
}
e
}),
};
// Output the response
match self.txn {
Some(_) => match stm {
Statement::Output(_) => {
buf.clear();
buf.push(res);
}
_ => buf.push(res),
},
None => out.push(res),
if self.txn.is_some() {
if clr {
buf.clear();
}
buf.push(res);
} else {
out.push(res)
}
}
// Return responses

View file

@ -19,6 +19,14 @@ impl Deref for Query {
}
}
impl IntoIterator for Query {
type Item = Statement;
type IntoIter = std::vec::IntoIter<Self::Item>;
fn into_iter(self) -> Self::IntoIter {
self.0.into_iter()
}
}
impl Display for Query {
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
write!(Pretty::from(f), "{}", &self.0)

View file

@ -49,7 +49,15 @@ impl Deref for Statements {
}
}
impl fmt::Display for Statements {
impl IntoIterator for Statements {
type Item = Statement;
type IntoIter = std::vec::IntoIter<Self::Item>;
fn into_iter(self) -> Self::IntoIter {
self.0.into_iter()
}
}
impl Display for Statements {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
Display::fmt(
&Fmt::one_line_separated(self.0.iter().map(|v| Fmt::new(v, |v, f| write!(f, "{v};")))),