2022-02-26 23:30:19 +00:00
|
|
|
use crate::cnf::MAX_CONCURRENT_TASKS;
|
2022-02-06 01:14:56 +00:00
|
|
|
use crate::ctx::Canceller;
|
|
|
|
use crate::ctx::Context;
|
|
|
|
use crate::dbs::Options;
|
2021-03-31 12:10:13 +00:00
|
|
|
use crate::dbs::Runtime;
|
2022-02-06 01:14:56 +00:00
|
|
|
use crate::dbs::Statement;
|
2022-02-15 01:00:30 +00:00
|
|
|
use crate::dbs::Transaction;
|
2022-02-06 01:14:56 +00:00
|
|
|
use crate::doc::Document;
|
2021-03-29 15:43:37 +00:00
|
|
|
use crate::err::Error;
|
2022-03-25 18:43:22 +00:00
|
|
|
use crate::sql::array::Array;
|
|
|
|
use crate::sql::field::Field;
|
2022-03-18 07:24:36 +00:00
|
|
|
use crate::sql::id::Id;
|
2022-03-23 14:01:28 +00:00
|
|
|
use crate::sql::part::Part;
|
2022-02-23 13:56:09 +00:00
|
|
|
use crate::sql::statements::create::CreateStatement;
|
|
|
|
use crate::sql::statements::delete::DeleteStatement;
|
|
|
|
use crate::sql::statements::insert::InsertStatement;
|
|
|
|
use crate::sql::statements::relate::RelateStatement;
|
|
|
|
use crate::sql::statements::select::SelectStatement;
|
|
|
|
use crate::sql::statements::update::UpdateStatement;
|
2022-01-13 07:31:21 +00:00
|
|
|
use crate::sql::table::Table;
|
|
|
|
use crate::sql::thing::Thing;
|
|
|
|
use crate::sql::value::Value;
|
2022-03-23 11:56:39 +00:00
|
|
|
use std::cmp::Ordering;
|
2022-03-25 18:43:22 +00:00
|
|
|
use std::collections::BTreeMap;
|
2022-01-13 07:31:21 +00:00
|
|
|
use std::mem;
|
2022-02-26 23:30:19 +00:00
|
|
|
use std::sync::Arc;
|
2022-02-06 01:14:56 +00:00
|
|
|
|
2022-01-13 07:31:21 +00:00
|
|
|
#[derive(Default)]
|
2022-02-26 23:30:19 +00:00
|
|
|
pub struct Iterator {
|
2022-02-06 01:14:56 +00:00
|
|
|
// Iterator status
|
|
|
|
run: Canceller,
|
2022-02-26 23:30:19 +00:00
|
|
|
// Iterator statement
|
|
|
|
stm: Statement,
|
|
|
|
// Iterator run option
|
|
|
|
parallel: bool,
|
2022-02-06 01:14:56 +00:00
|
|
|
// Iterator runtime error
|
|
|
|
error: Option<Error>,
|
|
|
|
// Iterator input values
|
|
|
|
readies: Vec<Value>,
|
|
|
|
// Iterator output results
|
|
|
|
results: Vec<Value>,
|
2022-01-13 07:31:21 +00:00
|
|
|
}
|
|
|
|
|
2022-02-26 23:30:19 +00:00
|
|
|
impl From<Arc<SelectStatement>> for Iterator {
|
|
|
|
fn from(v: Arc<SelectStatement>) -> Self {
|
2022-02-23 13:56:09 +00:00
|
|
|
Iterator {
|
2022-02-26 16:53:38 +00:00
|
|
|
parallel: v.parallel,
|
2022-02-26 23:30:19 +00:00
|
|
|
stm: Statement::from(v),
|
2022-02-23 13:56:09 +00:00
|
|
|
..Iterator::default()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-02-26 23:30:19 +00:00
|
|
|
impl From<Arc<CreateStatement>> for Iterator {
|
|
|
|
fn from(v: Arc<CreateStatement>) -> Self {
|
2022-02-23 13:56:09 +00:00
|
|
|
Iterator {
|
2022-02-26 16:53:38 +00:00
|
|
|
parallel: v.parallel,
|
2022-02-26 23:30:19 +00:00
|
|
|
stm: Statement::from(v),
|
2022-02-23 13:56:09 +00:00
|
|
|
..Iterator::default()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-02-26 23:30:19 +00:00
|
|
|
impl From<Arc<UpdateStatement>> for Iterator {
|
|
|
|
fn from(v: Arc<UpdateStatement>) -> Self {
|
2022-02-23 13:56:09 +00:00
|
|
|
Iterator {
|
2022-02-26 16:53:38 +00:00
|
|
|
parallel: v.parallel,
|
2022-02-26 23:30:19 +00:00
|
|
|
stm: Statement::from(v),
|
2022-02-23 13:56:09 +00:00
|
|
|
..Iterator::default()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-02-26 23:30:19 +00:00
|
|
|
impl From<Arc<RelateStatement>> for Iterator {
|
|
|
|
fn from(v: Arc<RelateStatement>) -> Self {
|
2022-02-23 13:56:09 +00:00
|
|
|
Iterator {
|
2022-02-26 16:53:38 +00:00
|
|
|
parallel: v.parallel,
|
2022-02-26 23:30:19 +00:00
|
|
|
stm: Statement::from(v),
|
2022-02-23 13:56:09 +00:00
|
|
|
..Iterator::default()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-02-26 23:30:19 +00:00
|
|
|
impl From<Arc<DeleteStatement>> for Iterator {
|
|
|
|
fn from(v: Arc<DeleteStatement>) -> Self {
|
2022-02-23 13:56:09 +00:00
|
|
|
Iterator {
|
2022-02-26 16:53:38 +00:00
|
|
|
parallel: v.parallel,
|
2022-02-26 23:30:19 +00:00
|
|
|
stm: Statement::from(v),
|
2022-02-23 13:56:09 +00:00
|
|
|
..Iterator::default()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-02-26 23:30:19 +00:00
|
|
|
impl From<Arc<InsertStatement>> for Iterator {
|
|
|
|
fn from(v: Arc<InsertStatement>) -> Self {
|
2022-02-23 13:56:09 +00:00
|
|
|
Iterator {
|
2022-02-26 16:53:38 +00:00
|
|
|
parallel: v.parallel,
|
2022-02-26 23:30:19 +00:00
|
|
|
stm: Statement::from(v),
|
2022-02-23 13:56:09 +00:00
|
|
|
..Iterator::default()
|
|
|
|
}
|
2022-01-13 07:31:21 +00:00
|
|
|
}
|
2022-02-23 13:56:09 +00:00
|
|
|
}
|
2022-01-13 07:31:21 +00:00
|
|
|
|
2022-02-26 23:30:19 +00:00
|
|
|
impl Iterator {
|
2022-02-06 01:14:56 +00:00
|
|
|
// Prepares a value for processing
|
|
|
|
pub fn prepare(&mut self, val: Value) {
|
|
|
|
self.readies.push(val)
|
2022-01-13 07:31:21 +00:00
|
|
|
}
|
|
|
|
|
2022-02-06 01:14:56 +00:00
|
|
|
// Create a new record for processing
|
|
|
|
pub fn produce(&mut self, val: Table) {
|
|
|
|
self.prepare(Value::Thing(Thing {
|
2022-03-04 16:01:32 +00:00
|
|
|
tb: val.name,
|
2022-03-18 07:24:36 +00:00
|
|
|
id: Id::rand(),
|
2022-02-06 01:14:56 +00:00
|
|
|
}))
|
2022-01-13 07:31:21 +00:00
|
|
|
}
|
|
|
|
|
2022-02-06 21:06:52 +00:00
|
|
|
// Process the records and output
|
2022-02-06 01:14:56 +00:00
|
|
|
pub async fn output(
|
|
|
|
&mut self,
|
|
|
|
ctx: &Runtime,
|
2022-02-06 21:06:52 +00:00
|
|
|
opt: &Options,
|
2022-02-15 03:33:16 +00:00
|
|
|
txn: &Transaction,
|
2022-02-06 01:14:56 +00:00
|
|
|
) -> Result<Value, Error> {
|
|
|
|
// Log the statement
|
2022-02-26 23:30:19 +00:00
|
|
|
trace!("Iterating: {}", self.stm);
|
2022-02-06 01:14:56 +00:00
|
|
|
// Enable context override
|
2022-03-04 16:01:32 +00:00
|
|
|
let mut ctx = Context::new(ctx);
|
2022-02-06 01:14:56 +00:00
|
|
|
self.run = ctx.add_cancel();
|
|
|
|
let ctx = ctx.freeze();
|
|
|
|
// Process prepared values
|
2022-02-15 01:00:30 +00:00
|
|
|
self.iterate(&ctx, opt, txn).await?;
|
2022-02-06 01:14:56 +00:00
|
|
|
// Return any document errors
|
|
|
|
if let Some(e) = self.error.take() {
|
|
|
|
return Err(e);
|
2022-01-13 07:31:21 +00:00
|
|
|
}
|
2022-02-06 01:14:56 +00:00
|
|
|
// Process any SPLIT clause
|
2022-03-21 21:23:47 +00:00
|
|
|
self.output_split(&ctx, opt, txn).await?;
|
2022-02-06 01:14:56 +00:00
|
|
|
// Process any GROUP clause
|
2022-03-21 21:23:47 +00:00
|
|
|
self.output_group(&ctx, opt, txn).await?;
|
2022-02-06 01:14:56 +00:00
|
|
|
// Process any ORDER clause
|
2022-03-21 21:23:47 +00:00
|
|
|
self.output_order(&ctx, opt, txn).await?;
|
2022-02-06 01:14:56 +00:00
|
|
|
// Process any START clause
|
2022-03-23 14:01:28 +00:00
|
|
|
self.output_start(&ctx, opt, txn).await?;
|
2022-02-06 01:14:56 +00:00
|
|
|
// Process any LIMIT clause
|
2022-03-23 14:01:28 +00:00
|
|
|
self.output_limit(&ctx, opt, txn).await?;
|
|
|
|
// Process any FETCH clause
|
|
|
|
self.output_fetch(&ctx, opt, txn).await?;
|
2022-02-06 01:14:56 +00:00
|
|
|
// Output the results
|
|
|
|
Ok(mem::take(&mut self.results).into())
|
2022-01-13 07:31:21 +00:00
|
|
|
}
|
|
|
|
|
2022-02-06 01:14:56 +00:00
|
|
|
#[inline]
|
2022-03-21 21:23:47 +00:00
|
|
|
async fn output_split(
|
|
|
|
&mut self,
|
|
|
|
ctx: &Runtime,
|
|
|
|
opt: &Options,
|
|
|
|
txn: &Transaction,
|
|
|
|
) -> Result<(), Error> {
|
|
|
|
if let Some(splits) = self.stm.split() {
|
2022-03-25 18:43:22 +00:00
|
|
|
// Loop over each split clause
|
|
|
|
for split in splits.iter() {
|
2022-03-21 21:23:47 +00:00
|
|
|
// Get the query result
|
|
|
|
let res = mem::take(&mut self.results);
|
|
|
|
// Loop over each value
|
|
|
|
for obj in &res {
|
|
|
|
// Get the value at the path
|
2022-03-23 11:51:36 +00:00
|
|
|
let val = obj.pick(&split.split);
|
2022-03-21 21:23:47 +00:00
|
|
|
// Set the value at the path
|
|
|
|
match val {
|
|
|
|
Value::Array(v) => {
|
|
|
|
for val in v.value {
|
|
|
|
// Make a copy of object
|
|
|
|
let mut obj = obj.clone();
|
|
|
|
// Set the value at the path
|
|
|
|
obj.set(ctx, opt, txn, &split.split, val).await?;
|
|
|
|
// Add the object to the results
|
|
|
|
self.results.push(obj);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
_ => {
|
|
|
|
// Make a copy of object
|
|
|
|
let mut obj = obj.clone();
|
|
|
|
// Set the value at the path
|
|
|
|
obj.set(ctx, opt, txn, &split.split, val).await?;
|
|
|
|
// Add the object to the results
|
|
|
|
self.results.push(obj);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2022-01-13 07:31:21 +00:00
|
|
|
}
|
2022-03-21 21:23:47 +00:00
|
|
|
Ok(())
|
2022-01-13 07:31:21 +00:00
|
|
|
}
|
|
|
|
|
2022-02-06 01:14:56 +00:00
|
|
|
#[inline]
|
2022-03-21 21:23:47 +00:00
|
|
|
async fn output_group(
|
|
|
|
&mut self,
|
2022-03-25 18:43:22 +00:00
|
|
|
ctx: &Runtime,
|
|
|
|
opt: &Options,
|
|
|
|
txn: &Transaction,
|
2022-03-21 21:23:47 +00:00
|
|
|
) -> Result<(), Error> {
|
2022-03-25 18:43:22 +00:00
|
|
|
if let Some(fields) = self.stm.expr() {
|
|
|
|
if let Some(groups) = self.stm.group() {
|
|
|
|
// Create the new grouped collection
|
|
|
|
let mut grp: BTreeMap<Array, Array> = BTreeMap::new();
|
|
|
|
// Get the query result
|
|
|
|
let res = mem::take(&mut self.results);
|
|
|
|
// Loop over each value
|
|
|
|
for obj in res {
|
|
|
|
// Create a new column set
|
|
|
|
let mut arr = Array::with_capacity(groups.len());
|
|
|
|
// Loop over each group clause
|
|
|
|
for group in groups.iter() {
|
|
|
|
// Get the value at the path
|
|
|
|
let val = obj.pick(&group.group);
|
|
|
|
// Set the value at the path
|
|
|
|
arr.value.push(val);
|
|
|
|
}
|
|
|
|
// Add to grouped collection
|
|
|
|
match grp.get_mut(&arr) {
|
|
|
|
Some(v) => v.value.push(obj),
|
|
|
|
None => {
|
|
|
|
grp.insert(arr, Array::from(obj));
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
// Loop over each grouped collection
|
|
|
|
for (_, vals) in grp {
|
|
|
|
// Create a new value
|
|
|
|
let mut obj = Value::base();
|
|
|
|
// Save the collected values
|
|
|
|
let vals = Value::from(vals);
|
|
|
|
// Loop over each group clause
|
|
|
|
for field in fields.other() {
|
|
|
|
// Process it if it is a normal field
|
|
|
|
if let Field::Alone(v) = field {
|
|
|
|
match v {
|
|
|
|
Value::Function(f) if f.is_aggregate() => {
|
|
|
|
let x = vals
|
2022-03-25 21:14:48 +00:00
|
|
|
.all()
|
2022-03-25 18:43:22 +00:00
|
|
|
.get(ctx, opt, txn, v.to_idiom().as_ref())
|
|
|
|
.await?;
|
|
|
|
let x = f.aggregate(x).compute(ctx, opt, txn, None).await?;
|
|
|
|
obj.set(ctx, opt, txn, v.to_idiom().as_ref(), x).await?;
|
|
|
|
}
|
|
|
|
_ => {
|
2022-03-25 21:14:48 +00:00
|
|
|
let x = vals.first();
|
2022-03-25 18:43:22 +00:00
|
|
|
let x = v.compute(ctx, opt, txn, Some(&x)).await?;
|
|
|
|
obj.set(ctx, opt, txn, v.to_idiom().as_ref(), x).await?;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
// Process it if it is a aliased field
|
|
|
|
if let Field::Alias(v, i) = field {
|
|
|
|
match v {
|
|
|
|
Value::Function(f) if f.is_aggregate() => {
|
2022-03-25 21:14:48 +00:00
|
|
|
let x = vals.all().get(ctx, opt, txn, i).await?;
|
2022-03-25 18:43:22 +00:00
|
|
|
let x = f.aggregate(x).compute(ctx, opt, txn, None).await?;
|
|
|
|
obj.set(ctx, opt, txn, i, x).await?;
|
|
|
|
}
|
|
|
|
_ => {
|
2022-03-25 21:14:48 +00:00
|
|
|
let x = vals.first();
|
2022-03-25 18:43:22 +00:00
|
|
|
let x = v.compute(ctx, opt, txn, Some(&x)).await?;
|
|
|
|
obj.set(ctx, opt, txn, i, x).await?;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
// Add the object to the results
|
|
|
|
self.results.push(obj);
|
|
|
|
}
|
|
|
|
}
|
2022-01-13 07:31:21 +00:00
|
|
|
}
|
2022-03-21 21:23:47 +00:00
|
|
|
Ok(())
|
2022-01-13 07:31:21 +00:00
|
|
|
}
|
|
|
|
|
2022-02-06 01:14:56 +00:00
|
|
|
#[inline]
|
2022-03-21 21:23:47 +00:00
|
|
|
async fn output_order(
|
|
|
|
&mut self,
|
|
|
|
_ctx: &Runtime,
|
|
|
|
_opt: &Options,
|
|
|
|
_txn: &Transaction,
|
|
|
|
) -> Result<(), Error> {
|
2022-03-23 11:56:39 +00:00
|
|
|
if let Some(orders) = self.stm.order() {
|
2022-03-25 18:43:22 +00:00
|
|
|
// Sort the full result set
|
2022-03-23 11:56:39 +00:00
|
|
|
self.results.sort_by(|a, b| {
|
2022-03-25 18:43:22 +00:00
|
|
|
// Loop over each order clause
|
|
|
|
for order in orders.iter() {
|
|
|
|
// Reverse the ordering if DESC
|
2022-04-27 01:30:03 +00:00
|
|
|
let o = match order.random {
|
|
|
|
true => {
|
|
|
|
let a = rand::random::<f64>();
|
|
|
|
let b = rand::random::<f64>();
|
|
|
|
a.partial_cmp(&b)
|
|
|
|
}
|
|
|
|
false => match order.direction {
|
2022-04-27 15:21:51 +00:00
|
|
|
true => a.compare(b, &order.order, order.collate, order.numeric),
|
|
|
|
false => b.compare(a, &order.order, order.collate, order.numeric),
|
2022-04-27 01:30:03 +00:00
|
|
|
},
|
2022-03-23 11:56:39 +00:00
|
|
|
};
|
2022-04-27 01:30:03 +00:00
|
|
|
//
|
2022-03-23 11:56:39 +00:00
|
|
|
match o {
|
|
|
|
Some(Ordering::Greater) => return Ordering::Greater,
|
|
|
|
Some(Ordering::Equal) => continue,
|
|
|
|
Some(Ordering::Less) => return Ordering::Less,
|
|
|
|
None => continue,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
Ordering::Equal
|
|
|
|
})
|
2022-01-13 07:31:21 +00:00
|
|
|
}
|
2022-03-21 21:23:47 +00:00
|
|
|
Ok(())
|
2022-01-13 07:31:21 +00:00
|
|
|
}
|
|
|
|
|
2022-03-23 14:01:28 +00:00
|
|
|
#[inline]
|
|
|
|
async fn output_start(
|
|
|
|
&mut self,
|
|
|
|
_ctx: &Runtime,
|
|
|
|
_opt: &Options,
|
|
|
|
_txn: &Transaction,
|
|
|
|
) -> Result<(), Error> {
|
|
|
|
if let Some(v) = self.stm.start() {
|
|
|
|
self.results = mem::take(&mut self.results).into_iter().skip(v.0).collect();
|
|
|
|
}
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
|
|
|
#[inline]
|
|
|
|
async fn output_limit(
|
|
|
|
&mut self,
|
|
|
|
_ctx: &Runtime,
|
|
|
|
_opt: &Options,
|
|
|
|
_txn: &Transaction,
|
|
|
|
) -> Result<(), Error> {
|
|
|
|
if let Some(v) = self.stm.limit() {
|
|
|
|
self.results = mem::take(&mut self.results).into_iter().take(v.0).collect();
|
|
|
|
}
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
|
|
|
#[inline]
|
|
|
|
async fn output_fetch(
|
|
|
|
&mut self,
|
|
|
|
ctx: &Runtime,
|
|
|
|
opt: &Options,
|
|
|
|
txn: &Transaction,
|
|
|
|
) -> Result<(), Error> {
|
|
|
|
if let Some(fetchs) = self.stm.fetch() {
|
|
|
|
for fetch in &fetchs.0 {
|
|
|
|
// Loop over each value
|
|
|
|
for obj in &mut self.results {
|
|
|
|
// Get the value at the path
|
|
|
|
let val = obj.get(ctx, opt, txn, &fetch.fetch).await?;
|
|
|
|
// Set the value at the path
|
|
|
|
match val {
|
|
|
|
Value::Array(v) => {
|
|
|
|
// Fetch all remote records
|
|
|
|
let val = Value::Array(v).get(ctx, opt, txn, &[Part::All]).await?;
|
|
|
|
// Set the value at the path
|
|
|
|
obj.set(ctx, opt, txn, &fetch.fetch, val).await?;
|
|
|
|
}
|
|
|
|
Value::Thing(v) => {
|
|
|
|
// Fetch all remote records
|
|
|
|
let val = Value::Thing(v).get(ctx, opt, txn, &[Part::All]).await?;
|
|
|
|
// Set the value at the path
|
|
|
|
obj.set(ctx, opt, txn, &fetch.fetch, val).await?;
|
|
|
|
}
|
|
|
|
_ => {
|
|
|
|
// Set the value at the path
|
|
|
|
obj.set(ctx, opt, txn, &fetch.fetch, val).await?;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
2022-02-22 14:16:50 +00:00
|
|
|
#[cfg(not(feature = "parallel"))]
|
|
|
|
async fn iterate(
|
|
|
|
&mut self,
|
|
|
|
ctx: &Runtime,
|
|
|
|
opt: &Options,
|
|
|
|
txn: &Transaction,
|
|
|
|
) -> Result<(), Error> {
|
|
|
|
// Process all prepared values
|
|
|
|
for v in mem::take(&mut self.readies) {
|
|
|
|
v.iterate(ctx, opt, txn, self).await?;
|
|
|
|
}
|
|
|
|
// Everything processed ok
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
|
|
|
#[cfg(feature = "parallel")]
|
2022-02-06 01:14:56 +00:00
|
|
|
async fn iterate(
|
|
|
|
&mut self,
|
|
|
|
ctx: &Runtime,
|
2022-02-06 21:06:52 +00:00
|
|
|
opt: &Options,
|
2022-02-15 03:33:16 +00:00
|
|
|
txn: &Transaction,
|
2022-02-06 01:14:56 +00:00
|
|
|
) -> Result<(), Error> {
|
|
|
|
match self.parallel {
|
2022-02-22 14:16:50 +00:00
|
|
|
// Run statements sequentially
|
|
|
|
false => {
|
|
|
|
// Process all prepared values
|
|
|
|
for v in mem::take(&mut self.readies) {
|
|
|
|
v.iterate(ctx, opt, txn, self).await?;
|
|
|
|
}
|
|
|
|
// Everything processed ok
|
|
|
|
Ok(())
|
|
|
|
}
|
2022-02-06 01:14:56 +00:00
|
|
|
// Run statements in parallel
|
|
|
|
true => {
|
2022-02-23 13:56:09 +00:00
|
|
|
let mut rcv = {
|
2022-03-18 19:51:13 +00:00
|
|
|
// Get current statement
|
|
|
|
let stm = &self.stm;
|
2022-02-23 13:56:09 +00:00
|
|
|
// Create an unbounded channel
|
2022-02-26 23:30:19 +00:00
|
|
|
let (chn, rx) = tokio::sync::mpsc::channel(MAX_CONCURRENT_TASKS);
|
2022-02-23 13:56:09 +00:00
|
|
|
// Process all prepared values
|
|
|
|
for v in mem::take(&mut self.readies) {
|
2022-02-26 23:30:19 +00:00
|
|
|
if ctx.is_ok() {
|
|
|
|
tokio::spawn(v.channel(
|
|
|
|
ctx.clone(),
|
|
|
|
opt.clone(),
|
2022-03-18 19:51:13 +00:00
|
|
|
stm.clone(),
|
2022-02-26 23:30:19 +00:00
|
|
|
txn.clone(),
|
|
|
|
chn.clone(),
|
|
|
|
));
|
|
|
|
}
|
2022-02-23 13:56:09 +00:00
|
|
|
}
|
2022-02-26 23:30:19 +00:00
|
|
|
// Return the receiver
|
|
|
|
rx
|
|
|
|
};
|
|
|
|
let mut rcv = {
|
|
|
|
// Clone the send values
|
|
|
|
let ctx = ctx.clone();
|
|
|
|
let opt = opt.clone();
|
|
|
|
let txn = txn.clone();
|
|
|
|
let stm = self.stm.clone();
|
|
|
|
// Create an unbounded channel
|
|
|
|
let (chn, rx) = tokio::sync::mpsc::channel(MAX_CONCURRENT_TASKS);
|
|
|
|
// Process all received values
|
|
|
|
tokio::spawn(async move {
|
|
|
|
while let Some((k, v)) = rcv.recv().await {
|
|
|
|
if ctx.is_ok() {
|
|
|
|
tokio::spawn(Document::compute(
|
|
|
|
ctx.clone(),
|
|
|
|
opt.clone(),
|
|
|
|
txn.clone(),
|
|
|
|
chn.clone(),
|
|
|
|
stm.clone(),
|
|
|
|
k,
|
|
|
|
v,
|
|
|
|
));
|
|
|
|
}
|
|
|
|
}
|
|
|
|
});
|
|
|
|
// Return the receiver
|
|
|
|
rx
|
2022-02-23 13:56:09 +00:00
|
|
|
};
|
2022-02-06 01:14:56 +00:00
|
|
|
// Process all processed values
|
2022-02-26 23:30:19 +00:00
|
|
|
while let Some(r) = rcv.recv().await {
|
|
|
|
self.result(r);
|
2022-02-06 01:14:56 +00:00
|
|
|
}
|
|
|
|
// Everything processed ok
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-02-23 13:56:09 +00:00
|
|
|
// Process a new record Thing and Value
|
2022-02-06 01:14:56 +00:00
|
|
|
pub async fn process(
|
|
|
|
&mut self,
|
|
|
|
ctx: &Runtime,
|
2022-02-06 21:06:52 +00:00
|
|
|
opt: &Options,
|
2022-02-15 03:33:16 +00:00
|
|
|
txn: &Transaction,
|
2022-02-06 01:14:56 +00:00
|
|
|
thg: Option<Thing>,
|
|
|
|
val: Value,
|
|
|
|
) {
|
|
|
|
// Check current context
|
|
|
|
if ctx.is_done() {
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
// Setup a new document
|
2022-02-13 19:03:00 +00:00
|
|
|
let mut doc = Document::new(thg, &val);
|
2022-02-06 01:14:56 +00:00
|
|
|
// Process the document
|
2022-02-26 23:30:19 +00:00
|
|
|
let res = match self.stm {
|
|
|
|
Statement::Select(_) => doc.select(ctx, opt, txn, &self.stm).await,
|
|
|
|
Statement::Create(_) => doc.create(ctx, opt, txn, &self.stm).await,
|
|
|
|
Statement::Update(_) => doc.update(ctx, opt, txn, &self.stm).await,
|
|
|
|
Statement::Relate(_) => doc.relate(ctx, opt, txn, &self.stm).await,
|
|
|
|
Statement::Delete(_) => doc.delete(ctx, opt, txn, &self.stm).await,
|
|
|
|
Statement::Insert(_) => doc.insert(ctx, opt, txn, &self.stm).await,
|
2022-02-06 01:14:56 +00:00
|
|
|
_ => unreachable!(),
|
|
|
|
};
|
2022-02-23 13:56:09 +00:00
|
|
|
// Process the result
|
|
|
|
self.result(res);
|
|
|
|
}
|
2022-02-06 01:14:56 +00:00
|
|
|
|
2022-02-23 13:56:09 +00:00
|
|
|
// Accept a processed record result
|
|
|
|
fn result(&mut self, res: Result<Value, Error>) {
|
2022-02-06 01:14:56 +00:00
|
|
|
// Process the result
|
|
|
|
match res {
|
2022-03-06 10:58:59 +00:00
|
|
|
Err(Error::Ignore) => {
|
2022-02-06 01:14:56 +00:00
|
|
|
return;
|
|
|
|
}
|
|
|
|
Err(e) => {
|
|
|
|
self.error = Some(e);
|
|
|
|
self.run.cancel();
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
Ok(v) => self.results.push(v),
|
|
|
|
}
|
|
|
|
// Check if we can exit
|
2022-03-04 16:01:32 +00:00
|
|
|
if self.stm.group().is_none() && self.stm.order().is_none() {
|
|
|
|
if let Some(l) = self.stm.limit() {
|
|
|
|
if let Some(s) = self.stm.start() {
|
|
|
|
if self.results.len() == l.0 + s.0 {
|
|
|
|
self.run.cancel()
|
2022-02-06 01:14:56 +00:00
|
|
|
}
|
2022-03-04 16:01:32 +00:00
|
|
|
} else if self.results.len() == l.0 {
|
|
|
|
self.run.cancel()
|
2022-02-06 01:14:56 +00:00
|
|
|
}
|
|
|
|
}
|
2022-01-13 07:31:21 +00:00
|
|
|
}
|
2021-03-29 15:43:37 +00:00
|
|
|
}
|
|
|
|
}
|