Improve code and logic in iterator

This commit is contained in:
Tobie Morgan Hitchcock 2022-02-23 13:56:09 +00:00
parent 73879706a6
commit 1eddf94e8d
9 changed files with 120 additions and 74 deletions

View file

@ -13,21 +13,19 @@ use async_recursion::async_recursion;
use nanoid::nanoid; use nanoid::nanoid;
use tokio::sync::mpsc::UnboundedSender; use tokio::sync::mpsc::UnboundedSender;
pub type Channel = UnboundedSender<(Option<Thing>, Value)>;
impl Value { impl Value {
pub async fn channel( pub async fn channel(
self, self,
ctx: Runtime, ctx: Runtime,
opt: Options, opt: Options,
chn: Channel,
txn: Transaction, txn: Transaction,
chn: UnboundedSender<(Option<Thing>, Value)>,
) -> Result<(), Error> { ) -> Result<(), Error> {
match self { match self {
Value::Array(v) => v.process(&ctx, &opt, &chn, &txn).await?, Value::Array(v) => v.process(&ctx, &opt, &txn, &chn).await?,
Value::Model(v) => v.process(&ctx, &opt, &chn, &txn).await?, Value::Model(v) => v.process(&ctx, &opt, &txn, &chn).await?,
Value::Thing(v) => v.process(&ctx, &opt, &chn, &txn).await?, Value::Thing(v) => v.process(&ctx, &opt, &txn, &chn).await?,
Value::Table(v) => v.process(&ctx, &opt, &chn, &txn).await?, Value::Table(v) => v.process(&ctx, &opt, &txn, &chn).await?,
v => chn.send((None, v))?, v => chn.send((None, v))?,
} }
Ok(()) Ok(())
@ -40,15 +38,15 @@ impl Array {
self, self,
ctx: &Runtime, ctx: &Runtime,
opt: &Options, opt: &Options,
chn: &Channel,
txn: &Transaction, txn: &Transaction,
chn: &UnboundedSender<(Option<Thing>, Value)>,
) -> Result<(), Error> { ) -> Result<(), Error> {
for v in self.value.into_iter() { for v in self.value.into_iter() {
match v { match v {
Value::Array(v) => v.process(ctx, opt, chn, txn).await?, Value::Array(v) => v.process(ctx, opt, txn, chn).await?,
Value::Model(v) => v.process(ctx, opt, chn, txn).await?, Value::Model(v) => v.process(ctx, opt, txn, chn).await?,
Value::Thing(v) => v.process(ctx, opt, chn, txn).await?, Value::Thing(v) => v.process(ctx, opt, txn, chn).await?,
Value::Table(v) => v.process(ctx, opt, chn, txn).await?, Value::Table(v) => v.process(ctx, opt, txn, chn).await?,
v => chn.send((None, v))?, v => chn.send((None, v))?,
} }
} }
@ -61,8 +59,8 @@ impl Model {
self, self,
ctx: &Runtime, ctx: &Runtime,
opt: &Options, opt: &Options,
chn: &Channel,
txn: &Transaction, txn: &Transaction,
chn: &UnboundedSender<(Option<Thing>, Value)>,
) -> Result<(), Error> { ) -> Result<(), Error> {
if ctx.is_ok() { if ctx.is_ok() {
if let Some(c) = self.count { if let Some(c) = self.count {
@ -71,7 +69,7 @@ impl Model {
tb: self.table.to_string(), tb: self.table.to_string(),
id: nanoid!(20, &ID_CHARS), id: nanoid!(20, &ID_CHARS),
} }
.process(ctx, opt, chn, txn) .process(ctx, opt, txn, chn)
.await?; .await?;
} }
} }
@ -81,7 +79,7 @@ impl Model {
tb: self.table.to_string(), tb: self.table.to_string(),
id: x.to_string(), id: x.to_string(),
} }
.process(ctx, opt, chn, txn) .process(ctx, opt, txn, chn)
.await?; .await?;
} }
} }
@ -95,8 +93,8 @@ impl Thing {
self, self,
ctx: &Runtime, ctx: &Runtime,
opt: &Options, opt: &Options,
chn: &Channel,
txn: &Transaction, txn: &Transaction,
chn: &UnboundedSender<(Option<Thing>, Value)>,
) -> Result<(), Error> { ) -> Result<(), Error> {
Ok(()) Ok(())
} }
@ -107,8 +105,8 @@ impl Table {
self, self,
ctx: &Runtime, ctx: &Runtime,
opt: &Options, opt: &Options,
chn: &Channel,
txn: &Transaction, txn: &Transaction,
chn: &UnboundedSender<(Option<Thing>, Value)>,
) -> Result<(), Error> { ) -> Result<(), Error> {
Ok(()) Ok(())
} }

View file

@ -12,6 +12,12 @@ use crate::sql::limit::Limit;
use crate::sql::order::Orders; use crate::sql::order::Orders;
use crate::sql::split::Splits; use crate::sql::split::Splits;
use crate::sql::start::Start; use crate::sql::start::Start;
use crate::sql::statements::create::CreateStatement;
use crate::sql::statements::delete::DeleteStatement;
use crate::sql::statements::insert::InsertStatement;
use crate::sql::statements::relate::RelateStatement;
use crate::sql::statements::select::SelectStatement;
use crate::sql::statements::update::UpdateStatement;
use crate::sql::table::Table; use crate::sql::table::Table;
use crate::sql::thing::Thing; use crate::sql::thing::Thing;
use crate::sql::value::Value; use crate::sql::value::Value;
@ -42,11 +48,66 @@ pub struct Iterator<'a> {
pub version: Option<&'a Version>, pub version: Option<&'a Version>,
} }
impl<'a> Iterator<'a> { impl<'a> From<&'a SelectStatement> for Iterator<'a> {
pub fn new() -> Iterator<'a> { fn from(v: &'a SelectStatement) -> Self {
Iterator::default() Iterator {
stmt: Statement::from(v),
split: v.split.as_ref(),
group: v.group.as_ref(),
order: v.order.as_ref(),
limit: v.limit.as_ref(),
start: v.start.as_ref(),
..Iterator::default()
} }
}
}
impl<'a> From<&'a CreateStatement> for Iterator<'a> {
fn from(v: &'a CreateStatement) -> Self {
Iterator {
stmt: Statement::from(v),
..Iterator::default()
}
}
}
impl<'a> From<&'a UpdateStatement> for Iterator<'a> {
fn from(v: &'a UpdateStatement) -> Self {
Iterator {
stmt: Statement::from(v),
..Iterator::default()
}
}
}
impl<'a> From<&'a RelateStatement> for Iterator<'a> {
fn from(v: &'a RelateStatement) -> Self {
Iterator {
stmt: Statement::from(v),
..Iterator::default()
}
}
}
impl<'a> From<&'a DeleteStatement> for Iterator<'a> {
fn from(v: &'a DeleteStatement) -> Self {
Iterator {
stmt: Statement::from(v),
..Iterator::default()
}
}
}
impl<'a> From<&'a InsertStatement> for Iterator<'a> {
fn from(v: &'a InsertStatement) -> Self {
Iterator {
stmt: Statement::from(v),
..Iterator::default()
}
}
}
impl<'a> Iterator<'a> {
// Prepares a value for processing // Prepares a value for processing
pub fn prepare(&mut self, val: Value) { pub fn prepare(&mut self, val: Value) {
self.readies.push(val) self.readies.push(val)
@ -162,18 +223,20 @@ impl<'a> Iterator<'a> {
} }
// Run statements in parallel // Run statements in parallel
true => { true => {
let mut rcv = {
// Use multi producer channel // Use multi producer channel
use tokio::sync::mpsc; use tokio::sync::mpsc;
// Create an unbounded channel // Create an unbounded channel
let (chn, mut rx) = mpsc::unbounded_channel(); let (chn, rcv) = mpsc::unbounded_channel();
// Process all prepared values // Process all prepared values
for v in mem::take(&mut self.readies) { for v in mem::take(&mut self.readies) {
tokio::spawn(v.channel(ctx.clone(), opt.clone(), chn.clone(), txn.clone())); tokio::spawn(v.channel(ctx.clone(), opt.clone(), txn.clone(), chn.clone()));
} }
// Drop the main channel reference //
drop(chn); rcv
};
// Process all processed values // Process all processed values
while let Some((k, v)) = rx.recv().await { while let Some((k, v)) = rcv.recv().await {
self.process(&ctx, opt, txn, k, v).await; self.process(&ctx, opt, txn, k, v).await;
} }
// Everything processed ok // Everything processed ok
@ -182,6 +245,7 @@ impl<'a> Iterator<'a> {
} }
} }
// Process a new record Thing and Value
pub async fn process( pub async fn process(
&mut self, &mut self,
ctx: &Runtime, ctx: &Runtime,
@ -196,7 +260,6 @@ impl<'a> Iterator<'a> {
} }
// Setup a new document // Setup a new document
let mut doc = Document::new(thg, &val); let mut doc = Document::new(thg, &val);
// Process the document // Process the document
let res = match self.stmt { let res = match self.stmt {
Statement::Select(_) => doc.select(ctx, opt, txn, &self.stmt).await, Statement::Select(_) => doc.select(ctx, opt, txn, &self.stmt).await,
@ -207,7 +270,12 @@ impl<'a> Iterator<'a> {
Statement::Insert(_) => doc.insert(ctx, opt, txn, &self.stmt).await, Statement::Insert(_) => doc.insert(ctx, opt, txn, &self.stmt).await,
_ => unreachable!(), _ => unreachable!(),
}; };
// Process the result
self.result(res);
}
// Accept a processed record result
fn result(&mut self, res: Result<Value, Error>) {
// Process the result // Process the result
match res { match res {
Err(Error::IgnoreError) => { Err(Error::IgnoreError) => {
@ -220,7 +288,6 @@ impl<'a> Iterator<'a> {
} }
Ok(v) => self.results.push(v), Ok(v) => self.results.push(v),
} }
// Check if we can exit // Check if we can exit
if self.group.is_none() { if self.group.is_none() {
if self.order.is_none() { if self.order.is_none() {

View file

@ -4,23 +4,28 @@ use crate::dbs::Statement;
use crate::dbs::Transaction; use crate::dbs::Transaction;
use crate::doc::Document; use crate::doc::Document;
use crate::err::Error; use crate::err::Error;
use crate::sql::thing::Thing;
use crate::sql::value::Value; use crate::sql::value::Value;
impl<'a> Document<'a> { impl<'a> Document<'a> {
pub async fn compute( pub async fn compute(
&mut self, ctx: Runtime,
ctx: &Runtime, opt: Options,
opt: &Options, txn: Transaction,
txn: &Transaction, stm: Statement<'_>,
stm: &Statement<'_>, thg: Option<Thing>,
val: Value,
) -> Result<Value, Error> { ) -> Result<Value, Error> {
// Setup a new document
let mut doc = Document::new(thg, &val);
// Process the statement
match stm { match stm {
Statement::Select(_) => self.select(ctx, opt, txn, stm).await, Statement::Select(_) => doc.select(&ctx, &opt, &txn, &stm).await,
Statement::Create(_) => self.create(ctx, opt, txn, stm).await, Statement::Create(_) => doc.create(&ctx, &opt, &txn, &stm).await,
Statement::Update(_) => self.update(ctx, opt, txn, stm).await, Statement::Update(_) => doc.update(&ctx, &opt, &txn, &stm).await,
Statement::Relate(_) => self.relate(ctx, opt, txn, stm).await, Statement::Relate(_) => doc.relate(&ctx, &opt, &txn, &stm).await,
Statement::Delete(_) => self.delete(ctx, opt, txn, stm).await, Statement::Delete(_) => doc.delete(&ctx, &opt, &txn, &stm).await,
Statement::Insert(_) => self.insert(ctx, opt, txn, stm).await, Statement::Insert(_) => doc.insert(&ctx, &opt, &txn, &stm).await,
_ => unreachable!(), _ => unreachable!(),
} }
} }

View file

@ -2,7 +2,6 @@ use crate::dbs::Iterator;
use crate::dbs::Level; use crate::dbs::Level;
use crate::dbs::Options; use crate::dbs::Options;
use crate::dbs::Runtime; use crate::dbs::Runtime;
use crate::dbs::Statement;
use crate::dbs::Transaction; use crate::dbs::Transaction;
use crate::err::Error; use crate::err::Error;
use crate::sql::comment::shouldbespace; use crate::sql::comment::shouldbespace;
@ -36,9 +35,7 @@ impl CreateStatement {
// Allowed to run? // Allowed to run?
opt.check(Level::No)?; opt.check(Level::No)?;
// Create a new iterator // Create a new iterator
let mut i = Iterator::new(); let mut i = Iterator::from(self);
// Pass in current statement
i.stmt = Statement::from(self);
// Ensure futures are stored // Ensure futures are stored
let opt = &opt.futures(false); let opt = &opt.futures(false);
// Loop over the create targets // Loop over the create targets

View file

@ -2,7 +2,6 @@ use crate::dbs::Iterator;
use crate::dbs::Level; use crate::dbs::Level;
use crate::dbs::Options; use crate::dbs::Options;
use crate::dbs::Runtime; use crate::dbs::Runtime;
use crate::dbs::Statement;
use crate::dbs::Transaction; use crate::dbs::Transaction;
use crate::err::Error; use crate::err::Error;
use crate::sql::comment::shouldbespace; use crate::sql::comment::shouldbespace;
@ -37,9 +36,7 @@ impl DeleteStatement {
// Allowed to run? // Allowed to run?
opt.check(Level::No)?; opt.check(Level::No)?;
// Create a new iterator // Create a new iterator
let mut i = Iterator::new(); let mut i = Iterator::from(self);
// Pass in current statement
i.stmt = Statement::from(self);
// Ensure futures are stored // Ensure futures are stored
let opt = &opt.futures(false); let opt = &opt.futures(false);
// Loop over the delete targets // Loop over the delete targets

View file

@ -2,7 +2,6 @@ use crate::dbs::Iterator;
use crate::dbs::Level; use crate::dbs::Level;
use crate::dbs::Options; use crate::dbs::Options;
use crate::dbs::Runtime; use crate::dbs::Runtime;
use crate::dbs::Statement;
use crate::dbs::Transaction; use crate::dbs::Transaction;
use crate::err::Error; use crate::err::Error;
use crate::sql::comment::shouldbespace; use crate::sql::comment::shouldbespace;
@ -40,9 +39,7 @@ impl InsertStatement {
// Allowed to run? // Allowed to run?
opt.check(Level::No)?; opt.check(Level::No)?;
// Create a new iterator // Create a new iterator
let mut i = Iterator::new(); let mut i = Iterator::from(self);
// Pass in current statement
i.stmt = Statement::from(self);
// Ensure futures are stored // Ensure futures are stored
let opt = &opt.futures(false); let opt = &opt.futures(false);
// Parse the expression // Parse the expression

View file

@ -2,7 +2,6 @@ use crate::dbs::Iterator;
use crate::dbs::Level; use crate::dbs::Level;
use crate::dbs::Options; use crate::dbs::Options;
use crate::dbs::Runtime; use crate::dbs::Runtime;
use crate::dbs::Statement;
use crate::dbs::Transaction; use crate::dbs::Transaction;
use crate::err::Error; use crate::err::Error;
use crate::sql::comment::mightbespace; use crate::sql::comment::mightbespace;
@ -43,9 +42,7 @@ impl RelateStatement {
// Allowed to run? // Allowed to run?
opt.check(Level::No)?; opt.check(Level::No)?;
// Create a new iterator // Create a new iterator
let mut i = Iterator::new(); let mut i = Iterator::from(self);
// Pass in current statement
i.stmt = Statement::from(self);
// Ensure futures are stored // Ensure futures are stored
let opt = &opt.futures(false); let opt = &opt.futures(false);
// Loop over the select targets // Loop over the select targets

View file

@ -2,7 +2,6 @@ use crate::dbs::Iterator;
use crate::dbs::Level; use crate::dbs::Level;
use crate::dbs::Options; use crate::dbs::Options;
use crate::dbs::Runtime; use crate::dbs::Runtime;
use crate::dbs::Statement;
use crate::dbs::Transaction; use crate::dbs::Transaction;
use crate::err::Error; use crate::err::Error;
use crate::sql::comment::shouldbespace; use crate::sql::comment::shouldbespace;
@ -65,15 +64,7 @@ impl SelectStatement {
// Allowed to run? // Allowed to run?
opt.check(Level::No)?; opt.check(Level::No)?;
// Create a new iterator // Create a new iterator
let mut i = Iterator::new(); let mut i = Iterator::from(self);
// Pass in current statement
i.stmt = Statement::from(self);
// Pass in statement config
i.split = self.split.as_ref();
i.group = self.group.as_ref();
i.order = self.order.as_ref();
i.limit = self.limit.as_ref();
i.start = self.start.as_ref();
// Ensure futures are processed // Ensure futures are processed
let opt = &opt.futures(true); let opt = &opt.futures(true);
// Loop over the select targets // Loop over the select targets

View file

@ -2,7 +2,6 @@ use crate::dbs::Iterator;
use crate::dbs::Level; use crate::dbs::Level;
use crate::dbs::Options; use crate::dbs::Options;
use crate::dbs::Runtime; use crate::dbs::Runtime;
use crate::dbs::Statement;
use crate::dbs::Transaction; use crate::dbs::Transaction;
use crate::err::Error; use crate::err::Error;
use crate::sql::comment::shouldbespace; use crate::sql::comment::shouldbespace;
@ -38,9 +37,7 @@ impl UpdateStatement {
// Allowed to run? // Allowed to run?
opt.check(Level::No)?; opt.check(Level::No)?;
// Create a new iterator // Create a new iterator
let mut i = Iterator::new(); let mut i = Iterator::from(self);
// Pass in current statement
i.stmt = Statement::from(self);
// Ensure futures are stored // Ensure futures are stored
let opt = &opt.futures(false); let opt = &opt.futures(false);
// Loop over the update targets // Loop over the update targets