diff --git a/lib/src/dbs/channel.rs b/lib/src/dbs/channel.rs index 308b1c25..70a20c68 100644 --- a/lib/src/dbs/channel.rs +++ b/lib/src/dbs/channel.rs @@ -13,21 +13,19 @@ use async_recursion::async_recursion; use nanoid::nanoid; use tokio::sync::mpsc::UnboundedSender; -pub type Channel = UnboundedSender<(Option, Value)>; - impl Value { pub async fn channel( self, ctx: Runtime, opt: Options, - chn: Channel, txn: Transaction, + chn: UnboundedSender<(Option, Value)>, ) -> Result<(), Error> { match self { - Value::Array(v) => v.process(&ctx, &opt, &chn, &txn).await?, - Value::Model(v) => v.process(&ctx, &opt, &chn, &txn).await?, - Value::Thing(v) => v.process(&ctx, &opt, &chn, &txn).await?, - Value::Table(v) => v.process(&ctx, &opt, &chn, &txn).await?, + Value::Array(v) => v.process(&ctx, &opt, &txn, &chn).await?, + Value::Model(v) => v.process(&ctx, &opt, &txn, &chn).await?, + Value::Thing(v) => v.process(&ctx, &opt, &txn, &chn).await?, + Value::Table(v) => v.process(&ctx, &opt, &txn, &chn).await?, v => chn.send((None, v))?, } Ok(()) @@ -40,15 +38,15 @@ impl Array { self, ctx: &Runtime, opt: &Options, - chn: &Channel, txn: &Transaction, + chn: &UnboundedSender<(Option, Value)>, ) -> Result<(), Error> { for v in self.value.into_iter() { match v { - Value::Array(v) => v.process(ctx, opt, chn, txn).await?, - Value::Model(v) => v.process(ctx, opt, chn, txn).await?, - Value::Thing(v) => v.process(ctx, opt, chn, txn).await?, - Value::Table(v) => v.process(ctx, opt, chn, txn).await?, + Value::Array(v) => v.process(ctx, opt, txn, chn).await?, + Value::Model(v) => v.process(ctx, opt, txn, chn).await?, + Value::Thing(v) => v.process(ctx, opt, txn, chn).await?, + Value::Table(v) => v.process(ctx, opt, txn, chn).await?, v => chn.send((None, v))?, } } @@ -61,8 +59,8 @@ impl Model { self, ctx: &Runtime, opt: &Options, - chn: &Channel, txn: &Transaction, + chn: &UnboundedSender<(Option, Value)>, ) -> Result<(), Error> { if ctx.is_ok() { if let Some(c) = self.count { @@ -71,7 +69,7 @@ impl Model { tb: self.table.to_string(), id: nanoid!(20, &ID_CHARS), } - .process(ctx, opt, chn, txn) + .process(ctx, opt, txn, chn) .await?; } } @@ -81,7 +79,7 @@ impl Model { tb: self.table.to_string(), id: x.to_string(), } - .process(ctx, opt, chn, txn) + .process(ctx, opt, txn, chn) .await?; } } @@ -95,8 +93,8 @@ impl Thing { self, ctx: &Runtime, opt: &Options, - chn: &Channel, txn: &Transaction, + chn: &UnboundedSender<(Option, Value)>, ) -> Result<(), Error> { Ok(()) } @@ -107,8 +105,8 @@ impl Table { self, ctx: &Runtime, opt: &Options, - chn: &Channel, txn: &Transaction, + chn: &UnboundedSender<(Option, Value)>, ) -> Result<(), Error> { Ok(()) } diff --git a/lib/src/dbs/iterator.rs b/lib/src/dbs/iterator.rs index 9d5bd73a..e7c54870 100644 --- a/lib/src/dbs/iterator.rs +++ b/lib/src/dbs/iterator.rs @@ -12,6 +12,12 @@ use crate::sql::limit::Limit; use crate::sql::order::Orders; use crate::sql::split::Splits; use crate::sql::start::Start; +use crate::sql::statements::create::CreateStatement; +use crate::sql::statements::delete::DeleteStatement; +use crate::sql::statements::insert::InsertStatement; +use crate::sql::statements::relate::RelateStatement; +use crate::sql::statements::select::SelectStatement; +use crate::sql::statements::update::UpdateStatement; use crate::sql::table::Table; use crate::sql::thing::Thing; use crate::sql::value::Value; @@ -42,11 +48,66 @@ pub struct Iterator<'a> { pub version: Option<&'a Version>, } -impl<'a> Iterator<'a> { - pub fn new() -> Iterator<'a> { - Iterator::default() +impl<'a> From<&'a SelectStatement> for Iterator<'a> { + fn from(v: &'a SelectStatement) -> Self { + Iterator { + stmt: Statement::from(v), + split: v.split.as_ref(), + group: v.group.as_ref(), + order: v.order.as_ref(), + limit: v.limit.as_ref(), + start: v.start.as_ref(), + ..Iterator::default() + } } +} +impl<'a> From<&'a CreateStatement> for Iterator<'a> { + fn from(v: &'a CreateStatement) -> Self { + Iterator { + stmt: Statement::from(v), + ..Iterator::default() + } + } +} + +impl<'a> From<&'a UpdateStatement> for Iterator<'a> { + fn from(v: &'a UpdateStatement) -> Self { + Iterator { + stmt: Statement::from(v), + ..Iterator::default() + } + } +} + +impl<'a> From<&'a RelateStatement> for Iterator<'a> { + fn from(v: &'a RelateStatement) -> Self { + Iterator { + stmt: Statement::from(v), + ..Iterator::default() + } + } +} + +impl<'a> From<&'a DeleteStatement> for Iterator<'a> { + fn from(v: &'a DeleteStatement) -> Self { + Iterator { + stmt: Statement::from(v), + ..Iterator::default() + } + } +} + +impl<'a> From<&'a InsertStatement> for Iterator<'a> { + fn from(v: &'a InsertStatement) -> Self { + Iterator { + stmt: Statement::from(v), + ..Iterator::default() + } + } +} + +impl<'a> Iterator<'a> { // Prepares a value for processing pub fn prepare(&mut self, val: Value) { self.readies.push(val) @@ -162,18 +223,20 @@ impl<'a> Iterator<'a> { } // Run statements in parallel true => { - // Use multi producer channel - use tokio::sync::mpsc; - // Create an unbounded channel - let (chn, mut rx) = mpsc::unbounded_channel(); - // Process all prepared values - for v in mem::take(&mut self.readies) { - tokio::spawn(v.channel(ctx.clone(), opt.clone(), chn.clone(), txn.clone())); - } - // Drop the main channel reference - drop(chn); + let mut rcv = { + // Use multi producer channel + use tokio::sync::mpsc; + // Create an unbounded channel + let (chn, rcv) = mpsc::unbounded_channel(); + // Process all prepared values + for v in mem::take(&mut self.readies) { + tokio::spawn(v.channel(ctx.clone(), opt.clone(), txn.clone(), chn.clone())); + } + // + rcv + }; // Process all processed values - while let Some((k, v)) = rx.recv().await { + while let Some((k, v)) = rcv.recv().await { self.process(&ctx, opt, txn, k, v).await; } // Everything processed ok @@ -182,6 +245,7 @@ impl<'a> Iterator<'a> { } } + // Process a new record Thing and Value pub async fn process( &mut self, ctx: &Runtime, @@ -196,7 +260,6 @@ impl<'a> Iterator<'a> { } // Setup a new document let mut doc = Document::new(thg, &val); - // Process the document let res = match self.stmt { Statement::Select(_) => doc.select(ctx, opt, txn, &self.stmt).await, @@ -207,7 +270,12 @@ impl<'a> Iterator<'a> { Statement::Insert(_) => doc.insert(ctx, opt, txn, &self.stmt).await, _ => unreachable!(), }; + // Process the result + self.result(res); + } + // Accept a processed record result + fn result(&mut self, res: Result) { // Process the result match res { Err(Error::IgnoreError) => { @@ -220,7 +288,6 @@ impl<'a> Iterator<'a> { } Ok(v) => self.results.push(v), } - // Check if we can exit if self.group.is_none() { if self.order.is_none() { diff --git a/lib/src/doc/compute.rs b/lib/src/doc/compute.rs index 821c990e..01e0deea 100644 --- a/lib/src/doc/compute.rs +++ b/lib/src/doc/compute.rs @@ -4,23 +4,28 @@ use crate::dbs::Statement; use crate::dbs::Transaction; use crate::doc::Document; use crate::err::Error; +use crate::sql::thing::Thing; use crate::sql::value::Value; impl<'a> Document<'a> { pub async fn compute( - &mut self, - ctx: &Runtime, - opt: &Options, - txn: &Transaction, - stm: &Statement<'_>, + ctx: Runtime, + opt: Options, + txn: Transaction, + stm: Statement<'_>, + thg: Option, + val: Value, ) -> Result { + // Setup a new document + let mut doc = Document::new(thg, &val); + // Process the statement match stm { - Statement::Select(_) => self.select(ctx, opt, txn, stm).await, - Statement::Create(_) => self.create(ctx, opt, txn, stm).await, - Statement::Update(_) => self.update(ctx, opt, txn, stm).await, - Statement::Relate(_) => self.relate(ctx, opt, txn, stm).await, - Statement::Delete(_) => self.delete(ctx, opt, txn, stm).await, - Statement::Insert(_) => self.insert(ctx, opt, txn, stm).await, + Statement::Select(_) => doc.select(&ctx, &opt, &txn, &stm).await, + Statement::Create(_) => doc.create(&ctx, &opt, &txn, &stm).await, + Statement::Update(_) => doc.update(&ctx, &opt, &txn, &stm).await, + Statement::Relate(_) => doc.relate(&ctx, &opt, &txn, &stm).await, + Statement::Delete(_) => doc.delete(&ctx, &opt, &txn, &stm).await, + Statement::Insert(_) => doc.insert(&ctx, &opt, &txn, &stm).await, _ => unreachable!(), } } diff --git a/lib/src/sql/statements/create.rs b/lib/src/sql/statements/create.rs index 36254791..8aeab6a0 100644 --- a/lib/src/sql/statements/create.rs +++ b/lib/src/sql/statements/create.rs @@ -2,7 +2,6 @@ use crate::dbs::Iterator; use crate::dbs::Level; use crate::dbs::Options; use crate::dbs::Runtime; -use crate::dbs::Statement; use crate::dbs::Transaction; use crate::err::Error; use crate::sql::comment::shouldbespace; @@ -36,9 +35,7 @@ impl CreateStatement { // Allowed to run? opt.check(Level::No)?; // Create a new iterator - let mut i = Iterator::new(); - // Pass in current statement - i.stmt = Statement::from(self); + let mut i = Iterator::from(self); // Ensure futures are stored let opt = &opt.futures(false); // Loop over the create targets diff --git a/lib/src/sql/statements/delete.rs b/lib/src/sql/statements/delete.rs index be4bb01e..686ca14d 100644 --- a/lib/src/sql/statements/delete.rs +++ b/lib/src/sql/statements/delete.rs @@ -2,7 +2,6 @@ use crate::dbs::Iterator; use crate::dbs::Level; use crate::dbs::Options; use crate::dbs::Runtime; -use crate::dbs::Statement; use crate::dbs::Transaction; use crate::err::Error; use crate::sql::comment::shouldbespace; @@ -37,9 +36,7 @@ impl DeleteStatement { // Allowed to run? opt.check(Level::No)?; // Create a new iterator - let mut i = Iterator::new(); - // Pass in current statement - i.stmt = Statement::from(self); + let mut i = Iterator::from(self); // Ensure futures are stored let opt = &opt.futures(false); // Loop over the delete targets diff --git a/lib/src/sql/statements/insert.rs b/lib/src/sql/statements/insert.rs index 79d28b54..90e146d1 100644 --- a/lib/src/sql/statements/insert.rs +++ b/lib/src/sql/statements/insert.rs @@ -2,7 +2,6 @@ use crate::dbs::Iterator; use crate::dbs::Level; use crate::dbs::Options; use crate::dbs::Runtime; -use crate::dbs::Statement; use crate::dbs::Transaction; use crate::err::Error; use crate::sql::comment::shouldbespace; @@ -40,9 +39,7 @@ impl InsertStatement { // Allowed to run? opt.check(Level::No)?; // Create a new iterator - let mut i = Iterator::new(); - // Pass in current statement - i.stmt = Statement::from(self); + let mut i = Iterator::from(self); // Ensure futures are stored let opt = &opt.futures(false); // Parse the expression diff --git a/lib/src/sql/statements/relate.rs b/lib/src/sql/statements/relate.rs index 01c06a34..fae1ae00 100644 --- a/lib/src/sql/statements/relate.rs +++ b/lib/src/sql/statements/relate.rs @@ -2,7 +2,6 @@ use crate::dbs::Iterator; use crate::dbs::Level; use crate::dbs::Options; use crate::dbs::Runtime; -use crate::dbs::Statement; use crate::dbs::Transaction; use crate::err::Error; use crate::sql::comment::mightbespace; @@ -43,9 +42,7 @@ impl RelateStatement { // Allowed to run? opt.check(Level::No)?; // Create a new iterator - let mut i = Iterator::new(); - // Pass in current statement - i.stmt = Statement::from(self); + let mut i = Iterator::from(self); // Ensure futures are stored let opt = &opt.futures(false); // Loop over the select targets diff --git a/lib/src/sql/statements/select.rs b/lib/src/sql/statements/select.rs index 9685f570..f3535620 100644 --- a/lib/src/sql/statements/select.rs +++ b/lib/src/sql/statements/select.rs @@ -2,7 +2,6 @@ use crate::dbs::Iterator; use crate::dbs::Level; use crate::dbs::Options; use crate::dbs::Runtime; -use crate::dbs::Statement; use crate::dbs::Transaction; use crate::err::Error; use crate::sql::comment::shouldbespace; @@ -65,15 +64,7 @@ impl SelectStatement { // Allowed to run? opt.check(Level::No)?; // Create a new iterator - let mut i = Iterator::new(); - // Pass in current statement - i.stmt = Statement::from(self); - // Pass in statement config - i.split = self.split.as_ref(); - i.group = self.group.as_ref(); - i.order = self.order.as_ref(); - i.limit = self.limit.as_ref(); - i.start = self.start.as_ref(); + let mut i = Iterator::from(self); // Ensure futures are processed let opt = &opt.futures(true); // Loop over the select targets diff --git a/lib/src/sql/statements/update.rs b/lib/src/sql/statements/update.rs index 7ac3f1f7..ce2af8fd 100644 --- a/lib/src/sql/statements/update.rs +++ b/lib/src/sql/statements/update.rs @@ -2,7 +2,6 @@ use crate::dbs::Iterator; use crate::dbs::Level; use crate::dbs::Options; use crate::dbs::Runtime; -use crate::dbs::Statement; use crate::dbs::Transaction; use crate::err::Error; use crate::sql::comment::shouldbespace; @@ -38,9 +37,7 @@ impl UpdateStatement { // Allowed to run? opt.check(Level::No)?; // Create a new iterator - let mut i = Iterator::new(); - // Pass in current statement - i.stmt = Statement::from(self); + let mut i = Iterator::from(self); // Ensure futures are stored let opt = &opt.futures(false); // Loop over the update targets