use crate::ctx::Canceller; use crate::ctx::Context; use crate::dbs::Options; use crate::dbs::Statement; use crate::dbs::Transaction; use crate::doc::Document; use crate::err::Error; use crate::sql::array::Array; use crate::sql::field::Field; use crate::sql::part::Part; use crate::sql::table::Table; use crate::sql::thing::Thing; use crate::sql::value::Value; use std::cmp::Ordering; use std::collections::BTreeMap; use std::mem; pub enum Iterable { Value(Value), Table(Table), Thing(Thing), Mergeable(Thing, Value), Relatable(Thing, Thing, Thing), } pub enum Operable { Value(Value), Mergeable(Value, Value), Relatable(Thing, Value, Thing), } pub enum Workable { Normal, Insert(Value), Relate(Thing, Thing), } #[derive(Default)] pub struct Iterator { // Iterator status run: Canceller, // Iterator runtime error error: Option, // Iterator output results results: Vec, // Iterator input values entries: Vec, } impl Iterator { // Creates a new iterator pub fn new() -> Self { Self::default() } // Prepares a value for processing pub fn ingest(&mut self, val: Iterable) { self.entries.push(val) } // Process the records and output pub async fn output( &mut self, ctx: &Context<'_>, opt: &Options, txn: &Transaction, stm: &Statement<'_>, ) -> Result { // Log the statement trace!("Iterating: {}", stm); // Enable context override let mut ctx = Context::new(ctx); self.run = ctx.add_cancel(); // Process prepared values self.iterate(&ctx, opt, txn, stm).await?; // Return any document errors if let Some(e) = self.error.take() { return Err(e); } // Process any SPLIT clause self.output_split(&ctx, opt, txn, stm).await?; // Process any GROUP clause self.output_group(&ctx, opt, txn, stm).await?; // Process any ORDER clause self.output_order(&ctx, opt, txn, stm).await?; // Process any START clause self.output_start(&ctx, opt, txn, stm).await?; // Process any LIMIT clause self.output_limit(&ctx, opt, txn, stm).await?; // Process any FETCH clause self.output_fetch(&ctx, opt, txn, stm).await?; // Output the results Ok(mem::take(&mut self.results).into()) } #[inline] async fn output_split( &mut self, ctx: &Context<'_>, opt: &Options, txn: &Transaction, stm: &Statement<'_>, ) -> Result<(), Error> { if let Some(splits) = stm.split() { // Loop over each split clause for split in splits.iter() { // Get the query result let res = mem::take(&mut self.results); // Loop over each value for obj in &res { // Get the value at the path let val = obj.pick(split); // Set the value at the path match val { Value::Array(v) => { for val in v { // Make a copy of object let mut obj = obj.clone(); // Set the value at the path obj.set(ctx, opt, txn, split, val).await?; // Add the object to the results self.results.push(obj); } } _ => { // Make a copy of object let mut obj = obj.clone(); // Set the value at the path obj.set(ctx, opt, txn, split, val).await?; // Add the object to the results self.results.push(obj); } } } } } Ok(()) } #[inline] async fn output_group( &mut self, ctx: &Context<'_>, opt: &Options, txn: &Transaction, stm: &Statement<'_>, ) -> Result<(), Error> { if let Some(fields) = stm.expr() { if let Some(groups) = stm.group() { // Create the new grouped collection let mut grp: BTreeMap = BTreeMap::new(); // Get the query result let res = mem::take(&mut self.results); // Loop over each value for obj in res { // Create a new column set let mut arr = Array::with_capacity(groups.len()); // Loop over each group clause for group in groups.iter() { // Get the value at the path let val = obj.pick(group); // Set the value at the path arr.push(val); } // Add to grouped collection match grp.get_mut(&arr) { Some(v) => v.push(obj), None => { grp.insert(arr, Array::from(obj)); } } } // Loop over each grouped collection for (_, vals) in grp { // Create a new value let mut obj = Value::base(); // Save the collected values let vals = Value::from(vals); // Loop over each group clause for field in fields.other() { // Process it if it is a normal field if let Field::Alone(v) = field { match v { Value::Function(f) if f.is_aggregate() => { let x = vals .all() .get(ctx, opt, txn, v.to_idiom().as_ref()) .await?; let x = f.aggregate(x).compute(ctx, opt, txn, None).await?; obj.set(ctx, opt, txn, v.to_idiom().as_ref(), x).await?; } _ => { let x = vals.first(); let x = v.compute(ctx, opt, txn, Some(&x)).await?; obj.set(ctx, opt, txn, v.to_idiom().as_ref(), x).await?; } } } // Process it if it is a aliased field if let Field::Alias(v, i) = field { match v { Value::Function(f) if f.is_aggregate() => { let x = vals.all().get(ctx, opt, txn, i).await?; let x = f.aggregate(x).compute(ctx, opt, txn, None).await?; obj.set(ctx, opt, txn, i, x).await?; } _ => { let x = vals.first(); let x = v.compute(ctx, opt, txn, Some(&x)).await?; obj.set(ctx, opt, txn, i, x).await?; } } } } // Add the object to the results self.results.push(obj); } } } Ok(()) } #[inline] async fn output_order( &mut self, _ctx: &Context<'_>, _opt: &Options, _txn: &Transaction, stm: &Statement<'_>, ) -> Result<(), Error> { if let Some(orders) = stm.order() { // Sort the full result set self.results.sort_by(|a, b| { // Loop over each order clause for order in orders.iter() { // Reverse the ordering if DESC let o = match order.random { true => { let a = rand::random::(); let b = rand::random::(); a.partial_cmp(&b) } false => match order.direction { true => a.compare(b, order, order.collate, order.numeric), false => b.compare(a, order, order.collate, order.numeric), }, }; // match o { Some(Ordering::Greater) => return Ordering::Greater, Some(Ordering::Equal) => continue, Some(Ordering::Less) => return Ordering::Less, None => continue, } } Ordering::Equal }) } Ok(()) } #[inline] async fn output_start( &mut self, _ctx: &Context<'_>, _opt: &Options, _txn: &Transaction, stm: &Statement<'_>, ) -> Result<(), Error> { if let Some(v) = stm.start() { self.results = mem::take(&mut self.results).into_iter().skip(v.0).collect(); } Ok(()) } #[inline] async fn output_limit( &mut self, _ctx: &Context<'_>, _opt: &Options, _txn: &Transaction, stm: &Statement<'_>, ) -> Result<(), Error> { if let Some(v) = stm.limit() { self.results = mem::take(&mut self.results).into_iter().take(v.0).collect(); } Ok(()) } #[inline] async fn output_fetch( &mut self, ctx: &Context<'_>, opt: &Options, txn: &Transaction, stm: &Statement<'_>, ) -> Result<(), Error> { if let Some(fetchs) = stm.fetch() { for fetch in &fetchs.0 { // Loop over each value for obj in &mut self.results { // Get the value at the path let val = obj.get(ctx, opt, txn, fetch).await?; // Set the value at the path match val { Value::Array(v) => { // Fetch all remote records let val = Value::Array(v).get(ctx, opt, txn, &[Part::All]).await?; // Set the value at the path obj.set(ctx, opt, txn, fetch, val).await?; } Value::Thing(v) => { // Fetch all remote records let val = Value::Thing(v).get(ctx, opt, txn, &[Part::All]).await?; // Set the value at the path obj.set(ctx, opt, txn, fetch, val).await?; } _ => { // Set the value at the path obj.set(ctx, opt, txn, fetch, val).await?; } } } } } Ok(()) } #[cfg(not(feature = "parallel"))] async fn iterate( &mut self, ctx: &Context<'_>, opt: &Options, txn: &Transaction, stm: &Statement<'_>, ) -> Result<(), Error> { // Process all prepared values for v in mem::take(&mut self.entries) { v.iterate(ctx, opt, txn, stm, self).await?; } // Everything processed ok Ok(()) } #[cfg(feature = "parallel")] async fn iterate( &mut self, ctx: &Context<'_>, opt: &Options, txn: &Transaction, stm: &Statement<'_>, ) -> Result<(), Error> { match stm.parallel() { // Run statements sequentially false => { // Process all prepared values for v in mem::take(&mut self.entries) { v.iterate(ctx, opt, txn, stm, self).await?; } // Everything processed ok Ok(()) } // Run statements in parallel true => { // Create a new executor let exe = executor::Executor::new(); // Take all of the iterator values let vals = mem::take(&mut self.entries); // Create a channel to shutdown let (end, exit) = channel::bounded::<()>(1); // Create an unbounded channel let (chn, docs) = channel::bounded(crate::cnf::MAX_CONCURRENT_TASKS); // Create an async closure for prepared values let adocs = async { // Process all prepared values for v in vals { exe.spawn(v.channel(ctx, opt, txn, stm, chn.clone())) // Ensure we detach the spawned task .detach(); } // Drop the uncloned channel instance drop(chn); }; // Create an unbounded channel let (chn, vals) = channel::bounded(crate::cnf::MAX_CONCURRENT_TASKS); // Create an async closure for received values let avals = async { // Process all received values while let Ok((k, v)) = docs.recv().await { exe.spawn(Document::compute(ctx, opt, txn, stm, chn.clone(), k, v)) // Ensure we detach the spawned task .detach(); } // Drop the uncloned channel instance drop(chn); }; // Create an async closure to process results let aproc = async { // Process all processed values while let Ok(r) = vals.recv().await { self.result(r, stm); } // Shutdown the executor let _ = end.send(()).await; }; // Run all executor tasks let fut = exe.run(exit.recv()); // Wait for all closures let res = futures::join!(adocs, avals, aproc, fut); // Consume executor error let _ = res.3; // Everything processed ok Ok(()) } } } // Process a new record Thing and Value pub async fn process( &mut self, ctx: &Context<'_>, opt: &Options, txn: &Transaction, stm: &Statement<'_>, thg: Option, val: Operable, ) { // Check current context if ctx.is_done() { return; } // Setup a new workable let val = match val { Operable::Value(v) => (v, Workable::Normal), Operable::Mergeable(v, o) => (v, Workable::Insert(o)), Operable::Relatable(f, v, w) => (v, Workable::Relate(f, w)), }; // Setup a new document let mut doc = Document::new(thg, &val.0, val.1); // Process the document let res = match stm { Statement::Select(_) => doc.select(ctx, opt, txn, stm).await, Statement::Create(_) => doc.create(ctx, opt, txn, stm).await, Statement::Update(_) => doc.update(ctx, opt, txn, stm).await, Statement::Relate(_) => doc.relate(ctx, opt, txn, stm).await, Statement::Delete(_) => doc.delete(ctx, opt, txn, stm).await, Statement::Insert(_) => doc.insert(ctx, opt, txn, stm).await, }; // Process the result self.result(res, stm); } // Accept a processed record result fn result(&mut self, res: Result, stm: &Statement<'_>) { // Process the result match res { Err(Error::Ignore) => { return; } Err(e) => { self.error = Some(e); self.run.cancel(); return; } Ok(v) => self.results.push(v), } // Check if we can exit if stm.group().is_none() && stm.order().is_none() { if let Some(l) = stm.limit() { if let Some(s) = stm.start() { if self.results.len() == l.0 + s.0 { self.run.cancel() } } else if self.results.len() == l.0 { self.run.cancel() } } } } }