Select count table scan optimisation (#4285)

This commit is contained in:
Emmanuel Keller 2024-09-16 17:30:00 +01:00 committed by GitHub
parent 9af0082376
commit 141e2e5e4c
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
11 changed files with 343 additions and 79 deletions

View file

@ -27,7 +27,7 @@ const TARGET: &str = "surrealdb::core::dbs";
#[derive(Clone)]
pub(crate) enum Iterable {
Value(Value),
Table(Table),
Table(Table, bool), // true = keys only
Thing(Thing),
TableRange(String, IdRange),
Edges(Edges),
@ -126,7 +126,7 @@ impl Iterator {
}
_ => {
// Ingest the table for scanning
self.ingest(Iterable::Table(v))
self.ingest(Iterable::Table(v, false))
}
},
// There is no data clause so create a record id
@ -137,7 +137,7 @@ impl Iterator {
}
_ => {
// Ingest the table for scanning
self.ingest(Iterable::Table(v))
self.ingest(Iterable::Table(v, false))
}
},
},

View file

@ -93,8 +93,13 @@ impl ExplainItem {
name: "Iterate Value".into(),
details: vec![("value", v.to_owned())],
},
Iterable::Table(t) => Self {
name: "Iterate Table".into(),
Iterable::Table(t, keys_only) => Self {
name: if *keys_only {
"Iterate Table Keys"
} else {
"Iterate Table"
}
.into(),
details: vec![("table", Value::from(t.0.to_owned()))],
},
Iterable::Thing(t) => Self {

View file

@ -16,6 +16,7 @@ use crate::sql::{Edges, Table, Thing, Value};
use channel::Sender;
use futures::StreamExt;
use reblessive::tree::Stk;
use std::borrow::Cow;
use std::ops::Bound;
use std::vec;
@ -55,7 +56,7 @@ impl Iterable {
fn iteration_stage_check(&self, ctx: &Context) -> bool {
match self {
Iterable::Table(tb) | Iterable::Index(tb, _) => {
Iterable::Table(tb, _) | Iterable::Index(tb, _) => {
if let Some(IterationStage::BuildKnn) = ctx.get_iteration_stage() {
if let Some(qp) = ctx.get_query_planner() {
if let Some(exe) = qp.get_query_executor(tb) {
@ -111,6 +112,19 @@ impl<'a> Processor<'a> {
Ok(())
}
fn check_query_planner_context<'b>(ctx: &'b Context, table: &'b Table) -> Cow<'b, Context> {
if let Some(qp) = ctx.get_query_planner() {
if let Some(exe) = qp.get_query_executor(&table.0) {
// We set the query executor matching the current table in the Context
// Avoiding search in the hashmap of the query planner for each doc
let mut ctx = MutableContext::new(ctx);
ctx.set_query_executor(exe.clone());
return Cow::Owned(ctx.freeze());
}
}
Cow::Borrowed(ctx)
}
async fn process_iterable(
&mut self,
stk: &mut Stk,
@ -128,18 +142,13 @@ impl<'a> Processor<'a> {
self.process_range(stk, ctx, opt, stm, tb, v).await?
}
Iterable::Edges(e) => self.process_edge(stk, ctx, opt, stm, e).await?,
Iterable::Table(v) => {
if let Some(qp) = ctx.get_query_planner() {
if let Some(exe) = qp.get_query_executor(&v.0) {
// We set the query executor matching the current table in the Context
// Avoiding search in the hashmap of the query planner for each doc
let mut ctx = MutableContext::new(ctx);
ctx.set_query_executor(exe.clone());
let ctx = ctx.freeze();
return self.process_table(stk, &ctx, opt, stm, &v).await;
}
Iterable::Table(v, keys_only) => {
let ctx = Self::check_query_planner_context(ctx, &v);
if keys_only {
self.process_table_keys(stk, &ctx, opt, stm, &v).await?
} else {
self.process_table(stk, &ctx, opt, stm, &v).await?
}
self.process_table(stk, ctx, opt, stm, &v).await?
}
Iterable::Index(t, irf) => {
if let Some(qp) = ctx.get_query_planner() {
@ -340,6 +349,45 @@ impl<'a> Processor<'a> {
Ok(())
}
async fn process_table_keys(
&mut self,
stk: &mut Stk,
ctx: &Context,
opt: &Options,
stm: &Statement<'_>,
v: &Table,
) -> Result<(), Error> {
// Get the transaction
let txn = ctx.tx();
// Check that the table exists
txn.check_ns_db_tb(opt.ns()?, opt.db()?, v, opt.strict).await?;
// Prepare the start and end keys
let beg = thing::prefix(opt.ns()?, opt.db()?, v);
let end = thing::suffix(opt.ns()?, opt.db()?, v);
// Create a new iterable range
let mut stream = txn.stream_keys(beg..end);
// Loop until no more entries
while let Some(res) = stream.next().await {
// Check if the context is finished
if ctx.is_done() {
break;
}
// Parse the data from the store
let k = res?;
let key: thing::Thing = (&k).into();
let rid = Thing::from((key.tb, key.id));
// Process the record
let pro = Processed {
rid: Some(rid.into()),
ir: None,
val: Operable::Value(Value::Null.into()),
};
self.process(stk, ctx, opt, stm, pro).await?;
}
// Everything ok
Ok(())
}
async fn process_range(
&mut self,
stk: &mut Stk,

View file

@ -14,18 +14,34 @@ use crate::idx::planner::iterators::IteratorRef;
use crate::idx::planner::knn::KnnBruteForceResults;
use crate::idx::planner::plan::{Plan, PlanBuilder};
use crate::idx::planner::tree::Tree;
use crate::sql::statements::SelectStatement;
use crate::sql::with::With;
use crate::sql::{Cond, Orders, Table};
use crate::sql::{Cond, Fields, Groups, Orders, Table};
use reblessive::tree::Stk;
use std::collections::HashMap;
use std::sync::atomic::{AtomicU8, Ordering};
use std::sync::Arc;
pub(crate) struct QueryPlannerParams<'a> {
fields: &'a Fields,
with: Option<&'a With>,
order: Option<&'a Orders>,
cond: Option<&'a Cond>,
group: Option<&'a Groups>,
}
impl<'a> From<&'a SelectStatement> for QueryPlannerParams<'a> {
fn from(stmt: &'a SelectStatement) -> Self {
QueryPlannerParams {
fields: &stmt.expr,
with: stmt.with.as_ref(),
order: stmt.order.as_ref(),
cond: stmt.cond.as_ref(),
group: stmt.group.as_ref(),
}
}
}
pub(crate) struct QueryPlanner {
opt: Arc<Options>,
with: Option<Arc<With>>,
cond: Option<Arc<Cond>>,
order: Option<Arc<Orders>>,
/// There is one executor per table
executors: HashMap<String, QueryExecutor>,
requires_distinct: bool,
@ -36,17 +52,8 @@ pub(crate) struct QueryPlanner {
}
impl QueryPlanner {
pub(crate) fn new(
opt: Arc<Options>,
with: Option<Arc<With>>,
cond: Option<Arc<Cond>>,
order: Option<Arc<Orders>>,
) -> Self {
pub(crate) fn new() -> Self {
Self {
opt,
with,
cond,
order,
executors: HashMap::default(),
requires_distinct: false,
fallbacks: vec![],
@ -60,27 +67,22 @@ impl QueryPlanner {
&mut self,
stk: &mut Stk,
ctx: &Context,
opt: &Options,
t: Table,
params: &QueryPlannerParams<'_>,
it: &mut Iterator,
) -> Result<(), Error> {
let mut is_table_iterator = false;
let mut tree = Tree::build(
stk,
ctx,
&self.opt,
&t,
self.cond.as_ref().map(|w| w.as_ref()),
self.with.as_ref().map(|c| c.as_ref()),
self.order.as_ref().map(|o| o.as_ref()),
)
.await?;
let mut tree =
Tree::build(stk, ctx, opt, &t, params.cond, params.with, params.order).await?;
let is_knn = !tree.knn_expressions.is_empty();
let order = tree.index_map.order_limit.take();
let mut exe = InnerQueryExecutor::new(
stk,
ctx,
&self.opt,
opt,
&t,
tree.index_map,
tree.knn_expressions,
@ -88,12 +90,7 @@ impl QueryPlanner {
tree.knn_condition,
)
.await?;
match PlanBuilder::build(
tree.root,
self.with.as_ref().map(|w| w.as_ref()),
tree.with_indexes,
order,
)? {
match PlanBuilder::build(tree.root, params, tree.with_indexes, order)? {
Plan::SingleIndex(exp, io) => {
if io.require_distinct() {
self.requires_distinct = true;
@ -123,12 +120,12 @@ impl QueryPlanner {
let ir = exe.add_iterator(IteratorEntry::Range(rq.exps, ixn, rq.from, rq.to));
self.add(t.clone(), Some(ir), exe, it);
}
Plan::TableIterator(fallback) => {
if let Some(fallback) = fallback {
self.fallbacks.push(fallback);
Plan::TableIterator(reason, keys_only) => {
if let Some(reason) = reason {
self.fallbacks.push(reason);
}
self.add(t.clone(), None, exe, it);
it.ingest(Iterable::Table(t));
it.ingest(Iterable::Table(t, keys_only));
is_table_iterator = true;
}
}

View file

@ -1,6 +1,7 @@
use crate::err::Error;
use crate::idx::ft::MatchRef;
use crate::idx::planner::tree::{GroupRef, IdiomCol, IdiomPosition, IndexRef, Node};
use crate::idx::planner::QueryPlannerParams;
use crate::sql::statements::DefineIndexStatement;
use crate::sql::with::With;
use crate::sql::{Array, Expression, Idiom, Number, Object};
@ -31,13 +32,10 @@ pub(super) struct PlanBuilder {
impl PlanBuilder {
pub(super) fn build(
root: Option<Node>,
with: Option<&With>,
params: &QueryPlannerParams,
with_indexes: Vec<IndexRef>,
order: Option<IndexOption>,
) -> Result<Plan, Error> {
if let Some(With::NoIndex) = with {
return Ok(Plan::TableIterator(Some("WITH NOINDEX".to_string())));
}
let mut b = PlanBuilder {
has_indexes: false,
non_range_indexes: Default::default(),
@ -47,10 +45,18 @@ impl PlanBuilder {
all_and: true,
all_exp_with_index: true,
};
// If we only count and there are no conditions and no aggregations, then we can only scan keys
let keys_only = Self::is_keys_only(params);
if let Some(With::NoIndex) = params.with {
return Ok(Self::table_iterator(Some("WITH NOINDEX"), keys_only));
}
// Browse the AST and collect information
if let Some(root) = &root {
if let Err(e) = b.eval_node(root) {
return Ok(Plan::TableIterator(Some(e.to_string())));
return Ok(Self::table_iterator(Some(&e), keys_only));
}
}
@ -84,7 +90,32 @@ impl PlanBuilder {
}
return Ok(Plan::MultiIndex(b.non_range_indexes, ranges));
}
Ok(Plan::TableIterator(None))
Ok(Self::table_iterator(None, keys_only))
}
fn is_keys_only(p: &QueryPlannerParams) -> bool {
if !p.fields.is_count_all_only() {
return false;
}
if p.cond.is_some() {
return false;
}
if let Some(g) = p.group {
if !g.is_empty() {
return false;
}
}
if let Some(p) = p.order {
if !p.is_empty() {
return false;
}
}
true
}
fn table_iterator(reason: Option<&str>, keys_only: bool) -> Plan {
let reason = reason.map(|s| s.to_string());
Plan::TableIterator(reason, keys_only)
}
// Check if we have an explicit list of index we can use
@ -161,7 +192,7 @@ impl PlanBuilder {
pub(super) enum Plan {
/// Table full scan
TableIterator(Option<String>),
TableIterator(Option<String>, bool),
/// Index scan filtered on records matching a given expression
SingleIndex(Option<Arc<Expression>>, IndexOption),
/// Union of filtered index scans

View file

@ -11,9 +11,7 @@ use std::ops::Range;
use std::pin::Pin;
use std::task::{Context, Poll};
type Output = Result<Vec<(Key, Val)>, Error>;
pub(super) struct Scanner<'a> {
pub(super) struct Scanner<'a, I> {
/// The store which started this range scan
store: &'a Transaction,
/// The number of keys to fetch at once
@ -21,16 +19,17 @@ pub(super) struct Scanner<'a> {
// The key range for this range scan
range: Range<Key>,
// The results from the last range scan
results: VecDeque<(Key, Val)>,
results: VecDeque<I>,
#[allow(clippy::type_complexity)]
/// The currently running future to be polled
future: Option<Pin<Box<dyn Future<Output = Output> + 'a>>>,
future: Option<Pin<Box<dyn Future<Output = Result<Vec<I>, Error>> + 'a>>>,
/// Whether this stream should try to fetch more
exhausted: bool,
/// Version as timestamp, 0 means latest.
version: Option<u64>,
}
impl<'a> Scanner<'a> {
impl<'a, I> Scanner<'a, I> {
pub fn new(
store: &'a Transaction,
batch: u32,
@ -49,7 +48,7 @@ impl<'a> Scanner<'a> {
}
}
impl<'a> Stream for Scanner<'a> {
impl<'a> Stream for Scanner<'a, (Key, Val)> {
type Item = Result<(Key, Val), Error>;
fn poll_next(
mut self: Pin<&mut Self>,
@ -94,7 +93,9 @@ impl<'a> Stream for Scanner<'a> {
self.exhausted = true;
}
// Get the last element of the results
let last = v.last().unwrap();
let last = v.last().ok_or_else(|| {
Error::Unreachable("Last key/val can't be none".to_string())
})?;
// Start the next scan from the last result
self.range.start.clone_from(&last.0);
// Ensure we don't see the last result again
@ -116,3 +117,70 @@ impl<'a> Stream for Scanner<'a> {
}
}
}
impl<'a> Stream for Scanner<'a, Key> {
type Item = Result<Key, Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Result<Key, Error>>> {
// If we have results, return the first one
if let Some(v) = self.results.pop_front() {
return Poll::Ready(Some(Ok(v)));
}
// If we won't fetch more results then exit
if self.exhausted {
return Poll::Ready(None);
}
// Check if there is no pending future task
if self.future.is_none() {
// Set the max number of results to fetch
let num = std::cmp::min(*MAX_STREAM_BATCH_SIZE, self.batch);
// Clone the range to use when scanning
let range = self.range.clone();
// Prepare a future to scan for results
self.future = Some(Box::pin(self.store.keys(range, num)));
}
// Try to resolve the future
match self.future.as_mut().unwrap().poll_unpin(cx) {
// The future has now completed fully
Poll::Ready(result) => {
// Drop the completed asynchronous future
self.future = None;
// Check the result of the finished future
match result {
// The range was fetched successfully
Ok(v) => match v.is_empty() {
// There are no more results to stream
true => {
// Mark this stream as complete
Poll::Ready(None)
}
// There are results which need streaming
false => {
// We fetched the last elements in the range
if v.len() < self.batch as usize {
self.exhausted = true;
}
// Get the last element of the results
let last = v.last().ok_or_else(|| {
Error::Unreachable("Last key can't be none".to_string())
})?;
// Start the next scan from the last result
self.range.start.clone_from(last);
// Ensure we don't see the last result again
self.range.start.push(0xff);
// Store the fetched range results
self.results.extend(v);
// Remove the first result to return
let item = self.results.pop_front().unwrap();
// Return the first result
Poll::Ready(Some(Ok(item)))
}
},
// Return the received error
Err(error) => Poll::Ready(Some(Err(error))),
}
}
// The future has not yet completed
Poll::Pending => Poll::Pending,
}
}
}

View file

@ -279,7 +279,7 @@ impl Transaction {
where
K: Into<Key> + Debug,
{
Scanner::new(
Scanner::<(Key, Val)>::new(
self,
*NORMAL_FETCH_SIZE,
Range {
@ -290,6 +290,22 @@ impl Transaction {
)
}
#[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip_all)]
pub fn stream_keys<K>(&self, rng: Range<K>) -> impl Stream<Item = Result<Key, Error>> + '_
where
K: Into<Key> + Debug,
{
Scanner::<Key>::new(
self,
*NORMAL_FETCH_SIZE,
Range {
start: rng.start.into(),
end: rng.end.into(),
},
None,
)
}
// --------------------------------------------------
// Rollback methods
// --------------------------------------------------

View file

@ -41,6 +41,25 @@ impl Fields {
_ => None,
}
}
/// Check if the fields are only about counting
pub(crate) fn is_count_all_only(&self) -> bool {
let mut is_count_only = false;
for field in &self.0 {
if let Field::Single {
expr: Value::Function(func),
..
} = field
{
if func.is_count_all() {
is_count_only = true;
continue;
}
}
return false;
}
is_count_only
}
}
impl Deref for Fields {

View file

@ -180,6 +180,10 @@ impl Function {
_ => OptimisedAggregate::None,
}
}
pub(crate) fn is_count_all(&self) -> bool {
matches!(self, Self::Normal(f, p) if f == "count" && p.is_empty() )
}
}
impl Function {

View file

@ -76,13 +76,6 @@ impl SelectStatement {
let version = self.version.as_ref().map(|v| v.to_u64());
let opt =
Arc::new(opt.new_with_futures(false).with_projections(true).with_version(version));
// Get a query planner
let mut planner = QueryPlanner::new(
opt.clone(),
self.with.as_ref().cloned().map(|w| w.into()),
self.cond.as_ref().cloned().map(|c| c.into()),
self.order.as_ref().cloned().map(|o| o.into()),
);
// Extract the limit
let limit = i.setup_limit(stk, ctx, &opt, &stm).await?;
// Used for ONLY: is the limit 1?
@ -103,6 +96,9 @@ impl SelectStatement {
}
None => ctx.clone(),
};
// Get a query planner
let mut planner = QueryPlanner::new();
let params = self.into();
// Loop over the select targets
for w in self.what.0.iter() {
let v = w.compute(stk, &ctx, &opt, doc).await?;
@ -111,7 +107,7 @@ impl SelectStatement {
if self.only && !limit_is_one_or_zero {
return Err(Error::SingleOnlyOutput);
}
planner.add_iterables(stk, &ctx, t, &mut i).await?;
planner.add_iterables(stk, &ctx, &opt, t, &params, &mut i).await?;
}
Value::Thing(v) => match &v.id {
Id::Range(r) => i.ingest(Iterable::TableRange(v.tb, *r.to_owned())),
@ -141,7 +137,7 @@ impl SelectStatement {
for v in v {
match v {
Value::Table(t) => {
planner.add_iterables(stk, &ctx, t, &mut i).await?;
planner.add_iterables(stk, &ctx, &opt, t, &params, &mut i).await?;
}
Value::Thing(v) => i.ingest(Iterable::Thing(v)),
Value::Edges(v) => i.ingest(Iterable::Edges(*v)),

View file

@ -1,6 +1,7 @@
mod parse;
use parse::Parse;
mod helpers;
use crate::helpers::Test;
use helpers::new_ds;
use helpers::skip_ok;
use surrealdb::dbs::Session;
@ -734,3 +735,82 @@ async fn select_aggregate_mean_update() -> Result<(), Error> {
Ok(())
}
#[tokio::test]
async fn select_count_group_all() -> Result<(), Error> {
let sql = r#"
CREATE table CONTENT { bar: "hello", foo: "Man"};
CREATE table CONTENT { bar: "hello", foo: "World"};
CREATE table CONTENT { bar: "world"};
SELECT COUNT() FROM table GROUP ALL EXPLAIN;
SELECT COUNT() FROM table GROUP ALL;
SELECT COUNT() FROM table EXPLAIN;
SELECT COUNT() FROM table;
"#;
let mut t = Test::new(sql).await?;
t.expect_size(7)?;
//
t.skip_ok(3)?;
//
t.expect_val(
r#"[
{
detail: {
table: 'table'
},
operation: 'Iterate Table Keys'
},
{
detail: {
idioms: {
count: [
'count'
]
},
type: 'Group'
},
operation: 'Collector'
}
]"#,
)?;
//
t.expect_val(
r#"[
{
count: 3
}
]"#,
)?;
//
t.expect_val(
r#"[
{
detail: {
table: 'table'
},
operation: 'Iterate Table Keys'
},
{
detail: {
type: 'Memory'
},
operation: 'Collector'
}
]"#,
)?;
//
t.expect_val(
r#"[
{
count: 1
},
{
count: 1
},
{
count: 1
}
]"#,
)?;
Ok(())
}