Don’t use static futures for parallel query execution

Instead of using static futures when processing a query in parallel, we now use a new executor model, which allows us to process futures which make use of references. As a result, we can remove the need to store each statement in an Arc.
This commit is contained in:
Tobie Morgan Hitchcock 2022-05-13 21:46:56 +01:00
parent e4619be89a
commit 6ff2a78c88
37 changed files with 383 additions and 357 deletions

81
Cargo.lock generated
View file

@ -82,6 +82,17 @@ dependencies = [
"stable_deref_trait",
]
[[package]]
name = "async-channel"
version = "1.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2114d64672151c0c5eaa5e131ec84a74f06e1e559830dabba01ca30605d66319"
dependencies = [
"concurrent-queue",
"event-listener",
"futures-core",
]
[[package]]
name = "async-compression"
version = "0.3.12"
@ -96,6 +107,20 @@ dependencies = [
"tokio",
]
[[package]]
name = "async-executor"
version = "1.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "871f9bb5e0a22eeb7e8cf16641feb87c9dc67032ccf8ff49e772eb9941d3a965"
dependencies = [
"async-task",
"concurrent-queue",
"fastrand",
"futures-lite",
"once_cell",
"slab",
]
[[package]]
name = "async-recursion"
version = "1.0.0"
@ -107,6 +132,12 @@ dependencies = [
"syn",
]
[[package]]
name = "async-task"
version = "4.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "30696a84d817107fc028e049980e09d5e140e8da8f1caeb17e8e950658a3cea9"
[[package]]
name = "async-trait"
version = "0.1.53"
@ -278,6 +309,12 @@ version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c4872d67bab6358e59559027aa3b9157c53d9358c51423c17554809a8858e0f8"
[[package]]
name = "cache-padded"
version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c1db59621ec70f09c5e9b597b220c7a2b43611f4710dc03ceb8748637775692c"
[[package]]
name = "cc"
version = "1.0.73"
@ -378,6 +415,15 @@ dependencies = [
"winapi",
]
[[package]]
name = "concurrent-queue"
version = "1.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "30ed07550be01594c6026cff2a1d7fe9c8f683caa798e12b68694ac9e88286a3"
dependencies = [
"cache-padded",
]
[[package]]
name = "console_error_panic_hook"
version = "0.1.7"
@ -506,6 +552,12 @@ dependencies = [
"cfg-if",
]
[[package]]
name = "event-listener"
version = "2.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "77f3309417938f28bf8228fcff79a4a37103981e3e186d2ccd19c74b38f4eb71"
[[package]]
name = "fail"
version = "0.4.0"
@ -646,6 +698,21 @@ version = "0.3.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fc4045962a5a5e935ee2fdedaa4e08284547402885ab326734432bed5d12966b"
[[package]]
name = "futures-lite"
version = "1.12.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7694489acd39452c77daa48516b894c153f192c3578d5a839b62c58099fcbf48"
dependencies = [
"fastrand",
"futures-core",
"futures-io",
"memchr",
"parking",
"pin-project-lite",
"waker-fn",
]
[[package]]
name = "futures-macro"
version = "0.3.21"
@ -1485,6 +1552,12 @@ version = "6.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8e22443d1643a904602595ba1cd8f7d896afe56d26712531c5ff73a15b2fbf64"
[[package]]
name = "parking"
version = "2.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "427c3892f9e783d91cc128285287e70a59e206ca452770ece88a76f7a3eddd72"
[[package]]
name = "parking_lot"
version = "0.11.2"
@ -2372,6 +2445,8 @@ name = "surrealdb"
version = "0.1.0"
dependencies = [
"argon2",
"async-channel",
"async-executor",
"async-recursion",
"bigdecimal",
"bytes",
@ -2898,6 +2973,12 @@ version = "0.9.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f"
[[package]]
name = "waker-fn"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9d5b2c62b4012a3e1eca5a7e077d13b3bf498c4073e33ccd58626607748ceeca"
[[package]]
name = "walkdir"
version = "2.3.2"

View file

@ -18,10 +18,12 @@ argon2 = "0.4.0"
async-recursion = "1.0.0"
bigdecimal = { version = "0.3.0", features = ["serde", "string-only"] }
bytes = "1.1.0"
channel = { version = "1.6.1", package = "async-channel" }
chrono = { version = "0.4.19", features = ["serde"] }
derive = { version = "0.1.2", package = "surrealdb-derive" }
dmp = "0.1.1"
echodb = { version = "0.3.0", optional = true }
executor = { version = "1.4.1", package = "async-executor" }
futures = "0.3.21"
fuzzy-matcher = "0.3.7"
geo = { version = "0.20.1", features = ["use-serde"] }
@ -38,7 +40,7 @@ rand = "0.8.5"
regex = "1.5.5"
msgpack = { version = "1.1.0", package = "rmp-serde" }
scrypt = "0.10.0"
serde = { version = "1.0.137", features = ["derive", "rc"] }
serde = { version = "1.0.137", features = ["derive"] }
storekey = "0.2.0"
sha-1 = "0.10.0"
sha2 = "0.10.2"

View file

@ -11,23 +11,23 @@ use crate::sql::table::Table;
use crate::sql::thing::Thing;
use crate::sql::value::Value;
use async_recursion::async_recursion;
use tokio::sync::mpsc::Sender;
use channel::Sender;
impl Value {
pub(crate) async fn channel(
self,
ctx: Runtime,
opt: Options,
stm: Statement,
txn: Transaction,
ctx: &Runtime,
opt: &Options,
txn: &Transaction,
stm: &Statement<'_>,
chn: Sender<(Option<Thing>, Value)>,
) -> Result<(), Error> {
if ctx.is_ok() {
match self {
Value::Array(v) => v.process(&ctx, &opt, &stm, &txn, &chn).await?,
Value::Model(v) => v.process(&ctx, &opt, &stm, &txn, &chn).await?,
Value::Thing(v) => v.process(&ctx, &opt, &stm, &txn, &chn).await?,
Value::Table(v) => v.process(&ctx, &opt, &stm, &txn, &chn).await?,
Value::Array(v) => v.process(ctx, opt, txn, stm, &chn).await?,
Value::Model(v) => v.process(ctx, opt, txn, stm, &chn).await?,
Value::Thing(v) => v.process(ctx, opt, txn, stm, &chn).await?,
Value::Table(v) => v.process(ctx, opt, txn, stm, &chn).await?,
v => chn.send((None, v)).await?,
}
}
@ -41,17 +41,17 @@ impl Array {
self,
ctx: &Runtime,
opt: &Options,
stm: &Statement,
txn: &Transaction,
stm: &Statement<'_>,
chn: &Sender<(Option<Thing>, Value)>,
) -> Result<(), Error> {
for v in self {
if ctx.is_ok() {
match v {
Value::Array(v) => v.process(ctx, opt, stm, txn, chn).await?,
Value::Model(v) => v.process(ctx, opt, stm, txn, chn).await?,
Value::Thing(v) => v.process(ctx, opt, stm, txn, chn).await?,
Value::Table(v) => v.process(ctx, opt, stm, txn, chn).await?,
Value::Array(v) => v.process(ctx, opt, txn, stm, chn).await?,
Value::Model(v) => v.process(ctx, opt, txn, stm, chn).await?,
Value::Thing(v) => v.process(ctx, opt, txn, stm, chn).await?,
Value::Table(v) => v.process(ctx, opt, txn, stm, chn).await?,
v => chn.send((None, v)).await?,
}
}
@ -65,8 +65,8 @@ impl Model {
self,
ctx: &Runtime,
opt: &Options,
stm: &Statement,
txn: &Transaction,
stm: &Statement<'_>,
chn: &Sender<(Option<Thing>, Value)>,
) -> Result<(), Error> {
if ctx.is_ok() {
@ -77,7 +77,7 @@ impl Model {
tb: tb.to_string(),
id: Id::rand(),
}
.process(ctx, opt, stm, txn, chn)
.process(ctx, opt, txn, stm, chn)
.await?;
}
}
@ -87,7 +87,7 @@ impl Model {
tb: tb.to_string(),
id: Id::from(x),
}
.process(ctx, opt, stm, txn, chn)
.process(ctx, opt, txn, stm, chn)
.await?;
}
}
@ -102,8 +102,8 @@ impl Thing {
self,
ctx: &Runtime,
opt: &Options,
_stm: &Statement,
txn: &Transaction,
_stm: &Statement<'_>,
chn: &Sender<(Option<Thing>, Value)>,
) -> Result<(), Error> {
if ctx.is_ok() {
@ -124,8 +124,8 @@ impl Table {
self,
ctx: &Runtime,
opt: &Options,
_stm: &Statement,
txn: &Transaction,
_stm: &Statement<'_>,
chn: &Sender<(Option<Thing>, Value)>,
) -> Result<(), Error> {
if ctx.is_ok() {

View file

@ -1,6 +1,7 @@
use crate::dbs::Iterator;
use crate::dbs::Options;
use crate::dbs::Runtime;
use crate::dbs::Statement;
use crate::dbs::Transaction;
use crate::err::Error;
use crate::key::thing;
@ -20,15 +21,16 @@ impl Value {
ctx: &Runtime,
opt: &Options,
txn: &Transaction,
stm: &Statement<'_>,
ite: &mut Iterator,
) -> Result<(), Error> {
if ctx.is_ok() {
match self {
Value::Array(v) => v.iterate(ctx, opt, txn, ite).await?,
Value::Model(v) => v.iterate(ctx, opt, txn, ite).await?,
Value::Thing(v) => v.iterate(ctx, opt, txn, ite).await?,
Value::Table(v) => v.iterate(ctx, opt, txn, ite).await?,
v => ite.process(ctx, opt, txn, None, v).await,
Value::Array(v) => v.iterate(ctx, opt, txn, stm, ite).await?,
Value::Model(v) => v.iterate(ctx, opt, txn, stm, ite).await?,
Value::Thing(v) => v.iterate(ctx, opt, txn, stm, ite).await?,
Value::Table(v) => v.iterate(ctx, opt, txn, stm, ite).await?,
v => ite.process(ctx, opt, txn, stm, None, v).await,
}
}
Ok(())
@ -43,16 +45,17 @@ impl Array {
ctx: &Runtime,
opt: &Options,
txn: &Transaction,
stm: &Statement<'_>,
ite: &mut Iterator,
) -> Result<(), Error> {
for v in self.into_iter() {
if ctx.is_ok() {
match v {
Value::Array(v) => v.iterate(ctx, opt, txn, ite).await?,
Value::Model(v) => v.iterate(ctx, opt, txn, ite).await?,
Value::Thing(v) => v.iterate(ctx, opt, txn, ite).await?,
Value::Table(v) => v.iterate(ctx, opt, txn, ite).await?,
v => ite.process(ctx, opt, txn, None, v).await,
Value::Array(v) => v.iterate(ctx, opt, txn, stm, ite).await?,
Value::Model(v) => v.iterate(ctx, opt, txn, stm, ite).await?,
Value::Thing(v) => v.iterate(ctx, opt, txn, stm, ite).await?,
Value::Table(v) => v.iterate(ctx, opt, txn, stm, ite).await?,
v => ite.process(ctx, opt, txn, stm, None, v).await,
}
}
}
@ -66,6 +69,7 @@ impl Model {
ctx: &Runtime,
opt: &Options,
txn: &Transaction,
stm: &Statement<'_>,
ite: &mut Iterator,
) -> Result<(), Error> {
if ctx.is_ok() {
@ -76,7 +80,7 @@ impl Model {
tb: tb.to_string(),
id: Id::rand(),
}
.iterate(ctx, opt, txn, ite)
.iterate(ctx, opt, txn, stm, ite)
.await?;
}
}
@ -86,7 +90,7 @@ impl Model {
tb: tb.to_string(),
id: Id::from(x),
}
.iterate(ctx, opt, txn, ite)
.iterate(ctx, opt, txn, stm, ite)
.await?;
}
}
@ -102,6 +106,7 @@ impl Thing {
ctx: &Runtime,
opt: &Options,
txn: &Transaction,
stm: &Statement<'_>,
ite: &mut Iterator,
) -> Result<(), Error> {
if ctx.is_ok() {
@ -111,7 +116,7 @@ impl Thing {
Some(v) => Value::from(v),
None => Value::None,
};
ite.process(ctx, opt, txn, Some(self), val).await;
ite.process(ctx, opt, txn, stm, Some(self), val).await;
}
Ok(())
}
@ -123,6 +128,7 @@ impl Table {
ctx: &Runtime,
opt: &Options,
txn: &Transaction,
stm: &Statement<'_>,
ite: &mut Iterator,
) -> Result<(), Error> {
if ctx.is_ok() {
@ -163,7 +169,7 @@ impl Table {
let v: crate::sql::value::Value = (&v).into();
let t = Thing::from((k.tb, k.id));
// Process the record
ite.process(ctx, opt, txn, Some(t), v).await;
ite.process(ctx, opt, txn, stm, Some(t), v).await;
}
}
continue;

View file

@ -11,28 +11,19 @@ use crate::sql::array::Array;
use crate::sql::field::Field;
use crate::sql::id::Id;
use crate::sql::part::Part;
use crate::sql::statements::create::CreateStatement;
use crate::sql::statements::delete::DeleteStatement;
use crate::sql::statements::insert::InsertStatement;
use crate::sql::statements::relate::RelateStatement;
use crate::sql::statements::select::SelectStatement;
use crate::sql::statements::update::UpdateStatement;
use crate::sql::table::Table;
use crate::sql::thing::Thing;
use crate::sql::value::Value;
use executor::Executor;
use futures::join;
use std::cmp::Ordering;
use std::collections::BTreeMap;
use std::mem;
use std::sync::Arc;
#[derive(Default)]
pub struct Iterator {
// Iterator status
run: Canceller,
// Iterator statement
stm: Statement,
// Iterator run option
parallel: bool,
// Iterator runtime error
error: Option<Error>,
// Iterator input values
@ -41,67 +32,12 @@ pub struct Iterator {
results: Vec<Value>,
}
impl From<Arc<SelectStatement>> for Iterator {
fn from(v: Arc<SelectStatement>) -> Self {
Iterator {
parallel: v.parallel,
stm: Statement::from(v),
..Iterator::default()
}
}
}
impl From<Arc<CreateStatement>> for Iterator {
fn from(v: Arc<CreateStatement>) -> Self {
Iterator {
parallel: v.parallel,
stm: Statement::from(v),
..Iterator::default()
}
}
}
impl From<Arc<UpdateStatement>> for Iterator {
fn from(v: Arc<UpdateStatement>) -> Self {
Iterator {
parallel: v.parallel,
stm: Statement::from(v),
..Iterator::default()
}
}
}
impl From<Arc<RelateStatement>> for Iterator {
fn from(v: Arc<RelateStatement>) -> Self {
Iterator {
parallel: v.parallel,
stm: Statement::from(v),
..Iterator::default()
}
}
}
impl From<Arc<DeleteStatement>> for Iterator {
fn from(v: Arc<DeleteStatement>) -> Self {
Iterator {
parallel: v.parallel,
stm: Statement::from(v),
..Iterator::default()
}
}
}
impl From<Arc<InsertStatement>> for Iterator {
fn from(v: Arc<InsertStatement>) -> Self {
Iterator {
parallel: v.parallel,
stm: Statement::from(v),
..Iterator::default()
}
}
}
impl Iterator {
// Creates a new iterator
pub fn new() -> Self {
Self::default()
}
// Prepares a value for processing
pub fn prepare(&mut self, val: Value) {
self.readies.push(val)
@ -121,31 +57,32 @@ impl Iterator {
ctx: &Runtime,
opt: &Options,
txn: &Transaction,
stm: &Statement<'_>,
) -> Result<Value, Error> {
// Log the statement
trace!("Iterating: {}", self.stm);
trace!("Iterating: {}", stm);
// Enable context override
let mut ctx = Context::new(ctx);
self.run = ctx.add_cancel();
let ctx = ctx.freeze();
// Process prepared values
self.iterate(&ctx, opt, txn).await?;
self.iterate(&ctx, opt, txn, stm).await?;
// Return any document errors
if let Some(e) = self.error.take() {
return Err(e);
}
// Process any SPLIT clause
self.output_split(&ctx, opt, txn).await?;
self.output_split(&ctx, opt, txn, stm).await?;
// Process any GROUP clause
self.output_group(&ctx, opt, txn).await?;
self.output_group(&ctx, opt, txn, stm).await?;
// Process any ORDER clause
self.output_order(&ctx, opt, txn).await?;
self.output_order(&ctx, opt, txn, stm).await?;
// Process any START clause
self.output_start(&ctx, opt, txn).await?;
self.output_start(&ctx, opt, txn, stm).await?;
// Process any LIMIT clause
self.output_limit(&ctx, opt, txn).await?;
self.output_limit(&ctx, opt, txn, stm).await?;
// Process any FETCH clause
self.output_fetch(&ctx, opt, txn).await?;
self.output_fetch(&ctx, opt, txn, stm).await?;
// Output the results
Ok(mem::take(&mut self.results).into())
}
@ -156,8 +93,9 @@ impl Iterator {
ctx: &Runtime,
opt: &Options,
txn: &Transaction,
stm: &Statement<'_>,
) -> Result<(), Error> {
if let Some(splits) = self.stm.split() {
if let Some(splits) = stm.split() {
// Loop over each split clause
for split in splits.iter() {
// Get the query result
@ -199,9 +137,10 @@ impl Iterator {
ctx: &Runtime,
opt: &Options,
txn: &Transaction,
stm: &Statement<'_>,
) -> Result<(), Error> {
if let Some(fields) = self.stm.expr() {
if let Some(groups) = self.stm.group() {
if let Some(fields) = stm.expr() {
if let Some(groups) = stm.group() {
// Create the new grouped collection
let mut grp: BTreeMap<Array, Array> = BTreeMap::new();
// Get the query result
@ -281,8 +220,9 @@ impl Iterator {
_ctx: &Runtime,
_opt: &Options,
_txn: &Transaction,
stm: &Statement<'_>,
) -> Result<(), Error> {
if let Some(orders) = self.stm.order() {
if let Some(orders) = stm.order() {
// Sort the full result set
self.results.sort_by(|a, b| {
// Loop over each order clause
@ -319,8 +259,9 @@ impl Iterator {
_ctx: &Runtime,
_opt: &Options,
_txn: &Transaction,
stm: &Statement<'_>,
) -> Result<(), Error> {
if let Some(v) = self.stm.start() {
if let Some(v) = stm.start() {
self.results = mem::take(&mut self.results).into_iter().skip(v.0).collect();
}
Ok(())
@ -332,8 +273,9 @@ impl Iterator {
_ctx: &Runtime,
_opt: &Options,
_txn: &Transaction,
stm: &Statement<'_>,
) -> Result<(), Error> {
if let Some(v) = self.stm.limit() {
if let Some(v) = stm.limit() {
self.results = mem::take(&mut self.results).into_iter().take(v.0).collect();
}
Ok(())
@ -345,8 +287,9 @@ impl Iterator {
ctx: &Runtime,
opt: &Options,
txn: &Transaction,
stm: &Statement<'_>,
) -> Result<(), Error> {
if let Some(fetchs) = self.stm.fetch() {
if let Some(fetchs) = stm.fetch() {
for fetch in &fetchs.0 {
// Loop over each value
for obj in &mut self.results {
@ -383,10 +326,11 @@ impl Iterator {
ctx: &Runtime,
opt: &Options,
txn: &Transaction,
stm: &Statement<'_>,
) -> Result<(), Error> {
// Process all prepared values
for v in mem::take(&mut self.readies) {
v.iterate(ctx, opt, txn, self).await?;
v.iterate(ctx, opt, txn, stm, self).await?;
}
// Everything processed ok
Ok(())
@ -398,70 +342,67 @@ impl Iterator {
ctx: &Runtime,
opt: &Options,
txn: &Transaction,
stm: &Statement<'_>,
) -> Result<(), Error> {
match self.parallel {
match stm.parallel() {
// Run statements sequentially
false => {
// Process all prepared values
for v in mem::take(&mut self.readies) {
v.iterate(ctx, opt, txn, self).await?;
v.iterate(ctx, opt, txn, stm, self).await?;
}
// Everything processed ok
Ok(())
}
// Run statements in parallel
true => {
let mut rcv = {
// Get current statement
let stm = &self.stm;
// Create an unbounded channel
let (chn, rx) = tokio::sync::mpsc::channel(MAX_CONCURRENT_TASKS);
// Create a new executor
let exe = Executor::new();
// Take all of the iterator values
let vals = mem::take(&mut self.readies);
// Create a channel to shutdown
let (end, exit) = channel::bounded::<()>(1);
// Create an unbounded channel
let (chn, docs) = channel::bounded(MAX_CONCURRENT_TASKS);
// Create an async closure for prepared values
let adocs = async {
// Process all prepared values
for v in mem::take(&mut self.readies) {
if ctx.is_ok() {
tokio::spawn(v.channel(
ctx.clone(),
opt.clone(),
stm.clone(),
txn.clone(),
chn.clone(),
));
}
for v in vals {
exe.spawn(v.channel(ctx, opt, txn, stm, chn.clone()))
// Ensure we detach the spawned task
.detach();
}
// Return the receiver
rx
// Drop the uncloned channel instance
drop(chn);
};
let mut rcv = {
// Clone the send values
let ctx = ctx.clone();
let opt = opt.clone();
let txn = txn.clone();
let stm = self.stm.clone();
// Create an unbounded channel
let (chn, rx) = tokio::sync::mpsc::channel(MAX_CONCURRENT_TASKS);
// Create an unbounded channel
let (chn, vals) = channel::bounded(MAX_CONCURRENT_TASKS);
// Create an async closure for received values
let avals = async {
// Process all received values
tokio::spawn(async move {
while let Some((k, v)) = rcv.recv().await {
if ctx.is_ok() {
tokio::spawn(Document::compute(
ctx.clone(),
opt.clone(),
txn.clone(),
chn.clone(),
stm.clone(),
k,
v,
));
}
}
});
// Return the receiver
rx
while let Ok((k, v)) = docs.recv().await {
exe.spawn(Document::compute(ctx, opt, txn, stm, chn.clone(), k, v))
// Ensure we detach the spawned task
.detach();
}
// Drop the uncloned channel instance
drop(chn);
};
// Process all processed values
while let Some(r) = rcv.recv().await {
self.result(r);
}
// Create an async closure to process results
let aproc = async {
// Process all processed values
while let Ok(r) = vals.recv().await {
self.result(r, stm);
}
// Shutdown the executor
let _ = end.send(()).await;
};
// Run all executor tasks
let fut = exe.run(exit.recv());
// Wait for all closures
let res = join!(adocs, avals, aproc, fut);
// Consume executor error
let _ = res.3;
// Everything processed ok
Ok(())
}
@ -474,6 +415,7 @@ impl Iterator {
ctx: &Runtime,
opt: &Options,
txn: &Transaction,
stm: &Statement<'_>,
thg: Option<Thing>,
val: Value,
) {
@ -484,21 +426,20 @@ impl Iterator {
// Setup a new document
let mut doc = Document::new(thg, &val);
// Process the document
let res = match self.stm {
Statement::Select(_) => doc.select(ctx, opt, txn, &self.stm).await,
Statement::Create(_) => doc.create(ctx, opt, txn, &self.stm).await,
Statement::Update(_) => doc.update(ctx, opt, txn, &self.stm).await,
Statement::Relate(_) => doc.relate(ctx, opt, txn, &self.stm).await,
Statement::Delete(_) => doc.delete(ctx, opt, txn, &self.stm).await,
Statement::Insert(_) => doc.insert(ctx, opt, txn, &self.stm).await,
_ => unreachable!(),
let res = match stm {
Statement::Select(_) => doc.select(ctx, opt, txn, stm).await,
Statement::Create(_) => doc.create(ctx, opt, txn, stm).await,
Statement::Update(_) => doc.update(ctx, opt, txn, stm).await,
Statement::Relate(_) => doc.relate(ctx, opt, txn, stm).await,
Statement::Delete(_) => doc.delete(ctx, opt, txn, stm).await,
Statement::Insert(_) => doc.insert(ctx, opt, txn, stm).await,
};
// Process the result
self.result(res);
self.result(res, stm);
}
// Accept a processed record result
fn result(&mut self, res: Result<Value, Error>) {
fn result(&mut self, res: Result<Value, Error>, stm: &Statement<'_>) {
// Process the result
match res {
Err(Error::Ignore) => {
@ -512,9 +453,9 @@ impl Iterator {
Ok(v) => self.results.push(v),
}
// Check if we can exit
if self.stm.group().is_none() && self.stm.order().is_none() {
if let Some(l) = self.stm.limit() {
if let Some(s) = self.stm.start() {
if stm.group().is_none() && stm.order().is_none() {
if let Some(l) = stm.limit() {
if let Some(s) = stm.start() {
if self.results.len() == l.0 + s.0 {
self.run.cancel()
}

View file

@ -16,62 +16,54 @@ use crate::sql::statements::select::SelectStatement;
use crate::sql::statements::update::UpdateStatement;
use crate::sql::version::Version;
use std::fmt;
use std::sync::Arc;
#[derive(Clone, Debug)]
pub enum Statement {
None,
Select(Arc<SelectStatement>),
Create(Arc<CreateStatement>),
Update(Arc<UpdateStatement>),
Relate(Arc<RelateStatement>),
Delete(Arc<DeleteStatement>),
Insert(Arc<InsertStatement>),
pub enum Statement<'a> {
Select(&'a SelectStatement),
Create(&'a CreateStatement),
Update(&'a UpdateStatement),
Relate(&'a RelateStatement),
Delete(&'a DeleteStatement),
Insert(&'a InsertStatement),
}
impl Default for Statement {
fn default() -> Self {
Statement::None
}
}
impl From<Arc<SelectStatement>> for Statement {
fn from(v: Arc<SelectStatement>) -> Self {
impl<'a> From<&'a SelectStatement> for Statement<'a> {
fn from(v: &'a SelectStatement) -> Self {
Statement::Select(v)
}
}
impl From<Arc<CreateStatement>> for Statement {
fn from(v: Arc<CreateStatement>) -> Self {
impl<'a> From<&'a CreateStatement> for Statement<'a> {
fn from(v: &'a CreateStatement) -> Self {
Statement::Create(v)
}
}
impl From<Arc<UpdateStatement>> for Statement {
fn from(v: Arc<UpdateStatement>) -> Self {
impl<'a> From<&'a UpdateStatement> for Statement<'a> {
fn from(v: &'a UpdateStatement) -> Self {
Statement::Update(v)
}
}
impl From<Arc<RelateStatement>> for Statement {
fn from(v: Arc<RelateStatement>) -> Self {
impl<'a> From<&'a RelateStatement> for Statement<'a> {
fn from(v: &'a RelateStatement) -> Self {
Statement::Relate(v)
}
}
impl From<Arc<DeleteStatement>> for Statement {
fn from(v: Arc<DeleteStatement>) -> Self {
impl<'a> From<&'a DeleteStatement> for Statement<'a> {
fn from(v: &'a DeleteStatement) -> Self {
Statement::Delete(v)
}
}
impl From<Arc<InsertStatement>> for Statement {
fn from(v: Arc<InsertStatement>) -> Self {
impl<'a> From<&'a InsertStatement> for Statement<'a> {
fn from(v: &'a InsertStatement) -> Self {
Statement::Insert(v)
}
}
impl fmt::Display for Statement {
impl<'a> fmt::Display for Statement<'a> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
Statement::Select(v) => write!(f, "{}", v),
@ -80,12 +72,11 @@ impl fmt::Display for Statement {
Statement::Relate(v) => write!(f, "{}", v),
Statement::Delete(v) => write!(f, "{}", v),
Statement::Insert(v) => write!(f, "{}", v),
_ => unreachable!(),
}
}
}
impl Statement {
impl<'a> Statement<'a> {
// Check the type of statement
#[inline]
pub fn is_select(&self) -> bool {
@ -186,4 +177,16 @@ impl Statement {
_ => None,
}
}
// Returns any RETURN clause if specified
#[inline]
pub fn parallel(&self) -> bool {
match self {
Statement::Select(v) => v.parallel,
Statement::Create(v) => v.parallel,
Statement::Update(v) => v.parallel,
Statement::Relate(v) => v.parallel,
Statement::Delete(v) => v.parallel,
Statement::Insert(v) => v.parallel,
}
}
}

View file

@ -11,7 +11,7 @@ impl<'a> Document<'a> {
_ctx: &Runtime,
_opt: &Options,
_txn: &Transaction,
stm: &Statement,
stm: &Statement<'_>,
) -> Result<(), Error> {
// Check that we are altering a record
if self.id.is_none() {

View file

@ -12,7 +12,7 @@ impl<'a> Document<'a> {
ctx: &Runtime,
opt: &Options,
txn: &Transaction,
stm: &Statement,
stm: &Statement<'_>,
) -> Result<(), Error> {
// Check permission clause
if opt.perms && opt.auth.perms() && self.id.is_some() {

View file

@ -11,7 +11,7 @@ impl<'a> Document<'a> {
ctx: &Runtime,
opt: &Options,
txn: &Transaction,
stm: &Statement,
stm: &Statement<'_>,
) -> Result<(), Error> {
// Check where condition
if let Some(cond) = stm.conds() {

View file

@ -6,15 +6,15 @@ use crate::doc::Document;
use crate::err::Error;
use crate::sql::thing::Thing;
use crate::sql::value::Value;
use tokio::sync::mpsc::Sender;
use channel::Sender;
impl<'a> Document<'a> {
pub(crate) async fn compute(
ctx: Runtime,
opt: Options,
txn: Transaction,
ctx: &Runtime,
opt: &Options,
txn: &Transaction,
stm: &Statement<'_>,
chn: Sender<Result<Value, Error>>,
stm: Statement,
thg: Option<Thing>,
val: Value,
) -> Result<(), Error> {
@ -22,13 +22,12 @@ impl<'a> Document<'a> {
let mut doc = Document::new(thg, &val);
// Process the statement
let res = match stm {
Statement::Select(_) => doc.select(&ctx, &opt, &txn, &stm).await,
Statement::Create(_) => doc.create(&ctx, &opt, &txn, &stm).await,
Statement::Update(_) => doc.update(&ctx, &opt, &txn, &stm).await,
Statement::Relate(_) => doc.relate(&ctx, &opt, &txn, &stm).await,
Statement::Delete(_) => doc.delete(&ctx, &opt, &txn, &stm).await,
Statement::Insert(_) => doc.insert(&ctx, &opt, &txn, &stm).await,
_ => unreachable!(),
Statement::Select(_) => doc.select(ctx, opt, txn, stm).await,
Statement::Create(_) => doc.create(ctx, opt, txn, stm).await,
Statement::Update(_) => doc.update(ctx, opt, txn, stm).await,
Statement::Relate(_) => doc.relate(ctx, opt, txn, stm).await,
Statement::Delete(_) => doc.delete(ctx, opt, txn, stm).await,
Statement::Insert(_) => doc.insert(ctx, opt, txn, stm).await,
};
// Send back the result
let _ = chn.send(res).await;

View file

@ -12,7 +12,7 @@ impl<'a> Document<'a> {
ctx: &Runtime,
opt: &Options,
txn: &Transaction,
stm: &Statement,
stm: &Statement<'_>,
) -> Result<Value, Error> {
// Check value type
self.admit(ctx, opt, txn, stm).await?;

View file

@ -12,7 +12,7 @@ impl<'a> Document<'a> {
ctx: &Runtime,
opt: &Options,
txn: &Transaction,
stm: &Statement,
stm: &Statement<'_>,
) -> Result<Value, Error> {
// Check value type
self.admit(ctx, opt, txn, stm).await?;

View file

@ -11,7 +11,7 @@ impl<'a> Document<'a> {
_ctx: &Runtime,
_opt: &Options,
_txn: &Transaction,
_stm: &Statement,
_stm: &Statement<'_>,
) -> Result<(), Error> {
// Check if this record exists
if self.id.is_some() {

View file

@ -11,7 +11,7 @@ impl<'a> Document<'a> {
ctx: &Runtime,
opt: &Options,
txn: &Transaction,
_stm: &Statement,
_stm: &Statement<'_>,
) -> Result<(), Error> {
self.current.to_mut().clear(ctx, opt, txn).await
}

View file

@ -14,7 +14,7 @@ impl<'a> Document<'a> {
ctx: &Runtime,
opt: &Options,
txn: &Transaction,
_stm: &Statement,
_stm: &Statement<'_>,
) -> Result<(), Error> {
// Check events
if !opt.events {

View file

@ -11,7 +11,7 @@ impl<'a> Document<'a> {
_ctx: &Runtime,
_opt: &Options,
_txn: &Transaction,
_stm: &Statement,
_stm: &Statement<'_>,
) -> Result<(), Error> {
// Check if this record exists
if let Some(id) = &self.id {

View file

@ -14,7 +14,7 @@ impl<'a> Document<'a> {
ctx: &Runtime,
opt: &Options,
txn: &Transaction,
_stm: &Statement,
_stm: &Statement<'_>,
) -> Result<(), Error> {
// Loop through all field statements
for fd in self.fd(opt, txn).await?.iter() {

View file

@ -12,7 +12,7 @@ impl<'a> Document<'a> {
ctx: &Runtime,
opt: &Options,
txn: &Transaction,
_stm: &Statement,
_stm: &Statement<'_>,
) -> Result<(), Error> {
// Check if forced
if !opt.force && !self.changed() {

View file

@ -12,7 +12,7 @@ impl<'a> Document<'a> {
_ctx: &Runtime,
_opt: &Options,
_txn: &Transaction,
_stm: &Statement,
_stm: &Statement<'_>,
) -> Result<Value, Error> {
todo!()
}

View file

@ -11,7 +11,7 @@ impl<'a> Document<'a> {
_ctx: &Runtime,
_opt: &Options,
_txn: &Transaction,
_stm: &Statement,
_stm: &Statement<'_>,
) -> Result<(), Error> {
Ok(())
}

View file

@ -14,7 +14,7 @@ impl<'a> Document<'a> {
ctx: &Runtime,
opt: &Options,
txn: &Transaction,
stm: &Statement,
stm: &Statement<'_>,
) -> Result<(), Error> {
// Get the record id
let rid = self.id.as_ref().unwrap();

View file

@ -17,7 +17,7 @@ impl<'a> Document<'a> {
ctx: &Runtime,
opt: &Options,
txn: &Transaction,
stm: &Statement,
stm: &Statement<'_>,
) -> Result<Value, Error> {
// Ensure futures are run
let opt = &opt.futures(true);

View file

@ -11,7 +11,7 @@ impl<'a> Document<'a> {
_ctx: &Runtime,
opt: &Options,
txn: &Transaction,
_stm: &Statement,
_stm: &Statement<'_>,
) -> Result<(), Error> {
// Check if forced
if !opt.force && !self.changed() {

View file

@ -12,7 +12,7 @@ impl<'a> Document<'a> {
ctx: &Runtime,
opt: &Options,
txn: &Transaction,
stm: &Statement,
stm: &Statement<'_>,
) -> Result<Value, Error> {
// Check value type
self.admit(ctx, opt, txn, stm).await?;

View file

@ -12,7 +12,7 @@ impl<'a> Document<'a> {
ctx: &Runtime,
opt: &Options,
txn: &Transaction,
stm: &Statement,
stm: &Statement<'_>,
) -> Result<Value, Error> {
// Check if record exists
self.empty(ctx, opt, txn, stm).await?;

View file

@ -11,7 +11,7 @@ impl<'a> Document<'a> {
_ctx: &Runtime,
opt: &Options,
txn: &Transaction,
_stm: &Statement,
_stm: &Statement<'_>,
) -> Result<(), Error> {
// Check if forced
if !opt.force && !self.changed() {

View file

@ -11,7 +11,7 @@ impl<'a> Document<'a> {
_ctx: &Runtime,
_opt: &Options,
_txn: &Transaction,
_stm: &Statement,
_stm: &Statement<'_>,
) -> Result<(), Error> {
Ok(())
}

View file

@ -12,7 +12,7 @@ impl<'a> Document<'a> {
ctx: &Runtime,
opt: &Options,
txn: &Transaction,
stm: &Statement,
stm: &Statement<'_>,
) -> Result<Value, Error> {
// Check value type
self.admit(ctx, opt, txn, stm).await?;

View file

@ -7,15 +7,6 @@ use storekey::decode::Error as DecodeError;
use storekey::encode::Error as EncodeError;
use thiserror::Error;
#[cfg(feature = "kv-tikv")]
use tikv::Error as TiKVError;
#[cfg(feature = "kv-echodb")]
use echodb::err::Error as EchoDBError;
#[cfg(feature = "kv-indxdb")]
use indxdb::err::Error as IndxDBError;
#[cfg(feature = "parallel")]
use tokio::sync::mpsc::error::SendError as TokioError;
@ -241,22 +232,22 @@ impl From<Error> for String {
}
#[cfg(feature = "kv-echodb")]
impl From<EchoDBError> for Error {
fn from(e: EchoDBError) -> Error {
impl From<echodb::err::Error> for Error {
fn from(e: echodb::err::Error) -> Error {
Error::Tx(e.to_string())
}
}
#[cfg(feature = "kv-indxdb")]
impl From<IndxDBError> for Error {
fn from(e: IndxDBError) -> Error {
impl From<indxdb::err::Error> for Error {
fn from(e: indxdb::err::Error) -> Error {
Error::Tx(e.to_string())
}
}
#[cfg(feature = "kv-tikv")]
impl From<TiKVError> for Error {
fn from(e: TiKVError) -> Error {
impl From<tikv::Error> for Error {
fn from(e: tikv::Error) -> Error {
Error::Tx(e.to_string())
}
}
@ -268,9 +259,14 @@ impl From<TokioError<bytes::Bytes>> for Error {
}
}
#[cfg(feature = "parallel")]
impl From<TokioError<(Option<Thing>, Value)>> for Error {
fn from(e: TokioError<(Option<Thing>, Value)>) -> Error {
impl From<channel::RecvError> for Error {
fn from(e: channel::RecvError) -> Error {
Error::Channel(e.to_string())
}
}
impl From<channel::SendError<(Option<Thing>, Value)>> for Error {
fn from(e: channel::SendError<(Option<Thing>, Value)>) -> Error {
Error::Channel(e.to_string())
}
}

View file

@ -33,7 +33,6 @@ use nom::sequence::delimited;
use serde::{Deserialize, Serialize};
use std::fmt;
use std::ops::Deref;
use std::sync::Arc;
use std::time::Duration;
#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
@ -60,25 +59,25 @@ pub fn statements(i: &str) -> IResult<&str, Statements> {
#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
pub enum Statement {
Set(Arc<SetStatement>),
Use(Arc<UseStatement>),
Info(Arc<InfoStatement>),
Live(Arc<LiveStatement>),
Kill(Arc<KillStatement>),
Begin(Arc<BeginStatement>),
Cancel(Arc<CancelStatement>),
Commit(Arc<CommitStatement>),
Output(Arc<OutputStatement>),
Ifelse(Arc<IfelseStatement>),
Select(Arc<SelectStatement>),
Create(Arc<CreateStatement>),
Update(Arc<UpdateStatement>),
Relate(Arc<RelateStatement>),
Delete(Arc<DeleteStatement>),
Insert(Arc<InsertStatement>),
Define(Arc<DefineStatement>),
Remove(Arc<RemoveStatement>),
Option(Arc<OptionStatement>),
Set(SetStatement),
Use(UseStatement),
Info(InfoStatement),
Live(LiveStatement),
Kill(KillStatement),
Begin(BeginStatement),
Cancel(CancelStatement),
Commit(CommitStatement),
Output(OutputStatement),
Ifelse(IfelseStatement),
Select(SelectStatement),
Create(CreateStatement),
Update(UpdateStatement),
Relate(RelateStatement),
Delete(DeleteStatement),
Insert(InsertStatement),
Define(DefineStatement),
Remove(RemoveStatement),
Option(OptionStatement),
}
impl Statement {
@ -153,25 +152,25 @@ pub fn statement(i: &str) -> IResult<&str, Statement> {
delimited(
mightbespace,
alt((
map(set, |v| Statement::Set(Arc::new(v))),
map(yuse, |v| Statement::Use(Arc::new(v))),
map(info, |v| Statement::Info(Arc::new(v))),
map(live, |v| Statement::Live(Arc::new(v))),
map(kill, |v| Statement::Kill(Arc::new(v))),
map(begin, |v| Statement::Begin(Arc::new(v))),
map(cancel, |v| Statement::Cancel(Arc::new(v))),
map(commit, |v| Statement::Commit(Arc::new(v))),
map(output, |v| Statement::Output(Arc::new(v))),
map(ifelse, |v| Statement::Ifelse(Arc::new(v))),
map(select, |v| Statement::Select(Arc::new(v))),
map(create, |v| Statement::Create(Arc::new(v))),
map(update, |v| Statement::Update(Arc::new(v))),
map(relate, |v| Statement::Relate(Arc::new(v))),
map(delete, |v| Statement::Delete(Arc::new(v))),
map(insert, |v| Statement::Insert(Arc::new(v))),
map(define, |v| Statement::Define(Arc::new(v))),
map(remove, |v| Statement::Remove(Arc::new(v))),
map(option, |v| Statement::Option(Arc::new(v))),
map(set, Statement::Set),
map(yuse, Statement::Use),
map(info, Statement::Info),
map(live, Statement::Live),
map(kill, Statement::Kill),
map(begin, Statement::Begin),
map(cancel, Statement::Cancel),
map(commit, Statement::Commit),
map(output, Statement::Output),
map(ifelse, Statement::Ifelse),
map(select, Statement::Select),
map(create, Statement::Create),
map(update, Statement::Update),
map(relate, Statement::Relate),
map(delete, Statement::Delete),
map(insert, Statement::Insert),
map(define, Statement::Define),
map(remove, Statement::Remove),
map(option, Statement::Option),
)),
mightbespace,
)(i)

View file

@ -2,6 +2,7 @@ use crate::dbs::Iterator;
use crate::dbs::Level;
use crate::dbs::Options;
use crate::dbs::Runtime;
use crate::dbs::Statement;
use crate::dbs::Transaction;
use crate::err::Error;
use crate::sql::comment::shouldbespace;
@ -16,7 +17,6 @@ use nom::combinator::opt;
use nom::sequence::preceded;
use serde::{Deserialize, Serialize};
use std::fmt;
use std::sync::Arc;
#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize, Store)]
pub struct CreateStatement {
@ -29,7 +29,7 @@ pub struct CreateStatement {
impl CreateStatement {
pub(crate) async fn compute(
self: &Arc<Self>,
&self,
ctx: &Runtime,
opt: &Options,
txn: &Transaction,
@ -37,10 +37,8 @@ impl CreateStatement {
) -> Result<Value, Error> {
// Allowed to run?
opt.check(Level::No)?;
// Clone the statement
let s = Arc::clone(self);
// Create a new iterator
let mut i = Iterator::from(s);
let mut i = Iterator::new();
// Ensure futures are stored
let opt = &opt.futures(false);
// Loop over the create targets
@ -58,8 +56,10 @@ impl CreateStatement {
}
};
}
// Assign the statement
let stm = Statement::from(self);
// Output the results
i.output(ctx, opt, txn).await
i.output(ctx, opt, txn, &stm).await
}
}

View file

@ -2,6 +2,7 @@ use crate::dbs::Iterator;
use crate::dbs::Level;
use crate::dbs::Options;
use crate::dbs::Runtime;
use crate::dbs::Statement;
use crate::dbs::Transaction;
use crate::err::Error;
use crate::sql::comment::shouldbespace;
@ -17,7 +18,6 @@ use nom::sequence::preceded;
use nom::sequence::tuple;
use serde::{Deserialize, Serialize};
use std::fmt;
use std::sync::Arc;
#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize, Store)]
pub struct DeleteStatement {
@ -30,7 +30,7 @@ pub struct DeleteStatement {
impl DeleteStatement {
pub(crate) async fn compute(
self: &Arc<Self>,
&self,
ctx: &Runtime,
opt: &Options,
txn: &Transaction,
@ -38,10 +38,8 @@ impl DeleteStatement {
) -> Result<Value, Error> {
// Allowed to run?
opt.check(Level::No)?;
// Clone the statement
let s = Arc::clone(self);
// Create a new iterator
let mut i = Iterator::from(s);
let mut i = Iterator::new();
// Ensure futures are stored
let opt = &opt.futures(false);
// Loop over the delete targets
@ -59,8 +57,10 @@ impl DeleteStatement {
}
};
}
// Assign the statement
let stm = Statement::from(self);
// Output the results
i.output(ctx, opt, txn).await
i.output(ctx, opt, txn, &stm).await
}
}

View file

@ -2,6 +2,7 @@ use crate::dbs::Iterator;
use crate::dbs::Level;
use crate::dbs::Options;
use crate::dbs::Runtime;
use crate::dbs::Statement;
use crate::dbs::Transaction;
use crate::err::Error;
use crate::sql::comment::shouldbespace;
@ -18,7 +19,6 @@ use nom::combinator::opt;
use nom::sequence::preceded;
use serde::{Deserialize, Serialize};
use std::fmt;
use std::sync::Arc;
#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize, Store)]
pub struct InsertStatement {
@ -33,7 +33,7 @@ pub struct InsertStatement {
impl InsertStatement {
pub(crate) async fn compute(
self: &Arc<Self>,
&self,
ctx: &Runtime,
opt: &Options,
txn: &Transaction,
@ -41,10 +41,8 @@ impl InsertStatement {
) -> Result<Value, Error> {
// Allowed to run?
opt.check(Level::No)?;
// Clone the statement
let s = Arc::clone(self);
// Create a new iterator
let mut i = Iterator::from(s);
let mut i = Iterator::new();
// Ensure futures are stored
let opt = &opt.futures(false);
// Parse the expression
@ -66,8 +64,10 @@ impl InsertStatement {
}
_ => unreachable!(),
}
// Assign the statement
let stm = Statement::from(self);
// Output the results
i.output(ctx, opt, txn).await
i.output(ctx, opt, txn, &stm).await
}
}

View file

@ -2,6 +2,7 @@ use crate::dbs::Iterator;
use crate::dbs::Level;
use crate::dbs::Options;
use crate::dbs::Runtime;
use crate::dbs::Statement;
use crate::dbs::Transaction;
use crate::err::Error;
use crate::sql::comment::mightbespace;
@ -20,7 +21,6 @@ use nom::combinator::opt;
use nom::sequence::preceded;
use serde::{Deserialize, Serialize};
use std::fmt;
use std::sync::Arc;
#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize, Store)]
pub struct RelateStatement {
@ -36,7 +36,7 @@ pub struct RelateStatement {
impl RelateStatement {
pub(crate) async fn compute(
self: &Arc<Self>,
&self,
ctx: &Runtime,
opt: &Options,
txn: &Transaction,
@ -44,10 +44,8 @@ impl RelateStatement {
) -> Result<Value, Error> {
// Allowed to run?
opt.check(Level::No)?;
// Clone the statement
let s = Arc::clone(self);
// Create a new iterator
let mut i = Iterator::from(s);
let mut i = Iterator::new();
// Ensure futures are stored
let opt = &opt.futures(false);
// Loop over the select targets
@ -65,8 +63,10 @@ impl RelateStatement {
}
};
}
// Assign the statement
let stm = Statement::from(self);
// Output the results
i.output(ctx, opt, txn).await
i.output(ctx, opt, txn, &stm).await
}
}

View file

@ -2,6 +2,7 @@ use crate::dbs::Iterator;
use crate::dbs::Level;
use crate::dbs::Options;
use crate::dbs::Runtime;
use crate::dbs::Statement;
use crate::dbs::Transaction;
use crate::err::Error;
use crate::sql::comment::shouldbespace;
@ -23,7 +24,6 @@ use nom::combinator::opt;
use nom::sequence::preceded;
use serde::{Deserialize, Serialize};
use std::fmt;
use std::sync::Arc;
#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize, Store)]
pub struct SelectStatement {
@ -58,7 +58,7 @@ impl SelectStatement {
impl SelectStatement {
pub(crate) async fn compute(
self: &Arc<Self>,
&self,
ctx: &Runtime,
opt: &Options,
txn: &Transaction,
@ -66,10 +66,8 @@ impl SelectStatement {
) -> Result<Value, Error> {
// Allowed to run?
opt.check(Level::No)?;
// Clone the statement
let s = Arc::clone(self);
// Create a new iterator
let mut i = Iterator::from(s);
let mut i = Iterator::new();
// Ensure futures are processed
let opt = &opt.futures(true);
// Loop over the select targets
@ -83,8 +81,10 @@ impl SelectStatement {
v => i.prepare(v),
};
}
// Assign the statement
let stm = Statement::from(self);
// Output the results
i.output(ctx, opt, txn).await
i.output(ctx, opt, txn, &stm).await
}
}

View file

@ -2,6 +2,7 @@ use crate::dbs::Iterator;
use crate::dbs::Level;
use crate::dbs::Options;
use crate::dbs::Runtime;
use crate::dbs::Statement;
use crate::dbs::Transaction;
use crate::err::Error;
use crate::sql::comment::shouldbespace;
@ -17,7 +18,6 @@ use nom::combinator::opt;
use nom::sequence::preceded;
use serde::{Deserialize, Serialize};
use std::fmt;
use std::sync::Arc;
#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize, Store)]
pub struct UpdateStatement {
@ -31,7 +31,7 @@ pub struct UpdateStatement {
impl UpdateStatement {
pub(crate) async fn compute(
self: &Arc<Self>,
&self,
ctx: &Runtime,
opt: &Options,
txn: &Transaction,
@ -39,10 +39,8 @@ impl UpdateStatement {
) -> Result<Value, Error> {
// Allowed to run?
opt.check(Level::No)?;
// Clone the statement
let s = Arc::clone(self);
// Create a new iterator
let mut i = Iterator::from(s);
let mut i = Iterator::new();
// Ensure futures are stored
let opt = &opt.futures(false);
// Loop over the update targets
@ -60,8 +58,10 @@ impl UpdateStatement {
}
};
}
// Assign the statement
let stm = Statement::from(self);
// Output the results
i.output(ctx, opt, txn).await
i.output(ctx, opt, txn, &stm).await
}
}

View file

@ -18,18 +18,17 @@ use nom::combinator::map;
use serde::{Deserialize, Serialize};
use std::cmp::Ordering;
use std::fmt;
use std::sync::Arc;
#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
pub enum Subquery {
Value(Value),
Ifelse(IfelseStatement),
Select(Arc<SelectStatement>),
Create(Arc<CreateStatement>),
Update(Arc<UpdateStatement>),
Delete(Arc<DeleteStatement>),
Relate(Arc<RelateStatement>),
Insert(Arc<InsertStatement>),
Select(SelectStatement),
Create(CreateStatement),
Update(UpdateStatement),
Delete(DeleteStatement),
Relate(RelateStatement),
Insert(InsertStatement),
}
impl PartialOrd for Subquery {
@ -63,7 +62,7 @@ impl Subquery {
// Prepare context
let ctx = ctx.freeze();
// Process subquery
let res = Arc::clone(v).compute(&ctx, &opt, txn, doc).await?;
let res = v.compute(&ctx, &opt, txn, doc).await?;
// Process result
match v.limit() {
1 => match v.expr.single() {
@ -89,7 +88,7 @@ impl Subquery {
// Prepare context
let ctx = ctx.freeze();
// Process subquery
match Arc::clone(v).compute(&ctx, &opt, txn, doc).await? {
match v.compute(&ctx, &opt, txn, doc).await? {
Value::Array(mut v) => match v.len() {
1 => Ok(v.remove(0)),
_ => Ok(v.into()),
@ -110,7 +109,7 @@ impl Subquery {
// Prepare context
let ctx = ctx.freeze();
// Process subquery
match Arc::clone(v).compute(&ctx, &opt, txn, doc).await? {
match v.compute(&ctx, &opt, txn, doc).await? {
Value::Array(mut v) => match v.len() {
1 => Ok(v.remove(0)),
_ => Ok(v.into()),
@ -131,7 +130,7 @@ impl Subquery {
// Prepare context
let ctx = ctx.freeze();
// Process subquery
match Arc::clone(v).compute(&ctx, &opt, txn, doc).await? {
match v.compute(&ctx, &opt, txn, doc).await? {
Value::Array(mut v) => match v.len() {
1 => Ok(v.remove(0)),
_ => Ok(v.into()),
@ -152,7 +151,7 @@ impl Subquery {
// Prepare context
let ctx = ctx.freeze();
// Process subquery
match Arc::clone(v).compute(&ctx, &opt, txn, doc).await? {
match v.compute(&ctx, &opt, txn, doc).await? {
Value::Array(mut v) => match v.len() {
1 => Ok(v.remove(0)),
_ => Ok(v.into()),
@ -173,7 +172,7 @@ impl Subquery {
// Prepare context
let ctx = ctx.freeze();
// Process subquery
match Arc::clone(v).compute(&ctx, &opt, txn, doc).await? {
match v.compute(&ctx, &opt, txn, doc).await? {
Value::Array(mut v) => match v.len() {
1 => Ok(v.remove(0)),
_ => Ok(v.into()),
@ -212,12 +211,12 @@ fn subquery_ifelse(i: &str) -> IResult<&str, Subquery> {
fn subquery_others(i: &str) -> IResult<&str, Subquery> {
let (i, _) = char('(')(i)?;
let (i, v) = alt((
map(select, |v| Subquery::Select(Arc::new(v))),
map(create, |v| Subquery::Create(Arc::new(v))),
map(update, |v| Subquery::Update(Arc::new(v))),
map(delete, |v| Subquery::Delete(Arc::new(v))),
map(relate, |v| Subquery::Relate(Arc::new(v))),
map(insert, |v| Subquery::Insert(Arc::new(v))),
map(select, Subquery::Select),
map(create, Subquery::Create),
map(update, Subquery::Update),
map(delete, Subquery::Delete),
map(relate, Subquery::Relate),
map(insert, Subquery::Insert),
map(value, Subquery::Value),
))(i)?;
let (i, _) = char(')')(i)?;