Add initial implementation for parallel execution
This commit is contained in:
parent
36d114be55
commit
a5c1d95a64
3 changed files with 125 additions and 8 deletions
115
src/dbs/channel.rs
Normal file
115
src/dbs/channel.rs
Normal file
|
@ -0,0 +1,115 @@
|
|||
use crate::cnf::ID_CHARS;
|
||||
use crate::dbs::Options;
|
||||
use crate::dbs::Runtime;
|
||||
use crate::dbs::Transaction;
|
||||
use crate::err::Error;
|
||||
use crate::key::thing;
|
||||
use crate::sql::array::Array;
|
||||
use crate::sql::model::Model;
|
||||
use crate::sql::table::Table;
|
||||
use crate::sql::thing::Thing;
|
||||
use crate::sql::value::Value;
|
||||
use async_recursion::async_recursion;
|
||||
use nanoid::nanoid;
|
||||
use tokio::sync::mpsc::UnboundedSender;
|
||||
|
||||
pub type Channel = UnboundedSender<(Option<Thing>, Value)>;
|
||||
|
||||
impl Value {
|
||||
pub async fn channel(
|
||||
self,
|
||||
ctx: Runtime,
|
||||
opt: Options,
|
||||
chn: Channel,
|
||||
txn: Transaction,
|
||||
) -> Result<(), Error> {
|
||||
match self {
|
||||
Value::Array(v) => v.process(&ctx, &opt, &chn, &txn).await?,
|
||||
Value::Model(v) => v.process(&ctx, &opt, &chn, &txn).await?,
|
||||
Value::Thing(v) => v.process(&ctx, &opt, &chn, &txn).await?,
|
||||
Value::Table(v) => v.process(&ctx, &opt, &chn, &txn).await?,
|
||||
v => chn.send((None, v))?,
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl Array {
|
||||
#[async_recursion]
|
||||
pub async fn process(
|
||||
self,
|
||||
ctx: &Runtime,
|
||||
opt: &Options,
|
||||
chn: &Channel,
|
||||
txn: &Transaction,
|
||||
) -> Result<(), Error> {
|
||||
for v in self.value.into_iter() {
|
||||
match v {
|
||||
Value::Array(v) => v.process(ctx, opt, chn, txn).await?,
|
||||
Value::Model(v) => v.process(ctx, opt, chn, txn).await?,
|
||||
Value::Thing(v) => v.process(ctx, opt, chn, txn).await?,
|
||||
Value::Table(v) => v.process(ctx, opt, chn, txn).await?,
|
||||
v => chn.send((None, v))?,
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl Model {
|
||||
pub async fn process(
|
||||
self,
|
||||
ctx: &Runtime,
|
||||
opt: &Options,
|
||||
chn: &Channel,
|
||||
txn: &Transaction,
|
||||
) -> Result<(), Error> {
|
||||
if ctx.is_ok() {
|
||||
if let Some(c) = self.count {
|
||||
for _ in 0..c {
|
||||
Thing {
|
||||
tb: self.table.to_string(),
|
||||
id: nanoid!(20, &ID_CHARS),
|
||||
}
|
||||
.process(ctx, opt, chn, txn)
|
||||
.await?;
|
||||
}
|
||||
}
|
||||
if let Some(r) = self.range {
|
||||
for x in r.0..r.1 {
|
||||
Thing {
|
||||
tb: self.table.to_string(),
|
||||
id: x.to_string(),
|
||||
}
|
||||
.process(ctx, opt, chn, txn)
|
||||
.await?;
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl Thing {
|
||||
pub async fn process(
|
||||
self,
|
||||
ctx: &Runtime,
|
||||
opt: &Options,
|
||||
chn: &Channel,
|
||||
txn: &Transaction,
|
||||
) -> Result<(), Error> {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl Table {
|
||||
pub async fn process(
|
||||
self,
|
||||
ctx: &Runtime,
|
||||
opt: &Options,
|
||||
chn: &Channel,
|
||||
txn: &Transaction,
|
||||
) -> Result<(), Error> {
|
||||
Ok(())
|
||||
}
|
||||
}
|
|
@ -18,10 +18,6 @@ use crate::sql::value::Value;
|
|||
use crate::sql::version::Version;
|
||||
use nanoid::nanoid;
|
||||
use std::mem;
|
||||
use tokio::sync::mpsc;
|
||||
use tokio::sync::mpsc::UnboundedSender;
|
||||
|
||||
pub type Channel = UnboundedSender<Value>;
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct Iterator<'a> {
|
||||
|
@ -141,14 +137,18 @@ impl<'a> Iterator<'a> {
|
|||
match self.parallel {
|
||||
// Run statements in parallel
|
||||
true => {
|
||||
// Use multi producer channel
|
||||
use tokio::sync::mpsc;
|
||||
// Create an unbounded channel
|
||||
let (_, mut rx) = mpsc::unbounded_channel();
|
||||
let (chn, mut rx) = mpsc::unbounded_channel();
|
||||
// Process all prepared values
|
||||
for _ in mem::take(&mut self.readies) {
|
||||
todo!();
|
||||
for v in mem::take(&mut self.readies) {
|
||||
tokio::spawn(v.channel(ctx.clone(), opt.clone(), chn.clone(), txn.clone()));
|
||||
}
|
||||
// Drop the main channel reference
|
||||
drop(chn);
|
||||
// Process all processed values
|
||||
while let Some(v) = rx.recv().await {
|
||||
while let Some((k, v)) = rx.recv().await {
|
||||
self.process(&ctx, opt, txn, k, v).await;
|
||||
}
|
||||
// Everything processed ok
|
||||
|
|
|
@ -1,4 +1,5 @@
|
|||
mod auth;
|
||||
mod channel;
|
||||
mod dbs;
|
||||
mod executor;
|
||||
mod export;
|
||||
|
@ -13,6 +14,7 @@ mod transaction;
|
|||
mod variables;
|
||||
|
||||
pub use self::auth::*;
|
||||
pub use self::channel::*;
|
||||
pub use self::dbs::*;
|
||||
pub use self::executor::*;
|
||||
pub use self::iterator::*;
|
||||
|
|
Loading…
Reference in a new issue