2022-02-14 10:13:50 +00:00
|
|
|
use crate::cnf::ID_CHARS;
|
2022-02-06 01:14:56 +00:00
|
|
|
use crate::ctx::Canceller;
|
|
|
|
use crate::ctx::Context;
|
|
|
|
use crate::dbs::Options;
|
2021-03-31 12:10:13 +00:00
|
|
|
use crate::dbs::Runtime;
|
2022-02-06 01:14:56 +00:00
|
|
|
use crate::dbs::Statement;
|
2022-02-15 01:00:30 +00:00
|
|
|
use crate::dbs::Transaction;
|
2022-02-06 01:14:56 +00:00
|
|
|
use crate::doc::Document;
|
2021-03-29 15:43:37 +00:00
|
|
|
use crate::err::Error;
|
2022-01-13 07:31:21 +00:00
|
|
|
use crate::sql::group::Groups;
|
|
|
|
use crate::sql::limit::Limit;
|
|
|
|
use crate::sql::order::Orders;
|
|
|
|
use crate::sql::split::Splits;
|
|
|
|
use crate::sql::start::Start;
|
2022-02-23 13:56:09 +00:00
|
|
|
use crate::sql::statements::create::CreateStatement;
|
|
|
|
use crate::sql::statements::delete::DeleteStatement;
|
|
|
|
use crate::sql::statements::insert::InsertStatement;
|
|
|
|
use crate::sql::statements::relate::RelateStatement;
|
|
|
|
use crate::sql::statements::select::SelectStatement;
|
|
|
|
use crate::sql::statements::update::UpdateStatement;
|
2022-01-13 07:31:21 +00:00
|
|
|
use crate::sql::table::Table;
|
|
|
|
use crate::sql::thing::Thing;
|
|
|
|
use crate::sql::value::Value;
|
|
|
|
use crate::sql::version::Version;
|
2022-02-14 10:13:50 +00:00
|
|
|
use nanoid::nanoid;
|
2022-01-13 07:31:21 +00:00
|
|
|
use std::mem;
|
2022-02-06 01:14:56 +00:00
|
|
|
|
2022-01-13 07:31:21 +00:00
|
|
|
#[derive(Default)]
|
|
|
|
pub struct Iterator<'a> {
|
2022-02-06 01:14:56 +00:00
|
|
|
// Iterator status
|
|
|
|
run: Canceller,
|
|
|
|
// Iterator runtime error
|
|
|
|
error: Option<Error>,
|
|
|
|
// Iterator input values
|
|
|
|
readies: Vec<Value>,
|
|
|
|
// Iterator output results
|
|
|
|
results: Vec<Value>,
|
|
|
|
// Iterate options
|
|
|
|
pub parallel: bool,
|
|
|
|
// Underlying statement
|
|
|
|
pub stmt: Statement<'a>,
|
2022-01-13 07:31:21 +00:00
|
|
|
// Iterator options
|
|
|
|
pub split: Option<&'a Splits>,
|
|
|
|
pub group: Option<&'a Groups>,
|
|
|
|
pub order: Option<&'a Orders>,
|
|
|
|
pub limit: Option<&'a Limit>,
|
|
|
|
pub start: Option<&'a Start>,
|
|
|
|
pub version: Option<&'a Version>,
|
|
|
|
}
|
|
|
|
|
2022-02-23 13:56:09 +00:00
|
|
|
impl<'a> From<&'a SelectStatement> for Iterator<'a> {
|
|
|
|
fn from(v: &'a SelectStatement) -> Self {
|
|
|
|
Iterator {
|
|
|
|
stmt: Statement::from(v),
|
|
|
|
split: v.split.as_ref(),
|
|
|
|
group: v.group.as_ref(),
|
|
|
|
order: v.order.as_ref(),
|
|
|
|
limit: v.limit.as_ref(),
|
|
|
|
start: v.start.as_ref(),
|
2022-02-26 16:53:38 +00:00
|
|
|
parallel: v.parallel,
|
2022-02-23 13:56:09 +00:00
|
|
|
..Iterator::default()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl<'a> From<&'a CreateStatement> for Iterator<'a> {
|
|
|
|
fn from(v: &'a CreateStatement) -> Self {
|
|
|
|
Iterator {
|
|
|
|
stmt: Statement::from(v),
|
2022-02-26 16:53:38 +00:00
|
|
|
parallel: v.parallel,
|
2022-02-23 13:56:09 +00:00
|
|
|
..Iterator::default()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl<'a> From<&'a UpdateStatement> for Iterator<'a> {
|
|
|
|
fn from(v: &'a UpdateStatement) -> Self {
|
|
|
|
Iterator {
|
|
|
|
stmt: Statement::from(v),
|
2022-02-26 16:53:38 +00:00
|
|
|
parallel: v.parallel,
|
2022-02-23 13:56:09 +00:00
|
|
|
..Iterator::default()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl<'a> From<&'a RelateStatement> for Iterator<'a> {
|
|
|
|
fn from(v: &'a RelateStatement) -> Self {
|
|
|
|
Iterator {
|
|
|
|
stmt: Statement::from(v),
|
2022-02-26 16:53:38 +00:00
|
|
|
parallel: v.parallel,
|
2022-02-23 13:56:09 +00:00
|
|
|
..Iterator::default()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl<'a> From<&'a DeleteStatement> for Iterator<'a> {
|
|
|
|
fn from(v: &'a DeleteStatement) -> Self {
|
|
|
|
Iterator {
|
|
|
|
stmt: Statement::from(v),
|
2022-02-26 16:53:38 +00:00
|
|
|
parallel: v.parallel,
|
2022-02-23 13:56:09 +00:00
|
|
|
..Iterator::default()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl<'a> From<&'a InsertStatement> for Iterator<'a> {
|
|
|
|
fn from(v: &'a InsertStatement) -> Self {
|
|
|
|
Iterator {
|
|
|
|
stmt: Statement::from(v),
|
2022-02-26 16:53:38 +00:00
|
|
|
parallel: v.parallel,
|
2022-02-23 13:56:09 +00:00
|
|
|
..Iterator::default()
|
|
|
|
}
|
2022-01-13 07:31:21 +00:00
|
|
|
}
|
2022-02-23 13:56:09 +00:00
|
|
|
}
|
2022-01-13 07:31:21 +00:00
|
|
|
|
2022-02-23 13:56:09 +00:00
|
|
|
impl<'a> Iterator<'a> {
|
2022-02-06 01:14:56 +00:00
|
|
|
// Prepares a value for processing
|
|
|
|
pub fn prepare(&mut self, val: Value) {
|
|
|
|
self.readies.push(val)
|
2022-01-13 07:31:21 +00:00
|
|
|
}
|
|
|
|
|
2022-02-06 01:14:56 +00:00
|
|
|
// Create a new record for processing
|
|
|
|
pub fn produce(&mut self, val: Table) {
|
|
|
|
self.prepare(Value::Thing(Thing {
|
|
|
|
tb: val.name.to_string(),
|
2022-02-14 10:13:50 +00:00
|
|
|
id: nanoid!(20, &ID_CHARS),
|
2022-02-06 01:14:56 +00:00
|
|
|
}))
|
2022-01-13 07:31:21 +00:00
|
|
|
}
|
|
|
|
|
2022-02-06 21:06:52 +00:00
|
|
|
// Process the records and output
|
2022-02-06 01:14:56 +00:00
|
|
|
pub async fn output(
|
|
|
|
&mut self,
|
|
|
|
ctx: &Runtime,
|
2022-02-06 21:06:52 +00:00
|
|
|
opt: &Options,
|
2022-02-15 03:33:16 +00:00
|
|
|
txn: &Transaction,
|
2022-02-06 01:14:56 +00:00
|
|
|
) -> Result<Value, Error> {
|
|
|
|
// Log the statement
|
2022-02-23 13:29:29 +00:00
|
|
|
trace!("Iterating: {}", self.stmt);
|
2022-02-06 01:14:56 +00:00
|
|
|
// Enable context override
|
|
|
|
let mut ctx = Context::new(&ctx);
|
|
|
|
self.run = ctx.add_cancel();
|
|
|
|
let ctx = ctx.freeze();
|
|
|
|
// Process prepared values
|
2022-02-15 01:00:30 +00:00
|
|
|
self.iterate(&ctx, opt, txn).await?;
|
2022-02-06 01:14:56 +00:00
|
|
|
// Return any document errors
|
|
|
|
if let Some(e) = self.error.take() {
|
|
|
|
return Err(e);
|
2022-01-13 07:31:21 +00:00
|
|
|
}
|
2022-02-06 01:14:56 +00:00
|
|
|
// Process any SPLIT clause
|
2022-02-15 01:00:30 +00:00
|
|
|
self.output_split(&ctx, opt, txn);
|
2022-02-06 01:14:56 +00:00
|
|
|
// Process any GROUP clause
|
2022-02-15 01:00:30 +00:00
|
|
|
self.output_group(&ctx, opt, txn);
|
2022-02-06 01:14:56 +00:00
|
|
|
// Process any ORDER clause
|
2022-02-15 01:00:30 +00:00
|
|
|
self.output_order(&ctx, opt, txn);
|
2022-02-06 01:14:56 +00:00
|
|
|
// Process any START clause
|
2022-02-15 01:00:30 +00:00
|
|
|
self.output_start(&ctx, opt, txn);
|
2022-02-06 01:14:56 +00:00
|
|
|
// Process any LIMIT clause
|
2022-02-15 01:00:30 +00:00
|
|
|
self.output_limit(&ctx, opt, txn);
|
2022-02-06 01:14:56 +00:00
|
|
|
// Output the results
|
|
|
|
Ok(mem::take(&mut self.results).into())
|
2022-01-13 07:31:21 +00:00
|
|
|
}
|
|
|
|
|
2022-02-06 01:14:56 +00:00
|
|
|
#[inline]
|
2022-02-15 01:00:30 +00:00
|
|
|
fn output_split(&mut self, ctx: &Runtime, opt: &Options, txn: &Transaction) {
|
2022-01-13 07:31:21 +00:00
|
|
|
if self.split.is_some() {
|
|
|
|
// Ignore
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-02-06 01:14:56 +00:00
|
|
|
#[inline]
|
2022-02-15 01:00:30 +00:00
|
|
|
fn output_group(&mut self, ctx: &Runtime, opt: &Options, txn: &Transaction) {
|
2022-01-13 07:31:21 +00:00
|
|
|
if self.group.is_some() {
|
|
|
|
// Ignore
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-02-06 01:14:56 +00:00
|
|
|
#[inline]
|
2022-02-15 01:00:30 +00:00
|
|
|
fn output_order(&mut self, ctx: &Runtime, opt: &Options, txn: &Transaction) {
|
2022-01-13 07:31:21 +00:00
|
|
|
if self.order.is_some() {
|
|
|
|
// Ignore
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-02-06 01:14:56 +00:00
|
|
|
#[inline]
|
2022-02-15 01:00:30 +00:00
|
|
|
fn output_start(&mut self, ctx: &Runtime, opt: &Options, txn: &Transaction) {
|
2022-01-13 07:31:21 +00:00
|
|
|
if let Some(v) = self.start {
|
2022-02-06 01:14:56 +00:00
|
|
|
self.results = mem::take(&mut self.results).into_iter().skip(v.0).collect();
|
2022-01-13 07:31:21 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-02-06 01:14:56 +00:00
|
|
|
#[inline]
|
2022-02-15 01:00:30 +00:00
|
|
|
fn output_limit(&mut self, ctx: &Runtime, opt: &Options, txn: &Transaction) {
|
2022-01-13 07:31:21 +00:00
|
|
|
if let Some(v) = self.limit {
|
2022-02-06 01:14:56 +00:00
|
|
|
self.results = mem::take(&mut self.results).into_iter().take(v.0).collect();
|
2022-01-13 07:31:21 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-02-22 14:16:50 +00:00
|
|
|
#[cfg(not(feature = "parallel"))]
|
|
|
|
async fn iterate(
|
|
|
|
&mut self,
|
|
|
|
ctx: &Runtime,
|
|
|
|
opt: &Options,
|
|
|
|
txn: &Transaction,
|
|
|
|
) -> Result<(), Error> {
|
|
|
|
// Process all prepared values
|
|
|
|
for v in mem::take(&mut self.readies) {
|
|
|
|
v.iterate(ctx, opt, txn, self).await?;
|
|
|
|
}
|
|
|
|
// Everything processed ok
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
|
|
|
#[cfg(feature = "parallel")]
|
2022-02-06 01:14:56 +00:00
|
|
|
async fn iterate(
|
|
|
|
&mut self,
|
|
|
|
ctx: &Runtime,
|
2022-02-06 21:06:52 +00:00
|
|
|
opt: &Options,
|
2022-02-15 03:33:16 +00:00
|
|
|
txn: &Transaction,
|
2022-02-06 01:14:56 +00:00
|
|
|
) -> Result<(), Error> {
|
|
|
|
match self.parallel {
|
2022-02-22 14:16:50 +00:00
|
|
|
// Run statements sequentially
|
|
|
|
false => {
|
|
|
|
// Process all prepared values
|
|
|
|
for v in mem::take(&mut self.readies) {
|
|
|
|
v.iterate(ctx, opt, txn, self).await?;
|
|
|
|
}
|
|
|
|
// Everything processed ok
|
|
|
|
Ok(())
|
|
|
|
}
|
2022-02-06 01:14:56 +00:00
|
|
|
// Run statements in parallel
|
|
|
|
true => {
|
2022-02-23 13:56:09 +00:00
|
|
|
let mut rcv = {
|
|
|
|
// Use multi producer channel
|
|
|
|
use tokio::sync::mpsc;
|
|
|
|
// Create an unbounded channel
|
|
|
|
let (chn, rcv) = mpsc::unbounded_channel();
|
|
|
|
// Process all prepared values
|
|
|
|
for v in mem::take(&mut self.readies) {
|
|
|
|
tokio::spawn(v.channel(ctx.clone(), opt.clone(), txn.clone(), chn.clone()));
|
|
|
|
}
|
|
|
|
//
|
|
|
|
rcv
|
|
|
|
};
|
2022-02-06 01:14:56 +00:00
|
|
|
// Process all processed values
|
2022-02-23 13:56:09 +00:00
|
|
|
while let Some((k, v)) = rcv.recv().await {
|
2022-02-15 01:00:30 +00:00
|
|
|
self.process(&ctx, opt, txn, k, v).await;
|
2022-02-06 01:14:56 +00:00
|
|
|
}
|
|
|
|
// Everything processed ok
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-02-23 13:56:09 +00:00
|
|
|
// Process a new record Thing and Value
|
2022-02-06 01:14:56 +00:00
|
|
|
pub async fn process(
|
|
|
|
&mut self,
|
|
|
|
ctx: &Runtime,
|
2022-02-06 21:06:52 +00:00
|
|
|
opt: &Options,
|
2022-02-15 03:33:16 +00:00
|
|
|
txn: &Transaction,
|
2022-02-06 01:14:56 +00:00
|
|
|
thg: Option<Thing>,
|
|
|
|
val: Value,
|
|
|
|
) {
|
|
|
|
// Check current context
|
|
|
|
if ctx.is_done() {
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
// Setup a new document
|
2022-02-13 19:03:00 +00:00
|
|
|
let mut doc = Document::new(thg, &val);
|
2022-02-06 01:14:56 +00:00
|
|
|
// Process the document
|
|
|
|
let res = match self.stmt {
|
2022-02-15 01:00:30 +00:00
|
|
|
Statement::Select(_) => doc.select(ctx, opt, txn, &self.stmt).await,
|
|
|
|
Statement::Create(_) => doc.create(ctx, opt, txn, &self.stmt).await,
|
|
|
|
Statement::Update(_) => doc.update(ctx, opt, txn, &self.stmt).await,
|
|
|
|
Statement::Relate(_) => doc.relate(ctx, opt, txn, &self.stmt).await,
|
|
|
|
Statement::Delete(_) => doc.delete(ctx, opt, txn, &self.stmt).await,
|
|
|
|
Statement::Insert(_) => doc.insert(ctx, opt, txn, &self.stmt).await,
|
2022-02-06 01:14:56 +00:00
|
|
|
_ => unreachable!(),
|
|
|
|
};
|
2022-02-23 13:56:09 +00:00
|
|
|
// Process the result
|
|
|
|
self.result(res);
|
|
|
|
}
|
2022-02-06 01:14:56 +00:00
|
|
|
|
2022-02-23 13:56:09 +00:00
|
|
|
// Accept a processed record result
|
|
|
|
fn result(&mut self, res: Result<Value, Error>) {
|
2022-02-06 01:14:56 +00:00
|
|
|
// Process the result
|
|
|
|
match res {
|
|
|
|
Err(Error::IgnoreError) => {
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
Err(e) => {
|
|
|
|
self.error = Some(e);
|
|
|
|
self.run.cancel();
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
Ok(v) => self.results.push(v),
|
|
|
|
}
|
|
|
|
// Check if we can exit
|
|
|
|
if self.group.is_none() {
|
|
|
|
if self.order.is_none() {
|
|
|
|
if let Some(l) = self.limit {
|
|
|
|
if let Some(s) = self.start {
|
|
|
|
if self.results.len() == l.0 + s.0 {
|
|
|
|
self.run.cancel()
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
if self.results.len() == l.0 {
|
|
|
|
self.run.cancel()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2022-01-13 07:31:21 +00:00
|
|
|
}
|
2021-03-29 15:43:37 +00:00
|
|
|
}
|
|
|
|
}
|