Feature: A query with aggregations should not store every records in memory (#3657)

Co-authored-by: Mees Delzenne <DelSkayn@users.noreply.github.com>
This commit is contained in:
Emmanuel Keller 2024-03-12 10:48:53 +00:00 committed by GitHub
parent 21975548f2
commit 2fe4f352be
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
13 changed files with 1123 additions and 227 deletions

367
core/src/dbs/group.rs Normal file
View file

@ -0,0 +1,367 @@
use crate::ctx::Context;
use crate::dbs::plan::Explanation;
use crate::dbs::store::StoreCollector;
use crate::dbs::{Options, Statement, Transaction};
use crate::err::Error;
use crate::sql::function::OptimisedAggregate;
use crate::sql::value::{TryAdd, TryDiv, Value};
use crate::sql::{Array, Field, Idiom};
use std::borrow::Cow;
use std::collections::{BTreeMap, HashMap};
use std::mem;
pub(super) struct GroupsCollector {
base: Vec<Aggregator>,
idioms: Vec<Idiom>,
grp: BTreeMap<Array, Vec<Aggregator>>,
}
#[derive(Default)]
struct Aggregator {
array: Option<Array>,
first_val: Option<Value>,
count: Option<usize>,
math_max: Option<Value>,
math_min: Option<Value>,
math_sum: Option<Value>,
math_mean: Option<(Value, usize)>,
time_max: Option<Value>,
time_min: Option<Value>,
}
impl GroupsCollector {
pub(super) fn new(stm: &Statement<'_>) -> Self {
let mut idioms_agr: HashMap<Idiom, Aggregator> = HashMap::new();
if let Some(fields) = stm.expr() {
for field in fields.other() {
if let Field::Single {
expr,
alias,
} = field
{
let idiom = alias.as_ref().cloned().unwrap_or_else(|| expr.to_idiom());
idioms_agr.entry(idiom).or_default().prepare(expr);
}
}
}
let mut base = Vec::with_capacity(idioms_agr.len());
let mut idioms = Vec::with_capacity(idioms_agr.len());
for (idiom, agr) in idioms_agr {
base.push(agr);
idioms.push(idiom);
}
Self {
base,
idioms,
grp: Default::default(),
}
}
pub(super) async fn push(
&mut self,
ctx: &Context<'_>,
opt: &Options,
txn: &Transaction,
stm: &Statement<'_>,
obj: Value,
) -> Result<(), Error> {
if let Some(groups) = stm.group() {
// Create a new column set
let mut arr = Array::with_capacity(groups.len());
// Loop over each group clause
for group in groups.iter() {
// Get the value at the path
let val = obj.pick(group);
// Set the value at the path
arr.push(val);
}
// Add to grouped collection
let agr = self
.grp
.entry(arr)
.or_insert_with(|| self.base.iter().map(|a| a.new_instance()).collect());
Self::pushes(ctx, opt, txn, agr, &self.idioms, obj).await?
}
Ok(())
}
async fn pushes(
ctx: &Context<'_>,
opt: &Options,
txn: &Transaction,
agrs: &mut [Aggregator],
idioms: &[Idiom],
obj: Value,
) -> Result<(), Error> {
for (agr, idiom) in agrs.iter_mut().zip(idioms) {
let val = obj.get(ctx, opt, txn, None, idiom).await?;
agr.push(val)?;
}
Ok(())
}
pub(super) fn len(&self) -> usize {
self.grp.len()
}
pub(super) async fn output(
&mut self,
ctx: &Context<'_>,
opt: &Options,
txn: &Transaction,
stm: &Statement<'_>,
) -> Result<StoreCollector, Error> {
let mut results = StoreCollector::default();
if let Some(fields) = stm.expr() {
let grp = mem::take(&mut self.grp);
// Loop over each grouped collection
for (_, mut aggregator) in grp {
// Create a new value
let mut obj = Value::base();
// Loop over each group clause
for field in fields.other() {
// Process the field
if let Field::Single {
expr,
alias,
} = field
{
let idiom = alias
.as_ref()
.map(Cow::Borrowed)
.unwrap_or_else(|| Cow::Owned(expr.to_idiom()));
if let Some(idioms_pos) =
self.idioms.iter().position(|i| i.eq(idiom.as_ref()))
{
let agr = &mut aggregator[idioms_pos];
match expr {
Value::Function(f) if f.is_aggregate() => {
let a = f.get_optimised_aggregate();
let x = if matches!(a, OptimisedAggregate::None) {
// The aggregation is not optimised, let's compute it with the values
let vals = agr.take();
let x = vals
.all()
.get(ctx, opt, txn, None, idiom.as_ref())
.await?;
f.aggregate(x).compute(ctx, opt, txn, None).await?
} else {
// The aggregation is optimised, just get the value
agr.compute(a)?
};
obj.set(ctx, opt, txn, idiom.as_ref(), x).await?;
}
_ => {
let x = agr.take().first();
obj.set(ctx, opt, txn, idiom.as_ref(), x).await?;
}
}
}
}
}
// Add the object to the results
results.push(obj);
}
}
Ok(results)
}
pub(super) fn explain(&self, exp: &mut Explanation) {
let mut explain = BTreeMap::new();
let idioms: Vec<String> =
self.idioms.iter().cloned().map(|i| Value::from(i).to_string()).collect();
for (i, a) in idioms.into_iter().zip(&self.base) {
explain.insert(i, a.explain());
}
exp.add_collector("Group", vec![("idioms", explain.into())]);
}
}
impl Aggregator {
fn prepare(&mut self, expr: &Value) {
let a = match expr {
Value::Function(f) => f.get_optimised_aggregate(),
_ => {
// We set it only if we don't already have an array
if self.array.is_none() && self.first_val.is_none() {
self.first_val = Some(Value::None);
return;
}
OptimisedAggregate::None
}
};
match a {
OptimisedAggregate::None => {
if self.array.is_none() {
self.array = Some(Array::new());
// We don't need both the array and the first val
self.first_val = None;
}
}
OptimisedAggregate::Count => {
if self.count.is_none() {
self.count = Some(0);
}
}
OptimisedAggregate::MathMax => {
if self.math_max.is_none() {
self.math_max = Some(Value::None);
}
}
OptimisedAggregate::MathMin => {
if self.math_min.is_none() {
self.math_min = Some(Value::None);
}
}
OptimisedAggregate::MathSum => {
if self.math_sum.is_none() {
self.math_sum = Some(0.into());
}
}
OptimisedAggregate::MathMean => {
if self.math_mean.is_none() {
self.math_mean = Some((0.into(), 0));
}
}
OptimisedAggregate::TimeMax => {
if self.time_max.is_none() {
self.time_max = Some(Value::None);
}
}
OptimisedAggregate::TimeMin => {
if self.time_min.is_none() {
self.time_min = Some(Value::None);
}
}
}
}
fn new_instance(&self) -> Self {
Self {
array: self.array.as_ref().map(|_| Array::new()),
first_val: self.first_val.as_ref().map(|_| Value::None),
count: self.count.as_ref().map(|_| 0),
math_max: self.math_max.as_ref().map(|_| Value::None),
math_min: self.math_min.as_ref().map(|_| Value::None),
math_sum: self.math_sum.as_ref().map(|_| 0.into()),
math_mean: self.math_mean.as_ref().map(|_| (0.into(), 0)),
time_max: self.time_max.as_ref().map(|_| Value::None),
time_min: self.time_min.as_ref().map(|_| Value::None),
}
}
fn push(&mut self, val: Value) -> Result<(), Error> {
if let Some(ref mut c) = self.count {
*c += 1;
}
if val.is_number() {
if let Some(s) = self.math_sum.take() {
self.math_sum = Some(s.try_add(val.clone())?);
}
if let Some((s, i)) = self.math_mean.take() {
let s = s.try_add(val.clone())?;
self.math_mean = Some((s, i + 1));
}
if let Some(m) = self.math_min.take() {
self.math_min = Some(if m.is_none() {
val.clone()
} else {
m.min(val.clone())
});
}
if let Some(m) = self.math_max.take() {
self.math_max = Some(if m.is_none() {
val.clone()
} else {
m.max(val.clone())
});
}
}
if val.is_datetime() {
if let Some(m) = self.time_min.take() {
self.time_min = Some(if m.is_none() {
val.clone()
} else {
m.min(val.clone())
});
}
if let Some(m) = self.time_max.take() {
self.time_max = Some(if m.is_none() {
val.clone()
} else {
m.max(val.clone())
});
}
}
if let Some(ref mut a) = self.array {
a.0.push(val);
} else if let Some(ref mut v) = self.first_val {
if v.is_none() {
*v = val;
}
}
Ok(())
}
fn compute(&mut self, a: OptimisedAggregate) -> Result<Value, Error> {
Ok(match a {
OptimisedAggregate::None => Value::None,
OptimisedAggregate::Count => self.count.take().map(|v| v.into()).unwrap_or(Value::None),
OptimisedAggregate::MathMax => self.math_max.take().unwrap_or(Value::None),
OptimisedAggregate::MathMin => self.math_min.take().unwrap_or(Value::None),
OptimisedAggregate::MathSum => self.math_sum.take().unwrap_or(Value::None),
OptimisedAggregate::MathMean => {
if let Some((v, i)) = self.math_mean.take() {
v.try_div(i.into())?
} else {
Value::None
}
}
OptimisedAggregate::TimeMax => self.time_max.take().unwrap_or(Value::None),
OptimisedAggregate::TimeMin => self.time_min.take().unwrap_or(Value::None),
})
}
fn take(&mut self) -> Value {
// We return a clone because the same value may be returned for different groups
if let Some(v) = self.first_val.as_ref().cloned() {
Array::from(v).into()
} else if let Some(a) = self.array.as_ref().cloned() {
a.into()
} else {
Value::None
}
}
fn explain(&self) -> Value {
let mut collections: Vec<Value> = vec![];
if self.array.is_some() {
collections.push("array".into());
}
if self.first_val.is_some() {
collections.push("first".into());
}
if self.count.is_some() {
collections.push("count".into());
}
if self.math_mean.is_some() {
collections.push("math::mean".into());
}
if self.math_max.is_some() {
collections.push("math::max".into());
}
if self.math_min.is_some() {
collections.push("math::min".into());
}
if self.math_sum.is_some() {
collections.push("math::sun".into());
}
if self.time_max.is_some() {
collections.push("time::max".into());
}
if self.time_min.is_some() {
collections.push("time::min".into());
}
collections.into()
}
}

View file

@ -3,7 +3,8 @@ use crate::ctx::Context;
#[cfg(not(target_arch = "wasm32"))]
use crate::dbs::distinct::AsyncDistinct;
use crate::dbs::distinct::SyncDistinct;
use crate::dbs::explanation::Explanation;
use crate::dbs::plan::Plan;
use crate::dbs::result::Results;
use crate::dbs::Statement;
use crate::dbs::{Options, Transaction};
use crate::doc::Document;
@ -11,17 +12,13 @@ use crate::err::Error;
use crate::idx::docids::DocId;
use crate::idx::planner::executor::IteratorRef;
use crate::idx::planner::IterationStage;
use crate::sql::array::Array;
use crate::sql::edges::Edges;
use crate::sql::field::Field;
use crate::sql::range::Range;
use crate::sql::table::Table;
use crate::sql::thing::Thing;
use crate::sql::value::Value;
use async_recursion::async_recursion;
use std::borrow::Cow;
use std::cmp::Ordering;
use std::collections::BTreeMap;
use std::mem;
#[derive(Clone)]
@ -67,8 +64,7 @@ pub(crate) struct Iterator {
// Iterator runtime error
error: Option<Error>,
// Iterator output results
// TODO: Should be stored on disk / (mmap?)
results: Vec<Value>,
results: Results,
// Iterator input values
entries: Vec<Iterable>,
}
@ -80,7 +76,7 @@ impl Clone for Iterator {
limit: self.limit,
start: self.start,
error: None,
results: vec![],
results: Results::default(),
entries: self.entries.clone(),
}
}
@ -299,10 +295,11 @@ impl Iterator {
self.setup_limit(&cancel_ctx, opt, txn, stm).await?;
// Process the query START clause
self.setup_start(&cancel_ctx, opt, txn, stm).await?;
// Prepare the results with possible optimisations on groups
self.results = self.results.prepare(stm);
// Extract the expected behaviour depending on the presence of EXPLAIN with or without FULL
let (do_iterate, mut explanation) = Explanation::new(ctx, stm.explain(), &self.entries);
if do_iterate {
let mut plan = Plan::new(ctx, stm, &self.entries, &self.results);
if plan.do_iterate {
// Process prepared values
if let Some(qp) = ctx.get_query_planner() {
while let Some(s) = qp.next_iteration_stage().await {
@ -321,30 +318,33 @@ impl Iterator {
// Process any SPLIT clause
self.output_split(ctx, opt, txn, stm).await?;
// Process any GROUP clause
self.output_group(ctx, opt, txn, stm).await?;
self.results = self.results.group(ctx, opt, txn, stm).await?;
// Process any ORDER clause
self.output_order(ctx, opt, txn, stm).await?;
// Process any START clause
self.output_start(ctx, opt, txn, stm).await?;
// Process any LIMIT clause
self.output_limit(ctx, opt, txn, stm).await?;
// Process any START & LIMIT clause
self.results.start_limit(self.start.as_ref(), self.limit.as_ref());
if let Some(e) = &mut explanation {
if let Some(e) = &mut plan.explanation {
e.add_fetch(self.results.len());
self.results.clear();
} else {
// Process any FETCH clause
self.output_fetch(ctx, opt, txn, stm).await?;
}
}
// Extract the output from the result
let mut results = self.results.take();
// Output the explanation if any
if let Some(e) = explanation {
e.output(&mut self.results);
if let Some(e) = plan.explanation {
results.clear();
for v in e.output() {
results.push(v)
}
}
// Output the results
Ok(mem::take(&mut self.results).into())
Ok(results.into())
}
#[inline]
@ -387,7 +387,7 @@ impl Iterator {
// Loop over each split clause
for split in splits.iter() {
// Get the query result
let res = mem::take(&mut self.results);
let res = self.results.take();
// Loop over each value
for obj in &res {
// Get the value at the path
@ -401,7 +401,7 @@ impl Iterator {
// Set the value at the path
obj.set(ctx, opt, txn, split, val).await?;
// Add the object to the results
self.results.push(obj);
self.results.push(ctx, opt, txn, stm, obj).await?;
}
}
_ => {
@ -410,7 +410,7 @@ impl Iterator {
// Set the value at the path
obj.set(ctx, opt, txn, split, val).await?;
// Add the object to the results
self.results.push(obj);
self.results.push(ctx, opt, txn, stm, obj).await?;
}
}
}
@ -418,87 +418,6 @@ impl Iterator {
}
Ok(())
}
#[inline]
async fn output_group(
&mut self,
ctx: &Context<'_>,
opt: &Options,
txn: &Transaction,
stm: &Statement<'_>,
) -> Result<(), Error> {
if let Some(fields) = stm.expr() {
if let Some(groups) = stm.group() {
// Create the new grouped collection
let mut grp: BTreeMap<Array, Array> = BTreeMap::new();
// Get the query result
let res = mem::take(&mut self.results);
// Loop over each value
for obj in res {
// Create a new column set
let mut arr = Array::with_capacity(groups.len());
// Loop over each group clause
for group in groups.iter() {
// Get the value at the path
let val = obj.pick(group);
// Set the value at the path
arr.push(val);
}
// Add to grouped collection
match grp.get_mut(&arr) {
Some(v) => v.push(obj),
None => {
grp.insert(arr, Array::from(obj));
}
}
}
// Loop over each grouped collection
for (_, vals) in grp {
// Create a new value
let mut obj = Value::base();
// Save the collected values
let vals = Value::from(vals);
// Loop over each group clause
for field in fields.other() {
// Process the field
if let Field::Single {
expr,
alias,
} = field
{
let idiom = alias
.as_ref()
.map(Cow::Borrowed)
.unwrap_or_else(|| Cow::Owned(expr.to_idiom()));
match expr {
Value::Function(f) if f.is_aggregate() => {
let x =
vals.all().get(ctx, opt, txn, None, idiom.as_ref()).await?;
let x = f.aggregate(x).compute(ctx, opt, txn, None).await?;
obj.set(ctx, opt, txn, idiom.as_ref(), x).await?;
}
_ => {
let x = vals.first();
let x = if let Some(alias) = alias {
let cur = (&x).into();
alias.compute(ctx, opt, txn, Some(&cur)).await?
} else {
let cur = (&x).into();
expr.compute(ctx, opt, txn, Some(&cur)).await?
};
obj.set(ctx, opt, txn, idiom.as_ref(), x).await?;
}
}
}
}
// Add the object to the results
self.results.push(obj);
}
}
}
Ok(())
}
#[inline]
async fn output_order(
&mut self,
@ -538,34 +457,6 @@ impl Iterator {
Ok(())
}
#[inline]
async fn output_start(
&mut self,
_ctx: &Context<'_>,
_opt: &Options,
_txn: &Transaction,
_stm: &Statement<'_>,
) -> Result<(), Error> {
if let Some(v) = self.start {
self.results = mem::take(&mut self.results).into_iter().skip(v).collect();
}
Ok(())
}
#[inline]
async fn output_limit(
&mut self,
_ctx: &Context<'_>,
_opt: &Options,
_txn: &Transaction,
_stm: &Statement<'_>,
) -> Result<(), Error> {
if let Some(v) = self.limit {
self.results = mem::take(&mut self.results).into_iter().take(v).collect();
}
Ok(())
}
#[inline]
async fn output_fetch(
&mut self,
@ -672,7 +563,7 @@ impl Iterator {
let aproc = async {
// Process all processed values
while let Ok(r) = vals.recv().await {
self.result(r, stm);
self.result(ctx, opt, txn, stm, r).await;
}
// Shutdown the executor
let _ = end.send(()).await;
@ -701,11 +592,18 @@ impl Iterator {
// Process the document
let res = Document::process(ctx, opt, txn, stm, pro).await;
// Process the result
self.result(res, stm);
self.result(ctx, opt, txn, stm, res).await;
}
/// Accept a processed record result
fn result(&mut self, res: Result<Value, Error>, stm: &Statement<'_>) {
async fn result(
&mut self,
ctx: &Context<'_>,
opt: &Options,
txn: &Transaction,
stm: &Statement<'_>,
res: Result<Value, Error>,
) {
// Process the result
match res {
Err(Error::Ignore) => {
@ -716,7 +614,13 @@ impl Iterator {
self.run.cancel();
return;
}
Ok(v) => self.results.push(v),
Ok(v) => {
if let Err(e) = self.results.push(ctx, opt, txn, stm, v).await {
self.error = Some(e);
self.run.cancel();
return;
}
}
}
// Check if we can exit
if stm.group().is_none() && stm.order().is_none() {

View file

@ -4,10 +4,10 @@
//! and executors to process the operations. This module also gives a `context` to the transaction.
mod distinct;
mod executor;
mod explanation;
mod iterator;
mod notification;
mod options;
mod plan;
mod response;
mod session;
mod statement;
@ -29,6 +29,9 @@ pub mod capabilities;
pub use self::capabilities::Capabilities;
pub mod node;
mod group;
mod processor;
mod result;
mod store;
#[cfg(test)]
pub(crate) mod test;

View file

@ -1,21 +1,25 @@
use crate::ctx::Context;
use crate::dbs::Iterable;
use crate::sql::{Explain, Object, Value};
use crate::dbs::result::Results;
use crate::dbs::{Iterable, Statement};
use crate::sql::{Object, Value};
use std::collections::HashMap;
#[derive(Default)]
pub(super) struct Explanation(Vec<ExplainItem>);
pub(super) struct Plan {
pub(super) do_iterate: bool,
pub(super) explanation: Option<Explanation>,
}
impl Explanation {
impl Plan {
pub(super) fn new(
ctx: &Context<'_>,
e: Option<&Explain>,
stm: &Statement<'_>,
iterables: &Vec<Iterable>,
) -> (bool, Option<Self>) {
match e {
results: &Results,
) -> Self {
let (do_iterate, explanation) = match stm.explain() {
None => (true, None),
Some(e) => {
let mut exp = Self::default();
let mut exp = Explanation::default();
for i in iterables {
exp.add_iter(ctx, i);
}
@ -24,11 +28,21 @@ impl Explanation {
exp.add_fallback(reason.to_string());
}
}
results.explain(&mut exp);
(e.0, Some(exp))
}
};
Self {
do_iterate,
explanation,
}
}
}
#[derive(Default)]
pub(super) struct Explanation(Vec<ExplainItem>);
impl Explanation {
fn add_iter(&mut self, ctx: &Context<'_>, iter: &Iterable) {
self.0.push(ExplainItem::new_iter(ctx, iter));
}
@ -37,14 +51,19 @@ impl Explanation {
self.0.push(ExplainItem::new_fetch(count));
}
pub(super) fn add_collector(
&mut self,
collector_type: &str,
details: Vec<(&'static str, Value)>,
) {
self.0.push(ExplainItem::new_collector(collector_type, details));
}
fn add_fallback(&mut self, reason: String) {
self.0.push(ExplainItem::new_fallback(reason));
}
pub(super) fn output(self, results: &mut Vec<Value>) {
for e in self.0 {
results.push(e.into());
}
pub(super) fn output(self) -> Vec<Value> {
self.0.into_iter().map(|e| e.into()).collect()
}
}
@ -83,7 +102,7 @@ impl ExplainItem {
details: vec![("thing", Value::Thing(t.to_owned()))],
},
Iterable::Defer(t) => Self {
name: "Iterate Thing".into(),
name: "Iterate Defer".into(),
details: vec![("thing", Value::Thing(t.to_owned()))],
},
Iterable::Range(r) => Self {
@ -120,6 +139,17 @@ impl ExplainItem {
}
}
}
pub(super) fn new_collector(
collector_type: &str,
mut details: Vec<(&'static str, Value)>,
) -> ExplainItem {
details.insert(0, ("type", collector_type.into()));
Self {
name: "Collector".into(),
details,
}
}
}
impl From<ExplainItem> for Value {

125
core/src/dbs/result.rs Normal file
View file

@ -0,0 +1,125 @@
use crate::ctx::Context;
use crate::dbs::group::GroupsCollector;
use crate::dbs::plan::Explanation;
use crate::dbs::store::StoreCollector;
use crate::dbs::{Options, Statement, Transaction};
use crate::err::Error;
use crate::sql::Value;
use std::cmp::Ordering;
use std::slice::IterMut;
pub(super) enum Results {
None,
Store(StoreCollector),
Groups(GroupsCollector),
}
impl Default for Results {
fn default() -> Self {
Self::None
}
}
impl Results {
pub(super) fn prepare(&mut self, stm: &Statement<'_>) -> Self {
if stm.expr().is_some() && stm.group().is_some() {
Self::Groups(GroupsCollector::new(stm))
} else {
Self::Store(StoreCollector::default())
}
}
pub(super) async fn push(
&mut self,
ctx: &Context<'_>,
opt: &Options,
txn: &Transaction,
stm: &Statement<'_>,
val: Value,
) -> Result<(), Error> {
match self {
Results::None => {}
Results::Store(s) => {
s.push(val);
}
Results::Groups(g) => {
g.push(ctx, opt, txn, stm, val).await?;
}
}
Ok(())
}
pub(super) fn sort_by<F>(&mut self, compare: F)
where
F: FnMut(&Value, &Value) -> Ordering,
{
if let Results::Store(s) = self {
s.sort_by(compare)
}
}
pub(super) fn start_limit(&mut self, start: Option<&usize>, limit: Option<&usize>) {
if let Results::Store(s) = self {
if let Some(&start) = start {
s.start(start);
}
if let Some(&limit) = limit {
s.limit(limit);
}
}
}
pub(super) fn len(&self) -> usize {
match self {
Results::None => 0,
Results::Store(s) => s.len(),
Results::Groups(g) => g.len(),
}
}
pub(super) async fn group(
&mut self,
ctx: &Context<'_>,
opt: &Options,
txn: &Transaction,
stm: &Statement<'_>,
) -> Result<Self, Error> {
Ok(match self {
Self::None => Self::None,
Self::Store(s) => Self::Store(s.take_store()),
Self::Groups(g) => Self::Store(g.output(ctx, opt, txn, stm).await?),
})
}
pub(super) fn take(&mut self) -> Vec<Value> {
if let Self::Store(s) = self {
s.take_vec()
} else {
vec![]
}
}
pub(super) fn explain(&self, exp: &mut Explanation) {
match self {
Results::None => exp.add_collector("None", vec![]),
Results::Store(s) => {
s.explain(exp);
}
Results::Groups(g) => {
g.explain(exp);
}
}
}
}
impl<'a> IntoIterator for &'a mut Results {
type Item = &'a mut Value;
type IntoIter = IterMut<'a, Value>;
fn into_iter(self) -> Self::IntoIter {
if let Results::Store(s) = self {
s.into_iter()
} else {
[].iter_mut()
}
}
}

53
core/src/dbs/store.rs Normal file
View file

@ -0,0 +1,53 @@
use crate::dbs::plan::Explanation;
use crate::sql::value::Value;
use std::cmp::Ordering;
use std::mem;
#[derive(Default)]
// TODO Use surreal-kv once the number of record reach a given threshold
pub(super) struct StoreCollector(Vec<Value>);
impl StoreCollector {
pub(super) fn push(&mut self, val: Value) {
self.0.push(val);
}
// When surreal-kv will be used, the key will be used to sort the records in surreal-kv
pub(super) fn sort_by<F>(&mut self, compare: F)
where
F: FnMut(&Value, &Value) -> Ordering,
{
self.0.sort_by(compare);
}
pub(super) fn len(&self) -> usize {
self.0.len()
}
pub(super) fn start(&mut self, start: usize) {
self.0 = mem::take(&mut self.0).into_iter().skip(start).collect();
}
pub(super) fn limit(&mut self, limit: usize) {
self.0 = mem::take(&mut self.0).into_iter().take(limit).collect();
}
pub(super) fn take_vec(&mut self) -> Vec<Value> {
mem::take(&mut self.0)
}
pub(super) fn take_store(&mut self) -> Self {
Self(self.take_vec())
}
pub(super) fn explain(&self, exp: &mut Explanation) {
exp.add_collector("Store", vec![]);
}
}
impl<'a> IntoIterator for &'a mut StoreCollector {
type Item = &'a mut Value;
type IntoIter = std::slice::IterMut<'a, Value>;
fn into_iter(self) -> Self::IntoIter {
self.0.iter_mut()
}
}

View file

@ -31,6 +31,17 @@ pub enum Function {
// Add new variants here
}
pub(crate) enum OptimisedAggregate {
None,
Count,
MathMax,
MathMin,
MathSum,
MathMean,
TimeMax,
TimeMin,
}
impl PartialOrd for Function {
#[inline]
fn partial_cmp(&self, _: &Self) -> Option<Ordering> {
@ -142,6 +153,18 @@ impl Function {
_ => false,
}
}
pub(crate) fn get_optimised_aggregate(&self) -> OptimisedAggregate {
match self {
Self::Normal(f, _) if f == "count" => OptimisedAggregate::Count,
Self::Normal(f, _) if f == "math::max" => OptimisedAggregate::MathMax,
Self::Normal(f, _) if f == "math::mean" => OptimisedAggregate::MathMean,
Self::Normal(f, _) if f == "math::min" => OptimisedAggregate::MathMin,
Self::Normal(f, _) if f == "math::sum" => OptimisedAggregate::MathSum,
Self::Normal(f, _) if f == "time::max" => OptimisedAggregate::TimeMax,
Self::Normal(f, _) if f == "time::min" => OptimisedAggregate::TimeMin,
_ => OptimisedAggregate::None,
}
}
}
impl Function {

View file

@ -31,6 +31,17 @@ pub enum Function {
// Add new variants here
}
pub(crate) enum OptimisedAggregate {
None,
Count,
MathMax,
MathMin,
MathSum,
MathMean,
TimeMax,
TimeMin,
}
impl PartialOrd for Function {
#[inline]
fn partial_cmp(&self, _: &Self) -> Option<Ordering> {
@ -142,6 +153,18 @@ impl Function {
_ => false,
}
}
pub(crate) fn get_optimised_aggregate(&self) -> OptimisedAggregate {
match self {
Self::Normal(f, _) if f == "count" => OptimisedAggregate::Count,
Self::Normal(f, _) if f == "math::max" => OptimisedAggregate::MathMax,
Self::Normal(f, _) if f == "math::mean" => OptimisedAggregate::MathMean,
Self::Normal(f, _) if f == "math::min" => OptimisedAggregate::MathMin,
Self::Normal(f, _) if f == "math::sum" => OptimisedAggregate::MathSum,
Self::Normal(f, _) if f == "time::max" => OptimisedAggregate::TimeMax,
Self::Normal(f, _) if f == "time::min" => OptimisedAggregate::TimeMin,
_ => OptimisedAggregate::None,
}
}
}
impl Function {

View file

@ -7,7 +7,7 @@ use surrealdb::err::Error;
use surrealdb::sql::Value;
#[tokio::test]
async fn select_limit_fetch() -> Result<(), Error> {
async fn select_aggregate() -> Result<(), Error> {
let sql = "
CREATE temperature:1 SET country = 'GBP', time = d'2020-01-01T08:00:00Z';
CREATE temperature:2 SET country = 'GBP', time = d'2020-02-01T08:00:00Z';
@ -19,12 +19,13 @@ async fn select_limit_fetch() -> Result<(), Error> {
CREATE temperature:8 SET country = 'AUD', time = d'2021-01-01T08:00:00Z';
CREATE temperature:9 SET country = 'CHF', time = d'2023-01-01T08:00:00Z';
SELECT *, time::year(time) AS year FROM temperature;
SELECT count(), time::year(time) AS year, country FROM temperature GROUP BY country, year;
SELECT count(), time::min(time) as min, time::max(time) as max, time::year(time) AS year, country FROM temperature GROUP BY country, year;
SELECT count(), time::min(time) as min, time::max(time) as max, time::year(time) AS year, country FROM temperature GROUP BY country, year EXPLAIN;
";
let dbs = new_ds().await?;
let ses = Session::owner().with_ns("test").with_db("test");
let res = &mut dbs.execute(sql, &ses, None).await?;
assert_eq!(res.len(), 11);
assert_eq!(res.len(), 12);
//
let tmp = res.remove(0).result?;
let val = Value::parse(
@ -198,40 +199,88 @@ async fn select_limit_fetch() -> Result<(), Error> {
let tmp = res.remove(0).result?;
let val = Value::parse(
"[
{
count: 1,
country: 'AUD',
year: 2021
},
{
count: 1,
country: 'CHF',
year: 2023
},
{
count: 1,
country: 'EUR',
year: 2021
},
{
count: 3,
country: 'GBP',
year: 2020
},
{
count: 2,
country: 'GBP',
year: 2021
},
{
count: 1,
country: 'USD',
year: 2021
}
]",
{
count: 1,
country: 'AUD',
max: d'2021-01-01T08:00:00Z',
min: d'2021-01-01T08:00:00Z',
year: 2021
},
{
count: 1,
country: 'CHF',
max: d'2023-01-01T08:00:00Z',
min: d'2023-01-01T08:00:00Z',
year: 2023
},
{
count: 1,
country: 'EUR',
max: d'2021-01-01T08:00:00Z',
min: d'2021-01-01T08:00:00Z',
year: 2021
},
{
count: 3,
country: 'GBP',
max: d'2020-03-01T08:00:00Z',
min: d'2020-01-01T08:00:00Z',
year: 2020
},
{
count: 2,
country: 'GBP',
max: d'2021-01-01T08:00:00Z',
min: d'2021-01-01T08:00:00Z',
year: 2021
},
{
count: 1,
country: 'USD',
max: d'2021-01-01T08:00:00Z',
min: d'2021-01-01T08:00:00Z',
year: 2021
}
]",
);
assert_eq!(tmp, val);
//
let tmp = res.remove(0).result?;
let val = Value::parse(
"[
{
detail: {
table: 'temperature'
},
operation: 'Iterate Table'
},
{
detail: {
idioms: {
count: [
'count'
],
country: [
'first'
],
max: [
'time::max'
],
min: [
'time::min'
],
year: [
'array'
]
},
type: 'Group'
},
operation: 'Collector'
}
]",
);
assert_eq!(format!("{tmp:#}"), format!("{val:#}"));
//
Ok(())
}
@ -242,13 +291,14 @@ async fn select_multi_aggregate() -> Result<(), Error> {
CREATE test:2 SET group = 1, one = 4.7, two = 3.9;
CREATE test:3 SET group = 2, one = 3.2, two = 9.7;
CREATE test:4 SET group = 2, one = 4.4, two = 3.0;
SELECT group, math::sum(one) AS one, math::sum(two) AS two FROM test GROUP BY group;
SELECT group, math::sum(two) AS two, math::sum(one) AS one FROM test GROUP BY group;
SELECT group, math::sum(one) AS one, math::sum(two) AS two, math::min(one) as min FROM test GROUP BY group;
SELECT group, math::sum(two) AS two, math::sum(one) AS one, math::max(two) as max, math::mean(one) as mean FROM test GROUP BY group;
SELECT group, math::sum(two) AS two, math::sum(one) AS one, math::max(two) as max, math::mean(one) as mean FROM test GROUP BY group EXPLAIN;
";
let dbs = new_ds().await?;
let ses = Session::owner().with_ns("test").with_db("test");
let res = &mut dbs.execute(sql, &ses, None).await?;
assert_eq!(res.len(), 6);
assert_eq!(res.len(), 7);
//
let tmp = res.remove(0).result?;
let val = Value::parse(
@ -305,37 +355,78 @@ async fn select_multi_aggregate() -> Result<(), Error> {
let tmp = res.remove(0).result?;
let val = Value::parse(
"[
{
group: 1,
one: 6.4,
two: 6.3,
},
{
group: 2,
one: 7.6000000000000005,
two: 12.7,
}
]",
{
group: 1,
min: 1.7,
one: 6.4,
two: 6.3
},
{
group: 2,
min: 3.2f,
one: 7.6000000000000005,
two: 12.7
}
]",
);
assert_eq!(tmp, val);
//
let tmp = res.remove(0).result?;
let val = Value::parse(
"[
{
group: 1,
one: 6.4,
two: 6.3,
},
{
group: 2,
one: 7.6000000000000005,
two: 12.7,
}
]",
{
group: 1,
max: 3.9,
mean: 3.2,
one: 6.4,
two: 6.3
},
{
group: 2,
max: 9.7,
mean: 3.8000000000000003,
one: 7.6000000000000005,
two: 12.7
}
]",
);
assert_eq!(tmp, val);
//
let tmp = res.remove(0).result?;
let val = Value::parse(
"[
{
detail: {
table: 'test'
},
operation: 'Iterate Table'
},
{
detail: {
idioms: {
group: [
'first'
],
max: [
'math::max'
],
mean: [
'math::mean'
],
one: [
'math::sun'
],
two: [
'math::sun'
]
},
type: 'Group'
},
operation: 'Collector'
}
]",
);
assert_eq!(format!("{tmp:#}"), format!("{val:#}"));
Ok(())
}
@ -349,11 +440,12 @@ async fn select_multi_aggregate_composed() -> Result<(), Error> {
SELECT group, math::sum(math::floor(one)) AS one, math::sum(math::floor(two)) AS two FROM test GROUP BY group;
SELECT group, math::sum(math::round(one)) AS one, math::sum(math::round(two)) AS two FROM test GROUP BY group;
SELECT group, math::sum(math::ceil(one)) AS one, math::sum(math::ceil(two)) AS two FROM test GROUP BY group;
SELECT group, math::sum(math::ceil(one)) AS one, math::sum(math::ceil(two)) AS two FROM test GROUP BY group EXPLAIN;
";
let dbs = new_ds().await?;
let ses = Session::owner().with_ns("test").with_db("test");
let res = &mut dbs.execute(sql, &ses, None).await?;
assert_eq!(res.len(), 7);
assert_eq!(res.len(), 8);
//
let tmp = res.remove(0).result?;
let val = Value::parse(
@ -458,5 +550,35 @@ async fn select_multi_aggregate_composed() -> Result<(), Error> {
);
assert_eq!(tmp, val);
//
let tmp = res.remove(0).result?;
let val = Value::parse(
"[
{
detail: {
table: 'test'
},
operation: 'Iterate Table'
},
{
detail: {
idioms: {
group: [
'first'
],
one: [
'math::sun'
],
two: [
'math::sun'
]
},
type: 'Group'
},
operation: 'Collector'
}
]",
);
assert_eq!(format!("{tmp:#}"), format!("{val:#}"));
//
Ok(())
}

View file

@ -36,6 +36,12 @@ async fn select_where_matches_using_index() -> Result<(), Error> {
table: 'blog',
},
operation: 'Iterate Index'
},
{
detail: {
type: 'Store'
},
operation: 'Collector'
}
]",
);
@ -81,6 +87,12 @@ async fn select_where_matches_without_using_index_iterator() -> Result<(), Error
},
operation: 'Iterate Table'
},
{
detail: {
type: 'Store'
},
operation: 'Collector'
},
{
detail: {
count: 1,
@ -140,6 +152,12 @@ async fn select_where_matches_using_index_and_arrays(parallel: bool) -> Result<(
table: 'blog',
},
operation: 'Iterate Index'
},
{
detail: {
type: 'Store'
},
operation: 'Collector'
}
]",
);
@ -209,6 +227,12 @@ async fn select_where_matches_using_index_and_objects(parallel: bool) -> Result<
table: 'blog',
},
operation: 'Iterate Index'
},
{
detail: {
type: 'Store'
},
operation: 'Collector'
}
]",
);
@ -438,6 +462,12 @@ async fn select_where_matches_without_complex_query() -> Result<(), Error> {
table: 'page'
},
operation: 'Iterate Index'
},
{
detail: {
type: 'Store'
},
operation: 'Collector'
}
]",
);

View file

@ -208,6 +208,12 @@ fn table_explain(fetch_count: usize) -> String {
}},
operation: 'Iterate Table'
}},
{{
detail: {{
type: 'Store'
}},
operation: 'Collector'
}},
{{
detail: {{
count: {fetch_count}
@ -233,6 +239,12 @@ fn table_explain_no_index(fetch_count: usize) -> String {
}},
operation: 'Fallback'
}},
{{
detail: {{
type: 'Store'
}},
operation: 'Collector'
}},
{{
detail: {{
count: {fetch_count}
@ -250,6 +262,12 @@ const THREE_TABLE_EXPLAIN: &str = "[
},
operation: 'Iterate Table'
},
{
detail: {
type: 'Store'
},
operation: 'Collector'
},
{
detail: {
count: 3
@ -292,6 +310,12 @@ const THREE_MULTI_INDEX_EXPLAIN: &str = "[
},
operation: 'Iterate Index'
},
{
detail: {
type: 'Store'
},
operation: 'Collector'
},
{
detail: {
count: 3
@ -312,6 +336,12 @@ const SINGLE_INDEX_FT_EXPLAIN: &str = "[
},
operation: 'Iterate Index'
},
{
detail: {
type: 'Store'
},
operation: 'Collector'
},
{
detail: {
count: 1
@ -332,6 +362,12 @@ const SINGLE_INDEX_UNIQ_EXPLAIN: &str = "[
},
operation: 'Iterate Index'
},
{
detail: {
type: 'Store'
},
operation: 'Collector'
},
{
detail: {
count: 1
@ -352,6 +388,12 @@ const SINGLE_INDEX_IDX_EXPLAIN: &str = "[
},
operation: 'Iterate Index'
},
{
detail: {
type: 'Store'
},
operation: 'Collector'
},
{
detail: {
count: 1
@ -383,6 +425,12 @@ const TWO_MULTI_INDEX_EXPLAIN: &str = "[
},
operation: 'Iterate Index'
},
{
detail: {
type: 'Store'
},
operation: 'Collector'
},
{
detail: {
count: 2
@ -413,6 +461,12 @@ async fn select_with_no_index_unary_operator() -> Result<(), Error> {
reason: 'WITH NOINDEX'
},
operation: 'Fallback'
},
{
detail: {
type: 'Store'
},
operation: 'Collector'
}
]"#,
);
@ -441,6 +495,12 @@ async fn select_unsupported_unary_operator() -> Result<(), Error> {
reason: 'unary expressions not supported'
},
operation: 'Fallback'
},
{
detail: {
type: 'Store'
},
operation: 'Collector'
}
]"#,
);
@ -525,6 +585,12 @@ const EXPLAIN_FROM_TO: &str = r"[
table: 'test'
},
operation: 'Iterate Index'
},
{
detail: {
type: 'Store'
},
operation: 'Collector'
}
]";
@ -566,6 +632,12 @@ const EXPLAIN_FROM_INCL_TO: &str = r"[
table: 'test'
},
operation: 'Iterate Index'
},
{
detail: {
type: 'Store'
},
operation: 'Collector'
}
]";
@ -611,6 +683,12 @@ const EXPLAIN_FROM_TO_INCL: &str = r"[
table: 'test'
},
operation: 'Iterate Index'
},
{
detail: {
type: 'Store'
},
operation: 'Collector'
}
]";
@ -656,6 +734,12 @@ const EXPLAIN_FROM_INCL_TO_INCL: &str = r"[
table: 'test'
},
operation: 'Iterate Index'
},
{
detail: {
type: 'Store'
},
operation: 'Collector'
}
]";
@ -744,6 +828,12 @@ const EXPLAIN_LESS: &str = r"[
table: 'test'
},
operation: 'Iterate Index'
},
{
detail: {
type: 'Store'
},
operation: 'Collector'
}
]";
@ -779,6 +869,12 @@ const EXPLAIN_LESS_OR_EQUAL: &str = r"[
table: 'test'
},
operation: 'Iterate Index'
},
{
detail: {
type: 'Store'
},
operation: 'Collector'
}
]";
@ -818,6 +914,12 @@ const EXPLAIN_MORE: &str = r"[
table: 'test'
},
operation: 'Iterate Index'
},
{
detail: {
type: 'Store'
},
operation: 'Collector'
}
]";
@ -853,6 +955,12 @@ const EXPLAIN_MORE_OR_EQUAL: &str = r"[
table: 'test'
},
operation: 'Iterate Index'
},
{
detail: {
type: 'Store'
},
operation: 'Collector'
}
]";
@ -903,6 +1011,12 @@ async fn select_with_idiom_param_value() -> Result<(), Error> {
table: 'person'
},
operation: 'Iterate Index'
},
{
detail: {
type: 'Store'
},
operation: 'Collector'
}
]"#,
);
@ -945,6 +1059,12 @@ const CONTAINS_TABLE_EXPLAIN: &str = r"[
reason: 'NO INDEX FOUND'
},
operation: 'Fallback'
},
{
detail: {
type: 'Store'
},
operation: 'Collector'
}
]";
@ -996,9 +1116,6 @@ async fn select_contains() -> Result<(), Error> {
const INDEX_EXPLAIN: &str = r"[
{
detail: {
table: 'student'
},
detail: {
plan: {
index: 'subject_idx',
@ -1008,6 +1125,12 @@ async fn select_contains() -> Result<(), Error> {
table: 'student',
},
operation: 'Iterate Index'
},
{
detail: {
type: 'Store'
},
operation: 'Collector'
}
]";
const RESULT: &str = r"[
@ -1036,9 +1159,6 @@ async fn select_contains_all() -> Result<(), Error> {
"#;
const INDEX_EXPLAIN: &str = r"[
{
detail: {
table: 'student'
},
detail: {
plan: {
index: 'subject_idx',
@ -1048,6 +1168,12 @@ async fn select_contains_all() -> Result<(), Error> {
table: 'student',
},
operation: 'Iterate Index'
},
{
detail: {
type: 'Store'
},
operation: 'Collector'
}
]";
const RESULT: &str = r"[
@ -1076,9 +1202,6 @@ async fn select_contains_any() -> Result<(), Error> {
"#;
const INDEX_EXPLAIN: &str = r"[
{
detail: {
table: 'student'
},
detail: {
plan: {
index: 'subject_idx',
@ -1088,6 +1211,12 @@ async fn select_contains_any() -> Result<(), Error> {
table: 'student',
},
operation: 'Iterate Index'
},
{
detail: {
type: 'Store'
},
operation: 'Collector'
}
]";
const RESULT: &str = r"[
@ -1123,9 +1252,6 @@ async fn select_unique_contains() -> Result<(), Error> {
const INDEX_EXPLAIN: &str = r"[
{
detail: {
table: 'student'
},
detail: {
plan: {
index: 'subject_idx',
@ -1135,6 +1261,12 @@ async fn select_unique_contains() -> Result<(), Error> {
table: 'student',
},
operation: 'Iterate Index'
},
{
detail: {
type: 'Store'
},
operation: 'Collector'
}
]";
const RESULT: &str = r"[
@ -1182,6 +1314,12 @@ async fn select_with_datetime_value() -> Result<(), Error> {
table: 'test_user'
},
operation: 'Iterate Index'
},
{
detail: {
type: 'Store'
},
operation: 'Collector'
}
]"#,
);
@ -1239,6 +1377,12 @@ async fn select_with_uuid_value() -> Result<(), Error> {
table: 'sessions'
},
operation: 'Iterate Index'
},
{
detail: {
type: 'Store'
},
operation: 'Collector'
}
]"#,
);
@ -1294,6 +1438,12 @@ async fn select_with_in_operator() -> Result<(), Error> {
table: 'user'
},
operation: 'Iterate Index'
},
{
detail: {
type: 'Store'
},
operation: 'Collector'
}
]"#,
);
@ -1366,6 +1516,12 @@ async fn select_with_in_operator_uniq_index() -> Result<(), Error> {
table: 'apprenants'
},
operation: 'Iterate Index'
},
{
detail: {
type: 'Store'
},
operation: 'Collector'
}
]"#,
);

View file

@ -218,6 +218,12 @@ async fn select_expression_value() -> Result<(), Error> {
},
operation: 'Iterate Table'
},
{
detail: {
type: 'Store'
},
operation: 'Collector'
},
{
detail: {
count: 2,
@ -521,6 +527,12 @@ async fn select_where_field_is_thing_and_with_index() -> Result<(), Error> {
table: 'post',
},
operation: 'Iterate Index'
},
{
detail: {
type: 'Store'
},
operation: 'Collector'
}
]",
);
@ -540,6 +552,12 @@ async fn select_where_field_is_thing_and_with_index() -> Result<(), Error> {
},
operation: 'Iterate Index'
},
{
detail: {
type: 'Store'
},
operation: 'Collector'
},
{
detail: {
count: 2,
@ -597,6 +615,12 @@ async fn select_where_and_with_index() -> Result<(), Error> {
table: 'person',
},
operation: 'Iterate Index'
},
{
detail: {
type: 'Store'
},
operation: 'Collector'
}
]",
);
@ -644,6 +668,12 @@ async fn select_where_and_with_unique_index() -> Result<(), Error> {
table: 'person',
},
operation: 'Iterate Index'
},
{
detail: {
type: 'Store'
},
operation: 'Collector'
}
]",
);
@ -693,6 +723,12 @@ async fn select_where_and_with_fulltext_index() -> Result<(), Error> {
table: 'person',
},
operation: 'Iterate Index'
},
{
detail: {
type: 'Store'
},
operation: 'Collector'
}
]",
);
@ -741,7 +777,13 @@ async fn select_where_explain() -> Result<(), Error> {
table: 'software',
},
operation: 'Iterate Table'
}
},
{
detail: {
type: 'Store'
},
operation: 'Collector'
},
]",
);
assert_eq!(tmp, val);
@ -761,6 +803,12 @@ async fn select_where_explain() -> Result<(), Error> {
},
operation: 'Iterate Table'
},
{
detail: {
type: 'Store'
},
operation: 'Collector'
},
{
detail: {
count: 3,

View file

@ -52,7 +52,13 @@ async fn select_where_mtree_knn() -> Result<(), Error> {
table: 'pts',
},
operation: 'Iterate Index'
}
},
{
detail: {
type: 'Store'
},
operation: 'Collector'
},
]",
);
assert_eq!(format!("{:#}", tmp), format!("{:#}", val));
@ -199,7 +205,13 @@ async fn select_where_brut_force_knn() -> Result<(), Error> {
reason: 'NO INDEX FOUND'
},
operation: 'Fallback'
}
},
{
detail: {
type: 'Store'
},
operation: 'Collector'
},
]",
);
assert_eq!(format!("{:#}", tmp), format!("{:#}", val));