Implement INSERT and RELATE statements

This commit is contained in:
Tobie Morgan Hitchcock 2022-05-30 16:32:26 +01:00
parent b37b027b60
commit 01d21e1157
24 changed files with 741 additions and 517 deletions

View file

@ -1,204 +1,121 @@
use crate::ctx::Context;
use crate::dbs::Iterable;
use crate::dbs::Operable;
use crate::dbs::Options;
use crate::dbs::Statement;
use crate::dbs::Transaction;
use crate::err::Error;
use crate::key::thing;
use crate::sql::array::Array;
use crate::sql::id::Id;
use crate::sql::model::Model;
use crate::sql::object::Object;
use crate::sql::table::Table;
use crate::sql::thing::Thing;
use crate::sql::value::Value;
use async_recursion::async_recursion;
use channel::Sender;
impl Value {
#[cfg_attr(feature = "parallel", async_recursion)]
#[cfg_attr(not(feature = "parallel"), async_recursion(?Send))]
impl Iterable {
pub(crate) async fn channel(
self,
ctx: &Context<'_>,
opt: &Options,
txn: &Transaction,
stm: &Statement<'_>,
chn: Sender<(Option<Thing>, Value)>,
_stm: &Statement<'_>,
chn: Sender<(Option<Thing>, Operable)>,
) -> Result<(), Error> {
if ctx.is_ok() {
match self {
Value::Object(v) => v.process(ctx, opt, txn, stm, &chn).await?,
Value::Array(v) => v.process(ctx, opt, txn, stm, &chn).await?,
Value::Model(v) => v.process(ctx, opt, txn, stm, &chn).await?,
Value::Thing(v) => v.process(ctx, opt, txn, stm, &chn).await?,
Value::Table(v) => v.process(ctx, opt, txn, stm, &chn).await?,
v => chn.send((None, v)).await?,
}
}
Ok(())
}
}
impl Array {
#[cfg_attr(feature = "parallel", async_recursion)]
#[cfg_attr(not(feature = "parallel"), async_recursion(?Send))]
pub(crate) async fn process(
self,
ctx: &Context<'_>,
opt: &Options,
txn: &Transaction,
stm: &Statement<'_>,
chn: &Sender<(Option<Thing>, Value)>,
) -> Result<(), Error> {
for v in self {
if ctx.is_ok() {
match v {
Value::Object(v) => v.process(ctx, opt, txn, stm, chn).await?,
Value::Array(v) => v.process(ctx, opt, txn, stm, chn).await?,
Value::Model(v) => v.process(ctx, opt, txn, stm, chn).await?,
Value::Thing(v) => v.process(ctx, opt, txn, stm, chn).await?,
Value::Table(v) => v.process(ctx, opt, txn, stm, chn).await?,
v => chn.send((None, v)).await?,
Iterable::Value(v) => {
// Pass the value through
let val = Operable::Value(v);
// Process the document record
chn.send((None, val)).await?;
}
}
}
Ok(())
}
}
impl Object {
#[cfg_attr(feature = "parallel", async_recursion)]
#[cfg_attr(not(feature = "parallel"), async_recursion(?Send))]
pub(crate) async fn process(
self,
ctx: &Context<'_>,
opt: &Options,
txn: &Transaction,
stm: &Statement<'_>,
chn: &Sender<(Option<Thing>, Value)>,
) -> Result<(), Error> {
if ctx.is_ok() {
if let Some(Value::Thing(id)) = self.get("id") {
id.clone().process(ctx, opt, txn, stm, chn).await?;
}
}
Ok(())
}
}
impl Model {
pub(crate) async fn process(
self,
ctx: &Context<'_>,
opt: &Options,
txn: &Transaction,
stm: &Statement<'_>,
chn: &Sender<(Option<Thing>, Value)>,
) -> Result<(), Error> {
if ctx.is_ok() {
match self {
Model::Count(tb, c) => {
for _ in 0..c {
Thing {
tb: tb.to_string(),
id: Id::rand(),
}
.process(ctx, opt, txn, stm, chn)
.await?;
}
Iterable::Thing(v) => {
// Fetch the data from the store
let key = thing::new(opt.ns(), opt.db(), &v.tb, &v.id);
let val = txn.clone().lock().await.get(key).await?;
// Parse the data from the store
let val = Operable::Value(match val {
Some(v) => Value::from(v),
None => Value::None,
});
// Process the document record
chn.send((Some(v), val)).await?;
}
Model::Range(tb, b, e) => {
for x in b..=e {
Thing {
tb: tb.to_string(),
id: Id::from(x),
}
.process(ctx, opt, txn, stm, chn)
.await?;
}
}
}
}
Ok(())
}
}
impl Thing {
pub(crate) async fn process(
self,
ctx: &Context<'_>,
opt: &Options,
txn: &Transaction,
_stm: &Statement<'_>,
chn: &Sender<(Option<Thing>, Value)>,
) -> Result<(), Error> {
if ctx.is_ok() {
let key = thing::new(opt.ns(), opt.db(), &self.tb, &self.id);
let val = txn.clone().lock().await.get(key).await?;
let val = match val {
Some(v) => Value::from(v),
None => Value::None,
};
chn.send((Some(self), val)).await?;
}
Ok(())
}
}
impl Table {
pub(crate) async fn process(
self,
ctx: &Context<'_>,
opt: &Options,
txn: &Transaction,
_stm: &Statement<'_>,
chn: &Sender<(Option<Thing>, Value)>,
) -> Result<(), Error> {
if ctx.is_ok() {
let beg = thing::prefix(opt.ns(), opt.db(), &self);
let end = thing::suffix(opt.ns(), opt.db(), &self);
let mut nxt: Option<Vec<u8>> = None;
loop {
if ctx.is_ok() {
let res = match nxt {
None => {
let min = beg.clone();
let max = end.clone();
txn.clone().lock().await.scan(min..max, 1000).await?
}
Some(ref mut beg) => {
beg.push(0x00);
let min = beg.clone();
let max = end.clone();
txn.clone().lock().await.scan(min..max, 1000).await?
}
};
if !res.is_empty() {
// Get total results
let n = res.len();
// Exit when settled
if n == 0 {
break;
}
// Loop over results
for (i, (k, v)) in res.into_iter().enumerate() {
if ctx.is_ok() {
// Ready the next
if n == i + 1 {
nxt = Some(k.clone());
Iterable::Table(v) => {
let beg = thing::prefix(opt.ns(), opt.db(), &v);
let end = thing::suffix(opt.ns(), opt.db(), &v);
let mut nxt: Option<Vec<u8>> = None;
loop {
if ctx.is_ok() {
let res = match nxt {
None => {
let min = beg.clone();
let max = end.clone();
txn.clone().lock().await.scan(min..max, 1000).await?
}
// Parse the key-value
let k: crate::key::thing::Thing = (&k).into();
let v: crate::sql::value::Value = (&v).into();
let t = Thing::from((k.tb, k.id));
// Process the record
chn.send((Some(t), v)).await?;
Some(ref mut beg) => {
beg.push(0x00);
let min = beg.clone();
let max = end.clone();
txn.clone().lock().await.scan(min..max, 1000).await?
}
};
if !res.is_empty() {
// Get total results
let n = res.len();
// Exit when settled
if n == 0 {
break;
}
// Loop over results
for (i, (k, v)) in res.into_iter().enumerate() {
if ctx.is_ok() {
// Ready the next
if n == i + 1 {
nxt = Some(k.clone());
}
// Parse the data from the store
let key: crate::key::thing::Thing = (&k).into();
let val: crate::sql::value::Value = (&v).into();
let rid = Thing::from((key.tb, key.id));
// Create a new operable value
let val = Operable::Value(val);
// Process the record
chn.send((Some(rid), val)).await?;
}
}
continue;
}
}
continue;
break;
}
}
break;
Iterable::Mergeable(v, o) => {
// Fetch the data from the store
let key = thing::new(opt.ns(), opt.db(), &v.tb, &v.id);
let val = txn.clone().lock().await.get(key).await?;
// Parse the data from the store
let x = match val {
Some(v) => Value::from(v),
None => Value::None,
};
// Create a new operable value
let val = Operable::Mergeable(x, o);
// Process the document record
chn.send((Some(v), val)).await?;
}
Iterable::Relatable(f, v, w) => {
// Fetch the data from the store
let key = thing::new(opt.ns(), opt.db(), &v.tb, &v.id);
let val = txn.clone().lock().await.get(key).await?;
// Parse the data from the store
let x = match val {
Some(v) => Value::from(v),
None => Value::None,
};
// Create a new operable value
let val = Operable::Relatable(f, x, w);
// Process the document record
chn.send((Some(v), val)).await?;
}
}
}
Ok(())

View file

@ -1,22 +1,16 @@
use crate::ctx::Context;
use crate::dbs::Iterable;
use crate::dbs::Iterator;
use crate::dbs::Operable;
use crate::dbs::Options;
use crate::dbs::Statement;
use crate::dbs::Transaction;
use crate::err::Error;
use crate::key::thing;
use crate::sql::array::Array;
use crate::sql::id::Id;
use crate::sql::model::Model;
use crate::sql::object::Object;
use crate::sql::table::Table;
use crate::sql::thing::Thing;
use crate::sql::value::Value;
use async_recursion::async_recursion;
impl Value {
#[cfg_attr(feature = "parallel", async_recursion)]
#[cfg_attr(not(feature = "parallel"), async_recursion(?Send))]
impl Iterable {
pub(crate) async fn iterate(
self,
ctx: &Context<'_>,
@ -27,178 +21,101 @@ impl Value {
) -> Result<(), Error> {
if ctx.is_ok() {
match self {
Value::Object(v) => v.iterate(ctx, opt, txn, stm, ite).await?,
Value::Array(v) => v.iterate(ctx, opt, txn, stm, ite).await?,
Value::Model(v) => v.iterate(ctx, opt, txn, stm, ite).await?,
Value::Thing(v) => v.iterate(ctx, opt, txn, stm, ite).await?,
Value::Table(v) => v.iterate(ctx, opt, txn, stm, ite).await?,
v => ite.process(ctx, opt, txn, stm, None, v).await,
}
}
Ok(())
}
}
impl Array {
#[cfg_attr(feature = "parallel", async_recursion)]
#[cfg_attr(not(feature = "parallel"), async_recursion(?Send))]
pub(crate) async fn iterate(
self,
ctx: &Context<'_>,
opt: &Options,
txn: &Transaction,
stm: &Statement<'_>,
ite: &mut Iterator,
) -> Result<(), Error> {
for v in self.into_iter() {
if ctx.is_ok() {
match v {
Value::Object(v) => v.iterate(ctx, opt, txn, stm, ite).await?,
Value::Array(v) => v.iterate(ctx, opt, txn, stm, ite).await?,
Value::Model(v) => v.iterate(ctx, opt, txn, stm, ite).await?,
Value::Thing(v) => v.iterate(ctx, opt, txn, stm, ite).await?,
Value::Table(v) => v.iterate(ctx, opt, txn, stm, ite).await?,
v => ite.process(ctx, opt, txn, stm, None, v).await,
Iterable::Value(v) => {
// Pass the value through
let val = Operable::Value(v);
// Process the document record
ite.process(ctx, opt, txn, stm, None, val).await;
}
}
}
Ok(())
}
}
impl Object {
#[cfg_attr(feature = "parallel", async_recursion)]
#[cfg_attr(not(feature = "parallel"), async_recursion(?Send))]
pub(crate) async fn iterate(
self,
ctx: &Context<'_>,
opt: &Options,
txn: &Transaction,
stm: &Statement<'_>,
ite: &mut Iterator,
) -> Result<(), Error> {
if ctx.is_ok() {
if let Some(Value::Thing(id)) = self.get("id") {
id.clone().iterate(ctx, opt, txn, stm, ite).await?;
}
}
Ok(())
}
}
impl Model {
pub(crate) async fn iterate(
self,
ctx: &Context<'_>,
opt: &Options,
txn: &Transaction,
stm: &Statement<'_>,
ite: &mut Iterator,
) -> Result<(), Error> {
if ctx.is_ok() {
match self {
Model::Count(tb, c) => {
for _ in 0..c {
Thing {
tb: tb.to_string(),
id: Id::rand(),
}
.iterate(ctx, opt, txn, stm, ite)
.await?;
}
Iterable::Thing(v) => {
// Fetch the data from the store
let key = thing::new(opt.ns(), opt.db(), &v.tb, &v.id);
let val = txn.clone().lock().await.get(key).await?;
// Parse the data from the store
let val = Operable::Value(match val {
Some(v) => Value::from(v),
None => Value::None,
});
// Process the document record
ite.process(ctx, opt, txn, stm, Some(v), val).await;
}
Model::Range(tb, b, e) => {
for x in b..=e {
Thing {
tb: tb.to_string(),
id: Id::from(x),
}
.iterate(ctx, opt, txn, stm, ite)
.await?;
}
}
}
}
Ok(())
}
}
impl Thing {
pub(crate) async fn iterate(
self,
ctx: &Context<'_>,
opt: &Options,
txn: &Transaction,
stm: &Statement<'_>,
ite: &mut Iterator,
) -> Result<(), Error> {
if ctx.is_ok() {
let key = thing::new(opt.ns(), opt.db(), &self.tb, &self.id);
let val = txn.clone().lock().await.get(key).await?;
let val = match val {
Some(v) => Value::from(v),
None => Value::None,
};
ite.process(ctx, opt, txn, stm, Some(self), val).await;
}
Ok(())
}
}
impl Table {
pub(crate) async fn iterate(
self,
ctx: &Context<'_>,
opt: &Options,
txn: &Transaction,
stm: &Statement<'_>,
ite: &mut Iterator,
) -> Result<(), Error> {
if ctx.is_ok() {
let beg = thing::prefix(opt.ns(), opt.db(), &self);
let end = thing::suffix(opt.ns(), opt.db(), &self);
let mut nxt: Option<Vec<u8>> = None;
loop {
if ctx.is_ok() {
let res = match nxt {
None => {
let min = beg.clone();
let max = end.clone();
txn.clone().lock().await.scan(min..max, 1000).await?
}
Some(ref mut beg) => {
beg.push(0x00);
let min = beg.clone();
let max = end.clone();
txn.clone().lock().await.scan(min..max, 1000).await?
}
};
if !res.is_empty() {
// Get total results
let n = res.len();
// Exit when settled
if n == 0 {
break;
}
// Loop over results
for (i, (k, v)) in res.into_iter().enumerate() {
if ctx.is_ok() {
// Ready the next
if n == i + 1 {
nxt = Some(k.clone());
Iterable::Table(v) => {
let beg = thing::prefix(opt.ns(), opt.db(), &v);
let end = thing::suffix(opt.ns(), opt.db(), &v);
let mut nxt: Option<Vec<u8>> = None;
loop {
if ctx.is_ok() {
let res = match nxt {
None => {
let min = beg.clone();
let max = end.clone();
txn.clone().lock().await.scan(min..max, 1000).await?
}
// Parse the key-value
let k: crate::key::thing::Thing = (&k).into();
let v: crate::sql::value::Value = (&v).into();
let t = Thing::from((k.tb, k.id));
// Process the record
ite.process(ctx, opt, txn, stm, Some(t), v).await;
Some(ref mut beg) => {
beg.push(0x00);
let min = beg.clone();
let max = end.clone();
txn.clone().lock().await.scan(min..max, 1000).await?
}
};
if !res.is_empty() {
// Get total results
let n = res.len();
// Exit when settled
if n == 0 {
break;
}
// Loop over results
for (i, (k, v)) in res.into_iter().enumerate() {
if ctx.is_ok() {
// Ready the next
if n == i + 1 {
nxt = Some(k.clone());
}
// Parse the data from the store
let key: crate::key::thing::Thing = (&k).into();
let val: crate::sql::value::Value = (&v).into();
let rid = Thing::from((key.tb, key.id));
// Create a new operable value
let val = Operable::Value(val);
// Process the record
ite.process(ctx, opt, txn, stm, Some(rid), val).await;
}
}
continue;
}
}
continue;
break;
}
}
break;
Iterable::Mergeable(v, o) => {
// Fetch the data from the store
let key = thing::new(opt.ns(), opt.db(), &v.tb, &v.id);
let val = txn.clone().lock().await.get(key).await?;
// Parse the data from the store
let x = match val {
Some(v) => Value::from(v),
None => Value::None,
};
// Create a new operable value
let val = Operable::Mergeable(x, o);
// Process the document record
ite.process(ctx, opt, txn, stm, Some(v), val).await;
}
Iterable::Relatable(f, v, w) => {
// Fetch the data from the store
let key = thing::new(opt.ns(), opt.db(), &v.tb, &v.id);
let val = txn.clone().lock().await.get(key).await?;
// Parse the data from the store
let x = match val {
Some(v) => Value::from(v),
None => Value::None,
};
// Create a new operable value
let val = Operable::Relatable(f, x, w);
// Process the document record
ite.process(ctx, opt, txn, stm, Some(v), val).await;
}
}
}
Ok(())

View file

@ -7,7 +7,6 @@ use crate::doc::Document;
use crate::err::Error;
use crate::sql::array::Array;
use crate::sql::field::Field;
use crate::sql::id::Id;
use crate::sql::part::Part;
use crate::sql::table::Table;
use crate::sql::thing::Thing;
@ -16,16 +15,36 @@ use std::cmp::Ordering;
use std::collections::BTreeMap;
use std::mem;
pub enum Iterable {
Value(Value),
Table(Table),
Thing(Thing),
Mergeable(Thing, Value),
Relatable(Thing, Thing, Thing),
}
pub enum Operable {
Value(Value),
Mergeable(Value, Value),
Relatable(Thing, Value, Thing),
}
pub enum Workable {
Normal,
Insert(Value),
Relate(Thing, Thing),
}
#[derive(Default)]
pub struct Iterator {
// Iterator status
run: Canceller,
// Iterator runtime error
error: Option<Error>,
// Iterator input values
readies: Vec<Value>,
// Iterator output results
results: Vec<Value>,
// Iterator input values
entries: Vec<Iterable>,
}
impl Iterator {
@ -35,16 +54,8 @@ impl Iterator {
}
// Prepares a value for processing
pub fn prepare(&mut self, val: Value) {
self.readies.push(val)
}
// Create a new record for processing
pub fn produce(&mut self, val: Table) {
self.prepare(Value::Thing(Thing {
tb: val.0,
id: Id::rand(),
}))
pub fn ingest(&mut self, val: Iterable) {
self.entries.push(val)
}
// Process the records and output
@ -324,7 +335,7 @@ impl Iterator {
stm: &Statement<'_>,
) -> Result<(), Error> {
// Process all prepared values
for v in mem::take(&mut self.readies) {
for v in mem::take(&mut self.entries) {
v.iterate(ctx, opt, txn, stm, self).await?;
}
// Everything processed ok
@ -343,7 +354,7 @@ impl Iterator {
// Run statements sequentially
false => {
// Process all prepared values
for v in mem::take(&mut self.readies) {
for v in mem::take(&mut self.entries) {
v.iterate(ctx, opt, txn, stm, self).await?;
}
// Everything processed ok
@ -354,7 +365,7 @@ impl Iterator {
// Create a new executor
let exe = executor::Executor::new();
// Take all of the iterator values
let vals = mem::take(&mut self.readies);
let vals = mem::take(&mut self.entries);
// Create a channel to shutdown
let (end, exit) = channel::bounded::<()>(1);
// Create an unbounded channel
@ -412,14 +423,20 @@ impl Iterator {
txn: &Transaction,
stm: &Statement<'_>,
thg: Option<Thing>,
val: Value,
val: Operable,
) {
// Check current context
if ctx.is_done() {
return;
}
// Setup a new workable
let val = match val {
Operable::Value(v) => (v, Workable::Normal),
Operable::Mergeable(v, o) => (v, Workable::Insert(o)),
Operable::Relatable(f, v, w) => (v, Workable::Relate(f, w)),
};
// Setup a new document
let mut doc = Document::new(thg, &val);
let mut doc = Document::new(thg, &val.0, val.1);
// Process the document
let res = match stm {
Statement::Select(_) => doc.select(ctx, opt, txn, stm).await,

View file

@ -96,6 +96,8 @@ impl<'a> Statement<'a> {
match self {
Statement::Create(v) => v.data.as_ref(),
Statement::Update(v) => v.data.as_ref(),
Statement::Relate(v) => v.data.as_ref(),
Statement::Insert(v) => v.update.as_ref(),
_ => None,
}
}
@ -157,6 +159,14 @@ impl<'a> Statement<'a> {
_ => None,
}
}
// Returns any VERSION clause if specified
#[inline]
pub fn version(&self) -> Option<&Version> {
match self {
Statement::Select(v) => v.version.as_ref(),
_ => None,
}
}
// Returns any RETURN clause if specified
#[inline]
pub fn output(&self) -> Option<&Output> {
@ -169,14 +179,6 @@ impl<'a> Statement<'a> {
_ => None,
}
}
// Returns any VERSION clause if specified
#[inline]
pub fn version(&self) -> Option<&Version> {
match self {
Statement::Select(v) => v.version.as_ref(),
_ => None,
}
}
// Returns any RETURN clause if specified
#[inline]
pub fn parallel(&self) -> bool {

View file

@ -1,40 +0,0 @@
use crate::ctx::Context;
use crate::dbs::Options;
use crate::dbs::Statement;
use crate::dbs::Transaction;
use crate::doc::Document;
use crate::err::Error;
impl<'a> Document<'a> {
pub async fn admit(
&self,
_ctx: &Context<'_>,
_opt: &Options,
_txn: &Transaction,
stm: &Statement<'_>,
) -> Result<(), Error> {
// Check that we are altering a record
if self.id.is_none() {
return match stm {
Statement::Create(_) => Err(Error::CreateStatement {
value: self.initial.to_string(),
}),
Statement::Update(_) => Err(Error::UpdateStatement {
value: self.initial.to_string(),
}),
Statement::Relate(_) => Err(Error::RelateStatement {
value: self.initial.to_string(),
}),
Statement::Delete(_) => Err(Error::DeleteStatement {
value: self.initial.to_string(),
}),
Statement::Insert(_) => Err(Error::InsertStatement {
value: self.initial.to_string(),
}),
_ => unreachable!(),
};
}
// Carry on
Ok(())
}
}

86
lib/src/doc/alter.rs Normal file
View file

@ -0,0 +1,86 @@
use crate::ctx::Context;
use crate::dbs::Options;
use crate::dbs::Statement;
use crate::dbs::Transaction;
use crate::doc::Document;
use crate::err::Error;
use crate::sql::data::Data;
use crate::sql::operator::Operator;
use crate::sql::value::Value;
impl<'a> Document<'a> {
pub async fn alter(
&mut self,
ctx: &Context<'_>,
opt: &Options,
txn: &Transaction,
stm: &Statement<'_>,
) -> Result<(), Error> {
// Get the record id
let rid = self.id.as_ref().unwrap();
// Set default field values
self.current.to_mut().def(ctx, opt, txn, rid).await?;
// The statement has a data clause
if let Some(v) = stm.data() {
match v {
Data::SetExpression(x) => {
for x in x.iter() {
let v = x.2.compute(ctx, opt, txn, Some(&self.current)).await?;
match x.1 {
Operator::Equal => match v {
Value::Void => {
self.current.to_mut().del(ctx, opt, txn, &x.0).await?
}
_ => self.current.to_mut().set(ctx, opt, txn, &x.0, v).await?,
},
Operator::Inc => {
self.current.to_mut().increment(ctx, opt, txn, &x.0, v).await?
}
Operator::Dec => {
self.current.to_mut().decrement(ctx, opt, txn, &x.0, v).await?
}
_ => unreachable!(),
}
}
}
Data::UpdateExpression(x) => {
for x in x.iter() {
let v = x.2.compute(ctx, opt, txn, Some(&self.current)).await?;
match x.1 {
Operator::Equal => match v {
Value::Void => {
self.current.to_mut().del(ctx, opt, txn, &x.0).await?
}
_ => self.current.to_mut().set(ctx, opt, txn, &x.0, v).await?,
},
Operator::Inc => {
self.current.to_mut().increment(ctx, opt, txn, &x.0, v).await?
}
Operator::Dec => {
self.current.to_mut().decrement(ctx, opt, txn, &x.0, v).await?
}
_ => unreachable!(),
}
}
}
Data::PatchExpression(data) => {
self.current.to_mut().patch(ctx, opt, txn, data).await?
}
Data::MergeExpression(data) => {
self.current.to_mut().merge(ctx, opt, txn, data).await?
}
Data::ReplaceExpression(data) => {
self.current.to_mut().replace(ctx, opt, txn, data).await?
}
Data::ContentExpression(data) => {
self.current.to_mut().replace(ctx, opt, txn, data).await?
}
_ => unreachable!(),
};
};
// Set default field values
self.current.to_mut().def(ctx, opt, txn, rid).await?;
// Carry on
Ok(())
}
}

View file

@ -1,7 +1,9 @@
use crate::ctx::Context;
use crate::dbs::Operable;
use crate::dbs::Options;
use crate::dbs::Statement;
use crate::dbs::Transaction;
use crate::dbs::Workable;
use crate::doc::Document;
use crate::err::Error;
use crate::sql::thing::Thing;
@ -16,10 +18,16 @@ impl<'a> Document<'a> {
stm: &Statement<'_>,
chn: Sender<Result<Value, Error>>,
thg: Option<Thing>,
val: Value,
val: Operable,
) -> Result<(), Error> {
// Setup a new workable
let ins = match val {
Operable::Value(v) => (v, Workable::Normal),
Operable::Mergeable(v, o) => (v, Workable::Insert(o)),
Operable::Relatable(f, v, w) => (v, Workable::Relate(f, w)),
};
// Setup a new document
let mut doc = Document::new(thg, &val);
let mut doc = Document::new(thg, &ins.0, ins.1);
// Process the statement
let res = match stm {
Statement::Select(_) => doc.select(ctx, opt, txn, stm).await,

View file

@ -14,12 +14,10 @@ impl<'a> Document<'a> {
txn: &Transaction,
stm: &Statement<'_>,
) -> Result<Value, Error> {
// Check value type
self.admit(ctx, opt, txn, stm).await?;
// Check if exists
self.exist(ctx, opt, txn, stm).await?;
// Merge record data
self.merge(ctx, opt, txn, stm).await?;
// Alter record data
self.alter(ctx, opt, txn, stm).await?;
// Merge fields data
self.field(ctx, opt, txn, stm).await?;
// Check if allowed

View file

@ -14,8 +14,6 @@ impl<'a> Document<'a> {
txn: &Transaction,
stm: &Statement<'_>,
) -> Result<Value, Error> {
// Check value type
self.admit(ctx, opt, txn, stm).await?;
// Check where clause
self.check(ctx, opt, txn, stm).await?;
// Erase document

View file

@ -1,5 +1,6 @@
use crate::dbs::Options;
use crate::dbs::Transaction;
use crate::dbs::Workable;
use crate::err::Error;
use crate::sql::statements::define::DefineEventStatement;
use crate::sql::statements::define::DefineFieldStatement;
@ -9,9 +10,9 @@ use crate::sql::thing::Thing;
use crate::sql::value::Value;
use std::borrow::Cow;
#[derive(Clone, Debug, Default, Eq, PartialEq)]
pub struct Document<'a> {
pub(super) id: Option<Thing>,
pub(super) extras: Workable,
pub(super) current: Cow<'a, Value>,
pub(super) initial: Cow<'a, Value>,
}
@ -23,9 +24,10 @@ impl<'a> From<&Document<'a>> for Vec<u8> {
}
impl<'a> Document<'a> {
pub fn new(id: Option<Thing>, val: &'a Value) -> Self {
pub fn new(id: Option<Thing>, val: &'a Value, ext: Workable) -> Self {
Document {
id,
extras: ext,
current: Cow::Borrowed(val),
initial: Cow::Borrowed(val),
}

46
lib/src/doc/edges.rs Normal file
View file

@ -0,0 +1,46 @@
use crate::ctx::Context;
use crate::dbs::Options;
use crate::dbs::Statement;
use crate::dbs::Transaction;
use crate::dbs::Workable;
use crate::doc::Document;
use crate::err::Error;
use crate::sql::graph::Dir;
impl<'a> Document<'a> {
pub async fn edges(
&self,
_ctx: &Context<'_>,
opt: &Options,
txn: &Transaction,
_stm: &Statement<'_>,
) -> Result<(), Error> {
// Check if the table is a view
if self.tb(opt, txn).await?.drop {
return Ok(());
}
// Clone transaction
let run = txn.clone();
// Claim transaction
let mut run = run.lock().await;
// Get the record id
let rid = self.id.as_ref().unwrap();
// Store the record edges
if let Workable::Relate(l, r) = &self.extras {
// Store the left pointer edge
let key = crate::key::graph::new(opt.ns(), opt.db(), &l.tb, &l.id, &Dir::Out, rid);
run.set(key, self).await?;
// Store the left inner edge
let key = crate::key::graph::new(opt.ns(), opt.db(), &rid.tb, &rid.id, &Dir::In, l);
run.set(key, self).await?;
// Store the right inner edge
let key = crate::key::graph::new(opt.ns(), opt.db(), &rid.tb, &rid.id, &Dir::Out, r);
run.set(key, self).await?;
// Store the right pointer edge
let key = crate::key::graph::new(opt.ns(), opt.db(), &r.tb, &r.id, &Dir::In, rid);
run.set(key, self).await?;
}
// Carry on
Ok(())
}
}

View file

@ -9,11 +9,59 @@ use crate::sql::value::Value;
impl<'a> Document<'a> {
pub async fn insert(
&mut self,
_ctx: &Context<'_>,
_opt: &Options,
_txn: &Transaction,
_stm: &Statement<'_>,
ctx: &Context<'_>,
opt: &Options,
txn: &Transaction,
stm: &Statement<'_>,
) -> Result<Value, Error> {
todo!()
// Check current record
match self.current.is_some() {
// Run INSERT clause
false => {
// Check if allowed
self.allow(ctx, opt, txn, stm).await?;
// Merge record data
self.merge(ctx, opt, txn, stm).await?;
// Merge fields data
self.field(ctx, opt, txn, stm).await?;
// Check if allowed
self.allow(ctx, opt, txn, stm).await?;
// Store index data
self.index(ctx, opt, txn, stm).await?;
// Store record data
self.store(ctx, opt, txn, stm).await?;
// Run table queries
self.table(ctx, opt, txn, stm).await?;
// Run lives queries
self.lives(ctx, opt, txn, stm).await?;
// Run event queries
self.event(ctx, opt, txn, stm).await?;
// Yield document
self.pluck(ctx, opt, txn, stm).await
}
// Run UPDATE clause
true => {
// Check if allowed
self.allow(ctx, opt, txn, stm).await?;
// Alter record data
self.alter(ctx, opt, txn, stm).await?;
// Merge fields data
self.field(ctx, opt, txn, stm).await?;
// Check if allowed
self.allow(ctx, opt, txn, stm).await?;
// Store index data
self.index(ctx, opt, txn, stm).await?;
// Store record data
self.store(ctx, opt, txn, stm).await?;
// Run table queries
self.table(ctx, opt, txn, stm).await?;
// Run lives queries
self.lives(ctx, opt, txn, stm).await?;
// Run event queries
self.event(ctx, opt, txn, stm).await?;
// Yield document
self.pluck(ctx, opt, txn, stm).await
}
}
}
}

View file

@ -2,11 +2,9 @@ use crate::ctx::Context;
use crate::dbs::Options;
use crate::dbs::Statement;
use crate::dbs::Transaction;
use crate::dbs::Workable;
use crate::doc::Document;
use crate::err::Error;
use crate::sql::data::Data;
use crate::sql::operator::Operator;
use crate::sql::value::Value;
impl<'a> Document<'a> {
pub async fn merge(
@ -14,50 +12,16 @@ impl<'a> Document<'a> {
ctx: &Context<'_>,
opt: &Options,
txn: &Transaction,
stm: &Statement<'_>,
_stm: &Statement<'_>,
) -> Result<(), Error> {
// Get the record id
let rid = self.id.as_ref().unwrap();
// Set default field values
self.current.to_mut().def(ctx, opt, txn, rid).await?;
// The statement has a data clause
if let Some(v) = stm.data() {
match v {
Data::SetExpression(x) => {
for x in x.iter() {
let v = x.2.compute(ctx, opt, txn, Some(&self.current)).await?;
match x.1 {
Operator::Equal => match v {
Value::Void => {
self.current.to_mut().del(ctx, opt, txn, &x.0).await?
}
_ => self.current.to_mut().set(ctx, opt, txn, &x.0, v).await?,
},
Operator::Inc => {
self.current.to_mut().increment(ctx, opt, txn, &x.0, v).await?
}
Operator::Dec => {
self.current.to_mut().decrement(ctx, opt, txn, &x.0, v).await?
}
_ => unreachable!(),
}
}
}
Data::PatchExpression(data) => {
self.current.to_mut().patch(ctx, opt, txn, data).await?
}
Data::MergeExpression(data) => {
self.current.to_mut().merge(ctx, opt, txn, data).await?
}
Data::ReplaceExpression(data) => {
self.current.to_mut().replace(ctx, opt, txn, data).await?
}
Data::ContentExpression(data) => {
self.current.to_mut().replace(ctx, opt, txn, data).await?
}
_ => unreachable!(),
};
};
// This is an INSERT statement
if let Workable::Insert(v) = &self.extras {
self.current.to_mut().merge(ctx, opt, txn, v).await?;
}
// Set default field values
self.current.to_mut().def(ctx, opt, txn, rid).await?;
// Carry on

View file

@ -3,12 +3,13 @@ pub use self::document::*;
#[cfg(feature = "parallel")]
mod compute;
mod admit;
mod allow;
mod alter;
mod check;
mod create;
mod delete;
mod document;
mod edges;
mod empty;
mod erase;
mod event;

View file

@ -14,18 +14,18 @@ impl<'a> Document<'a> {
txn: &Transaction,
stm: &Statement<'_>,
) -> Result<Value, Error> {
// Check value type
self.admit(ctx, opt, txn, stm).await?;
// Check if allowed
self.allow(ctx, opt, txn, stm).await?;
// Merge record data
self.merge(ctx, opt, txn, stm).await?;
// Alter record data
self.alter(ctx, opt, txn, stm).await?;
// Merge fields data
self.field(ctx, opt, txn, stm).await?;
// Check if allowed
self.allow(ctx, opt, txn, stm).await?;
// Store index data
self.index(ctx, opt, txn, stm).await?;
// Store record edges
self.edges(ctx, opt, txn, stm).await?;
// Store record data
self.store(ctx, opt, txn, stm).await?;
// Run table queries

View file

@ -14,14 +14,12 @@ impl<'a> Document<'a> {
txn: &Transaction,
stm: &Statement<'_>,
) -> Result<Value, Error> {
// Check value type
self.admit(ctx, opt, txn, stm).await?;
// Check where clause
self.check(ctx, opt, txn, stm).await?;
// Check if allowed
self.allow(ctx, opt, txn, stm).await?;
// Merge record data
self.merge(ctx, opt, txn, stm).await?;
// Alter record data
self.alter(ctx, opt, txn, stm).await?;
// Merge fields data
self.field(ctx, opt, txn, stm).await?;
// Check if allowed

View file

@ -8,6 +8,7 @@ use crate::sql::error::IResult;
use crate::sql::escape::escape_key;
use crate::sql::operation::{Op, Operation};
use crate::sql::serde::is_internal_serialization;
use crate::sql::thing::Thing;
use crate::sql::value::{value, Value};
use nom::branch::alt;
use nom::bytes::complete::is_not;
@ -77,6 +78,14 @@ impl IntoIterator for Object {
}
impl Object {
// Fetch the record id if there is one
pub fn rid(&self) -> Option<Thing> {
match self.get("id") {
Some(Value::Thing(v)) => Some(v.clone()),
_ => None,
}
}
// Convert this object to a diff-match-patch operation
pub fn to_operation(&self) -> Result<Operation, Error> {
match self.get("op") {
Some(o) => match self.get("path") {

View file

@ -1,4 +1,5 @@
use crate::ctx::Context;
use crate::dbs::Iterable;
use crate::dbs::Iterator;
use crate::dbs::Level;
use crate::dbs::Options;
@ -49,10 +50,47 @@ impl CreateStatement {
for w in self.what.0.iter() {
let v = w.compute(ctx, opt, txn, doc).await?;
match v {
Value::Table(v) => i.produce(v),
Value::Thing(_) => i.prepare(v),
Value::Model(_) => i.prepare(v),
Value::Array(_) => i.prepare(v),
Value::Table(v) => i.ingest(Iterable::Thing(v.generate())),
Value::Thing(v) => i.ingest(Iterable::Thing(v)),
Value::Model(v) => {
for v in v {
i.ingest(Iterable::Thing(v));
}
}
Value::Array(v) => {
for v in v {
match v {
Value::Table(v) => i.ingest(Iterable::Thing(v.generate())),
Value::Thing(v) => i.ingest(Iterable::Thing(v)),
Value::Model(v) => {
for v in v {
i.ingest(Iterable::Thing(v));
}
}
Value::Object(v) => match v.rid() {
Some(v) => i.ingest(Iterable::Thing(v)),
None => {
return Err(Error::CreateStatement {
value: v.to_string(),
})
}
},
v => {
return Err(Error::CreateStatement {
value: v.to_string(),
})
}
};
}
}
Value::Object(v) => match v.rid() {
Some(v) => i.ingest(Iterable::Thing(v)),
None => {
return Err(Error::CreateStatement {
value: v.to_string(),
})
}
},
v => {
return Err(Error::CreateStatement {
value: v.to_string(),

View file

@ -1,4 +1,5 @@
use crate::ctx::Context;
use crate::dbs::Iterable;
use crate::dbs::Iterator;
use crate::dbs::Level;
use crate::dbs::Options;
@ -50,10 +51,47 @@ impl DeleteStatement {
for w in self.what.0.iter() {
let v = w.compute(ctx, opt, txn, doc).await?;
match v {
Value::Table(_) => i.prepare(v),
Value::Thing(_) => i.prepare(v),
Value::Model(_) => i.prepare(v),
Value::Array(_) => i.prepare(v),
Value::Table(v) => i.ingest(Iterable::Table(v)),
Value::Thing(v) => i.ingest(Iterable::Thing(v)),
Value::Model(v) => {
for v in v {
i.ingest(Iterable::Thing(v));
}
}
Value::Array(v) => {
for v in v {
match v {
Value::Table(v) => i.ingest(Iterable::Table(v)),
Value::Thing(v) => i.ingest(Iterable::Thing(v)),
Value::Model(v) => {
for v in v {
i.ingest(Iterable::Thing(v));
}
}
Value::Object(v) => match v.rid() {
Some(v) => i.ingest(Iterable::Thing(v)),
None => {
return Err(Error::DeleteStatement {
value: v.to_string(),
})
}
},
v => {
return Err(Error::DeleteStatement {
value: v.to_string(),
})
}
};
}
}
Value::Object(v) => match v.rid() {
Some(v) => i.ingest(Iterable::Thing(v)),
None => {
return Err(Error::DeleteStatement {
value: v.to_string(),
})
}
},
v => {
return Err(Error::DeleteStatement {
value: v.to_string(),

View file

@ -1,4 +1,5 @@
use crate::ctx::Context;
use crate::dbs::Iterable;
use crate::dbs::Iterator;
use crate::dbs::Level;
use crate::dbs::Options;
@ -51,14 +52,40 @@ impl InsertStatement {
let opt = &opt.futures(false);
// Parse the expression
match &self.data {
Data::ValuesExpression(_) => {
todo!() // TODO: loop over each
// Check if this is a traditional statement
Data::ValuesExpression(v) => {
for v in v {
// Create a new empty base object
let mut o = Value::base();
// Set each field from the expression
for (k, v) in v.iter() {
let v = v.compute(ctx, opt, txn, None).await?;
o.set(ctx, opt, txn, k, v).await?;
}
// Specify the new table record id
let id = o.retable(&self.into)?;
// Pass the mergeable to the iterator
i.ingest(Iterable::Mergeable(id, o));
}
}
// Check if this is a modern statement
Data::SingleExpression(v) => {
let v = v.compute(ctx, opt, txn, doc).await?;
match v {
Value::Array(v) => v.into_iter().for_each(|v| i.prepare(v)),
Value::Object(_) => i.prepare(v),
Value::Array(v) => {
for v in v {
// Specify the new table record id
let id = v.retable(&self.into)?;
// Pass the mergeable to the iterator
i.ingest(Iterable::Mergeable(id, v));
}
}
Value::Object(_) => {
// Specify the new table record id
let id = v.retable(&self.into)?;
// Pass the mergeable to the iterator
i.ingest(Iterable::Mergeable(id, v));
}
v => {
return Err(Error::InsertStatement {
value: v.to_string(),

View file

@ -1,4 +1,5 @@
use crate::ctx::Context;
use crate::dbs::Iterable;
use crate::dbs::Iterator;
use crate::dbs::Level;
use crate::dbs::Options;
@ -52,20 +53,102 @@ impl RelateStatement {
let mut i = Iterator::new();
// Ensure futures are stored
let opt = &opt.futures(false);
// Loop over the select targets
for w in self.from.0.iter() {
let v = w.compute(ctx, opt, txn, doc).await?;
match v {
Value::Table(_) => i.prepare(v),
Value::Thing(_) => i.prepare(v),
Value::Model(_) => i.prepare(v),
Value::Array(_) => i.prepare(v),
v => {
return Err(Error::RelateStatement {
value: v.to_string(),
})
}
};
// Loop over the from targets
let from = {
let mut out = Vec::new();
for w in self.from.0.iter() {
let v = w.compute(ctx, opt, txn, doc).await?;
match v {
Value::Thing(v) => out.push(v),
Value::Array(v) => {
for v in v {
match v {
Value::Thing(v) => out.push(v),
Value::Object(v) => match v.rid() {
Some(v) => out.push(v),
_ => {
return Err(Error::RelateStatement {
value: v.to_string(),
})
}
},
v => {
return Err(Error::RelateStatement {
value: v.to_string(),
})
}
}
}
}
Value::Object(v) => match v.rid() {
Some(v) => out.push(v),
None => {
return Err(Error::RelateStatement {
value: v.to_string(),
})
}
},
v => {
return Err(Error::RelateStatement {
value: v.to_string(),
})
}
};
}
out
};
// Loop over the with targets
let with = {
let mut out = Vec::new();
for w in self.with.0.iter() {
let v = w.compute(ctx, opt, txn, doc).await?;
match v {
Value::Thing(v) => out.push(v),
Value::Array(v) => {
for v in v {
match v {
Value::Thing(v) => out.push(v),
Value::Object(v) => match v.rid() {
Some(v) => out.push(v),
None => {
return Err(Error::RelateStatement {
value: v.to_string(),
})
}
},
v => {
return Err(Error::RelateStatement {
value: v.to_string(),
})
}
}
}
}
Value::Object(v) => match v.rid() {
Some(v) => out.push(v),
None => {
return Err(Error::RelateStatement {
value: v.to_string(),
})
}
},
v => {
return Err(Error::RelateStatement {
value: v.to_string(),
})
}
};
}
out
};
//
for f in from.iter() {
for w in with.iter() {
let f = f.clone();
let w = w.clone();
let t = self.kind.generate();
i.ingest(Iterable::Relatable(f, t, w));
}
}
// Assign the statement
let stm = Statement::from(self);

View file

@ -1,4 +1,5 @@
use crate::ctx::Context;
use crate::dbs::Iterable;
use crate::dbs::Iterator;
use crate::dbs::Level;
use crate::dbs::Options;
@ -89,11 +90,28 @@ impl SelectStatement {
for w in self.what.0.iter() {
let v = w.compute(ctx, opt, txn, doc).await?;
match v {
Value::Table(_) => i.prepare(v),
Value::Thing(_) => i.prepare(v),
Value::Model(_) => i.prepare(v),
Value::Array(_) => i.prepare(v),
v => i.prepare(v),
Value::Table(v) => i.ingest(Iterable::Table(v)),
Value::Thing(v) => i.ingest(Iterable::Thing(v)),
Value::Model(v) => {
for v in v {
i.ingest(Iterable::Thing(v));
}
}
Value::Array(v) => {
for v in v {
match v {
Value::Table(v) => i.ingest(Iterable::Table(v)),
Value::Thing(v) => i.ingest(Iterable::Thing(v)),
Value::Model(v) => {
for v in v {
i.ingest(Iterable::Thing(v));
}
}
_ => i.ingest(Iterable::Value(v)),
}
}
}
v => i.ingest(Iterable::Value(v)),
};
}
// Assign the statement

View file

@ -1,4 +1,5 @@
use crate::ctx::Context;
use crate::dbs::Iterable;
use crate::dbs::Iterator;
use crate::dbs::Level;
use crate::dbs::Options;
@ -51,10 +52,47 @@ impl UpdateStatement {
for w in self.what.0.iter() {
let v = w.compute(ctx, opt, txn, doc).await?;
match v {
Value::Table(_) => i.prepare(v),
Value::Thing(_) => i.prepare(v),
Value::Model(_) => i.prepare(v),
Value::Array(_) => i.prepare(v),
Value::Table(v) => i.ingest(Iterable::Table(v)),
Value::Thing(v) => i.ingest(Iterable::Thing(v)),
Value::Model(v) => {
for v in v {
i.ingest(Iterable::Thing(v));
}
}
Value::Array(v) => {
for v in v {
match v {
Value::Table(v) => i.ingest(Iterable::Table(v)),
Value::Thing(v) => i.ingest(Iterable::Thing(v)),
Value::Model(v) => {
for v in v {
i.ingest(Iterable::Thing(v));
}
}
Value::Object(v) => match v.rid() {
Some(v) => i.ingest(Iterable::Thing(v)),
None => {
return Err(Error::UpdateStatement {
value: v.to_string(),
})
}
},
v => {
return Err(Error::UpdateStatement {
value: v.to_string(),
})
}
};
}
}
Value::Object(v) => match v.rid() {
Some(v) => i.ingest(Iterable::Thing(v)),
None => {
return Err(Error::UpdateStatement {
value: v.to_string(),
})
}
},
v => {
return Err(Error::UpdateStatement {
value: v.to_string(),

View file

@ -1,7 +1,9 @@
use crate::sql::common::commas;
use crate::sql::error::IResult;
use crate::sql::escape::escape_ident;
use crate::sql::id::Id;
use crate::sql::ident::{ident_raw, Ident};
use crate::sql::thing::Thing;
use nom::multi::separated_list1;
use serde::{Deserialize, Serialize};
use std::fmt;
@ -50,6 +52,15 @@ impl Deref for Table {
}
}
impl Table {
pub fn generate(&self) -> Thing {
Thing {
tb: self.0.to_owned(),
id: Id::rand(),
}
}
}
impl fmt::Display for Table {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{}", escape_ident(&self.0))