From 7e5f4388a2286725cb3b0173138e5b8e1dd75331 Mon Sep 17 00:00:00 2001 From: Finn Bear Date: Thu, 20 Apr 2023 05:30:47 -0700 Subject: [PATCH] Refactor and reduce allocations in query executor. (#1819) --- lib/src/dbs/executor.rs | 113 +++++++++++++++++---------------------- lib/src/sql/query.rs | 8 +++ lib/src/sql/statement.rs | 10 +++- 3 files changed, 67 insertions(+), 64 deletions(-) diff --git a/lib/src/dbs/executor.rs b/lib/src/dbs/executor.rs index c2aa245d..e911cfec 100644 --- a/lib/src/dbs/executor.rs +++ b/lib/src/dbs/executor.rs @@ -59,24 +59,17 @@ impl<'a> Executor<'a> { async fn commit(&mut self, local: bool) { if local { if let Some(txn) = self.txn.as_ref() { - match &self.err { - true => { - let txn = txn.clone(); - let mut txn = txn.lock().await; - if txn.cancel().await.is_err() { - self.err = true; - } - self.txn = None; - } - false => { - let txn = txn.clone(); - let mut txn = txn.lock().await; - if txn.commit().await.is_err() { - self.err = true; - } - self.txn = None; - } + let txn = txn.clone(); + let mut txn = txn.lock().await; + let result = if self.err { + txn.cancel().await + } else { + txn.commit().await + }; + if result.is_err() { + self.err = true; } + self.txn = None; } } } @@ -140,7 +133,7 @@ impl<'a> Executor<'a> { // Initialise array of responses let mut out: Vec = vec![]; // Process all statements in query - for stm in qry.iter() { + for stm in qry.into_iter() { // Log the statement debug!(target: LOG, "Executing: {}", stm); // Reset errors @@ -149,23 +142,27 @@ impl<'a> Executor<'a> { } // Get the statement start time let now = Instant::now(); + // Check if this is a RETURN statement + let clr = matches!(stm, Statement::Output(_)); // Process a single statement let res = match stm { // Specify runtime options - Statement::Option(stm) => { + Statement::Option(mut stm) => { // Selected DB? opt.needs(Level::Db)?; // Allowed to run? opt.check(Level::Db)?; + // Convert to uppercase + stm.name.0.make_ascii_uppercase(); // Process the option - match &stm.name.to_uppercase()[..] { - "FIELDS" => opt = opt.fields(stm.what), - "EVENTS" => opt = opt.events(stm.what), - "TABLES" => opt = opt.tables(stm.what), - "IMPORT" => opt = opt.import(stm.what), - "FORCE" => opt = opt.force(stm.what), + opt = match stm.name.0.as_str() { + "FIELDS" => opt.fields(stm.what), + "EVENTS" => opt.events(stm.what), + "TABLES" => opt.tables(stm.what), + "IMPORT" => opt.import(stm.what), + "FORCE" => opt.force(stm.what), _ => break, - } + }; // Continue continue; } @@ -223,7 +220,7 @@ impl<'a> Executor<'a> { Ok(Value::None) } // Process param definition statements - Statement::Set(stm) => { + Statement::Set(mut stm) => { // Create a transaction let loc = self.begin(stm.writeable()).await; // Check the transaction @@ -238,16 +235,19 @@ impl<'a> Executor<'a> { false => stm.compute(&ctx, &opt, &self.txn(), None).await, // The user tried to set a protected variable true => Err(Error::InvalidParam { - name: stm.name.to_owned(), + // Move the parameter name, as we no longer need it + name: std::mem::take(&mut stm.name), }), }; // Check the statement match res { Ok(val) => { + // Check if writeable + let writeable = stm.writeable(); // Set the parameter - ctx.add_value(stm.name.to_owned(), val); + ctx.add_value(stm.name, val); // Finalise transaction - match stm.writeable() { + match writeable { true => self.commit(loc).await, false => self.cancel(loc).await, } @@ -297,13 +297,11 @@ impl<'a> Executor<'a> { None => stm.compute(&ctx, &opt, &self.txn(), None).await, }; // Finalise transaction - match &res { - Ok(_) => match stm.writeable() { - true => self.commit(loc).await, - false => self.cancel(loc).await, - }, - Err(_) => self.cancel(loc).await, - }; + if res.is_ok() && stm.writeable() { + self.commit(loc).await; + } else { + self.cancel(loc).await; + } // Return the result res } @@ -311,36 +309,25 @@ impl<'a> Executor<'a> { } }, }; - // Get the statement end time - let dur = now.elapsed(); // Produce the response - let res = match res { - Ok(v) => Response { - time: dur, - result: Ok(v), - }, - Err(e) => { - // Produce the response - let res = Response { - time: dur, - result: Err(e), - }; - // Mark the error + let res = Response { + // Get the statement end time + time: now.elapsed(), + // TODO: Replace with `inspect_err` once stable. + result: res.map_err(|e| { + // Mark the error. self.err = true; - // Return - res - } + e + }), }; // Output the response - match self.txn { - Some(_) => match stm { - Statement::Output(_) => { - buf.clear(); - buf.push(res); - } - _ => buf.push(res), - }, - None => out.push(res), + if self.txn.is_some() { + if clr { + buf.clear(); + } + buf.push(res); + } else { + out.push(res) } } // Return responses diff --git a/lib/src/sql/query.rs b/lib/src/sql/query.rs index 2c3c6b7b..5ad36da3 100644 --- a/lib/src/sql/query.rs +++ b/lib/src/sql/query.rs @@ -19,6 +19,14 @@ impl Deref for Query { } } +impl IntoIterator for Query { + type Item = Statement; + type IntoIter = std::vec::IntoIter; + fn into_iter(self) -> Self::IntoIter { + self.0.into_iter() + } +} + impl Display for Query { fn fmt(&self, f: &mut Formatter) -> fmt::Result { write!(Pretty::from(f), "{}", &self.0) diff --git a/lib/src/sql/statement.rs b/lib/src/sql/statement.rs index 16abed3d..a2a0ae1c 100644 --- a/lib/src/sql/statement.rs +++ b/lib/src/sql/statement.rs @@ -49,7 +49,15 @@ impl Deref for Statements { } } -impl fmt::Display for Statements { +impl IntoIterator for Statements { + type Item = Statement; + type IntoIter = std::vec::IntoIter; + fn into_iter(self) -> Self::IntoIter { + self.0.into_iter() + } +} + +impl Display for Statements { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { Display::fmt( &Fmt::one_line_separated(self.0.iter().map(|v| Fmt::new(v, |v, f| write!(f, "{v};")))),