Pass the current statement to the value iterator functions

This commit is contained in:
Tobie Morgan Hitchcock 2022-03-18 19:51:13 +00:00
parent 5b1d727c25
commit f8b747374f
2 changed files with 19 additions and 10 deletions

View file

@ -1,5 +1,6 @@
use crate::dbs::Options; use crate::dbs::Options;
use crate::dbs::Runtime; use crate::dbs::Runtime;
use crate::dbs::Statement;
use crate::dbs::Transaction; use crate::dbs::Transaction;
use crate::err::Error; use crate::err::Error;
use crate::key::thing; use crate::key::thing;
@ -17,15 +18,16 @@ impl Value {
self, self,
ctx: Runtime, ctx: Runtime,
opt: Options, opt: Options,
stm: Statement,
txn: Transaction, txn: Transaction,
chn: Sender<(Option<Thing>, Value)>, chn: Sender<(Option<Thing>, Value)>,
) -> Result<(), Error> { ) -> Result<(), Error> {
if ctx.is_ok() { if ctx.is_ok() {
match self { match self {
Value::Array(v) => v.process(&ctx, &opt, &txn, &chn).await?, Value::Array(v) => v.process(&ctx, &opt, &stm, &txn, &chn).await?,
Value::Model(v) => v.process(&ctx, &opt, &txn, &chn).await?, Value::Model(v) => v.process(&ctx, &opt, &stm, &txn, &chn).await?,
Value::Thing(v) => v.process(&ctx, &opt, &txn, &chn).await?, Value::Thing(v) => v.process(&ctx, &opt, &stm, &txn, &chn).await?,
Value::Table(v) => v.process(&ctx, &opt, &txn, &chn).await?, Value::Table(v) => v.process(&ctx, &opt, &stm, &txn, &chn).await?,
v => chn.send((None, v)).await?, v => chn.send((None, v)).await?,
} }
} }
@ -39,16 +41,17 @@ impl Array {
self, self,
ctx: &Runtime, ctx: &Runtime,
opt: &Options, opt: &Options,
stm: &Statement,
txn: &Transaction, txn: &Transaction,
chn: &Sender<(Option<Thing>, Value)>, chn: &Sender<(Option<Thing>, Value)>,
) -> Result<(), Error> { ) -> Result<(), Error> {
for v in self.value.into_iter() { for v in self.value.into_iter() {
if ctx.is_ok() { if ctx.is_ok() {
match v { match v {
Value::Array(v) => v.process(ctx, opt, txn, chn).await?, Value::Array(v) => v.process(ctx, opt, stm, txn, chn).await?,
Value::Model(v) => v.process(ctx, opt, txn, chn).await?, Value::Model(v) => v.process(ctx, opt, stm, txn, chn).await?,
Value::Thing(v) => v.process(ctx, opt, txn, chn).await?, Value::Thing(v) => v.process(ctx, opt, stm, txn, chn).await?,
Value::Table(v) => v.process(ctx, opt, txn, chn).await?, Value::Table(v) => v.process(ctx, opt, stm, txn, chn).await?,
v => chn.send((None, v)).await?, v => chn.send((None, v)).await?,
} }
} }
@ -62,6 +65,7 @@ impl Model {
self, self,
ctx: &Runtime, ctx: &Runtime,
opt: &Options, opt: &Options,
stm: &Statement,
txn: &Transaction, txn: &Transaction,
chn: &Sender<(Option<Thing>, Value)>, chn: &Sender<(Option<Thing>, Value)>,
) -> Result<(), Error> { ) -> Result<(), Error> {
@ -72,7 +76,7 @@ impl Model {
tb: self.table.to_string(), tb: self.table.to_string(),
id: Id::rand(), id: Id::rand(),
} }
.process(ctx, opt, txn, chn) .process(ctx, opt, stm, txn, chn)
.await?; .await?;
} }
} }
@ -82,7 +86,7 @@ impl Model {
tb: self.table.to_string(), tb: self.table.to_string(),
id: Id::from(x), id: Id::from(x),
} }
.process(ctx, opt, txn, chn) .process(ctx, opt, stm, txn, chn)
.await?; .await?;
} }
} }
@ -96,6 +100,7 @@ impl Thing {
self, self,
ctx: &Runtime, ctx: &Runtime,
opt: &Options, opt: &Options,
stm: &Statement,
txn: &Transaction, txn: &Transaction,
chn: &Sender<(Option<Thing>, Value)>, chn: &Sender<(Option<Thing>, Value)>,
) -> Result<(), Error> { ) -> Result<(), Error> {
@ -117,6 +122,7 @@ impl Table {
self, self,
ctx: &Runtime, ctx: &Runtime,
opt: &Options, opt: &Options,
stm: &Statement,
txn: &Transaction, txn: &Transaction,
chn: &Sender<(Option<Thing>, Value)>, chn: &Sender<(Option<Thing>, Value)>,
) -> Result<(), Error> { ) -> Result<(), Error> {

View file

@ -213,6 +213,8 @@ impl Iterator {
// Run statements in parallel // Run statements in parallel
true => { true => {
let mut rcv = { let mut rcv = {
// Get current statement
let stm = &self.stm;
// Create an unbounded channel // Create an unbounded channel
let (chn, rx) = tokio::sync::mpsc::channel(MAX_CONCURRENT_TASKS); let (chn, rx) = tokio::sync::mpsc::channel(MAX_CONCURRENT_TASKS);
// Process all prepared values // Process all prepared values
@ -221,6 +223,7 @@ impl Iterator {
tokio::spawn(v.channel( tokio::spawn(v.channel(
ctx.clone(), ctx.clone(),
opt.clone(), opt.clone(),
stm.clone(),
txn.clone(), txn.clone(),
chn.clone(), chn.clone(),
)); ));