Enable parallel iteration of records

This commit is contained in:
Tobie Morgan Hitchcock 2022-02-26 23:30:19 +00:00
parent 16da3e9b3b
commit 38c7ae206e
34 changed files with 293 additions and 194 deletions

View file

@ -37,7 +37,7 @@ rand = "0.8.5"
regex = "1.5.4"
msgpack = { version = "1.0.0", package = "rmp-serde" }
scrypt = "0.9.0"
serde = { version = "1.0.136", features = ["derive"] }
serde = { version = "1.0.136", features = ["derive", "rc"] }
sha-1 = "0.10.0"
sha2 = "0.10.2"
slug = "0.1.4"

View file

@ -1,3 +1,6 @@
// Specifies how many concurrent jobs can be buffered in the worker channel.
pub const MAX_CONCURRENT_TASKS: usize = 64;
// Specifies how many subqueries will be processed recursively before the query fails.
pub const MAX_RECURSIVE_QUERIES: usize = 16;

View file

@ -11,7 +11,7 @@ use crate::sql::thing::Thing;
use crate::sql::value::Value;
use async_recursion::async_recursion;
use nanoid::nanoid;
use tokio::sync::mpsc::UnboundedSender;
use tokio::sync::mpsc::Sender;
impl Value {
pub async fn channel(
@ -19,7 +19,7 @@ impl Value {
ctx: Runtime,
opt: Options,
txn: Transaction,
chn: UnboundedSender<(Option<Thing>, Value)>,
chn: Sender<(Option<Thing>, Value)>,
) -> Result<(), Error> {
if ctx.is_ok() {
match self {
@ -27,7 +27,7 @@ impl Value {
Value::Model(v) => v.process(&ctx, &opt, &txn, &chn).await?,
Value::Thing(v) => v.process(&ctx, &opt, &txn, &chn).await?,
Value::Table(v) => v.process(&ctx, &opt, &txn, &chn).await?,
v => chn.send((None, v))?,
v => chn.send((None, v)).await?,
}
}
Ok(())
@ -41,7 +41,7 @@ impl Array {
ctx: &Runtime,
opt: &Options,
txn: &Transaction,
chn: &UnboundedSender<(Option<Thing>, Value)>,
chn: &Sender<(Option<Thing>, Value)>,
) -> Result<(), Error> {
for v in self.value.into_iter() {
if ctx.is_ok() {
@ -50,7 +50,7 @@ impl Array {
Value::Model(v) => v.process(ctx, opt, txn, chn).await?,
Value::Thing(v) => v.process(ctx, opt, txn, chn).await?,
Value::Table(v) => v.process(ctx, opt, txn, chn).await?,
v => chn.send((None, v))?,
v => chn.send((None, v)).await?,
}
}
}
@ -64,7 +64,7 @@ impl Model {
ctx: &Runtime,
opt: &Options,
txn: &Transaction,
chn: &UnboundedSender<(Option<Thing>, Value)>,
chn: &Sender<(Option<Thing>, Value)>,
) -> Result<(), Error> {
if ctx.is_ok() {
if let Some(c) = self.count {
@ -98,7 +98,7 @@ impl Thing {
ctx: &Runtime,
opt: &Options,
txn: &Transaction,
chn: &UnboundedSender<(Option<Thing>, Value)>,
chn: &Sender<(Option<Thing>, Value)>,
) -> Result<(), Error> {
Ok(())
}
@ -110,7 +110,7 @@ impl Table {
ctx: &Runtime,
opt: &Options,
txn: &Transaction,
chn: &UnboundedSender<(Option<Thing>, Value)>,
chn: &Sender<(Option<Thing>, Value)>,
) -> Result<(), Error> {
Ok(())
}

View file

@ -21,7 +21,7 @@ impl Value {
ctx: &Runtime,
opt: &Options,
txn: &Transaction,
ite: &mut Iterator<'_>,
ite: &mut Iterator,
) -> Result<(), Error> {
if ctx.is_ok() {
match self {
@ -44,7 +44,7 @@ impl Array {
ctx: &Runtime,
opt: &Options,
txn: &Transaction,
ite: &mut Iterator<'_>,
ite: &mut Iterator,
) -> Result<(), Error> {
for v in self.value.into_iter() {
if ctx.is_ok() {
@ -67,7 +67,7 @@ impl Model {
ctx: &Runtime,
opt: &Options,
txn: &Transaction,
ite: &mut Iterator<'_>,
ite: &mut Iterator,
) -> Result<(), Error> {
if ctx.is_ok() {
if let Some(c) = self.count {
@ -101,7 +101,7 @@ impl Thing {
ctx: &Runtime,
opt: &Options,
txn: &Transaction,
ite: &mut Iterator<'_>,
ite: &mut Iterator,
) -> Result<(), Error> {
Ok(())
}
@ -113,7 +113,7 @@ impl Table {
ctx: &Runtime,
opt: &Options,
txn: &Transaction,
ite: &mut Iterator<'_>,
ite: &mut Iterator,
) -> Result<(), Error> {
Ok(())
}

View file

@ -1,4 +1,5 @@
use crate::cnf::ID_CHARS;
use crate::cnf::MAX_CONCURRENT_TASKS;
use crate::ctx::Canceller;
use crate::ctx::Context;
use crate::dbs::Options;
@ -7,11 +8,6 @@ use crate::dbs::Statement;
use crate::dbs::Transaction;
use crate::doc::Document;
use crate::err::Error;
use crate::sql::group::Groups;
use crate::sql::limit::Limit;
use crate::sql::order::Orders;
use crate::sql::split::Splits;
use crate::sql::start::Start;
use crate::sql::statements::create::CreateStatement;
use crate::sql::statements::delete::DeleteStatement;
use crate::sql::statements::insert::InsertStatement;
@ -21,99 +17,87 @@ use crate::sql::statements::update::UpdateStatement;
use crate::sql::table::Table;
use crate::sql::thing::Thing;
use crate::sql::value::Value;
use crate::sql::version::Version;
use nanoid::nanoid;
use std::mem;
use std::sync::Arc;
#[derive(Default)]
pub struct Iterator<'a> {
pub struct Iterator {
// Iterator status
run: Canceller,
// Iterator statement
stm: Statement,
// Iterator run option
parallel: bool,
// Iterator runtime error
error: Option<Error>,
// Iterator input values
readies: Vec<Value>,
// Iterator output results
results: Vec<Value>,
// Iterate options
pub parallel: bool,
// Underlying statement
pub stmt: Statement<'a>,
// Iterator options
pub split: Option<&'a Splits>,
pub group: Option<&'a Groups>,
pub order: Option<&'a Orders>,
pub limit: Option<&'a Limit>,
pub start: Option<&'a Start>,
pub version: Option<&'a Version>,
}
impl<'a> From<&'a SelectStatement> for Iterator<'a> {
fn from(v: &'a SelectStatement) -> Self {
impl From<Arc<SelectStatement>> for Iterator {
fn from(v: Arc<SelectStatement>) -> Self {
Iterator {
stmt: Statement::from(v),
split: v.split.as_ref(),
group: v.group.as_ref(),
order: v.order.as_ref(),
limit: v.limit.as_ref(),
start: v.start.as_ref(),
parallel: v.parallel,
stm: Statement::from(v),
..Iterator::default()
}
}
}
impl<'a> From<&'a CreateStatement> for Iterator<'a> {
fn from(v: &'a CreateStatement) -> Self {
impl From<Arc<CreateStatement>> for Iterator {
fn from(v: Arc<CreateStatement>) -> Self {
Iterator {
stmt: Statement::from(v),
parallel: v.parallel,
stm: Statement::from(v),
..Iterator::default()
}
}
}
impl<'a> From<&'a UpdateStatement> for Iterator<'a> {
fn from(v: &'a UpdateStatement) -> Self {
impl From<Arc<UpdateStatement>> for Iterator {
fn from(v: Arc<UpdateStatement>) -> Self {
Iterator {
stmt: Statement::from(v),
parallel: v.parallel,
stm: Statement::from(v),
..Iterator::default()
}
}
}
impl<'a> From<&'a RelateStatement> for Iterator<'a> {
fn from(v: &'a RelateStatement) -> Self {
impl From<Arc<RelateStatement>> for Iterator {
fn from(v: Arc<RelateStatement>) -> Self {
Iterator {
stmt: Statement::from(v),
parallel: v.parallel,
stm: Statement::from(v),
..Iterator::default()
}
}
}
impl<'a> From<&'a DeleteStatement> for Iterator<'a> {
fn from(v: &'a DeleteStatement) -> Self {
impl From<Arc<DeleteStatement>> for Iterator {
fn from(v: Arc<DeleteStatement>) -> Self {
Iterator {
stmt: Statement::from(v),
parallel: v.parallel,
stm: Statement::from(v),
..Iterator::default()
}
}
}
impl<'a> From<&'a InsertStatement> for Iterator<'a> {
fn from(v: &'a InsertStatement) -> Self {
impl From<Arc<InsertStatement>> for Iterator {
fn from(v: Arc<InsertStatement>) -> Self {
Iterator {
stmt: Statement::from(v),
parallel: v.parallel,
stm: Statement::from(v),
..Iterator::default()
}
}
}
impl<'a> Iterator<'a> {
impl Iterator {
// Prepares a value for processing
pub fn prepare(&mut self, val: Value) {
self.readies.push(val)
@ -135,7 +119,7 @@ impl<'a> Iterator<'a> {
txn: &Transaction,
) -> Result<Value, Error> {
// Log the statement
trace!("Iterating: {}", self.stmt);
trace!("Iterating: {}", self.stm);
// Enable context override
let mut ctx = Context::new(&ctx);
self.run = ctx.add_cancel();
@ -162,35 +146,35 @@ impl<'a> Iterator<'a> {
#[inline]
fn output_split(&mut self, ctx: &Runtime, opt: &Options, txn: &Transaction) {
if self.split.is_some() {
if self.stm.split().is_some() {
// Ignore
}
}
#[inline]
fn output_group(&mut self, ctx: &Runtime, opt: &Options, txn: &Transaction) {
if self.group.is_some() {
if self.stm.group().is_some() {
// Ignore
}
}
#[inline]
fn output_order(&mut self, ctx: &Runtime, opt: &Options, txn: &Transaction) {
if self.order.is_some() {
if self.stm.order().is_some() {
// Ignore
}
}
#[inline]
fn output_start(&mut self, ctx: &Runtime, opt: &Options, txn: &Transaction) {
if let Some(v) = self.start {
if let Some(v) = self.stm.start() {
self.results = mem::take(&mut self.results).into_iter().skip(v.0).collect();
}
}
#[inline]
fn output_limit(&mut self, ctx: &Runtime, opt: &Options, txn: &Transaction) {
if let Some(v) = self.limit {
if let Some(v) = self.stm.limit() {
self.results = mem::take(&mut self.results).into_iter().take(v.0).collect();
}
}
@ -230,20 +214,52 @@ impl<'a> Iterator<'a> {
// Run statements in parallel
true => {
let mut rcv = {
// Use multi producer channel
use tokio::sync::mpsc;
// Create an unbounded channel
let (chn, rcv) = mpsc::unbounded_channel();
let (chn, rx) = tokio::sync::mpsc::channel(MAX_CONCURRENT_TASKS);
// Process all prepared values
for v in mem::take(&mut self.readies) {
tokio::spawn(v.channel(ctx.clone(), opt.clone(), txn.clone(), chn.clone()));
if ctx.is_ok() {
tokio::spawn(v.channel(
ctx.clone(),
opt.clone(),
txn.clone(),
chn.clone(),
));
}
}
//
rcv
// Return the receiver
rx
};
let mut rcv = {
// Clone the send values
let ctx = ctx.clone();
let opt = opt.clone();
let txn = txn.clone();
let stm = self.stm.clone();
// Create an unbounded channel
let (chn, rx) = tokio::sync::mpsc::channel(MAX_CONCURRENT_TASKS);
// Process all received values
tokio::spawn(async move {
while let Some((k, v)) = rcv.recv().await {
if ctx.is_ok() {
tokio::spawn(Document::compute(
ctx.clone(),
opt.clone(),
txn.clone(),
chn.clone(),
stm.clone(),
k,
v,
));
}
}
});
// Return the receiver
rx
};
// Process all processed values
while let Some((k, v)) = rcv.recv().await {
self.process(&ctx, opt, txn, k, v).await;
while let Some(r) = rcv.recv().await {
self.result(r);
}
// Everything processed ok
Ok(())
@ -267,13 +283,13 @@ impl<'a> Iterator<'a> {
// Setup a new document
let mut doc = Document::new(thg, &val);
// Process the document
let res = match self.stmt {
Statement::Select(_) => doc.select(ctx, opt, txn, &self.stmt).await,
Statement::Create(_) => doc.create(ctx, opt, txn, &self.stmt).await,
Statement::Update(_) => doc.update(ctx, opt, txn, &self.stmt).await,
Statement::Relate(_) => doc.relate(ctx, opt, txn, &self.stmt).await,
Statement::Delete(_) => doc.delete(ctx, opt, txn, &self.stmt).await,
Statement::Insert(_) => doc.insert(ctx, opt, txn, &self.stmt).await,
let res = match self.stm {
Statement::Select(_) => doc.select(ctx, opt, txn, &self.stm).await,
Statement::Create(_) => doc.create(ctx, opt, txn, &self.stm).await,
Statement::Update(_) => doc.update(ctx, opt, txn, &self.stm).await,
Statement::Relate(_) => doc.relate(ctx, opt, txn, &self.stm).await,
Statement::Delete(_) => doc.delete(ctx, opt, txn, &self.stm).await,
Statement::Insert(_) => doc.insert(ctx, opt, txn, &self.stm).await,
_ => unreachable!(),
};
// Process the result
@ -295,10 +311,10 @@ impl<'a> Iterator<'a> {
Ok(v) => self.results.push(v),
}
// Check if we can exit
if self.group.is_none() {
if self.order.is_none() {
if let Some(l) = self.limit {
if let Some(s) = self.start {
if self.stm.group().is_none() {
if self.stm.order().is_none() {
if let Some(l) = self.stm.limit() {
if let Some(s) = self.stm.start() {
if self.results.len() == l.0 + s.0 {
self.run.cancel()
}

View file

@ -1,65 +1,72 @@
use crate::sql::group::Groups;
use crate::sql::limit::Limit;
use crate::sql::order::Orders;
use crate::sql::split::Splits;
use crate::sql::start::Start;
use crate::sql::statements::create::CreateStatement;
use crate::sql::statements::delete::DeleteStatement;
use crate::sql::statements::insert::InsertStatement;
use crate::sql::statements::relate::RelateStatement;
use crate::sql::statements::select::SelectStatement;
use crate::sql::statements::update::UpdateStatement;
use crate::sql::version::Version;
use std::fmt;
use std::sync::Arc;
#[derive(Debug)]
pub enum Statement<'a> {
#[derive(Clone, Debug)]
pub enum Statement {
None,
Select(&'a SelectStatement),
Create(&'a CreateStatement),
Update(&'a UpdateStatement),
Relate(&'a RelateStatement),
Delete(&'a DeleteStatement),
Insert(&'a InsertStatement),
Select(Arc<SelectStatement>),
Create(Arc<CreateStatement>),
Update(Arc<UpdateStatement>),
Relate(Arc<RelateStatement>),
Delete(Arc<DeleteStatement>),
Insert(Arc<InsertStatement>),
}
impl Default for Statement<'_> {
impl Default for Statement {
fn default() -> Self {
Statement::None
}
}
impl<'a> From<&'a SelectStatement> for Statement<'a> {
fn from(v: &'a SelectStatement) -> Self {
impl From<Arc<SelectStatement>> for Statement {
fn from(v: Arc<SelectStatement>) -> Self {
Statement::Select(v)
}
}
impl<'a> From<&'a CreateStatement> for Statement<'a> {
fn from(v: &'a CreateStatement) -> Self {
impl From<Arc<CreateStatement>> for Statement {
fn from(v: Arc<CreateStatement>) -> Self {
Statement::Create(v)
}
}
impl<'a> From<&'a UpdateStatement> for Statement<'a> {
fn from(v: &'a UpdateStatement) -> Self {
impl From<Arc<UpdateStatement>> for Statement {
fn from(v: Arc<UpdateStatement>) -> Self {
Statement::Update(v)
}
}
impl<'a> From<&'a RelateStatement> for Statement<'a> {
fn from(v: &'a RelateStatement) -> Self {
impl From<Arc<RelateStatement>> for Statement {
fn from(v: Arc<RelateStatement>) -> Self {
Statement::Relate(v)
}
}
impl<'a> From<&'a DeleteStatement> for Statement<'a> {
fn from(v: &'a DeleteStatement) -> Self {
impl From<Arc<DeleteStatement>> for Statement {
fn from(v: Arc<DeleteStatement>) -> Self {
Statement::Delete(v)
}
}
impl<'a> From<&'a InsertStatement> for Statement<'a> {
fn from(v: &'a InsertStatement) -> Self {
impl From<Arc<InsertStatement>> for Statement {
fn from(v: Arc<InsertStatement>) -> Self {
Statement::Insert(v)
}
}
impl<'a> fmt::Display for Statement<'a> {
impl fmt::Display for Statement {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
Statement::Select(v) => write!(f, "{}", v),
@ -72,3 +79,48 @@ impl<'a> fmt::Display for Statement<'a> {
}
}
}
impl Statement {
// Returns any SPLIT clause if specified
pub fn split<'a>(self: &'a Statement) -> Option<&'a Splits> {
match self {
Statement::Select(v) => v.split.as_ref(),
_ => None,
}
}
// Returns any GROUP clause if specified
pub fn group<'a>(self: &'a Statement) -> Option<&'a Groups> {
match self {
Statement::Select(v) => v.group.as_ref(),
_ => None,
}
}
// Returns any ORDER clause if specified
pub fn order<'a>(self: &'a Statement) -> Option<&'a Orders> {
match self {
Statement::Select(v) => v.order.as_ref(),
_ => None,
}
}
// Returns any START clause if specified
pub fn start<'a>(self: &'a Statement) -> Option<&'a Start> {
match self {
Statement::Select(v) => v.start.as_ref(),
_ => None,
}
}
// Returns any LIMIT clause if specified
pub fn limit<'a>(self: &'a Statement) -> Option<&'a Limit> {
match self {
Statement::Select(v) => v.limit.as_ref(),
_ => None,
}
}
// Returns any VERSION clause if specified
pub fn version<'a>(self: &'a Statement) -> Option<&'a Version> {
match self {
Statement::Select(v) => v.version.as_ref(),
_ => None,
}
}
}

View file

@ -11,7 +11,7 @@ impl<'a> Document<'a> {
_ctx: &Runtime,
_opt: &Options,
_txn: &Transaction,
stm: &Statement<'_>,
stm: &Statement,
) -> Result<(), Error> {
match self.id {
Some(_) => Ok(()),

View file

@ -11,7 +11,7 @@ impl<'a> Document<'a> {
_ctx: &Runtime,
_opt: &Options,
_txn: &Transaction,
_stm: &Statement<'_>,
_stm: &Statement,
) -> Result<(), Error> {
Ok(())
}

View file

@ -11,7 +11,7 @@ impl<'a> Document<'a> {
ctx: &Runtime,
opt: &Options,
txn: &Transaction,
stm: &Statement<'_>,
stm: &Statement,
) -> Result<(), Error> {
// Extract statement clause
let cond = match stm {

View file

@ -6,20 +6,22 @@ use crate::doc::Document;
use crate::err::Error;
use crate::sql::thing::Thing;
use crate::sql::value::Value;
use tokio::sync::mpsc::Sender;
impl<'a> Document<'a> {
pub async fn compute(
ctx: Runtime,
opt: Options,
txn: Transaction,
stm: Statement<'_>,
chn: Sender<Result<Value, Error>>,
stm: Statement,
thg: Option<Thing>,
val: Value,
) -> Result<Value, Error> {
) -> Result<(), Error> {
// Setup a new document
let mut doc = Document::new(thg, &val);
// Process the statement
match stm {
let res = match stm {
Statement::Select(_) => doc.select(&ctx, &opt, &txn, &stm).await,
Statement::Create(_) => doc.create(&ctx, &opt, &txn, &stm).await,
Statement::Update(_) => doc.update(&ctx, &opt, &txn, &stm).await,
@ -27,6 +29,10 @@ impl<'a> Document<'a> {
Statement::Delete(_) => doc.delete(&ctx, &opt, &txn, &stm).await,
Statement::Insert(_) => doc.insert(&ctx, &opt, &txn, &stm).await,
_ => unreachable!(),
}
};
// Send back the result
let _ = chn.send(res).await;
// Everything went ok
Ok(())
}
}

View file

@ -12,7 +12,7 @@ impl<'a> Document<'a> {
ctx: &Runtime,
opt: &Options,
txn: &Transaction,
stm: &Statement<'_>,
stm: &Statement,
) -> Result<Value, Error> {
// Check value type
self.admit(ctx, opt, txn, stm).await?;

View file

@ -12,7 +12,7 @@ impl<'a> Document<'a> {
ctx: &Runtime,
opt: &Options,
txn: &Transaction,
stm: &Statement<'_>,
stm: &Statement,
) -> Result<Value, Error> {
// Check value type
self.admit(ctx, opt, txn, stm).await?;

View file

@ -11,7 +11,7 @@ impl<'a> Document<'a> {
_ctx: &Runtime,
_opt: &Options,
_txn: &Transaction,
_stm: &Statement<'_>,
_stm: &Statement,
) -> Result<(), Error> {
match self.id.is_some() && self.current.is_none() {
true => Err(Error::IgnoreError),

View file

@ -11,7 +11,7 @@ impl<'a> Document<'a> {
ctx: &Runtime,
opt: &Options,
txn: &Transaction,
_stm: &Statement<'_>,
_stm: &Statement,
) -> Result<(), Error> {
self.current.to_mut().clear(ctx, opt, txn).await
}

View file

@ -11,7 +11,7 @@ impl<'a> Document<'a> {
_ctx: &Runtime,
_opt: &Options,
_txn: &Transaction,
_stm: &Statement<'_>,
_stm: &Statement,
) -> Result<(), Error> {
Ok(())
}

View file

@ -11,7 +11,7 @@ impl<'a> Document<'a> {
_ctx: &Runtime,
_opt: &Options,
_txn: &Transaction,
_stm: &Statement<'_>,
_stm: &Statement,
) -> Result<(), Error> {
Ok(())
}

View file

@ -12,7 +12,7 @@ impl<'a> Document<'a> {
_ctx: &Runtime,
_opt: &Options,
_txn: &Transaction,
_stm: &Statement<'_>,
_stm: &Statement,
) -> Result<Value, Error> {
todo!()
}

View file

@ -11,7 +11,7 @@ impl<'a> Document<'a> {
_ctx: &Runtime,
_opt: &Options,
_txn: &Transaction,
_stm: &Statement<'_>,
_stm: &Statement,
) -> Result<(), Error> {
Ok(())
}

View file

@ -14,7 +14,7 @@ impl<'a> Document<'a> {
ctx: &Runtime,
opt: &Options,
txn: &Transaction,
stm: &Statement<'_>,
stm: &Statement,
) -> Result<(), Error> {
// Get the ID reference
let id = self.id.as_ref();

View file

@ -15,7 +15,7 @@ impl<'a> Document<'a> {
ctx: &Runtime,
opt: &Options,
txn: &Transaction,
stm: &Statement<'_>,
stm: &Statement,
) -> Result<Value, Error> {
// Extract statement clause
let expr = match stm {

View file

@ -12,7 +12,7 @@ impl<'a> Document<'a> {
ctx: &Runtime,
opt: &Options,
txn: &Transaction,
stm: &Statement<'_>,
stm: &Statement,
) -> Result<Value, Error> {
// Check value type
self.admit(ctx, opt, txn, stm).await?;

View file

@ -12,7 +12,7 @@ impl<'a> Document<'a> {
ctx: &Runtime,
opt: &Options,
txn: &Transaction,
stm: &Statement<'_>,
stm: &Statement,
) -> Result<Value, Error> {
// Check if record exists
self.empty(ctx, opt, txn, stm).await?;

View file

@ -11,7 +11,7 @@ impl<'a> Document<'a> {
_ctx: &Runtime,
opt: &Options,
txn: &Transaction,
_stm: &Statement<'_>,
_stm: &Statement,
) -> Result<(), Error> {
Ok(())
}

View file

@ -11,7 +11,7 @@ impl<'a> Document<'a> {
_ctx: &Runtime,
_opt: &Options,
_txn: &Transaction,
_stm: &Statement<'_>,
_stm: &Statement,
) -> Result<(), Error> {
Ok(())
}

View file

@ -12,7 +12,7 @@ impl<'a> Document<'a> {
ctx: &Runtime,
opt: &Options,
txn: &Transaction,
stm: &Statement<'_>,
stm: &Statement,
) -> Result<Value, Error> {
// Check value type
self.admit(ctx, opt, txn, stm).await?;

View file

@ -32,6 +32,7 @@ use nom::multi::separated_list1;
use nom::sequence::delimited;
use serde::{Deserialize, Serialize};
use std::fmt;
use std::sync::Arc;
use std::time::Duration;
#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
@ -51,25 +52,25 @@ pub fn statements(i: &str) -> IResult<&str, Statements> {
#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
pub enum Statement {
Set(SetStatement),
Use(UseStatement),
Info(InfoStatement),
Live(LiveStatement),
Kill(KillStatement),
Begin(BeginStatement),
Cancel(CancelStatement),
Commit(CommitStatement),
Output(OutputStatement),
Ifelse(IfelseStatement),
Select(SelectStatement),
Create(CreateStatement),
Update(UpdateStatement),
Relate(RelateStatement),
Delete(DeleteStatement),
Insert(InsertStatement),
Define(DefineStatement),
Remove(RemoveStatement),
Option(OptionStatement),
Set(Arc<SetStatement>),
Use(Arc<UseStatement>),
Info(Arc<InfoStatement>),
Live(Arc<LiveStatement>),
Kill(Arc<KillStatement>),
Begin(Arc<BeginStatement>),
Cancel(Arc<CancelStatement>),
Commit(Arc<CommitStatement>),
Output(Arc<OutputStatement>),
Ifelse(Arc<IfelseStatement>),
Select(Arc<SelectStatement>),
Create(Arc<CreateStatement>),
Update(Arc<UpdateStatement>),
Relate(Arc<RelateStatement>),
Delete(Arc<DeleteStatement>),
Insert(Arc<InsertStatement>),
Define(Arc<DefineStatement>),
Remove(Arc<RemoveStatement>),
Option(Arc<OptionStatement>),
}
impl Statement {
@ -162,25 +163,25 @@ pub fn statement(i: &str) -> IResult<&str, Statement> {
delimited(
mightbespace,
alt((
map(set, |v| Statement::Set(v)),
map(yuse, |v| Statement::Use(v)),
map(info, |v| Statement::Info(v)),
map(live, |v| Statement::Live(v)),
map(kill, |v| Statement::Kill(v)),
map(begin, |v| Statement::Begin(v)),
map(cancel, |v| Statement::Cancel(v)),
map(commit, |v| Statement::Commit(v)),
map(output, |v| Statement::Output(v)),
map(ifelse, |v| Statement::Ifelse(v)),
map(select, |v| Statement::Select(v)),
map(create, |v| Statement::Create(v)),
map(update, |v| Statement::Update(v)),
map(relate, |v| Statement::Relate(v)),
map(delete, |v| Statement::Delete(v)),
map(insert, |v| Statement::Insert(v)),
map(define, |v| Statement::Define(v)),
map(remove, |v| Statement::Remove(v)),
map(option, |v| Statement::Option(v)),
map(set, |v| Statement::Set(Arc::new(v))),
map(yuse, |v| Statement::Use(Arc::new(v))),
map(info, |v| Statement::Info(Arc::new(v))),
map(live, |v| Statement::Live(Arc::new(v))),
map(kill, |v| Statement::Kill(Arc::new(v))),
map(begin, |v| Statement::Begin(Arc::new(v))),
map(cancel, |v| Statement::Cancel(Arc::new(v))),
map(commit, |v| Statement::Commit(Arc::new(v))),
map(output, |v| Statement::Output(Arc::new(v))),
map(ifelse, |v| Statement::Ifelse(Arc::new(v))),
map(select, |v| Statement::Select(Arc::new(v))),
map(create, |v| Statement::Create(Arc::new(v))),
map(update, |v| Statement::Update(Arc::new(v))),
map(relate, |v| Statement::Relate(Arc::new(v))),
map(delete, |v| Statement::Delete(Arc::new(v))),
map(insert, |v| Statement::Insert(Arc::new(v))),
map(define, |v| Statement::Define(Arc::new(v))),
map(remove, |v| Statement::Remove(Arc::new(v))),
map(option, |v| Statement::Option(Arc::new(v))),
)),
mightbespace,
)(i)

View file

@ -16,6 +16,7 @@ use nom::combinator::opt;
use nom::sequence::preceded;
use serde::{Deserialize, Serialize};
use std::fmt;
use std::sync::Arc;
#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize, Store)]
pub struct CreateStatement {
@ -28,7 +29,7 @@ pub struct CreateStatement {
impl CreateStatement {
pub async fn compute(
&self,
self: &Arc<Self>,
ctx: &Runtime,
opt: &Options,
txn: &Transaction,
@ -36,8 +37,10 @@ impl CreateStatement {
) -> Result<Value, Error> {
// Allowed to run?
opt.check(Level::No)?;
// Clone the statement
let s = Arc::clone(self);
// Create a new iterator
let mut i = Iterator::from(self);
let mut i = Iterator::from(s);
// Ensure futures are stored
let opt = &opt.futures(false);
// Loop over the create targets

View file

@ -17,6 +17,7 @@ use nom::sequence::preceded;
use nom::sequence::tuple;
use serde::{Deserialize, Serialize};
use std::fmt;
use std::sync::Arc;
#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize, Store)]
pub struct DeleteStatement {
@ -29,7 +30,7 @@ pub struct DeleteStatement {
impl DeleteStatement {
pub async fn compute(
&self,
self: &Arc<Self>,
ctx: &Runtime,
opt: &Options,
txn: &Transaction,
@ -37,8 +38,10 @@ impl DeleteStatement {
) -> Result<Value, Error> {
// Allowed to run?
opt.check(Level::No)?;
// Clone the statement
let s = Arc::clone(self);
// Create a new iterator
let mut i = Iterator::from(self);
let mut i = Iterator::from(s);
// Ensure futures are stored
let opt = &opt.futures(false);
// Loop over the delete targets

View file

@ -18,6 +18,7 @@ use nom::combinator::opt;
use nom::sequence::preceded;
use serde::{Deserialize, Serialize};
use std::fmt;
use std::sync::Arc;
#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize, Store)]
pub struct InsertStatement {
@ -32,7 +33,7 @@ pub struct InsertStatement {
impl InsertStatement {
pub async fn compute(
&self,
self: &Arc<Self>,
ctx: &Runtime,
opt: &Options,
txn: &Transaction,
@ -40,8 +41,10 @@ impl InsertStatement {
) -> Result<Value, Error> {
// Allowed to run?
opt.check(Level::No)?;
// Clone the statement
let s = Arc::clone(self);
// Create a new iterator
let mut i = Iterator::from(self);
let mut i = Iterator::from(s);
// Ensure futures are stored
let opt = &opt.futures(false);
// Parse the expression

View file

@ -20,6 +20,7 @@ use nom::combinator::opt;
use nom::sequence::preceded;
use serde::{Deserialize, Serialize};
use std::fmt;
use std::sync::Arc;
#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize, Store)]
pub struct RelateStatement {
@ -35,7 +36,7 @@ pub struct RelateStatement {
impl RelateStatement {
pub async fn compute(
&self,
self: &Arc<Self>,
ctx: &Runtime,
opt: &Options,
txn: &Transaction,
@ -43,8 +44,10 @@ impl RelateStatement {
) -> Result<Value, Error> {
// Allowed to run?
opt.check(Level::No)?;
// Clone the statement
let s = Arc::clone(self);
// Create a new iterator
let mut i = Iterator::from(self);
let mut i = Iterator::from(s);
// Ensure futures are stored
let opt = &opt.futures(false);
// Loop over the select targets

View file

@ -23,6 +23,7 @@ use nom::combinator::opt;
use nom::sequence::preceded;
use serde::{Deserialize, Serialize};
use std::fmt;
use std::sync::Arc;
#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize, Store)]
pub struct SelectStatement {
@ -57,7 +58,7 @@ impl SelectStatement {
impl SelectStatement {
pub async fn compute(
&self,
self: &Arc<Self>,
ctx: &Runtime,
opt: &Options,
txn: &Transaction,
@ -65,8 +66,10 @@ impl SelectStatement {
) -> Result<Value, Error> {
// Allowed to run?
opt.check(Level::No)?;
// Clone the statement
let s = Arc::clone(self);
// Create a new iterator
let mut i = Iterator::from(self);
let mut i = Iterator::from(s);
// Ensure futures are processed
let opt = &opt.futures(true);
// Loop over the select targets

View file

@ -17,6 +17,7 @@ use nom::combinator::opt;
use nom::sequence::preceded;
use serde::{Deserialize, Serialize};
use std::fmt;
use std::sync::Arc;
#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize, Store)]
pub struct UpdateStatement {
@ -30,7 +31,7 @@ pub struct UpdateStatement {
impl UpdateStatement {
pub async fn compute(
&self,
self: &Arc<Self>,
ctx: &Runtime,
opt: &Options,
txn: &Transaction,
@ -38,8 +39,10 @@ impl UpdateStatement {
) -> Result<Value, Error> {
// Allowed to run?
opt.check(Level::No)?;
// Clone the statement
let s = Arc::clone(self);
// Create a new iterator
let mut i = Iterator::from(self);
let mut i = Iterator::from(s);
// Ensure futures are stored
let opt = &opt.futures(false);
// Loop over the update targets

View file

@ -18,17 +18,18 @@ use nom::combinator::map;
use serde::{Deserialize, Serialize};
use std::cmp::Ordering;
use std::fmt;
use std::sync::Arc;
#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
pub enum Subquery {
Value(Value),
Select(SelectStatement),
Create(CreateStatement),
Update(UpdateStatement),
Delete(DeleteStatement),
Relate(RelateStatement),
Insert(InsertStatement),
Ifelse(IfelseStatement),
Select(Arc<SelectStatement>),
Create(Arc<CreateStatement>),
Update(Arc<UpdateStatement>),
Delete(Arc<DeleteStatement>),
Relate(Arc<RelateStatement>),
Insert(Arc<InsertStatement>),
}
impl PartialOrd for Subquery {
@ -62,7 +63,7 @@ impl Subquery {
// Prepare context
let ctx = ctx.freeze();
// Process subquery
let res = v.compute(&ctx, &opt, txn, doc).await?;
let res = Arc::clone(v).compute(&ctx, &opt, txn, doc).await?;
// Process result
match v.limit() {
1 => match v.expr.single() {
@ -88,7 +89,7 @@ impl Subquery {
// Prepare context
let ctx = ctx.freeze();
// Process subquery
match v.compute(&ctx, &opt, txn, doc).await? {
match Arc::clone(v).compute(&ctx, &opt, txn, doc).await? {
Value::Array(mut v) => match v.len() {
1 => Ok(v.value.remove(0)),
_ => Ok(v.into()),
@ -109,7 +110,7 @@ impl Subquery {
// Prepare context
let ctx = ctx.freeze();
// Process subquery
match v.compute(&ctx, &opt, txn, doc).await? {
match Arc::clone(v).compute(&ctx, &opt, txn, doc).await? {
Value::Array(mut v) => match v.len() {
1 => Ok(v.value.remove(0)),
_ => Ok(v.into()),
@ -130,7 +131,7 @@ impl Subquery {
// Prepare context
let ctx = ctx.freeze();
// Process subquery
match v.compute(&ctx, &opt, txn, doc).await? {
match Arc::clone(v).compute(&ctx, &opt, txn, doc).await? {
Value::Array(mut v) => match v.len() {
1 => Ok(v.value.remove(0)),
_ => Ok(v.into()),
@ -151,7 +152,7 @@ impl Subquery {
// Prepare context
let ctx = ctx.freeze();
// Process subquery
match v.compute(&ctx, &opt, txn, doc).await? {
match Arc::clone(v).compute(&ctx, &opt, txn, doc).await? {
Value::Array(mut v) => match v.len() {
1 => Ok(v.value.remove(0)),
_ => Ok(v.into()),
@ -172,7 +173,7 @@ impl Subquery {
// Prepare context
let ctx = ctx.freeze();
// Process subquery
match v.compute(&ctx, &opt, txn, doc).await? {
match Arc::clone(v).compute(&ctx, &opt, txn, doc).await? {
Value::Array(mut v) => match v.len() {
1 => Ok(v.value.remove(0)),
_ => Ok(v.into()),
@ -211,12 +212,12 @@ fn subquery_ifelse(i: &str) -> IResult<&str, Subquery> {
fn subquery_others(i: &str) -> IResult<&str, Subquery> {
let (i, _) = tag("(")(i)?;
let (i, v) = alt((
map(select, |v| Subquery::Select(v)),
map(create, |v| Subquery::Create(v)),
map(update, |v| Subquery::Update(v)),
map(delete, |v| Subquery::Delete(v)),
map(relate, |v| Subquery::Relate(v)),
map(insert, |v| Subquery::Insert(v)),
map(select, |v| Subquery::Select(Arc::new(v))),
map(create, |v| Subquery::Create(Arc::new(v))),
map(update, |v| Subquery::Update(Arc::new(v))),
map(delete, |v| Subquery::Delete(Arc::new(v))),
map(relate, |v| Subquery::Relate(Arc::new(v))),
map(insert, |v| Subquery::Insert(Arc::new(v))),
map(value, |v| Subquery::Value(v)),
))(i)?;
let (i, _) = tag(")")(i)?;

View file

@ -9,6 +9,7 @@ use crate::sql::statements::select::SelectStatement;
use crate::sql::value::{Value, Values};
use async_recursion::async_recursion;
use futures::future::try_join_all;
use std::sync::Arc;
impl Value {
#[cfg_attr(feature = "parallel", async_recursion)]
@ -76,7 +77,8 @@ impl Value {
what: Values(vec![Value::Thing(v.clone())]),
..SelectStatement::default()
};
stm.compute(ctx, opt, txn, None)
Arc::new(stm)
.compute(ctx, opt, txn, None)
.await?
.first(ctx, opt, txn)
.await?