diff --git a/lib/src/dbs/channel.rs b/lib/src/dbs/channel.rs index b1001bc1..4a14cac3 100644 --- a/lib/src/dbs/channel.rs +++ b/lib/src/dbs/channel.rs @@ -1,5 +1,6 @@ use crate::dbs::Options; use crate::dbs::Runtime; +use crate::dbs::Statement; use crate::dbs::Transaction; use crate::err::Error; use crate::key::thing; @@ -17,15 +18,16 @@ impl Value { self, ctx: Runtime, opt: Options, + stm: Statement, txn: Transaction, chn: Sender<(Option, Value)>, ) -> Result<(), Error> { if ctx.is_ok() { match self { - Value::Array(v) => v.process(&ctx, &opt, &txn, &chn).await?, - Value::Model(v) => v.process(&ctx, &opt, &txn, &chn).await?, - Value::Thing(v) => v.process(&ctx, &opt, &txn, &chn).await?, - Value::Table(v) => v.process(&ctx, &opt, &txn, &chn).await?, + Value::Array(v) => v.process(&ctx, &opt, &stm, &txn, &chn).await?, + Value::Model(v) => v.process(&ctx, &opt, &stm, &txn, &chn).await?, + Value::Thing(v) => v.process(&ctx, &opt, &stm, &txn, &chn).await?, + Value::Table(v) => v.process(&ctx, &opt, &stm, &txn, &chn).await?, v => chn.send((None, v)).await?, } } @@ -39,16 +41,17 @@ impl Array { self, ctx: &Runtime, opt: &Options, + stm: &Statement, txn: &Transaction, chn: &Sender<(Option, Value)>, ) -> Result<(), Error> { for v in self.value.into_iter() { if ctx.is_ok() { match v { - Value::Array(v) => v.process(ctx, opt, txn, chn).await?, - Value::Model(v) => v.process(ctx, opt, txn, chn).await?, - Value::Thing(v) => v.process(ctx, opt, txn, chn).await?, - Value::Table(v) => v.process(ctx, opt, txn, chn).await?, + Value::Array(v) => v.process(ctx, opt, stm, txn, chn).await?, + Value::Model(v) => v.process(ctx, opt, stm, txn, chn).await?, + Value::Thing(v) => v.process(ctx, opt, stm, txn, chn).await?, + Value::Table(v) => v.process(ctx, opt, stm, txn, chn).await?, v => chn.send((None, v)).await?, } } @@ -62,6 +65,7 @@ impl Model { self, ctx: &Runtime, opt: &Options, + stm: &Statement, txn: &Transaction, chn: &Sender<(Option, Value)>, ) -> Result<(), Error> { @@ -72,7 +76,7 @@ impl Model { tb: self.table.to_string(), id: Id::rand(), } - .process(ctx, opt, txn, chn) + .process(ctx, opt, stm, txn, chn) .await?; } } @@ -82,7 +86,7 @@ impl Model { tb: self.table.to_string(), id: Id::from(x), } - .process(ctx, opt, txn, chn) + .process(ctx, opt, stm, txn, chn) .await?; } } @@ -96,6 +100,7 @@ impl Thing { self, ctx: &Runtime, opt: &Options, + stm: &Statement, txn: &Transaction, chn: &Sender<(Option, Value)>, ) -> Result<(), Error> { @@ -117,6 +122,7 @@ impl Table { self, ctx: &Runtime, opt: &Options, + stm: &Statement, txn: &Transaction, chn: &Sender<(Option, Value)>, ) -> Result<(), Error> { diff --git a/lib/src/dbs/iterator.rs b/lib/src/dbs/iterator.rs index 5b04cada..bc30bebb 100644 --- a/lib/src/dbs/iterator.rs +++ b/lib/src/dbs/iterator.rs @@ -213,6 +213,8 @@ impl Iterator { // Run statements in parallel true => { let mut rcv = { + // Get current statement + let stm = &self.stm; // Create an unbounded channel let (chn, rx) = tokio::sync::mpsc::channel(MAX_CONCURRENT_TASKS); // Process all prepared values @@ -221,6 +223,7 @@ impl Iterator { tokio::spawn(v.channel( ctx.clone(), opt.clone(), + stm.clone(), txn.clone(), chn.clone(), ));