From 141e2e5e4c5075e1afc5a920a85a25f89e9c97bb Mon Sep 17 00:00:00 2001 From: Emmanuel Keller Date: Mon, 16 Sep 2024 17:30:00 +0100 Subject: [PATCH] Select count table scan optimisation (#4285) --- core/src/dbs/iterator.rs | 6 +-- core/src/dbs/plan.rs | 9 +++- core/src/dbs/processor.rs | 72 +++++++++++++++++++++----- core/src/idx/planner/mod.rs | 71 +++++++++++++------------- core/src/idx/planner/plan.rs | 45 ++++++++++++++--- core/src/kvs/scanner.rs | 84 ++++++++++++++++++++++++++++--- core/src/kvs/tx.rs | 18 ++++++- core/src/sql/field.rs | 19 +++++++ core/src/sql/function.rs | 4 ++ core/src/sql/statements/select.rs | 14 ++---- sdk/tests/group.rs | 80 +++++++++++++++++++++++++++++ 11 files changed, 343 insertions(+), 79 deletions(-) diff --git a/core/src/dbs/iterator.rs b/core/src/dbs/iterator.rs index 4cfdf3b8..dbb7d81f 100644 --- a/core/src/dbs/iterator.rs +++ b/core/src/dbs/iterator.rs @@ -27,7 +27,7 @@ const TARGET: &str = "surrealdb::core::dbs"; #[derive(Clone)] pub(crate) enum Iterable { Value(Value), - Table(Table), + Table(Table, bool), // true = keys only Thing(Thing), TableRange(String, IdRange), Edges(Edges), @@ -126,7 +126,7 @@ impl Iterator { } _ => { // Ingest the table for scanning - self.ingest(Iterable::Table(v)) + self.ingest(Iterable::Table(v, false)) } }, // There is no data clause so create a record id @@ -137,7 +137,7 @@ impl Iterator { } _ => { // Ingest the table for scanning - self.ingest(Iterable::Table(v)) + self.ingest(Iterable::Table(v, false)) } }, }, diff --git a/core/src/dbs/plan.rs b/core/src/dbs/plan.rs index e0704531..05cc12a8 100644 --- a/core/src/dbs/plan.rs +++ b/core/src/dbs/plan.rs @@ -93,8 +93,13 @@ impl ExplainItem { name: "Iterate Value".into(), details: vec![("value", v.to_owned())], }, - Iterable::Table(t) => Self { - name: "Iterate Table".into(), + Iterable::Table(t, keys_only) => Self { + name: if *keys_only { + "Iterate Table Keys" + } else { + "Iterate Table" + } + .into(), details: vec![("table", Value::from(t.0.to_owned()))], }, Iterable::Thing(t) => Self { diff --git a/core/src/dbs/processor.rs b/core/src/dbs/processor.rs index c1b5b4f4..b4aacb34 100644 --- a/core/src/dbs/processor.rs +++ b/core/src/dbs/processor.rs @@ -16,6 +16,7 @@ use crate::sql::{Edges, Table, Thing, Value}; use channel::Sender; use futures::StreamExt; use reblessive::tree::Stk; +use std::borrow::Cow; use std::ops::Bound; use std::vec; @@ -55,7 +56,7 @@ impl Iterable { fn iteration_stage_check(&self, ctx: &Context) -> bool { match self { - Iterable::Table(tb) | Iterable::Index(tb, _) => { + Iterable::Table(tb, _) | Iterable::Index(tb, _) => { if let Some(IterationStage::BuildKnn) = ctx.get_iteration_stage() { if let Some(qp) = ctx.get_query_planner() { if let Some(exe) = qp.get_query_executor(tb) { @@ -111,6 +112,19 @@ impl<'a> Processor<'a> { Ok(()) } + fn check_query_planner_context<'b>(ctx: &'b Context, table: &'b Table) -> Cow<'b, Context> { + if let Some(qp) = ctx.get_query_planner() { + if let Some(exe) = qp.get_query_executor(&table.0) { + // We set the query executor matching the current table in the Context + // Avoiding search in the hashmap of the query planner for each doc + let mut ctx = MutableContext::new(ctx); + ctx.set_query_executor(exe.clone()); + return Cow::Owned(ctx.freeze()); + } + } + Cow::Borrowed(ctx) + } + async fn process_iterable( &mut self, stk: &mut Stk, @@ -128,18 +142,13 @@ impl<'a> Processor<'a> { self.process_range(stk, ctx, opt, stm, tb, v).await? } Iterable::Edges(e) => self.process_edge(stk, ctx, opt, stm, e).await?, - Iterable::Table(v) => { - if let Some(qp) = ctx.get_query_planner() { - if let Some(exe) = qp.get_query_executor(&v.0) { - // We set the query executor matching the current table in the Context - // Avoiding search in the hashmap of the query planner for each doc - let mut ctx = MutableContext::new(ctx); - ctx.set_query_executor(exe.clone()); - let ctx = ctx.freeze(); - return self.process_table(stk, &ctx, opt, stm, &v).await; - } + Iterable::Table(v, keys_only) => { + let ctx = Self::check_query_planner_context(ctx, &v); + if keys_only { + self.process_table_keys(stk, &ctx, opt, stm, &v).await? + } else { + self.process_table(stk, &ctx, opt, stm, &v).await? } - self.process_table(stk, ctx, opt, stm, &v).await? } Iterable::Index(t, irf) => { if let Some(qp) = ctx.get_query_planner() { @@ -340,6 +349,45 @@ impl<'a> Processor<'a> { Ok(()) } + async fn process_table_keys( + &mut self, + stk: &mut Stk, + ctx: &Context, + opt: &Options, + stm: &Statement<'_>, + v: &Table, + ) -> Result<(), Error> { + // Get the transaction + let txn = ctx.tx(); + // Check that the table exists + txn.check_ns_db_tb(opt.ns()?, opt.db()?, v, opt.strict).await?; + // Prepare the start and end keys + let beg = thing::prefix(opt.ns()?, opt.db()?, v); + let end = thing::suffix(opt.ns()?, opt.db()?, v); + // Create a new iterable range + let mut stream = txn.stream_keys(beg..end); + // Loop until no more entries + while let Some(res) = stream.next().await { + // Check if the context is finished + if ctx.is_done() { + break; + } + // Parse the data from the store + let k = res?; + let key: thing::Thing = (&k).into(); + let rid = Thing::from((key.tb, key.id)); + // Process the record + let pro = Processed { + rid: Some(rid.into()), + ir: None, + val: Operable::Value(Value::Null.into()), + }; + self.process(stk, ctx, opt, stm, pro).await?; + } + // Everything ok + Ok(()) + } + async fn process_range( &mut self, stk: &mut Stk, diff --git a/core/src/idx/planner/mod.rs b/core/src/idx/planner/mod.rs index c6c559da..db006551 100644 --- a/core/src/idx/planner/mod.rs +++ b/core/src/idx/planner/mod.rs @@ -14,18 +14,34 @@ use crate::idx::planner::iterators::IteratorRef; use crate::idx::planner::knn::KnnBruteForceResults; use crate::idx::planner::plan::{Plan, PlanBuilder}; use crate::idx::planner::tree::Tree; +use crate::sql::statements::SelectStatement; use crate::sql::with::With; -use crate::sql::{Cond, Orders, Table}; +use crate::sql::{Cond, Fields, Groups, Orders, Table}; use reblessive::tree::Stk; use std::collections::HashMap; use std::sync::atomic::{AtomicU8, Ordering}; -use std::sync::Arc; + +pub(crate) struct QueryPlannerParams<'a> { + fields: &'a Fields, + with: Option<&'a With>, + order: Option<&'a Orders>, + cond: Option<&'a Cond>, + group: Option<&'a Groups>, +} + +impl<'a> From<&'a SelectStatement> for QueryPlannerParams<'a> { + fn from(stmt: &'a SelectStatement) -> Self { + QueryPlannerParams { + fields: &stmt.expr, + with: stmt.with.as_ref(), + order: stmt.order.as_ref(), + cond: stmt.cond.as_ref(), + group: stmt.group.as_ref(), + } + } +} pub(crate) struct QueryPlanner { - opt: Arc, - with: Option>, - cond: Option>, - order: Option>, /// There is one executor per table executors: HashMap, requires_distinct: bool, @@ -36,17 +52,8 @@ pub(crate) struct QueryPlanner { } impl QueryPlanner { - pub(crate) fn new( - opt: Arc, - with: Option>, - cond: Option>, - order: Option>, - ) -> Self { + pub(crate) fn new() -> Self { Self { - opt, - with, - cond, - order, executors: HashMap::default(), requires_distinct: false, fallbacks: vec![], @@ -60,27 +67,22 @@ impl QueryPlanner { &mut self, stk: &mut Stk, ctx: &Context, + opt: &Options, t: Table, + params: &QueryPlannerParams<'_>, it: &mut Iterator, ) -> Result<(), Error> { let mut is_table_iterator = false; - let mut tree = Tree::build( - stk, - ctx, - &self.opt, - &t, - self.cond.as_ref().map(|w| w.as_ref()), - self.with.as_ref().map(|c| c.as_ref()), - self.order.as_ref().map(|o| o.as_ref()), - ) - .await?; + + let mut tree = + Tree::build(stk, ctx, opt, &t, params.cond, params.with, params.order).await?; let is_knn = !tree.knn_expressions.is_empty(); let order = tree.index_map.order_limit.take(); let mut exe = InnerQueryExecutor::new( stk, ctx, - &self.opt, + opt, &t, tree.index_map, tree.knn_expressions, @@ -88,12 +90,7 @@ impl QueryPlanner { tree.knn_condition, ) .await?; - match PlanBuilder::build( - tree.root, - self.with.as_ref().map(|w| w.as_ref()), - tree.with_indexes, - order, - )? { + match PlanBuilder::build(tree.root, params, tree.with_indexes, order)? { Plan::SingleIndex(exp, io) => { if io.require_distinct() { self.requires_distinct = true; @@ -123,12 +120,12 @@ impl QueryPlanner { let ir = exe.add_iterator(IteratorEntry::Range(rq.exps, ixn, rq.from, rq.to)); self.add(t.clone(), Some(ir), exe, it); } - Plan::TableIterator(fallback) => { - if let Some(fallback) = fallback { - self.fallbacks.push(fallback); + Plan::TableIterator(reason, keys_only) => { + if let Some(reason) = reason { + self.fallbacks.push(reason); } self.add(t.clone(), None, exe, it); - it.ingest(Iterable::Table(t)); + it.ingest(Iterable::Table(t, keys_only)); is_table_iterator = true; } } diff --git a/core/src/idx/planner/plan.rs b/core/src/idx/planner/plan.rs index 08363d21..4b44af47 100644 --- a/core/src/idx/planner/plan.rs +++ b/core/src/idx/planner/plan.rs @@ -1,6 +1,7 @@ use crate::err::Error; use crate::idx::ft::MatchRef; use crate::idx::planner::tree::{GroupRef, IdiomCol, IdiomPosition, IndexRef, Node}; +use crate::idx::planner::QueryPlannerParams; use crate::sql::statements::DefineIndexStatement; use crate::sql::with::With; use crate::sql::{Array, Expression, Idiom, Number, Object}; @@ -31,13 +32,10 @@ pub(super) struct PlanBuilder { impl PlanBuilder { pub(super) fn build( root: Option, - with: Option<&With>, + params: &QueryPlannerParams, with_indexes: Vec, order: Option, ) -> Result { - if let Some(With::NoIndex) = with { - return Ok(Plan::TableIterator(Some("WITH NOINDEX".to_string()))); - } let mut b = PlanBuilder { has_indexes: false, non_range_indexes: Default::default(), @@ -47,10 +45,18 @@ impl PlanBuilder { all_and: true, all_exp_with_index: true, }; + + // If we only count and there are no conditions and no aggregations, then we can only scan keys + let keys_only = Self::is_keys_only(params); + + if let Some(With::NoIndex) = params.with { + return Ok(Self::table_iterator(Some("WITH NOINDEX"), keys_only)); + } + // Browse the AST and collect information if let Some(root) = &root { if let Err(e) = b.eval_node(root) { - return Ok(Plan::TableIterator(Some(e.to_string()))); + return Ok(Self::table_iterator(Some(&e), keys_only)); } } @@ -84,7 +90,32 @@ impl PlanBuilder { } return Ok(Plan::MultiIndex(b.non_range_indexes, ranges)); } - Ok(Plan::TableIterator(None)) + Ok(Self::table_iterator(None, keys_only)) + } + + fn is_keys_only(p: &QueryPlannerParams) -> bool { + if !p.fields.is_count_all_only() { + return false; + } + if p.cond.is_some() { + return false; + } + if let Some(g) = p.group { + if !g.is_empty() { + return false; + } + } + if let Some(p) = p.order { + if !p.is_empty() { + return false; + } + } + true + } + + fn table_iterator(reason: Option<&str>, keys_only: bool) -> Plan { + let reason = reason.map(|s| s.to_string()); + Plan::TableIterator(reason, keys_only) } // Check if we have an explicit list of index we can use @@ -161,7 +192,7 @@ impl PlanBuilder { pub(super) enum Plan { /// Table full scan - TableIterator(Option), + TableIterator(Option, bool), /// Index scan filtered on records matching a given expression SingleIndex(Option>, IndexOption), /// Union of filtered index scans diff --git a/core/src/kvs/scanner.rs b/core/src/kvs/scanner.rs index b422ac63..2439eac2 100644 --- a/core/src/kvs/scanner.rs +++ b/core/src/kvs/scanner.rs @@ -11,9 +11,7 @@ use std::ops::Range; use std::pin::Pin; use std::task::{Context, Poll}; -type Output = Result, Error>; - -pub(super) struct Scanner<'a> { +pub(super) struct Scanner<'a, I> { /// The store which started this range scan store: &'a Transaction, /// The number of keys to fetch at once @@ -21,16 +19,17 @@ pub(super) struct Scanner<'a> { // The key range for this range scan range: Range, // The results from the last range scan - results: VecDeque<(Key, Val)>, + results: VecDeque, + #[allow(clippy::type_complexity)] /// The currently running future to be polled - future: Option + 'a>>>, + future: Option, Error>> + 'a>>>, /// Whether this stream should try to fetch more exhausted: bool, /// Version as timestamp, 0 means latest. version: Option, } -impl<'a> Scanner<'a> { +impl<'a, I> Scanner<'a, I> { pub fn new( store: &'a Transaction, batch: u32, @@ -49,7 +48,7 @@ impl<'a> Scanner<'a> { } } -impl<'a> Stream for Scanner<'a> { +impl<'a> Stream for Scanner<'a, (Key, Val)> { type Item = Result<(Key, Val), Error>; fn poll_next( mut self: Pin<&mut Self>, @@ -94,7 +93,9 @@ impl<'a> Stream for Scanner<'a> { self.exhausted = true; } // Get the last element of the results - let last = v.last().unwrap(); + let last = v.last().ok_or_else(|| { + Error::Unreachable("Last key/val can't be none".to_string()) + })?; // Start the next scan from the last result self.range.start.clone_from(&last.0); // Ensure we don't see the last result again @@ -116,3 +117,70 @@ impl<'a> Stream for Scanner<'a> { } } } + +impl<'a> Stream for Scanner<'a, Key> { + type Item = Result; + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll>> { + // If we have results, return the first one + if let Some(v) = self.results.pop_front() { + return Poll::Ready(Some(Ok(v))); + } + // If we won't fetch more results then exit + if self.exhausted { + return Poll::Ready(None); + } + // Check if there is no pending future task + if self.future.is_none() { + // Set the max number of results to fetch + let num = std::cmp::min(*MAX_STREAM_BATCH_SIZE, self.batch); + // Clone the range to use when scanning + let range = self.range.clone(); + // Prepare a future to scan for results + self.future = Some(Box::pin(self.store.keys(range, num))); + } + // Try to resolve the future + match self.future.as_mut().unwrap().poll_unpin(cx) { + // The future has now completed fully + Poll::Ready(result) => { + // Drop the completed asynchronous future + self.future = None; + // Check the result of the finished future + match result { + // The range was fetched successfully + Ok(v) => match v.is_empty() { + // There are no more results to stream + true => { + // Mark this stream as complete + Poll::Ready(None) + } + // There are results which need streaming + false => { + // We fetched the last elements in the range + if v.len() < self.batch as usize { + self.exhausted = true; + } + // Get the last element of the results + let last = v.last().ok_or_else(|| { + Error::Unreachable("Last key can't be none".to_string()) + })?; + // Start the next scan from the last result + self.range.start.clone_from(last); + // Ensure we don't see the last result again + self.range.start.push(0xff); + // Store the fetched range results + self.results.extend(v); + // Remove the first result to return + let item = self.results.pop_front().unwrap(); + // Return the first result + Poll::Ready(Some(Ok(item))) + } + }, + // Return the received error + Err(error) => Poll::Ready(Some(Err(error))), + } + } + // The future has not yet completed + Poll::Pending => Poll::Pending, + } + } +} diff --git a/core/src/kvs/tx.rs b/core/src/kvs/tx.rs index 4af958cc..ab58db61 100644 --- a/core/src/kvs/tx.rs +++ b/core/src/kvs/tx.rs @@ -279,7 +279,7 @@ impl Transaction { where K: Into + Debug, { - Scanner::new( + Scanner::<(Key, Val)>::new( self, *NORMAL_FETCH_SIZE, Range { @@ -290,6 +290,22 @@ impl Transaction { ) } + #[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip_all)] + pub fn stream_keys(&self, rng: Range) -> impl Stream> + '_ + where + K: Into + Debug, + { + Scanner::::new( + self, + *NORMAL_FETCH_SIZE, + Range { + start: rng.start.into(), + end: rng.end.into(), + }, + None, + ) + } + // -------------------------------------------------- // Rollback methods // -------------------------------------------------- diff --git a/core/src/sql/field.rs b/core/src/sql/field.rs index 349418eb..7badac24 100644 --- a/core/src/sql/field.rs +++ b/core/src/sql/field.rs @@ -41,6 +41,25 @@ impl Fields { _ => None, } } + + /// Check if the fields are only about counting + pub(crate) fn is_count_all_only(&self) -> bool { + let mut is_count_only = false; + for field in &self.0 { + if let Field::Single { + expr: Value::Function(func), + .. + } = field + { + if func.is_count_all() { + is_count_only = true; + continue; + } + } + return false; + } + is_count_only + } } impl Deref for Fields { diff --git a/core/src/sql/function.rs b/core/src/sql/function.rs index d6e6da6e..3278cdbf 100644 --- a/core/src/sql/function.rs +++ b/core/src/sql/function.rs @@ -180,6 +180,10 @@ impl Function { _ => OptimisedAggregate::None, } } + + pub(crate) fn is_count_all(&self) -> bool { + matches!(self, Self::Normal(f, p) if f == "count" && p.is_empty() ) + } } impl Function { diff --git a/core/src/sql/statements/select.rs b/core/src/sql/statements/select.rs index ec6c669b..6693f9c2 100644 --- a/core/src/sql/statements/select.rs +++ b/core/src/sql/statements/select.rs @@ -76,13 +76,6 @@ impl SelectStatement { let version = self.version.as_ref().map(|v| v.to_u64()); let opt = Arc::new(opt.new_with_futures(false).with_projections(true).with_version(version)); - // Get a query planner - let mut planner = QueryPlanner::new( - opt.clone(), - self.with.as_ref().cloned().map(|w| w.into()), - self.cond.as_ref().cloned().map(|c| c.into()), - self.order.as_ref().cloned().map(|o| o.into()), - ); // Extract the limit let limit = i.setup_limit(stk, ctx, &opt, &stm).await?; // Used for ONLY: is the limit 1? @@ -103,6 +96,9 @@ impl SelectStatement { } None => ctx.clone(), }; + // Get a query planner + let mut planner = QueryPlanner::new(); + let params = self.into(); // Loop over the select targets for w in self.what.0.iter() { let v = w.compute(stk, &ctx, &opt, doc).await?; @@ -111,7 +107,7 @@ impl SelectStatement { if self.only && !limit_is_one_or_zero { return Err(Error::SingleOnlyOutput); } - planner.add_iterables(stk, &ctx, t, &mut i).await?; + planner.add_iterables(stk, &ctx, &opt, t, ¶ms, &mut i).await?; } Value::Thing(v) => match &v.id { Id::Range(r) => i.ingest(Iterable::TableRange(v.tb, *r.to_owned())), @@ -141,7 +137,7 @@ impl SelectStatement { for v in v { match v { Value::Table(t) => { - planner.add_iterables(stk, &ctx, t, &mut i).await?; + planner.add_iterables(stk, &ctx, &opt, t, ¶ms, &mut i).await?; } Value::Thing(v) => i.ingest(Iterable::Thing(v)), Value::Edges(v) => i.ingest(Iterable::Edges(*v)), diff --git a/sdk/tests/group.rs b/sdk/tests/group.rs index 7ed028fc..3c3378fb 100644 --- a/sdk/tests/group.rs +++ b/sdk/tests/group.rs @@ -1,6 +1,7 @@ mod parse; use parse::Parse; mod helpers; +use crate::helpers::Test; use helpers::new_ds; use helpers::skip_ok; use surrealdb::dbs::Session; @@ -734,3 +735,82 @@ async fn select_aggregate_mean_update() -> Result<(), Error> { Ok(()) } + +#[tokio::test] +async fn select_count_group_all() -> Result<(), Error> { + let sql = r#" + CREATE table CONTENT { bar: "hello", foo: "Man"}; + CREATE table CONTENT { bar: "hello", foo: "World"}; + CREATE table CONTENT { bar: "world"}; + SELECT COUNT() FROM table GROUP ALL EXPLAIN; + SELECT COUNT() FROM table GROUP ALL; + SELECT COUNT() FROM table EXPLAIN; + SELECT COUNT() FROM table; + "#; + let mut t = Test::new(sql).await?; + t.expect_size(7)?; + // + t.skip_ok(3)?; + // + t.expect_val( + r#"[ + { + detail: { + table: 'table' + }, + operation: 'Iterate Table Keys' + }, + { + detail: { + idioms: { + count: [ + 'count' + ] + }, + type: 'Group' + }, + operation: 'Collector' + } + ]"#, + )?; + // + t.expect_val( + r#"[ + { + count: 3 + } + ]"#, + )?; + // + t.expect_val( + r#"[ + { + detail: { + table: 'table' + }, + operation: 'Iterate Table Keys' + }, + { + detail: { + type: 'Memory' + }, + operation: 'Collector' + } + ]"#, + )?; + // + t.expect_val( + r#"[ + { + count: 1 + }, + { + count: 1 + }, + { + count: 1 + } + ]"#, + )?; + Ok(()) +}