From 31adc4538de2b764ebc9ad0bce0780bb52ed8dc6 Mon Sep 17 00:00:00 2001 From: Tobie Morgan Hitchcock Date: Sat, 16 Jul 2022 23:23:13 +0100 Subject: [PATCH] Implement `DEFINE TABLE AS` foreign table statements --- lib/src/doc/document.rs | 11 + lib/src/doc/table.rs | 432 +++++++++++++++++++++++++++++++++++++++- lib/src/sql/function.rs | 12 +- lib/src/sql/idiom.rs | 8 + 4 files changed, 454 insertions(+), 9 deletions(-) diff --git a/lib/src/doc/document.rs b/lib/src/doc/document.rs index 708c2815..43efadd5 100644 --- a/lib/src/doc/document.rs +++ b/lib/src/doc/document.rs @@ -67,6 +67,17 @@ impl<'a> Document<'a> { }, } } + // Get the foreign tables for this document + pub async fn ft( + &self, + opt: &Options, + txn: &Transaction, + ) -> Result, Error> { + // Get the record id + let id = self.id.as_ref().unwrap(); + // Get the table definitions + txn.clone().lock().await.all_ft(opt.ns(), opt.db(), &id.tb).await + } // Get the events for this document pub async fn ev( &self, diff --git a/lib/src/doc/table.rs b/lib/src/doc/table.rs index a4bd941a..97e3bfb2 100644 --- a/lib/src/doc/table.rs +++ b/lib/src/doc/table.rs @@ -4,15 +4,439 @@ use crate::dbs::Statement; use crate::dbs::Transaction; use crate::doc::Document; use crate::err::Error; +use crate::sql::data::Data; +use crate::sql::expression::Expression; +use crate::sql::field::{Field, Fields}; +use crate::sql::idiom::Idiom; +use crate::sql::operator::Operator; +use crate::sql::part::Part; +use crate::sql::statement::Statement as Query; +use crate::sql::statements::delete::DeleteStatement; +use crate::sql::statements::ifelse::IfelseStatement; +use crate::sql::statements::update::UpdateStatement; +use crate::sql::subquery::Subquery; +use crate::sql::thing::Thing; +use crate::sql::value::{Value, Values}; +use futures::future::try_join_all; + +type Ops = Vec<(Idiom, Operator, Value)>; + +#[derive(Clone, Debug, Eq, PartialEq)] +enum Action { + Create, + Update, + Delete, +} impl<'a> Document<'a> { pub async fn table( &self, - _ctx: &Context<'_>, - _opt: &Options, - _txn: &Transaction, - _stm: &Statement<'_>, + ctx: &Context<'_>, + opt: &Options, + txn: &Transaction, + stm: &Statement<'_>, ) -> Result<(), Error> { + // Check events + if !opt.tables { + return Ok(()); + } + // Check if forced + if !opt.force && !self.changed() { + return Ok(()); + } + // Get the record id + let rid = self.id.as_ref().unwrap(); + // Get the query action + let act = if stm.is_delete() { + Action::Delete + } else if self.is_new() { + Action::Create + } else { + Action::Update + }; + // Loop through all foreign table statements + for ft in self.ft(opt, txn).await?.iter() { + // Get the table definition + let tb = ft.view.as_ref().unwrap(); + // Check if there is a GROUP BY clause + match &tb.group { + // There is a GROUP BY clause specified + Some(group) => { + // Set the previous record id + let old = Thing { + tb: ft.name.to_raw(), + id: try_join_all( + group.iter().map(|v| v.compute(ctx, opt, txn, Some(&self.initial))), + ) + .await? + .iter() + .map(|v| format!("{}", v)) + .collect::>() + .join(" ") + .into(), + }; + // Set the current record id + let rid = Thing { + tb: ft.name.to_raw(), + id: try_join_all( + group.iter().map(|v| v.compute(ctx, opt, txn, Some(&self.current))), + ) + .await? + .iter() + .map(|v| format!("{}", v)) + .collect::>() + .join(" ") + .into(), + }; + // Check if a WHERE clause is specified + match &tb.cond { + // There is a WHERE clause specified + Some(cond) => { + match cond.compute(ctx, opt, txn, Some(&self.current)).await? { + v if v.is_truthy() => { + if !opt.force && act != Action::Create { + // Delete the old value + let act = Action::Delete; + // Modify the value in the table + let stm = UpdateStatement { + what: Values(vec![Value::from(old)]), + data: Some( + self.data(ctx, opt, txn, act, &tb.expr).await?, + ), + ..UpdateStatement::default() + }; + // Execute the statement + stm.compute(ctx, opt, txn, None).await?; + } + if act != Action::Delete { + // Update the new value + let act = Action::Update; + // Modify the value in the table + let stm = UpdateStatement { + what: Values(vec![Value::from(rid)]), + data: Some( + self.data(ctx, opt, txn, act, &tb.expr).await?, + ), + ..UpdateStatement::default() + }; + // Execute the statement + stm.compute(ctx, opt, txn, None).await?; + } + } + _ => { + if !opt.force && act != Action::Create { + // Update the new value + let act = Action::Update; + // Modify the value in the table + let stm = UpdateStatement { + what: Values(vec![Value::from(old)]), + data: Some( + self.data(ctx, opt, txn, act, &tb.expr).await?, + ), + ..UpdateStatement::default() + }; + // Execute the statement + stm.compute(ctx, opt, txn, None).await?; + } + } + } + } + // No WHERE clause is specified + None => { + if !opt.force && act != Action::Create { + // Delete the old value + let act = Action::Delete; + // Modify the value in the table + let stm = UpdateStatement { + what: Values(vec![Value::from(old)]), + data: Some(self.data(ctx, opt, txn, act, &tb.expr).await?), + ..UpdateStatement::default() + }; + // Execute the statement + stm.compute(ctx, opt, txn, None).await?; + } + if act != Action::Delete { + // Update the new value + let act = Action::Update; + // Modify the value in the table + let stm = UpdateStatement { + what: Values(vec![Value::from(rid)]), + data: Some(self.data(ctx, opt, txn, act, &tb.expr).await?), + ..UpdateStatement::default() + }; + // Execute the statement + stm.compute(ctx, opt, txn, None).await?; + } + } + } + } + // No GROUP BY clause is specified + None => { + // Set the current record id + let rid = Thing { + tb: ft.name.to_raw(), + id: rid.id.clone(), + }; + // Use the current record data + let doc = Some(self.current.as_ref()); + // Check if a WHERE clause is specified + match &tb.cond { + // There is a WHERE clause specified + Some(cond) => { + match cond.compute(ctx, opt, txn, doc).await? { + v if v.is_truthy() => { + // Define the statement + let stm = match act { + // Delete the value in the table + Action::Delete => Query::Delete(DeleteStatement { + what: Values(vec![Value::from(rid)]), + ..DeleteStatement::default() + }), + // Update the value in the table + _ => Query::Update(UpdateStatement { + what: Values(vec![Value::from(rid)]), + data: Some(Data::ReplaceExpression( + tb.expr.compute(ctx, opt, txn, doc, false).await?, + )), + ..UpdateStatement::default() + }), + }; + // Execute the statement + stm.compute(ctx, opt, txn, None).await?; + } + _ => { + // Delete the value in the table + let stm = DeleteStatement { + what: Values(vec![Value::from(rid)]), + ..DeleteStatement::default() + }; + // Execute the statement + stm.compute(ctx, opt, txn, None).await?; + } + } + } + // No WHERE clause is specified + None => { + // Define the statement + let stm = match act { + // Delete the value in the table + Action::Delete => Query::Delete(DeleteStatement { + what: Values(vec![Value::from(rid)]), + ..DeleteStatement::default() + }), + // Update the value in the table + _ => Query::Update(UpdateStatement { + what: Values(vec![Value::from(rid)]), + data: Some(Data::ReplaceExpression( + tb.expr.compute(ctx, opt, txn, doc, false).await?, + )), + ..UpdateStatement::default() + }), + }; + // Execute the statement + stm.compute(ctx, opt, txn, None).await?; + } + } + } + } + } + // Carry on Ok(()) } + // + async fn data( + &self, + ctx: &Context<'_>, + opt: &Options, + txn: &Transaction, + act: Action, + exp: &Fields, + ) -> Result { + // + let mut ops: Ops = vec![]; + // + let doc = match act { + Action::Delete => Some(self.initial.as_ref()), + Action::Update => Some(self.current.as_ref()), + _ => unreachable!(), + }; + // + for field in exp.other() { + // Process it if it is a normal field + if let Field::Alone(v) = field { + match v { + Value::Function(f) if f.is_rolling() => match f.name() { + "count" => { + let val = f.compute(ctx, opt, txn, doc).await?; + self.chg(&mut ops, &act, v.to_idiom(), val); + } + "math::sum" => { + let val = f.args()[0].compute(ctx, opt, txn, doc).await?; + self.chg(&mut ops, &act, v.to_idiom(), val); + } + "math::min" => { + let val = f.args()[0].compute(ctx, opt, txn, doc).await?; + self.min(&mut ops, &act, v.to_idiom(), val); + } + "math::max" => { + let val = f.args()[0].compute(ctx, opt, txn, doc).await?; + self.max(&mut ops, &act, v.to_idiom(), val); + } + "math::mean" => { + let val = f.args()[0].compute(ctx, opt, txn, doc).await?; + self.mean(&mut ops, &act, v.to_idiom(), val); + } + _ => unreachable!(), + }, + _ => { + let val = v.compute(ctx, opt, txn, doc).await?; + self.set(&mut ops, v.to_idiom(), val); + } + } + } + // Process it if it is a aliased field + if let Field::Alias(v, i) = field { + match v { + Value::Function(f) if f.is_rolling() => match f.name() { + "count" => { + let val = f.compute(ctx, opt, txn, doc).await?; + self.chg(&mut ops, &act, i.to_owned(), val); + } + "math::sum" => { + let val = f.args()[0].compute(ctx, opt, txn, doc).await?; + self.chg(&mut ops, &act, i.to_owned(), val); + } + "math::min" => { + let val = f.args()[0].compute(ctx, opt, txn, doc).await?; + self.min(&mut ops, &act, i.to_owned(), val); + } + "math::max" => { + let val = f.args()[0].compute(ctx, opt, txn, doc).await?; + self.max(&mut ops, &act, i.to_owned(), val); + } + "math::mean" => { + let val = f.args()[0].compute(ctx, opt, txn, doc).await?; + self.mean(&mut ops, &act, i.to_owned(), val); + } + _ => unreachable!(), + }, + _ => { + let val = v.compute(ctx, opt, txn, doc).await?; + self.set(&mut ops, i.to_owned(), val); + } + } + } + } + // + Ok(Data::SetExpression(ops)) + } + // Set the field in the foreign table + fn set(&self, ops: &mut Ops, key: Idiom, val: Value) { + ops.push((key, Operator::Equal, val)); + } + // Increment or decrement the field in the foreign table + fn chg(&self, ops: &mut Ops, act: &Action, key: Idiom, val: Value) { + ops.push(( + key, + match act { + Action::Delete => Operator::Dec, + Action::Update => Operator::Inc, + _ => unreachable!(), + }, + val, + )); + } + // Set the new minimum value for the field in the foreign table + fn min(&self, ops: &mut Ops, act: &Action, key: Idiom, val: Value) { + if act == &Action::Update { + ops.push(( + key.clone(), + Operator::Equal, + Value::Subquery(Box::new(Subquery::Ifelse(IfelseStatement { + exprs: vec![( + Value::Expression(Box::new(Expression { + l: Value::Idiom(key.clone()), + o: Operator::MoreThan, + r: val.clone(), + })), + val, + )], + close: Some(Value::Idiom(key)), + }))), + )); + } + } + // Set the new maximum value for the field in the foreign table + fn max(&self, ops: &mut Ops, act: &Action, key: Idiom, val: Value) { + if act == &Action::Update { + ops.push(( + key.clone(), + Operator::Equal, + Value::Subquery(Box::new(Subquery::Ifelse(IfelseStatement { + exprs: vec![( + Value::Expression(Box::new(Expression { + l: Value::Idiom(key.clone()), + o: Operator::LessThan, + r: val.clone(), + })), + val, + )], + close: Some(Value::Idiom(key)), + }))), + )); + } + } + // Set the new average value for the field in the foreign table + fn mean(&self, ops: &mut Ops, act: &Action, key: Idiom, val: Value) { + // + let mut key_c = Idiom::from(vec![Part::from("__")]); + key_c.0.push(Part::from(key.to_hash())); + key_c.0.push(Part::from("c")); + // + ops.push(( + key.clone(), + Operator::Equal, + Value::Expression(Box::new(Expression { + l: Value::Subquery(Box::new(Subquery::Value(Value::Expression(Box::new( + Expression { + l: Value::Subquery(Box::new(Subquery::Value(Value::Expression(Box::new( + Expression { + l: Value::Idiom(key), + o: Operator::Mul, + r: Value::Idiom(key_c.clone()), + }, + ))))), + o: match act { + Action::Delete => Operator::Sub, + Action::Update => Operator::Add, + _ => unreachable!(), + }, + r: val, + }, + ))))), + o: Operator::Div, + r: Value::Subquery(Box::new(Subquery::Value(Value::Expression(Box::new( + Expression { + l: Value::Idiom(key_c.clone()), + o: match act { + Action::Delete => Operator::Sub, + Action::Update => Operator::Add, + _ => unreachable!(), + }, + r: Value::from(1), + }, + ))))), + })), + )); + // + ops.push(( + key_c.clone(), + match act { + Action::Delete => Operator::Dec, + Action::Update => Operator::Inc, + _ => unreachable!(), + }, + Value::from(1), + )); + } } diff --git a/lib/src/sql/function.rs b/lib/src/sql/function.rs index 9bb53eb5..848d7834 100644 --- a/lib/src/sql/function.rs +++ b/lib/src/sql/function.rs @@ -32,6 +32,13 @@ impl PartialOrd for Function { } impl Function { + // Get function name if applicable + pub fn name(&self) -> &str { + match self { + Function::Normal(n, _) => n.as_str(), + _ => unreachable!(), + } + } // Get function arguments if applicable pub fn args(&self) -> &[Value] { match self { @@ -59,16 +66,11 @@ impl Function { // Check if this function is a rolling function pub fn is_rolling(&self) -> bool { match self { - Function::Normal(f, _) if f == "array::concat" => true, - Function::Normal(f, _) if f == "array::distinct" => true, - Function::Normal(f, _) if f == "array::union" => true, Function::Normal(f, _) if f == "count" => true, Function::Normal(f, _) if f == "math::max" => true, Function::Normal(f, _) if f == "math::mean" => true, Function::Normal(f, _) if f == "math::min" => true, - Function::Normal(f, _) if f == "math::stddev" => true, Function::Normal(f, _) if f == "math::sum" => true, - Function::Normal(f, _) if f == "math::variance" => true, _ => false, } } diff --git a/lib/src/sql/idiom.rs b/lib/src/sql/idiom.rs index e0f143d5..3feb45ed 100644 --- a/lib/src/sql/idiom.rs +++ b/lib/src/sql/idiom.rs @@ -8,6 +8,8 @@ use crate::sql::part::Next; use crate::sql::part::{all, field, first, graph, index, last, part, thing, Part}; use crate::sql::paths::{ID, IN, OUT}; use crate::sql::value::Value; +use md5::Digest; +use md5::Md5; use nom::branch::alt; use nom::multi::separated_list1; use nom::multi::{many0, many1}; @@ -65,6 +67,12 @@ impl Idiom { self.0.push(n); self } + // Convert this Idiom to a unique hash + pub(crate) fn to_hash(&self) -> String { + let mut hasher = Md5::new(); + hasher.update(self.to_string().as_str()); + format!("{:x}", hasher.finalize()) + } // Convert this Idiom to a JSON Path string pub(crate) fn to_path(&self) -> String { format!("/{}", self).replace(']', "").replace(&['.', '['][..], "/")