From 063f4e6665660d2fc8d184dad71551f643900e39 Mon Sep 17 00:00:00 2001 From: Emmanuel Keller Date: Thu, 20 Jul 2023 13:56:32 +0100 Subject: [PATCH] feat: implements Multi-index execution plan (#2280) --- lib/src/ctx/context.rs | 24 +-- lib/src/dbs/distinct.rs | 99 ++++++++++ lib/src/dbs/explanation.rs | 4 +- lib/src/dbs/iterator.rs | 52 +++-- lib/src/dbs/mod.rs | 1 + lib/src/dbs/processor.rs | 189 ++++++++++++------ lib/src/doc/compute.rs | 13 +- lib/src/doc/document.rs | 37 +++- lib/src/fnc/operate.rs | 20 +- lib/src/fnc/search.rs | 63 +++--- lib/src/idx/ft/mod.rs | 9 +- lib/src/idx/ft/scorer.rs | 7 +- lib/src/idx/planner/executor.rs | 103 +++++++--- lib/src/idx/planner/iterators.rs | 126 ++++++++++++ lib/src/idx/planner/mod.rs | 114 +++++------ lib/src/idx/planner/plan.rs | 325 ++++++++----------------------- lib/src/sql/field.rs | 2 +- lib/src/sql/statements/select.rs | 6 +- lib/src/sql/value/del.rs | 7 +- lib/src/sql/value/fetch.rs | 3 +- lib/src/sql/value/get.rs | 6 +- lib/src/sql/value/set.rs | 5 +- lib/tests/matches.rs | 16 +- lib/tests/planner.rs | 109 +++++++++++ 24 files changed, 834 insertions(+), 506 deletions(-) create mode 100644 lib/src/dbs/distinct.rs create mode 100644 lib/src/idx/planner/iterators.rs create mode 100644 lib/tests/planner.rs diff --git a/lib/src/ctx/context.rs b/lib/src/ctx/context.rs index 917142a5..a074e51d 100644 --- a/lib/src/ctx/context.rs +++ b/lib/src/ctx/context.rs @@ -1,7 +1,7 @@ use crate::ctx::canceller::Canceller; use crate::ctx::reason::Reason; use crate::dbs::Notification; -use crate::idx::planner::executor::QueryExecutor; +use crate::idx::planner::QueryPlanner; use crate::sql::value::Value; use channel::Sender; use std::borrow::Cow; @@ -34,8 +34,8 @@ pub struct Context<'a> { values: HashMap, Cow<'a, Value>>, // Stores the notification channel if available notifications: Option>, - // An optional query executor - query_executors: Option>>, + // An optional query planner + query_planner: Option<&'a QueryPlanner<'a>>, } impl<'a> Default for Context<'a> { @@ -64,7 +64,7 @@ impl<'a> Context<'a> { deadline: None, cancelled: Arc::new(AtomicBool::new(false)), notifications: None, - query_executors: None, + query_planner: None, } } @@ -76,7 +76,7 @@ impl<'a> Context<'a> { deadline: parent.deadline, cancelled: Arc::new(AtomicBool::new(false)), notifications: parent.notifications.clone(), - query_executors: parent.query_executors.clone(), + query_planner: parent.query_planner, } } @@ -118,9 +118,9 @@ impl<'a> Context<'a> { self.notifications = chn.cloned() } - /// Set the query executors - pub(crate) fn set_query_executors(&mut self, executors: HashMap) { - self.query_executors = Some(Arc::new(executors)); + /// Set the query planner + pub(crate) fn set_query_planner(&mut self, qp: &'a QueryPlanner) { + self.query_planner = Some(qp); } /// Get the timeout for this operation, if any. This is useful for @@ -133,12 +133,8 @@ impl<'a> Context<'a> { self.notifications.clone() } - pub(crate) fn get_query_executor(&self, tb: &str) -> Option<&QueryExecutor> { - if let Some(qe) = &self.query_executors { - qe.get(tb) - } else { - None - } + pub(crate) fn get_query_planner(&self) -> Option<&QueryPlanner> { + self.query_planner } /// Check if the context is done. If it returns `None` the operation may diff --git a/lib/src/dbs/distinct.rs b/lib/src/dbs/distinct.rs new file mode 100644 index 00000000..7e6dad05 --- /dev/null +++ b/lib/src/dbs/distinct.rs @@ -0,0 +1,99 @@ +use crate::ctx::Context; +use crate::dbs::{Iterable, Processed}; +use crate::kvs::Key; +use radix_trie::Trie; +use std::default::Default; +#[cfg(not(target_arch = "wasm32"))] +use std::sync::Arc; +#[cfg(not(target_arch = "wasm32"))] +use tokio::sync::Mutex; + +// TODO: This is currently processed in memory. In the future is should be on disk (mmap?) +type Distinct = Trie; + +#[derive(Default)] +pub(crate) struct SyncDistinct { + processed: Distinct, +} + +impl SyncDistinct { + pub(super) fn new(ctx: &Context<'_>) -> Option { + if let Some(pla) = ctx.get_query_planner() { + if pla.requires_distinct() { + return Some(Self::default()); + } + } + None + } + + fn is_distinct(ctx: &Context<'_>, i: &Iterable) -> bool { + if let Iterable::Index(t, ir, _) = i { + if let Some(pla) = ctx.get_query_planner() { + if let Some(exe) = pla.get_query_executor(&t.0) { + return exe.is_distinct(*ir); + } + } + } + false + } + + pub(super) fn requires_distinct<'a>( + ctx: &Context<'_>, + dis: Option<&'a mut SyncDistinct>, + i: &Iterable, + ) -> Option<&'a mut SyncDistinct> { + if dis.is_some() && Self::is_distinct(ctx, i) { + dis + } else { + None + } + } + + pub(super) fn check_already_processed(&mut self, pro: &Processed) -> bool { + if let Some(key) = pro.rid.as_ref().map(|rid| rid.to_vec()) { + if self.processed.get(&key).is_some() { + true + } else { + self.processed.insert(key, true); + false + } + } else { + false + } + } +} + +#[cfg(not(target_arch = "wasm32"))] +#[derive(Default, Clone)] +pub(crate) struct AsyncDistinct { + processed: Arc>, +} + +#[cfg(not(target_arch = "wasm32"))] +impl AsyncDistinct { + pub(super) fn new(ctx: &Context<'_>) -> Option { + if let Some(pla) = ctx.get_query_planner() { + if pla.requires_distinct() { + return Some(Self::default()); + } + } + None + } + + pub(super) fn requires_distinct( + ctx: &Context<'_>, + dis: Option<&AsyncDistinct>, + i: &Iterable, + ) -> Option { + if let Some(dis) = dis { + if SyncDistinct::is_distinct(ctx, i) { + return Some(dis.clone()); + } + } + None + } + + pub(super) async fn check_already_processed(&self, pro: &Processed) -> bool { + self.processed.lock().await.check_already_processed(pro) + } +} diff --git a/lib/src/dbs/explanation.rs b/lib/src/dbs/explanation.rs index 726b2851..84d43859 100644 --- a/lib/src/dbs/explanation.rs +++ b/lib/src/dbs/explanation.rs @@ -81,9 +81,9 @@ impl ExplainItem { ("thing-3", Value::Thing(t3.to_owned())), ], }, - Iterable::Index(t, p) => Self { + Iterable::Index(t, _, io) => Self { name: "Iterate Index".into(), - details: vec![("table", Value::from(t.0.to_owned())), ("plan", p.explain())], + details: vec![("table", Value::from(t.0.to_owned())), ("plan", io.explain())], }, } } diff --git a/lib/src/dbs/iterator.rs b/lib/src/dbs/iterator.rs index 7595414f..9558fa0d 100644 --- a/lib/src/dbs/iterator.rs +++ b/lib/src/dbs/iterator.rs @@ -1,13 +1,16 @@ use crate::ctx::Canceller; use crate::ctx::Context; +#[cfg(not(target_arch = "wasm32"))] +use crate::dbs::distinct::AsyncDistinct; +use crate::dbs::distinct::SyncDistinct; use crate::dbs::explanation::Explanation; use crate::dbs::Statement; use crate::dbs::{Options, Transaction}; -use crate::doc::CursorDoc; use crate::doc::Document; use crate::err::Error; use crate::idx::ft::docids::DocId; -use crate::idx::planner::plan::Plan; +use crate::idx::planner::executor::IteratorRef; +use crate::idx::planner::plan::IndexOption; use crate::sql::array::Array; use crate::sql::edges::Edges; use crate::sql::field::Field; @@ -29,7 +32,14 @@ pub(crate) enum Iterable { Edges(Edges), Mergeable(Thing, Value), Relatable(Thing, Thing, Thing), - Index(Table, Plan), + Index(Table, IteratorRef, IndexOption), +} + +pub(crate) struct Processed { + pub(crate) ir: Option, + pub(crate) rid: Option, + pub(crate) doc_id: Option, + pub(crate) val: Operable, } pub(crate) enum Operable { @@ -55,6 +65,7 @@ pub(crate) struct Iterator { // Iterator runtime error error: Option, // Iterator output results + // TODO: Should be stored on disk / (mmap?) results: Vec, // Iterator input values entries: Vec, @@ -261,10 +272,10 @@ impl Iterator { _ => { let x = vals.first(); let x = if let Some(alias) = alias { - let cur = CursorDoc::new(None, None, &x); + let cur = (&x).into(); alias.compute(ctx, opt, txn, Some(&cur)).await? } else { - let cur = CursorDoc::new(None, None, &x); + let cur = (&x).into(); expr.compute(ctx, opt, txn, Some(&cur)).await? }; obj.set(ctx, opt, txn, idiom.as_ref(), x).await?; @@ -378,9 +389,13 @@ impl Iterator { ) -> Result<(), Error> { // Prevent deep recursion let opt = &opt.dive(4)?; + // If any iterator requires distinct, we new to create a global distinct instance + let mut distinct = SyncDistinct::new(ctx); // Process all prepared values for v in mem::take(&mut self.entries) { - v.iterate(ctx, opt, txn, stm, self).await?; + // Distinct is passed only for iterators that really requires it + let dis = SyncDistinct::requires_distinct(ctx, distinct.as_mut(), &v); + v.iterate(ctx, opt, txn, stm, self, dis).await?; } // Everything processed ok Ok(()) @@ -401,15 +416,21 @@ impl Iterator { match stm.parallel() { // Run statements sequentially false => { + // If any iterator requires distinct, we new to create a global distinct instance + let mut distinct = SyncDistinct::new(ctx); // Process all prepared values for v in mem::take(&mut self.entries) { - v.iterate(ctx, opt, txn, stm, self).await?; + // Distinct is passed only for iterators that really requires it + let dis = SyncDistinct::requires_distinct(ctx, distinct.as_mut(), &v); + v.iterate(ctx, opt, txn, stm, self, dis).await?; } // Everything processed ok Ok(()) } // Run statements in parallel true => { + // If any iterator requires distinct, we new to create a global distinct instance + let distinct = AsyncDistinct::new(ctx); // Create a new executor let e = executor::Executor::new(); // Take all of the iterator values @@ -422,7 +443,9 @@ impl Iterator { let adocs = async { // Process all prepared values for v in vals { - e.spawn(v.channel(ctx, opt, txn, stm, chn.clone())) + // Distinct is passed only for iterators that really requires it + let dis = AsyncDistinct::requires_distinct(ctx, distinct.as_ref(), &v); + e.spawn(v.channel(ctx, opt, txn, stm, chn.clone(), dis)) // Ensure we detach the spawned task .detach(); } @@ -434,8 +457,8 @@ impl Iterator { // Create an async closure for received values let avals = async { // Process all received values - while let Ok((k, d, v)) = docs.recv().await { - e.spawn(Document::compute(ctx, opt, txn, stm, chn.clone(), k, d, v)) + while let Ok(pro) = docs.recv().await { + e.spawn(Document::compute(ctx, opt, txn, stm, chn.clone(), pro)) // Ensure we detach the spawned task .detach(); } @@ -464,29 +487,26 @@ impl Iterator { } /// Process a new record Thing and Value - #[allow(clippy::too_many_arguments)] pub async fn process( &mut self, ctx: &Context<'_>, opt: &Options, txn: &Transaction, stm: &Statement<'_>, - thg: Option, - doc_id: Option, - val: Operable, + pro: Processed, ) { // Check current context if ctx.is_done() { return; } // Setup a new workable - let (val, ext) = match val { + let (val, ext) = match pro.val { Operable::Value(v) => (v, Workable::Normal), Operable::Mergeable(v, o) => (v, Workable::Insert(o)), Operable::Relatable(f, v, w) => (v, Workable::Relate(f, w)), }; // Setup a new document - let mut doc = Document::new(thg.as_ref(), doc_id, &val, ext); + let mut doc = Document::new(pro.ir, pro.rid.as_ref(), pro.doc_id, &val, ext); // Process the document let res = match stm { Statement::Select(_) => doc.select(ctx, opt, txn, stm).await, diff --git a/lib/src/dbs/mod.rs b/lib/src/dbs/mod.rs index c95b7403..cf925ed9 100644 --- a/lib/src/dbs/mod.rs +++ b/lib/src/dbs/mod.rs @@ -3,6 +3,7 @@ //! glue between the API and the response. In this module we use channels as a transport layer //! and executors to process the operations. This module also gives a `context` to the transaction. mod auth; +mod distinct; mod executor; mod explanation; mod iterator; diff --git a/lib/src/dbs/processor.rs b/lib/src/dbs/processor.rs index 027dafec..bccf9d19 100644 --- a/lib/src/dbs/processor.rs +++ b/lib/src/dbs/processor.rs @@ -1,8 +1,11 @@ use crate::ctx::Context; -use crate::dbs::{Iterable, Iterator, Operable, Options, Statement, Transaction}; +#[cfg(not(target_arch = "wasm32"))] +use crate::dbs::distinct::AsyncDistinct; +use crate::dbs::distinct::SyncDistinct; +use crate::dbs::{Iterable, Iterator, Operable, Options, Processed, Statement, Transaction}; use crate::err::Error; -use crate::idx::ft::docids::DocId; -use crate::idx::planner::plan::Plan; +use crate::idx::planner::executor::IteratorRef; +use crate::idx::planner::plan::IndexOption; use crate::key::{graph, thing}; use crate::sql::dir::Dir; use crate::sql::{Edges, Range, Table, Thing, Value}; @@ -18,8 +21,9 @@ impl Iterable { txn: &Transaction, stm: &Statement<'_>, ite: &mut Iterator, + dis: Option<&mut SyncDistinct>, ) -> Result<(), Error> { - Processor::Iterator(ite).process_iterable(ctx, opt, txn, stm, self).await + Processor::Iterator(dis, ite).process_iterable(ctx, opt, txn, stm, self).await } #[cfg(not(target_arch = "wasm32"))] @@ -29,37 +33,49 @@ impl Iterable { opt: &Options, txn: &Transaction, stm: &Statement<'_>, - chn: Sender<(Option, Option, Operable)>, + chn: Sender, + dis: Option, ) -> Result<(), Error> { - Processor::Channel(chn).process_iterable(ctx, opt, txn, stm, self).await + Processor::Channel(dis, chn).process_iterable(ctx, opt, txn, stm, self).await } } enum Processor<'a> { - Iterator(&'a mut Iterator), + Iterator(Option<&'a mut SyncDistinct>, &'a mut Iterator), #[cfg(not(target_arch = "wasm32"))] - Channel(Sender<(Option, Option, Operable)>), + Channel(Option, Sender), } impl<'a> Processor<'a> { - #[allow(clippy::too_many_arguments)] async fn process( &mut self, ctx: &Context<'_>, opt: &Options, txn: &Transaction, stm: &Statement<'_>, - rid: Option, - doc_id: Option, - val: Operable, + pro: Processed, ) -> Result<(), Error> { match self { - Processor::Iterator(ite) => { - ite.process(ctx, opt, txn, stm, rid, doc_id, val).await; + Processor::Iterator(distinct, ite) => { + let is_processed = if let Some(d) = distinct { + d.check_already_processed(&pro) + } else { + false + }; + if !is_processed { + ite.process(ctx, opt, txn, stm, pro).await; + } } #[cfg(not(target_arch = "wasm32"))] - Processor::Channel(chn) => { - chn.send((rid, doc_id, val)).await?; + Processor::Channel(distinct, chn) => { + let is_processed = if let Some(d) = distinct { + d.check_already_processed(&pro).await + } else { + false + }; + if !is_processed { + chn.send(pro).await?; + } } }; Ok(()) @@ -80,7 +96,9 @@ impl<'a> Processor<'a> { Iterable::Table(v) => self.process_table(ctx, opt, txn, stm, v).await?, Iterable::Range(v) => self.process_range(ctx, opt, txn, stm, v).await?, Iterable::Edges(e) => self.process_edge(ctx, opt, txn, stm, e).await?, - Iterable::Index(t, p) => self.process_index(ctx, opt, txn, stm, t, p).await?, + Iterable::Index(t, ir, io) => { + self.process_index(ctx, opt, txn, stm, t, ir, io).await? + } Iterable::Mergeable(v, o) => { self.process_mergeable(ctx, opt, txn, stm, v, o).await? } @@ -101,9 +119,14 @@ impl<'a> Processor<'a> { v: Value, ) -> Result<(), Error> { // Pass the value through - let val = Operable::Value(v); + let pro = Processed { + ir: None, + rid: None, + doc_id: None, + val: Operable::Value(v), + }; // Process the document record - self.process(ctx, opt, txn, stm, None, None, val).await + self.process(ctx, opt, txn, stm, pro).await } async fn process_thing( @@ -125,7 +148,13 @@ impl<'a> Processor<'a> { None => Value::None, }); // Process the document record - self.process(ctx, opt, txn, stm, Some(v), None, val).await?; + let pro = Processed { + ir: None, + rid: Some(v), + doc_id: None, + val, + }; + self.process(ctx, opt, txn, stm, pro).await?; // Everything ok Ok(()) } @@ -152,7 +181,13 @@ impl<'a> Processor<'a> { // Create a new operable value let val = Operable::Mergeable(x, o); // Process the document record - self.process(ctx, opt, txn, stm, Some(v), None, val).await?; + let pro = Processed { + ir: None, + rid: Some(v), + doc_id: None, + val, + }; + self.process(ctx, opt, txn, stm, pro).await?; // Everything ok Ok(()) } @@ -181,7 +216,13 @@ impl<'a> Processor<'a> { // Create a new operable value let val = Operable::Relatable(f, x, w); // Process the document record - self.process(ctx, opt, txn, stm, Some(v), None, val).await?; + let pro = Processed { + ir: None, + rid: Some(v), + doc_id: None, + val, + }; + self.process(ctx, opt, txn, stm, pro).await?; // Everything ok Ok(()) } @@ -242,7 +283,13 @@ impl<'a> Processor<'a> { // Create a new operable value let val = Operable::Value(val); // Process the record - self.process(ctx, opt, txn, stm, Some(rid), None, val).await?; + let pro = Processed { + ir: None, + rid: Some(rid), + doc_id: None, + val, + }; + self.process(ctx, opt, txn, stm, pro).await?; } continue; } @@ -325,7 +372,13 @@ impl<'a> Processor<'a> { // Create a new operable value let val = Operable::Value(val); // Process the record - self.process(ctx, opt, txn, stm, Some(rid), None, val).await?; + let pro = Processed { + ir: None, + rid: Some(rid), + doc_id: None, + val, + }; + self.process(ctx, opt, txn, stm, pro).await?; } continue; } @@ -465,7 +518,13 @@ impl<'a> Processor<'a> { None => Value::None, }); // Process the record - self.process(ctx, opt, txn, stm, Some(rid), None, val).await?; + let pro = Processed { + ir: None, + rid: Some(rid), + doc_id: None, + val, + }; + self.process(ctx, opt, txn, stm, pro).await?; } continue; } @@ -476,6 +535,7 @@ impl<'a> Processor<'a> { Ok(()) } + #[allow(clippy::too_many_arguments)] async fn process_index( &mut self, ctx: &Context<'_>, @@ -483,53 +543,60 @@ impl<'a> Processor<'a> { txn: &Transaction, stm: &Statement<'_>, table: Table, - plan: Plan, + ir: IteratorRef, + io: IndexOption, ) -> Result<(), Error> { // Check that the table exists txn.lock().await.check_ns_db_tb(opt.ns(), opt.db(), &table.0, opt.strict).await?; - let exe = ctx.get_query_executor(&table.0); - if let Some(exe) = exe { - let mut iterator = plan.new_iterator(opt, txn, exe).await?; - let mut things = iterator.next_batch(txn, 1000).await?; - while !things.is_empty() { - // Check if the context is finished - if ctx.is_done() { - break; - } - - for (thing, doc_id) in things { - // Check the context + if let Some(pla) = ctx.get_query_planner() { + if let Some(exe) = pla.get_query_executor(&table.0) { + let mut iterator = exe.new_iterator(opt, ir, io).await?; + let mut things = iterator.next_batch(txn, 1000).await?; + while !things.is_empty() { + // Check if the context is finished if ctx.is_done() { break; } - // If the record is from another table we can skip - if !thing.tb.eq(table.as_str()) { - continue; + for (thing, doc_id) in things { + // Check the context + if ctx.is_done() { + break; + } + + // If the record is from another table we can skip + if !thing.tb.eq(table.as_str()) { + continue; + } + + // Fetch the data from the store + let key = thing::new(opt.ns(), opt.db(), &table.0, &thing.id); + let val = txn.lock().await.get(key.clone()).await?; + let rid = Thing::from((key.tb, key.id)); + // Parse the data from the store + let val = Operable::Value(match val { + Some(v) => Value::from(v), + None => Value::None, + }); + // Process the document record + let pro = Processed { + ir: Some(ir), + rid: Some(rid), + doc_id: Some(doc_id), + val, + }; + self.process(ctx, opt, txn, stm, pro).await?; } - // Fetch the data from the store - let key = thing::new(opt.ns(), opt.db(), &table.0, &thing.id); - let val = txn.lock().await.get(key.clone()).await?; - let rid = Thing::from((key.tb, key.id)); - // Parse the data from the store - let val = Operable::Value(match val { - Some(v) => Value::from(v), - None => Value::None, - }); - // Process the document record - self.process(ctx, opt, txn, stm, Some(rid), Some(doc_id), val).await?; + // Collect the next batch of ids + things = iterator.next_batch(txn, 1000).await?; } - - // Collect the next batch of ids - things = iterator.next_batch(txn, 1000).await?; + // Everything ok + return Ok(()); } - // Everything ok - Ok(()) - } else { - Err(Error::QueryNotExecutedDetail { - message: "The QueryExecutor has not been found.".to_string(), - }) } + Err(Error::QueryNotExecutedDetail { + message: "No QueryExecutor has not been found.".to_string(), + }) } } diff --git a/lib/src/doc/compute.rs b/lib/src/doc/compute.rs index a5652490..bfcac51d 100644 --- a/lib/src/doc/compute.rs +++ b/lib/src/doc/compute.rs @@ -1,36 +1,31 @@ use crate::ctx::Context; -use crate::dbs::Options; use crate::dbs::Statement; use crate::dbs::Workable; use crate::dbs::{Operable, Transaction}; +use crate::dbs::{Options, Processed}; use crate::doc::Document; use crate::err::Error; -use crate::idx::ft::docids::DocId; -use crate::sql::thing::Thing; use crate::sql::value::Value; use channel::Sender; impl<'a> Document<'a> { #[allow(dead_code)] - #[allow(clippy::too_many_arguments)] pub(crate) async fn compute( ctx: &Context<'_>, opt: &Options, txn: &Transaction, stm: &Statement<'_>, chn: Sender>, - thg: Option, - doc_id: Option, - val: Operable, + pro: Processed, ) -> Result<(), Error> { // Setup a new workable - let ins = match val { + let ins = match pro.val { Operable::Value(v) => (v, Workable::Normal), Operable::Mergeable(v, o) => (v, Workable::Insert(o)), Operable::Relatable(f, v, w) => (v, Workable::Relate(f, w)), }; // Setup a new document - let mut doc = Document::new(thg.as_ref(), doc_id, &ins.0, ins.1); + let mut doc = Document::new(pro.ir, pro.rid.as_ref(), pro.doc_id, &ins.0, ins.1); // Process the statement let res = match stm { Statement::Select(_) => doc.select(ctx, opt, txn, stm).await, diff --git a/lib/src/doc/document.rs b/lib/src/doc/document.rs index 14b9e75d..13ba8553 100644 --- a/lib/src/doc/document.rs +++ b/lib/src/doc/document.rs @@ -4,6 +4,7 @@ use crate::dbs::Transaction; use crate::dbs::Workable; use crate::err::Error; use crate::idx::ft::docids::DocId; +use crate::idx::planner::executor::IteratorRef; use crate::sql::statements::define::DefineEventStatement; use crate::sql::statements::define::DefineFieldStatement; use crate::sql::statements::define::DefineIndexStatement; @@ -23,14 +24,21 @@ pub(crate) struct Document<'a> { } pub struct CursorDoc<'a> { + pub(crate) ir: Option, pub(crate) rid: Option<&'a Thing>, pub(crate) doc: Cow<'a, Value>, pub(crate) doc_id: Option, } impl<'a> CursorDoc<'a> { - pub(crate) fn new(rid: Option<&'a Thing>, doc_id: Option, doc: &'a Value) -> Self { + pub(crate) fn new( + ir: Option, + rid: Option<&'a Thing>, + doc_id: Option, + doc: &'a Value, + ) -> Self { Self { + ir, rid, doc: Cow::Borrowed(doc), doc_id, @@ -38,6 +46,28 @@ impl<'a> CursorDoc<'a> { } } +impl<'a> From<&'a Value> for CursorDoc<'a> { + fn from(doc: &'a Value) -> Self { + Self { + ir: None, + rid: None, + doc: Cow::Borrowed(doc), + doc_id: None, + } + } +} + +impl<'a> From<&'a mut Value> for CursorDoc<'a> { + fn from(doc: &'a mut Value) -> Self { + Self { + ir: None, + rid: None, + doc: Cow::Borrowed(doc), + doc_id: None, + } + } +} + impl<'a> Debug for Document<'a> { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { write!(f, "Document - id: <{:?}>", self.id) @@ -52,6 +82,7 @@ impl<'a> From<&Document<'a>> for Vec { impl<'a> Document<'a> { pub fn new( + ir: Option, id: Option<&'a Thing>, doc_id: Option, val: &'a Value, @@ -60,8 +91,8 @@ impl<'a> Document<'a> { Document { id, extras, - current: CursorDoc::new(id, doc_id, val), - initial: CursorDoc::new(id, doc_id, val), + current: CursorDoc::new(ir, id, doc_id, val), + initial: CursorDoc::new(ir, id, doc_id, val), } } } diff --git a/lib/src/fnc/operate.rs b/lib/src/fnc/operate.rs index 07ff67de..aa9cc69e 100644 --- a/lib/src/fnc/operate.rs +++ b/lib/src/fnc/operate.rs @@ -171,13 +171,25 @@ pub(crate) async fn matches( ctx: &Context<'_>, txn: &Transaction, doc: Option<&CursorDoc<'_>>, - e: &Expression, + exp: &Expression, ) -> Result { if let Some(doc) = doc { if let Some(thg) = doc.rid { - if let Some(exe) = ctx.get_query_executor(&thg.tb) { - // Check the matches - return exe.matches(txn, thg, e).await; + if let Some(pla) = ctx.get_query_planner() { + if let Some(exe) = pla.get_query_executor(&thg.tb) { + // If we find the expression in `pre_match`, + // it means that we are using an Iterator::Index + // and we are iterating over documents that already matches the expression. + if let Some(ir) = doc.ir { + if let Some(e) = exe.get_iterator_expression(ir) { + if e.eq(exp) { + return Ok(Value::Bool(true)); + } + } + } + // Evaluate the matches + return exe.matches(txn, thg, exp).await; + } } } } diff --git a/lib/src/fnc/search.rs b/lib/src/fnc/search.rs index 771cb33d..f3e06935 100644 --- a/lib/src/fnc/search.rs +++ b/lib/src/fnc/search.rs @@ -2,54 +2,57 @@ use crate::ctx::Context; use crate::dbs::Transaction; use crate::doc::CursorDoc; use crate::err::Error; -use crate::sql::Value; +use crate::idx::planner::executor::QueryExecutor; +use crate::sql::{Thing, Value}; + +fn get_execution_context<'a>( + ctx: &'a Context<'_>, + txn: Option<&'a Transaction>, + doc: Option<&'a CursorDoc<'_>>, +) -> Option<(&'a Transaction, &'a QueryExecutor, &'a CursorDoc<'a>, &'a Thing)> { + if let Some(txn) = txn { + if let Some(doc) = doc { + if let Some(thg) = doc.rid { + if let Some(pla) = ctx.get_query_planner() { + if let Some(exe) = pla.get_query_executor(&thg.tb) { + return Some((txn, exe, doc, thg)); + } + } + } + } + } + None +} pub async fn score( (ctx, txn, doc): (&Context<'_>, Option<&Transaction>, Option<&CursorDoc<'_>>), (match_ref,): (Value,), ) -> Result { - if let Some(txn) = txn { - if let Some(doc) = doc { - if let Some(thg) = doc.rid { - if let Some(exe) = ctx.get_query_executor(&thg.tb) { - return exe.score(txn, &match_ref, thg, doc.doc_id).await; - } - } - } + if let Some((txn, exe, doc, thg)) = get_execution_context(ctx, txn, doc) { + exe.score(txn, &match_ref, thg, doc.doc_id).await + } else { + Ok(Value::None) } - Ok(Value::None) } pub async fn highlight( (ctx, txn, doc): (&Context<'_>, Option<&Transaction>, Option<&CursorDoc<'_>>), (prefix, suffix, match_ref): (Value, Value, Value), ) -> Result { - if let Some(txn) = txn { - if let Some(doc) = doc { - if let Some(thg) = doc.rid { - if let Some(exe) = ctx.get_query_executor(&thg.tb) { - return exe - .highlight(txn, thg, prefix, suffix, &match_ref, doc.doc.as_ref()) - .await; - } - } - } + if let Some((txn, exe, doc, thg)) = get_execution_context(ctx, txn, doc) { + exe.highlight(txn, thg, prefix, suffix, &match_ref, doc.doc.as_ref()).await + } else { + Ok(Value::None) } - Ok(Value::None) } pub async fn offsets( (ctx, txn, doc): (&Context<'_>, Option<&Transaction>, Option<&CursorDoc<'_>>), (match_ref,): (Value,), ) -> Result { - if let Some(txn) = txn { - if let Some(doc) = doc { - if let Some(thg) = doc.rid { - if let Some(exe) = ctx.get_query_executor(&thg.tb) { - return exe.offsets(txn, thg, &match_ref).await; - } - } - } + if let Some((txn, exe, _, thg)) = get_execution_context(ctx, txn, doc) { + exe.offsets(txn, thg, &match_ref).await + } else { + Ok(Value::None) } - Ok(Value::None) } diff --git a/lib/src/idx/ft/mod.rs b/lib/src/idx/ft/mod.rs index 6e6706a2..7384c86a 100644 --- a/lib/src/idx/ft/mod.rs +++ b/lib/src/idx/ft/mod.rs @@ -17,7 +17,7 @@ use crate::idx::ft::highlighter::{Highlighter, Offseter}; use crate::idx::ft::offsets::Offsets; use crate::idx::ft::postings::Postings; use crate::idx::ft::scorer::BM25Scorer; -use crate::idx::ft::termdocs::TermDocs; +use crate::idx::ft::termdocs::{TermDocs, TermsDocs}; use crate::idx::ft::terms::{TermId, Terms}; use crate::idx::{btree, IndexKeyBase, SerdeState}; use crate::kvs::{Key, Transaction}; @@ -320,7 +320,7 @@ impl FtIndex { pub(super) fn new_hits_iterator( &self, - terms_docs: Arc>>, + terms_docs: TermsDocs, ) -> Result, Error> { let mut hits: Option = None; for opt_term_docs in terms_docs.iter() { @@ -342,10 +342,7 @@ impl FtIndex { Ok(None) } - pub(super) fn new_scorer( - &self, - terms_docs: Arc>>, - ) -> Result, Error> { + pub(super) fn new_scorer(&self, terms_docs: TermsDocs) -> Result, Error> { if let Some(bm25) = &self.bm25 { return Ok(Some(BM25Scorer::new( self.postings.clone(), diff --git a/lib/src/idx/ft/scorer.rs b/lib/src/idx/ft/scorer.rs index f22d3a13..eea8457d 100644 --- a/lib/src/idx/ft/scorer.rs +++ b/lib/src/idx/ft/scorer.rs @@ -2,10 +2,9 @@ use crate::err::Error; use crate::idx::ft::docids::DocId; use crate::idx::ft::doclength::{DocLength, DocLengths}; use crate::idx::ft::postings::{Postings, TermFrequency}; -use crate::idx::ft::terms::TermId; +use crate::idx::ft::termdocs::TermsDocs; use crate::idx::ft::Bm25Params; use crate::kvs::Transaction; -use roaring::RoaringTreemap; use std::sync::Arc; use tokio::sync::RwLock; @@ -13,7 +12,7 @@ pub(super) type Score = f32; pub(crate) struct BM25Scorer { postings: Arc>, - terms_docs: Arc>>, + terms_docs: TermsDocs, doc_lengths: Arc>, average_doc_length: f32, doc_count: f32, @@ -23,7 +22,7 @@ pub(crate) struct BM25Scorer { impl BM25Scorer { pub(super) fn new( postings: Arc>, - terms_docs: Arc>>, + terms_docs: TermsDocs, doc_lengths: Arc>, total_docs_length: u128, doc_count: u64, diff --git a/lib/src/idx/planner/executor.rs b/lib/src/idx/planner/executor.rs index 9f3593fb..b422c295 100644 --- a/lib/src/idx/planner/executor.rs +++ b/lib/src/idx/planner/executor.rs @@ -6,25 +6,28 @@ use crate::idx::ft::scorer::BM25Scorer; use crate::idx::ft::termdocs::TermsDocs; use crate::idx::ft::terms::TermId; use crate::idx::ft::{FtIndex, MatchRef}; +use crate::idx::planner::iterators::{ + MatchesThingIterator, NonUniqueEqualThingIterator, ThingIterator, UniqueEqualThingIterator, +}; use crate::idx::planner::plan::IndexOption; use crate::idx::planner::tree::IndexMap; use crate::idx::IndexKeyBase; use crate::kvs; use crate::kvs::Key; use crate::sql::index::Index; -use crate::sql::{Expression, Table, Thing, Value}; -use roaring::RoaringTreemap; +use crate::sql::{Expression, Operator, Table, Thing, Value}; use std::collections::HashMap; use std::sync::Arc; use tokio::sync::RwLock; +pub(crate) type IteratorRef = u16; + pub(crate) struct QueryExecutor { table: String, - pre_match_expression: Option, - pre_match_entry: Option, ft_map: HashMap, mr_entries: HashMap, exp_entries: HashMap, + iterators: Vec, } impl QueryExecutor { @@ -33,7 +36,6 @@ impl QueryExecutor { txn: &Transaction, table: &Table, index_map: IndexMap, - pre_match_expression: Option, ) -> Result { let mut run = txn.lock().await; @@ -82,25 +84,27 @@ impl QueryExecutor { } } - let mut pre_match_entry = None; - if let Some(exp) = &pre_match_expression { - pre_match_entry = exp_entries.get(exp).cloned(); - } Ok(Self { table: table.0.clone(), - pre_match_expression, - pre_match_entry, ft_map, mr_entries, exp_entries, + iterators: Vec::new(), }) } - pub(super) fn pre_match_terms_docs(&self) -> Option { - if let Some(entry) = &self.pre_match_entry { - return Some(entry.0.terms_docs.clone()); - } - None + pub(super) fn add_iterator(&mut self, exp: Expression) -> IteratorRef { + let ir = self.iterators.len(); + self.iterators.push(exp); + ir as IteratorRef + } + + pub(crate) fn is_distinct(&self, ir: IteratorRef) -> bool { + (ir as usize) < self.iterators.len() + } + + pub(crate) fn get_iterator_expression(&self, ir: IteratorRef) -> Option<&Expression> { + self.iterators.get(ir as usize) } fn get_match_ref(match_ref: &Value) -> Option { @@ -112,21 +116,68 @@ impl QueryExecutor { } } + pub(crate) async fn new_iterator( + &self, + opt: &Options, + ir: IteratorRef, + io: IndexOption, + ) -> Result { + match &io.ix().index { + Index::Idx => Self::new_index_iterator(opt, io), + Index::Uniq => Self::new_unique_index_iterator(opt, io), + Index::Search { + .. + } => self.new_search_index_iterator(ir, io).await, + } + } + + fn new_index_iterator(opt: &Options, io: IndexOption) -> Result { + if io.op() == &Operator::Equal { + return Ok(ThingIterator::NonUniqueEqual(NonUniqueEqualThingIterator::new( + opt, + io.ix(), + io.value(), + )?)); + } + Err(Error::BypassQueryPlanner) + } + + fn new_unique_index_iterator(opt: &Options, io: IndexOption) -> Result { + if io.op() == &Operator::Equal { + return Ok(ThingIterator::UniqueEqual(UniqueEqualThingIterator::new( + opt, + io.ix(), + io.value(), + )?)); + } + Err(Error::BypassQueryPlanner) + } + + async fn new_search_index_iterator( + &self, + ir: IteratorRef, + io: IndexOption, + ) -> Result { + if let Some(exp) = self.iterators.get(ir as usize) { + if let Operator::Matches(_) = io.op() { + let ixn = &io.ix().name.0; + if let Some(fti) = self.ft_map.get(ixn) { + if let Some(fte) = self.exp_entries.get(exp) { + let it = MatchesThingIterator::new(fti, fte.0.terms_docs.clone()).await?; + return Ok(ThingIterator::Matches(it)); + } + } + } + } + Err(Error::BypassQueryPlanner) + } + pub(crate) async fn matches( &self, txn: &Transaction, thg: &Thing, exp: &Expression, ) -> Result { - // If we find the expression in `pre_match_expression`, - // it means that we are using an Iterator::Index - // and we are iterating over document that already matches the expression. - if let Some(pme) = &self.pre_match_expression { - if pme.eq(exp) { - return Ok(Value::Bool(true)); - } - } - // Otherwise, we look for the first possible index options, and evaluate the expression // Does the record id match this executor's table? if thg.tb.eq(&self.table) { @@ -244,7 +295,7 @@ struct Inner { index_option: IndexOption, doc_ids: Arc>, terms: Vec>, - terms_docs: Arc>>, + terms_docs: TermsDocs, scorer: Option, } diff --git a/lib/src/idx/planner/iterators.rs b/lib/src/idx/planner/iterators.rs new file mode 100644 index 00000000..f3c999de --- /dev/null +++ b/lib/src/idx/planner/iterators.rs @@ -0,0 +1,126 @@ +use crate::dbs::{Options, Transaction}; +use crate::err::Error; +use crate::idx::ft::docids::{DocId, NO_DOC_ID}; +use crate::idx::ft::termdocs::TermsDocs; +use crate::idx::ft::{FtIndex, HitsIterator}; +use crate::key; +use crate::kvs::Key; +use crate::sql::statements::DefineIndexStatement; +use crate::sql::{Array, Thing, Value}; + +pub(crate) enum ThingIterator { + NonUniqueEqual(NonUniqueEqualThingIterator), + UniqueEqual(UniqueEqualThingIterator), + Matches(MatchesThingIterator), +} + +impl ThingIterator { + pub(crate) async fn next_batch( + &mut self, + tx: &Transaction, + size: u32, + ) -> Result, Error> { + match self { + ThingIterator::NonUniqueEqual(i) => i.next_batch(tx, size).await, + ThingIterator::UniqueEqual(i) => i.next_batch(tx, size).await, + ThingIterator::Matches(i) => i.next_batch(tx, size).await, + } + } +} + +pub(crate) struct NonUniqueEqualThingIterator { + beg: Vec, + end: Vec, +} + +impl NonUniqueEqualThingIterator { + pub(super) fn new( + opt: &Options, + ix: &DefineIndexStatement, + v: &Value, + ) -> Result { + let v = Array::from(v.clone()); + let (beg, end) = + key::index::Index::range_all_ids(opt.ns(), opt.db(), &ix.what, &ix.name, &v); + Ok(Self { + beg, + end, + }) + } + + async fn next_batch( + &mut self, + txn: &Transaction, + limit: u32, + ) -> Result, Error> { + let min = self.beg.clone(); + let max = self.end.clone(); + let res = txn.lock().await.scan(min..max, limit).await?; + if let Some((key, _)) = res.last() { + self.beg = key.clone(); + self.beg.push(0x00); + } + let res = res.iter().map(|(_, val)| (val.into(), NO_DOC_ID)).collect(); + Ok(res) + } +} + +pub(crate) struct UniqueEqualThingIterator { + key: Option, +} + +impl UniqueEqualThingIterator { + pub(super) fn new(opt: &Options, ix: &DefineIndexStatement, v: &Value) -> Result { + let v = Array::from(v.clone()); + let key = key::index::Index::new(opt.ns(), opt.db(), &ix.what, &ix.name, v, None).into(); + Ok(Self { + key: Some(key), + }) + } + + async fn next_batch( + &mut self, + txn: &Transaction, + _limit: u32, + ) -> Result, Error> { + if let Some(key) = self.key.take() { + if let Some(val) = txn.lock().await.get(key).await? { + return Ok(vec![(val.into(), NO_DOC_ID)]); + } + } + Ok(vec![]) + } +} + +pub(crate) struct MatchesThingIterator { + hits: Option, +} + +impl MatchesThingIterator { + pub(super) async fn new(fti: &FtIndex, terms_docs: TermsDocs) -> Result { + let hits = fti.new_hits_iterator(terms_docs)?; + Ok(Self { + hits, + }) + } + + async fn next_batch( + &mut self, + txn: &Transaction, + mut limit: u32, + ) -> Result, Error> { + let mut res = vec![]; + if let Some(hits) = &mut self.hits { + let mut run = txn.lock().await; + while limit > 0 { + if let Some(hit) = hits.next(&mut run).await? { + res.push(hit); + } else { + break; + } + limit -= 1; + } + } + Ok(res) + } +} diff --git a/lib/src/idx/planner/mod.rs b/lib/src/idx/planner/mod.rs index d0553e73..b0253301 100644 --- a/lib/src/idx/planner/mod.rs +++ b/lib/src/idx/planner/mod.rs @@ -1,14 +1,15 @@ pub(crate) mod executor; +pub(crate) mod iterators; pub(crate) mod plan; mod tree; use crate::ctx::Context; -use crate::dbs::{Iterable, Options, Transaction}; +use crate::dbs::{Iterable, Iterator, Options, Transaction}; use crate::err::Error; use crate::idx::planner::executor::QueryExecutor; use crate::idx::planner::plan::{Plan, PlanBuilder}; -use crate::idx::planner::tree::{Node, Tree}; -use crate::sql::{Cond, Operator, Table}; +use crate::idx::planner::tree::Tree; +use crate::sql::{Cond, Table}; use std::collections::HashMap; pub(crate) struct QueryPlanner<'a> { @@ -16,6 +17,7 @@ pub(crate) struct QueryPlanner<'a> { cond: &'a Option, /// There is one executor per table executors: HashMap, + requires_distinct: bool, } impl<'a> QueryPlanner<'a> { @@ -24,83 +26,57 @@ impl<'a> QueryPlanner<'a> { opt, cond, executors: HashMap::default(), + requires_distinct: false, } } - pub(crate) async fn get_iterable( + pub(crate) async fn add_iterables( &mut self, ctx: &Context<'_>, txn: &Transaction, t: Table, - ) -> Result { + it: &mut Iterator, + ) -> Result<(), Error> { let res = Tree::build(ctx, self.opt, txn, &t, self.cond).await?; if let Some((node, im)) = res { - if let Some(plan) = AllAndStrategy::build(&node)? { - let e = QueryExecutor::new(self.opt, txn, &t, im, Some(plan.e.clone())).await?; - self.executors.insert(t.0.clone(), e); - return Ok(Iterable::Index(t, plan)); + let mut exe = QueryExecutor::new(self.opt, txn, &t, im).await?; + let ok = match PlanBuilder::build(node) { + Ok(plan) => match plan { + Plan::SingleIndex(exp, io) => { + let ir = exe.add_iterator(exp); + it.ingest(Iterable::Index(t.clone(), ir, io)); + true + } + Plan::MultiIndex(v) => { + for (exp, io) in v { + let ir = exe.add_iterator(exp); + it.ingest(Iterable::Index(t.clone(), ir, io)); + self.requires_distinct = true; + } + true + } + }, + Err(Error::BypassQueryPlanner) => false, + Err(e) => return Err(e), + }; + self.executors.insert(t.0.clone(), exe); + if ok { + return Ok(()); } - let e = QueryExecutor::new(self.opt, txn, &t, im, None).await?; - self.executors.insert(t.0.clone(), e); } - Ok(Iterable::Table(t)) - } - - pub(crate) fn finish(self) -> Option> { - if self.executors.is_empty() { - None - } else { - Some(self.executors) - } - } -} - -struct AllAndStrategy { - b: PlanBuilder, -} - -/// Successful if every boolean operators are AND -/// and there is at least one condition covered by an index -impl AllAndStrategy { - fn build(node: &Node) -> Result, Error> { - let mut s = AllAndStrategy { - b: PlanBuilder::default(), - }; - match s.eval_node(node) { - Ok(_) => match s.b.build() { - Ok(p) => Ok(Some(p)), - Err(Error::BypassQueryPlanner) => Ok(None), - Err(e) => Err(e), - }, - Err(Error::BypassQueryPlanner) => Ok(None), - Err(e) => Err(e), - } - } - - fn eval_node(&mut self, node: &Node) -> Result<(), Error> { - match node { - Node::Expression { - io: index_option, - left, - right, - exp: expression, - } => { - if let Some(io) = index_option { - self.b.add_index_option(expression.clone(), io.clone()); - } - self.eval_expression(left, right, expression.operator()) - } - Node::Unsupported => Err(Error::BypassQueryPlanner), - _ => Ok(()), - } - } - - fn eval_expression(&mut self, left: &Node, right: &Node, op: &Operator) -> Result<(), Error> { - if op.eq(&Operator::Or) { - return Err(Error::BypassQueryPlanner); - } - self.eval_node(left)?; - self.eval_node(right)?; + it.ingest(Iterable::Table(t)); Ok(()) } + + pub(crate) fn has_executors(&self) -> bool { + !self.executors.is_empty() + } + + pub(crate) fn get_query_executor(&self, tb: &str) -> Option<&QueryExecutor> { + self.executors.get(tb) + } + + pub(crate) fn requires_distinct(&self) -> bool { + self.requires_distinct + } } diff --git a/lib/src/idx/planner/plan.rs b/lib/src/idx/planner/plan.rs index 36a72d1a..4e5ee528 100644 --- a/lib/src/idx/planner/plan.rs +++ b/lib/src/idx/planner/plan.rs @@ -1,74 +1,100 @@ -use crate::dbs::{Options, Transaction}; use crate::err::Error; -use crate::idx::btree::store::BTreeStoreType; -use crate::idx::ft::docids::{DocId, NO_DOC_ID}; -use crate::idx::ft::termdocs::TermsDocs; -use crate::idx::ft::{FtIndex, HitsIterator, MatchRef}; -use crate::idx::planner::executor::QueryExecutor; -use crate::idx::IndexKeyBase; -use crate::key; -use crate::kvs::Key; -use crate::sql::index::Index; -use crate::sql::scoring::Scoring; +use crate::idx::ft::MatchRef; +use crate::idx::planner::tree::Node; use crate::sql::statements::DefineIndexStatement; -use crate::sql::{Array, Expression, Ident, Idiom, Object, Operator, Thing, Value}; +use crate::sql::Object; +use crate::sql::{Expression, Idiom, Operator, Value}; use std::collections::HashMap; use std::hash::Hash; use std::sync::Arc; -#[derive(Default)] pub(super) struct PlanBuilder { indexes: Vec<(Expression, IndexOption)>, + all_and: bool, + all_exp_with_index: bool, } impl PlanBuilder { - pub(super) fn add_index_option(&mut self, e: Expression, i: IndexOption) { + pub(super) fn build(root: Node) -> Result { + let mut b = PlanBuilder { + indexes: Vec::new(), + all_and: true, + all_exp_with_index: true, + }; + // Browse the AST and collect information + b.eval_node(root)?; + // If we didn't found any index, we're done with no index plan + if b.indexes.is_empty() { + return Err(Error::BypassQueryPlanner); + } + // If every boolean operator are AND then we can use the single index plan + if b.all_and { + if let Some((e, i)) = b.indexes.pop() { + return Ok(Plan::SingleIndex(e, i)); + } + } + // If every expression is backed by an index with can use the MultiIndex plan + if b.all_exp_with_index { + return Ok(Plan::MultiIndex(b.indexes)); + } + Err(Error::BypassQueryPlanner) + } + + fn eval_node(&mut self, node: Node) -> Result<(), Error> { + match node { + Node::Expression { + io, + left, + right, + exp, + } => { + if self.all_and && Operator::Or.eq(exp.operator()) { + self.all_and = false; + } + let is_bool = self.check_boolean_operator(exp.operator()); + if let Some(io) = io { + self.add_index_option(exp, io); + } else if self.all_exp_with_index && !is_bool { + self.all_exp_with_index = false; + } + self.eval_expression(*left, *right) + } + Node::Unsupported => Err(Error::BypassQueryPlanner), + _ => Ok(()), + } + } + + fn check_boolean_operator(&mut self, op: &Operator) -> bool { + match op { + Operator::Neg | Operator::Or => { + if self.all_and { + self.all_and = false; + } + true + } + Operator::And => true, + _ => false, + } + } + + fn eval_expression(&mut self, left: Node, right: Node) -> Result<(), Error> { + self.eval_node(left)?; + self.eval_node(right)?; + Ok(()) + } + + fn add_index_option(&mut self, e: Expression, i: IndexOption) { self.indexes.push((e, i)); } - - pub(super) fn build(mut self) -> Result { - // TODO select the best option if there are several (cost based) - if let Some((e, i)) = self.indexes.pop() { - Ok(Plan::new(e, i)) - } else { - Err(Error::BypassQueryPlanner) - } - } } -pub(crate) struct Plan { - pub(super) e: Expression, - pub(super) i: IndexOption, -} - -impl Plan { - pub(super) fn new(e: Expression, i: IndexOption) -> Self { - Self { - e, - i, - } - } - - pub(crate) async fn new_iterator( - &self, - opt: &Options, - txn: &Transaction, - exe: &QueryExecutor, - ) -> Result { - self.i.new_iterator(opt, txn, exe).await - } - - pub(crate) fn explain(&self) -> Value { - Value::Object(Object::from(HashMap::from([ - ("index", Value::from(self.i.ix().name.0.to_owned())), - ("operator", Value::from(self.i.op().to_string())), - ("value", self.i.value().clone()), - ]))) - } +pub(super) enum Plan { + SingleIndex(Expression, IndexOption), + MultiIndex(Vec<(Expression, IndexOption)>), } #[derive(Debug, Clone, Eq, PartialEq, Hash)] -pub(super) struct IndexOption(Arc); +pub(crate) struct IndexOption(Arc); #[derive(Debug, Eq, PartialEq, Hash)] pub(super) struct Inner { @@ -123,193 +149,12 @@ impl IndexOption { self.0.mr.as_ref() } - async fn new_iterator( - &self, - opt: &Options, - txn: &Transaction, - exe: &QueryExecutor, - ) -> Result { - match &self.ix().index { - Index::Idx => { - if self.op() == &Operator::Equal { - return Ok(ThingIterator::NonUniqueEqual(NonUniqueEqualThingIterator::new( - opt, - self.ix(), - self.value(), - )?)); - } - } - Index::Uniq => { - if self.op() == &Operator::Equal { - return Ok(ThingIterator::UniqueEqual(UniqueEqualThingIterator::new( - opt, - self.ix(), - self.value(), - )?)); - } - } - Index::Search { - az, - hl, - sc, - order, - } => { - if let Operator::Matches(_) = self.op() { - let td = exe.pre_match_terms_docs(); - return Ok(ThingIterator::Matches( - MatchesThingIterator::new(opt, txn, self.ix(), az, *hl, sc, *order, td) - .await?, - )); - } - } - } - Err(Error::BypassQueryPlanner) - } -} - -pub(crate) enum ThingIterator { - NonUniqueEqual(NonUniqueEqualThingIterator), - UniqueEqual(UniqueEqualThingIterator), - Matches(MatchesThingIterator), -} - -impl ThingIterator { - pub(crate) async fn next_batch( - &mut self, - tx: &Transaction, - size: u32, - ) -> Result, Error> { - match self { - ThingIterator::NonUniqueEqual(i) => i.next_batch(tx, size).await, - ThingIterator::UniqueEqual(i) => i.next_batch(tx, size).await, - ThingIterator::Matches(i) => i.next_batch(tx, size).await, - } - } -} - -pub(crate) struct NonUniqueEqualThingIterator { - beg: Vec, - end: Vec, -} - -impl NonUniqueEqualThingIterator { - fn new( - opt: &Options, - ix: &DefineIndexStatement, - v: &Value, - ) -> Result { - let v = Array::from(v.clone()); - let (beg, end) = - key::index::Index::range_all_ids(opt.ns(), opt.db(), &ix.what, &ix.name, &v); - Ok(Self { - beg, - end, - }) - } - - async fn next_batch( - &mut self, - txn: &Transaction, - limit: u32, - ) -> Result, Error> { - let min = self.beg.clone(); - let max = self.end.clone(); - let res = txn.lock().await.scan(min..max, limit).await?; - if let Some((key, _)) = res.last() { - self.beg = key.clone(); - self.beg.push(0x00); - } - let res = res.iter().map(|(_, val)| (val.into(), NO_DOC_ID)).collect(); - Ok(res) - } -} - -pub(crate) struct UniqueEqualThingIterator { - key: Option, -} - -impl UniqueEqualThingIterator { - fn new(opt: &Options, ix: &DefineIndexStatement, v: &Value) -> Result { - let v = Array::from(v.clone()); - let key = key::index::Index::new(opt.ns(), opt.db(), &ix.what, &ix.name, v, None).into(); - Ok(Self { - key: Some(key), - }) - } - - async fn next_batch( - &mut self, - txn: &Transaction, - _limit: u32, - ) -> Result, Error> { - if let Some(key) = self.key.take() { - if let Some(val) = txn.lock().await.get(key).await? { - return Ok(vec![(val.into(), NO_DOC_ID)]); - } - } - Ok(vec![]) - } -} - -pub(crate) struct MatchesThingIterator { - hits: Option, -} - -impl MatchesThingIterator { - #[allow(clippy::too_many_arguments)] - async fn new( - opt: &Options, - txn: &Transaction, - ix: &DefineIndexStatement, - az: &Ident, - hl: bool, - sc: &Scoring, - order: u32, - terms_docs: Option, - ) -> Result { - let ikb = IndexKeyBase::new(opt, ix); - if let Scoring::Bm { - .. - } = sc - { - let mut run = txn.lock().await; - let az = run.get_az(opt.ns(), opt.db(), az.as_str()).await?; - let fti = FtIndex::new(&mut run, az, ikb, order, sc, hl, BTreeStoreType::Read).await?; - if let Some(terms_docs) = terms_docs { - let hits = fti.new_hits_iterator(terms_docs)?; - Ok(Self { - hits, - }) - } else { - Ok(Self { - hits: None, - }) - } - } else { - Err(Error::FeatureNotYetImplemented { - feature: "Vector Search", - }) - } - } - - async fn next_batch( - &mut self, - txn: &Transaction, - mut limit: u32, - ) -> Result, Error> { - let mut res = vec![]; - if let Some(hits) = &mut self.hits { - let mut run = txn.lock().await; - while limit > 0 { - if let Some(hit) = hits.next(&mut run).await? { - res.push(hit); - } else { - break; - } - limit -= 1; - } - } - Ok(res) + pub(crate) fn explain(&self) -> Value { + Value::Object(Object::from(HashMap::from([ + ("index", Value::from(self.ix().name.0.to_owned())), + ("operator", Value::from(self.op().to_string())), + ("value", self.value().clone()), + ]))) } } diff --git a/lib/src/sql/field.rs b/lib/src/sql/field.rs index 5fd1e852..431fa96d 100644 --- a/lib/src/sql/field.rs +++ b/lib/src/sql/field.rs @@ -84,7 +84,7 @@ impl Fields { if let Some(doc) = doc { self.compute_value(ctx, opt, txn, doc, group).await } else { - let doc = CursorDoc::new(None, None, &Value::None); + let doc = (&Value::None).into(); self.compute_value(ctx, opt, txn, &doc, group).await } } diff --git a/lib/src/sql/statements/select.rs b/lib/src/sql/statements/select.rs index 39707cb1..8493ccaa 100644 --- a/lib/src/sql/statements/select.rs +++ b/lib/src/sql/statements/select.rs @@ -97,7 +97,7 @@ impl SelectStatement { let v = w.compute(ctx, opt, txn, doc).await?; match v { Value::Table(t) => { - i.ingest(planner.get_iterable(ctx, txn, t).await?); + planner.add_iterables(ctx, txn, t, &mut i).await?; } Value::Thing(v) => i.ingest(Iterable::Thing(v)), Value::Range(v) => i.ingest(Iterable::Range(*v)), @@ -128,9 +128,9 @@ impl SelectStatement { // Assign the statement let stm = Statement::from(self); // Add query executors if any - if let Some(ex) = planner.finish() { + if planner.has_executors() { let mut ctx = Context::new(ctx); - ctx.set_query_executors(ex); + ctx.set_query_planner(&planner); // Output the results i.output(&ctx, opt, txn, &stm).await } else { diff --git a/lib/src/sql/value/del.rs b/lib/src/sql/value/del.rs index 4a0c75f9..77ea638f 100644 --- a/lib/src/sql/value/del.rs +++ b/lib/src/sql/value/del.rs @@ -1,6 +1,5 @@ use crate::ctx::Context; use crate::dbs::{Options, Transaction}; -use crate::doc::CursorDoc; use crate::err::Error; use crate::exe::try_join_all_buffered; use crate::sql::array::Abolish; @@ -119,7 +118,7 @@ impl Value { // iterate in reverse, and call swap_remove let mut m = HashSet::new(); for (i, v) in v.iter().enumerate() { - let cur = CursorDoc::new(None, None, v); + let cur = v.into(); if w.compute(ctx, opt, txn, Some(&cur)).await?.is_truthy() { m.insert(i); }; @@ -133,7 +132,7 @@ impl Value { let mut p = Vec::new(); // Store the elements and positions to update for (i, o) in v.iter_mut().enumerate() { - let cur = CursorDoc::new(None, None, o); + let cur = o.into(); if w.compute(ctx, opt, txn, Some(&cur)).await?.is_truthy() { a.push(o.clone()); p.push(i); @@ -157,7 +156,7 @@ impl Value { _ => { let path = path.next(); for v in v.iter_mut() { - let cur = CursorDoc::new(None, None, v); + let cur = v.into(); if w.compute(ctx, opt, txn, Some(&cur)).await?.is_truthy() { v.del(ctx, opt, txn, path).await?; } diff --git a/lib/src/sql/value/fetch.rs b/lib/src/sql/value/fetch.rs index 5783cea1..3466bdb3 100644 --- a/lib/src/sql/value/fetch.rs +++ b/lib/src/sql/value/fetch.rs @@ -1,6 +1,5 @@ use crate::ctx::Context; use crate::dbs::{Options, Transaction}; -use crate::doc::CursorDoc; use crate::err::Error; use crate::sql::edges::Edges; use crate::sql::field::{Field, Fields}; @@ -64,7 +63,7 @@ impl Value { Part::Where(w) => { let path = path.next(); for v in v.iter_mut() { - let cur = CursorDoc::new(None, None, v); + let cur = v.into(); if w.compute(ctx, opt, txn, Some(&cur)).await?.is_truthy() { v.fetch(ctx, opt, txn, path).await?; } diff --git a/lib/src/sql/value/get.rs b/lib/src/sql/value/get.rs index 7cf0efd3..4850090a 100644 --- a/lib/src/sql/value/get.rs +++ b/lib/src/sql/value/get.rs @@ -123,8 +123,8 @@ impl Value { Part::Where(w) => { let mut a = Vec::new(); for v in v.iter() { - let cur = Some(CursorDoc::new(None, None, v)); - if w.compute(ctx, opt, txn, cur.as_ref()).await?.is_truthy() { + let cur = v.into(); + if w.compute(ctx, opt, txn, Some(&cur)).await?.is_truthy() { a.push(v.clone()); } } @@ -382,7 +382,7 @@ mod tests { let doc = Value::parse("{ name: 'Tobie', something: [{ age: 34 }, { age: 36 }] }"); let idi = Idiom::parse("test.something[WHERE age > 35]"); let val = Value::parse("{ test: { { something: something } } }"); - let cur = CursorDoc::new(None, None, &doc); + let cur = (&doc).into(); let res = val.get(&ctx, &opt, &txn, Some(&cur), &idi).await.unwrap(); assert_eq!( res, diff --git a/lib/src/sql/value/set.rs b/lib/src/sql/value/set.rs index cb404da2..b0b89594 100644 --- a/lib/src/sql/value/set.rs +++ b/lib/src/sql/value/set.rs @@ -1,6 +1,5 @@ use crate::ctx::Context; use crate::dbs::{Options, Transaction}; -use crate::doc::CursorDoc; use crate::err::Error; use crate::exe::try_join_all_buffered; use crate::sql::part::Next; @@ -92,7 +91,7 @@ impl Value { let mut p = Vec::new(); // Store the elements and positions to update for (i, o) in v.iter_mut().enumerate() { - let cur = CursorDoc::new(None, None, o); + let cur = o.into(); if w.compute(ctx, opt, txn, Some(&cur)).await?.is_truthy() { a.push(o.clone()); p.push(i); @@ -111,7 +110,7 @@ impl Value { _ => { let path = path.next(); for v in v.iter_mut() { - let cur = CursorDoc::new(None, None, v); + let cur = v.into(); if w.compute(ctx, opt, txn, Some(&cur)).await?.is_truthy() { v.set(ctx, opt, txn, path, val.clone()).await?; } diff --git a/lib/tests/matches.rs b/lib/tests/matches.rs index b44c9532..2c6de6bb 100644 --- a/lib/tests/matches.rs +++ b/lib/tests/matches.rs @@ -242,15 +242,19 @@ async fn select_where_matches_using_index_and_score() -> Result<(), Error> { #[tokio::test] async fn select_where_matches_without_using_index_and_score() -> Result<(), Error> { let sql = r" - CREATE blog:1 SET title = 'the quick brown fox jumped over the lazy dog'; - CREATE blog:2 SET title = 'the fast fox jumped over the lazy dog'; - CREATE blog:3 SET title = 'the other animals sat there watching'; - CREATE blog:4 SET title = 'the dog sat there and did nothing'; + CREATE blog:1 SET title = 'the quick brown fox jumped over the lazy dog', label = 'test'; + CREATE blog:2 SET title = 'the fast fox jumped over the lazy dog', label = 'test'; + CREATE blog:3 SET title = 'the other animals sat there watching', label = 'test'; + CREATE blog:4 SET title = 'the dog sat there and did nothing', label = 'test'; DEFINE ANALYZER simple TOKENIZERS blank,class; DEFINE INDEX blog_title ON blog FIELDS title SEARCH ANALYZER simple BM25 HIGHLIGHTS; LET $keywords = 'animals'; - SELECT id,search::score(1) AS score FROM blog WHERE (title @1@ $keywords AND id>0) OR (title @1@ $keywords AND id<99); - SELECT id,search::score(1) + search::score(2) AS score FROM blog WHERE title @1@ 'dummy1' OR title @2@ 'dummy2'; + SELECT id,search::score(1) AS score FROM blog + WHERE (title @1@ $keywords AND label = 'test') + OR (title @1@ $keywords AND label = 'test'); + SELECT id,search::score(1) + search::score(2) AS score FROM blog + WHERE (title @1@ 'dummy1' AND label = 'test') + OR (title @2@ 'dummy2' AND label = 'test'); "; let dbs = Datastore::new("memory").await?; let ses = Session::for_kv().with_ns("test").with_db("test"); diff --git a/lib/tests/planner.rs b/lib/tests/planner.rs new file mode 100644 index 00000000..0949efbf --- /dev/null +++ b/lib/tests/planner.rs @@ -0,0 +1,109 @@ +mod parse; + +use parse::Parse; +use surrealdb::dbs::Session; +use surrealdb::err::Error; +use surrealdb::kvs::Datastore; +use surrealdb::sql::Value; + +async fn test_select_where_iterate_multi_index(parallel: bool) -> Result<(), Error> { + let parallel = if parallel { + "PARALLEL" + } else { + "" + }; + let sql = format!( + " + CREATE person:tobie SET name = 'Tobie', genre='m', company='SurrealDB'; + CREATE person:jaime SET name = 'Jaime', genre='m', company='SurrealDB'; + CREATE person:lizzie SET name = 'Lizzie', genre='f', company='SurrealDB'; + CREATE person:neytiry SET name = 'Neytiri', genre='f', company='Metkayina'; + DEFINE ANALYZER simple TOKENIZERS blank,class FILTERS lowercase; + DEFINE INDEX ft_company ON person FIELDS company SEARCH ANALYZER simple BM25; + DEFINE INDEX uniq_name ON TABLE person COLUMNS name UNIQUE; + DEFINE INDEX idx_genre ON TABLE person COLUMNS genre; + SELECT name FROM person WHERE name = 'Jaime' OR genre = 'm' OR company @@ 'surrealdb' {parallel}; + SELECT name FROM person WHERE name = 'Jaime' OR genre = 'm' OR company @@ 'surrealdb' {parallel} EXPLAIN FULL;" + ); + let dbs = Datastore::new("memory").await?; + let ses = Session::for_kv().with_ns("test").with_db("test"); + let res = &mut dbs.execute(&sql, &ses, None).await?; + assert_eq!(res.len(), 10); + // + for _ in 0..8 { + let _ = res.remove(0).result?; + } + // + let tmp = res.remove(0).result?; + let val = Value::parse( + "[ + { + name: 'Jaime' + }, + { + name: 'Tobie' + }, + { + name: 'Lizzie' + } + ]", + ); + assert_eq!(format!("{:#}", tmp), format!("{:#}", val)); + // + let tmp = res.remove(0).result?; + let val = Value::parse( + "[ + { + detail: { + plan: { + index: 'uniq_name', + operator: '=', + value: 'Jaime' + }, + table: 'person', + }, + operation: 'Iterate Index' + }, + { + detail: { + plan: { + index: 'idx_genre', + operator: '=', + value: 'm' + }, + table: 'person', + }, + operation: 'Iterate Index' + }, + { + detail: { + plan: { + index: 'ft_company', + operator: '@@', + value: 'surrealdb' + }, + table: 'person', + }, + operation: 'Iterate Index' + }, + { + detail: { + count: 3 + }, + operation: 'Fetch' + } + ]", + ); + assert_eq!(format!("{:#}", tmp), format!("{:#}", val)); + Ok(()) +} + +#[tokio::test] +async fn select_where_iterate_multi_index() -> Result<(), Error> { + test_select_where_iterate_multi_index(false).await +} + +#[tokio::test] +async fn select_where_iterate_multi_index_parallel() -> Result<(), Error> { + test_select_where_iterate_multi_index(true).await +}