diff --git a/src/dbs/channel.rs b/src/dbs/channel.rs new file mode 100644 index 00000000..308b1c25 --- /dev/null +++ b/src/dbs/channel.rs @@ -0,0 +1,115 @@ +use crate::cnf::ID_CHARS; +use crate::dbs::Options; +use crate::dbs::Runtime; +use crate::dbs::Transaction; +use crate::err::Error; +use crate::key::thing; +use crate::sql::array::Array; +use crate::sql::model::Model; +use crate::sql::table::Table; +use crate::sql::thing::Thing; +use crate::sql::value::Value; +use async_recursion::async_recursion; +use nanoid::nanoid; +use tokio::sync::mpsc::UnboundedSender; + +pub type Channel = UnboundedSender<(Option, Value)>; + +impl Value { + pub async fn channel( + self, + ctx: Runtime, + opt: Options, + chn: Channel, + txn: Transaction, + ) -> Result<(), Error> { + match self { + Value::Array(v) => v.process(&ctx, &opt, &chn, &txn).await?, + Value::Model(v) => v.process(&ctx, &opt, &chn, &txn).await?, + Value::Thing(v) => v.process(&ctx, &opt, &chn, &txn).await?, + Value::Table(v) => v.process(&ctx, &opt, &chn, &txn).await?, + v => chn.send((None, v))?, + } + Ok(()) + } +} + +impl Array { + #[async_recursion] + pub async fn process( + self, + ctx: &Runtime, + opt: &Options, + chn: &Channel, + txn: &Transaction, + ) -> Result<(), Error> { + for v in self.value.into_iter() { + match v { + Value::Array(v) => v.process(ctx, opt, chn, txn).await?, + Value::Model(v) => v.process(ctx, opt, chn, txn).await?, + Value::Thing(v) => v.process(ctx, opt, chn, txn).await?, + Value::Table(v) => v.process(ctx, opt, chn, txn).await?, + v => chn.send((None, v))?, + } + } + Ok(()) + } +} + +impl Model { + pub async fn process( + self, + ctx: &Runtime, + opt: &Options, + chn: &Channel, + txn: &Transaction, + ) -> Result<(), Error> { + if ctx.is_ok() { + if let Some(c) = self.count { + for _ in 0..c { + Thing { + tb: self.table.to_string(), + id: nanoid!(20, &ID_CHARS), + } + .process(ctx, opt, chn, txn) + .await?; + } + } + if let Some(r) = self.range { + for x in r.0..r.1 { + Thing { + tb: self.table.to_string(), + id: x.to_string(), + } + .process(ctx, opt, chn, txn) + .await?; + } + } + } + Ok(()) + } +} + +impl Thing { + pub async fn process( + self, + ctx: &Runtime, + opt: &Options, + chn: &Channel, + txn: &Transaction, + ) -> Result<(), Error> { + Ok(()) + } +} + +impl Table { + pub async fn process( + self, + ctx: &Runtime, + opt: &Options, + chn: &Channel, + txn: &Transaction, + ) -> Result<(), Error> { + Ok(()) + } +} diff --git a/src/dbs/iterator.rs b/src/dbs/iterator.rs index 6d0b47f4..162a5e68 100644 --- a/src/dbs/iterator.rs +++ b/src/dbs/iterator.rs @@ -18,10 +18,6 @@ use crate::sql::value::Value; use crate::sql::version::Version; use nanoid::nanoid; use std::mem; -use tokio::sync::mpsc; -use tokio::sync::mpsc::UnboundedSender; - -pub type Channel = UnboundedSender; #[derive(Default)] pub struct Iterator<'a> { @@ -141,14 +137,18 @@ impl<'a> Iterator<'a> { match self.parallel { // Run statements in parallel true => { + // Use multi producer channel + use tokio::sync::mpsc; // Create an unbounded channel - let (_, mut rx) = mpsc::unbounded_channel(); + let (chn, mut rx) = mpsc::unbounded_channel(); // Process all prepared values - for _ in mem::take(&mut self.readies) { - todo!(); + for v in mem::take(&mut self.readies) { + tokio::spawn(v.channel(ctx.clone(), opt.clone(), chn.clone(), txn.clone())); } + // Drop the main channel reference + drop(chn); // Process all processed values - while let Some(v) = rx.recv().await { + while let Some((k, v)) = rx.recv().await { self.process(&ctx, opt, txn, k, v).await; } // Everything processed ok diff --git a/src/dbs/mod.rs b/src/dbs/mod.rs index 3632289d..17e6e13e 100644 --- a/src/dbs/mod.rs +++ b/src/dbs/mod.rs @@ -1,4 +1,5 @@ mod auth; +mod channel; mod dbs; mod executor; mod export; @@ -13,6 +14,7 @@ mod transaction; mod variables; pub use self::auth::*; +pub use self::channel::*; pub use self::dbs::*; pub use self::executor::*; pub use self::iterator::*;