feat: implements Multi-index execution plan (#2280)

This commit is contained in:
Emmanuel Keller 2023-07-20 13:56:32 +01:00 committed by GitHub
parent cacd9a1de9
commit 063f4e6665
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
24 changed files with 834 additions and 506 deletions

View file

@ -1,7 +1,7 @@
use crate::ctx::canceller::Canceller;
use crate::ctx::reason::Reason;
use crate::dbs::Notification;
use crate::idx::planner::executor::QueryExecutor;
use crate::idx::planner::QueryPlanner;
use crate::sql::value::Value;
use channel::Sender;
use std::borrow::Cow;
@ -34,8 +34,8 @@ pub struct Context<'a> {
values: HashMap<Cow<'static, str>, Cow<'a, Value>>,
// Stores the notification channel if available
notifications: Option<Sender<Notification>>,
// An optional query executor
query_executors: Option<Arc<HashMap<String, QueryExecutor>>>,
// An optional query planner
query_planner: Option<&'a QueryPlanner<'a>>,
}
impl<'a> Default for Context<'a> {
@ -64,7 +64,7 @@ impl<'a> Context<'a> {
deadline: None,
cancelled: Arc::new(AtomicBool::new(false)),
notifications: None,
query_executors: None,
query_planner: None,
}
}
@ -76,7 +76,7 @@ impl<'a> Context<'a> {
deadline: parent.deadline,
cancelled: Arc::new(AtomicBool::new(false)),
notifications: parent.notifications.clone(),
query_executors: parent.query_executors.clone(),
query_planner: parent.query_planner,
}
}
@ -118,9 +118,9 @@ impl<'a> Context<'a> {
self.notifications = chn.cloned()
}
/// Set the query executors
pub(crate) fn set_query_executors(&mut self, executors: HashMap<String, QueryExecutor>) {
self.query_executors = Some(Arc::new(executors));
/// Set the query planner
pub(crate) fn set_query_planner(&mut self, qp: &'a QueryPlanner) {
self.query_planner = Some(qp);
}
/// Get the timeout for this operation, if any. This is useful for
@ -133,12 +133,8 @@ impl<'a> Context<'a> {
self.notifications.clone()
}
pub(crate) fn get_query_executor(&self, tb: &str) -> Option<&QueryExecutor> {
if let Some(qe) = &self.query_executors {
qe.get(tb)
} else {
None
}
pub(crate) fn get_query_planner(&self) -> Option<&QueryPlanner> {
self.query_planner
}
/// Check if the context is done. If it returns `None` the operation may

99
lib/src/dbs/distinct.rs Normal file
View file

@ -0,0 +1,99 @@
use crate::ctx::Context;
use crate::dbs::{Iterable, Processed};
use crate::kvs::Key;
use radix_trie::Trie;
use std::default::Default;
#[cfg(not(target_arch = "wasm32"))]
use std::sync::Arc;
#[cfg(not(target_arch = "wasm32"))]
use tokio::sync::Mutex;
// TODO: This is currently processed in memory. In the future is should be on disk (mmap?)
type Distinct = Trie<Key, bool>;
#[derive(Default)]
pub(crate) struct SyncDistinct {
processed: Distinct,
}
impl SyncDistinct {
pub(super) fn new(ctx: &Context<'_>) -> Option<Self> {
if let Some(pla) = ctx.get_query_planner() {
if pla.requires_distinct() {
return Some(Self::default());
}
}
None
}
fn is_distinct(ctx: &Context<'_>, i: &Iterable) -> bool {
if let Iterable::Index(t, ir, _) = i {
if let Some(pla) = ctx.get_query_planner() {
if let Some(exe) = pla.get_query_executor(&t.0) {
return exe.is_distinct(*ir);
}
}
}
false
}
pub(super) fn requires_distinct<'a>(
ctx: &Context<'_>,
dis: Option<&'a mut SyncDistinct>,
i: &Iterable,
) -> Option<&'a mut SyncDistinct> {
if dis.is_some() && Self::is_distinct(ctx, i) {
dis
} else {
None
}
}
pub(super) fn check_already_processed(&mut self, pro: &Processed) -> bool {
if let Some(key) = pro.rid.as_ref().map(|rid| rid.to_vec()) {
if self.processed.get(&key).is_some() {
true
} else {
self.processed.insert(key, true);
false
}
} else {
false
}
}
}
#[cfg(not(target_arch = "wasm32"))]
#[derive(Default, Clone)]
pub(crate) struct AsyncDistinct {
processed: Arc<Mutex<SyncDistinct>>,
}
#[cfg(not(target_arch = "wasm32"))]
impl AsyncDistinct {
pub(super) fn new(ctx: &Context<'_>) -> Option<Self> {
if let Some(pla) = ctx.get_query_planner() {
if pla.requires_distinct() {
return Some(Self::default());
}
}
None
}
pub(super) fn requires_distinct(
ctx: &Context<'_>,
dis: Option<&AsyncDistinct>,
i: &Iterable,
) -> Option<AsyncDistinct> {
if let Some(dis) = dis {
if SyncDistinct::is_distinct(ctx, i) {
return Some(dis.clone());
}
}
None
}
pub(super) async fn check_already_processed(&self, pro: &Processed) -> bool {
self.processed.lock().await.check_already_processed(pro)
}
}

View file

@ -81,9 +81,9 @@ impl ExplainItem {
("thing-3", Value::Thing(t3.to_owned())),
],
},
Iterable::Index(t, p) => Self {
Iterable::Index(t, _, io) => Self {
name: "Iterate Index".into(),
details: vec![("table", Value::from(t.0.to_owned())), ("plan", p.explain())],
details: vec![("table", Value::from(t.0.to_owned())), ("plan", io.explain())],
},
}
}

View file

@ -1,13 +1,16 @@
use crate::ctx::Canceller;
use crate::ctx::Context;
#[cfg(not(target_arch = "wasm32"))]
use crate::dbs::distinct::AsyncDistinct;
use crate::dbs::distinct::SyncDistinct;
use crate::dbs::explanation::Explanation;
use crate::dbs::Statement;
use crate::dbs::{Options, Transaction};
use crate::doc::CursorDoc;
use crate::doc::Document;
use crate::err::Error;
use crate::idx::ft::docids::DocId;
use crate::idx::planner::plan::Plan;
use crate::idx::planner::executor::IteratorRef;
use crate::idx::planner::plan::IndexOption;
use crate::sql::array::Array;
use crate::sql::edges::Edges;
use crate::sql::field::Field;
@ -29,7 +32,14 @@ pub(crate) enum Iterable {
Edges(Edges),
Mergeable(Thing, Value),
Relatable(Thing, Thing, Thing),
Index(Table, Plan),
Index(Table, IteratorRef, IndexOption),
}
pub(crate) struct Processed {
pub(crate) ir: Option<IteratorRef>,
pub(crate) rid: Option<Thing>,
pub(crate) doc_id: Option<DocId>,
pub(crate) val: Operable,
}
pub(crate) enum Operable {
@ -55,6 +65,7 @@ pub(crate) struct Iterator {
// Iterator runtime error
error: Option<Error>,
// Iterator output results
// TODO: Should be stored on disk / (mmap?)
results: Vec<Value>,
// Iterator input values
entries: Vec<Iterable>,
@ -261,10 +272,10 @@ impl Iterator {
_ => {
let x = vals.first();
let x = if let Some(alias) = alias {
let cur = CursorDoc::new(None, None, &x);
let cur = (&x).into();
alias.compute(ctx, opt, txn, Some(&cur)).await?
} else {
let cur = CursorDoc::new(None, None, &x);
let cur = (&x).into();
expr.compute(ctx, opt, txn, Some(&cur)).await?
};
obj.set(ctx, opt, txn, idiom.as_ref(), x).await?;
@ -378,9 +389,13 @@ impl Iterator {
) -> Result<(), Error> {
// Prevent deep recursion
let opt = &opt.dive(4)?;
// If any iterator requires distinct, we new to create a global distinct instance
let mut distinct = SyncDistinct::new(ctx);
// Process all prepared values
for v in mem::take(&mut self.entries) {
v.iterate(ctx, opt, txn, stm, self).await?;
// Distinct is passed only for iterators that really requires it
let dis = SyncDistinct::requires_distinct(ctx, distinct.as_mut(), &v);
v.iterate(ctx, opt, txn, stm, self, dis).await?;
}
// Everything processed ok
Ok(())
@ -401,15 +416,21 @@ impl Iterator {
match stm.parallel() {
// Run statements sequentially
false => {
// If any iterator requires distinct, we new to create a global distinct instance
let mut distinct = SyncDistinct::new(ctx);
// Process all prepared values
for v in mem::take(&mut self.entries) {
v.iterate(ctx, opt, txn, stm, self).await?;
// Distinct is passed only for iterators that really requires it
let dis = SyncDistinct::requires_distinct(ctx, distinct.as_mut(), &v);
v.iterate(ctx, opt, txn, stm, self, dis).await?;
}
// Everything processed ok
Ok(())
}
// Run statements in parallel
true => {
// If any iterator requires distinct, we new to create a global distinct instance
let distinct = AsyncDistinct::new(ctx);
// Create a new executor
let e = executor::Executor::new();
// Take all of the iterator values
@ -422,7 +443,9 @@ impl Iterator {
let adocs = async {
// Process all prepared values
for v in vals {
e.spawn(v.channel(ctx, opt, txn, stm, chn.clone()))
// Distinct is passed only for iterators that really requires it
let dis = AsyncDistinct::requires_distinct(ctx, distinct.as_ref(), &v);
e.spawn(v.channel(ctx, opt, txn, stm, chn.clone(), dis))
// Ensure we detach the spawned task
.detach();
}
@ -434,8 +457,8 @@ impl Iterator {
// Create an async closure for received values
let avals = async {
// Process all received values
while let Ok((k, d, v)) = docs.recv().await {
e.spawn(Document::compute(ctx, opt, txn, stm, chn.clone(), k, d, v))
while let Ok(pro) = docs.recv().await {
e.spawn(Document::compute(ctx, opt, txn, stm, chn.clone(), pro))
// Ensure we detach the spawned task
.detach();
}
@ -464,29 +487,26 @@ impl Iterator {
}
/// Process a new record Thing and Value
#[allow(clippy::too_many_arguments)]
pub async fn process(
&mut self,
ctx: &Context<'_>,
opt: &Options,
txn: &Transaction,
stm: &Statement<'_>,
thg: Option<Thing>,
doc_id: Option<DocId>,
val: Operable,
pro: Processed,
) {
// Check current context
if ctx.is_done() {
return;
}
// Setup a new workable
let (val, ext) = match val {
let (val, ext) = match pro.val {
Operable::Value(v) => (v, Workable::Normal),
Operable::Mergeable(v, o) => (v, Workable::Insert(o)),
Operable::Relatable(f, v, w) => (v, Workable::Relate(f, w)),
};
// Setup a new document
let mut doc = Document::new(thg.as_ref(), doc_id, &val, ext);
let mut doc = Document::new(pro.ir, pro.rid.as_ref(), pro.doc_id, &val, ext);
// Process the document
let res = match stm {
Statement::Select(_) => doc.select(ctx, opt, txn, stm).await,

View file

@ -3,6 +3,7 @@
//! glue between the API and the response. In this module we use channels as a transport layer
//! and executors to process the operations. This module also gives a `context` to the transaction.
mod auth;
mod distinct;
mod executor;
mod explanation;
mod iterator;

View file

@ -1,8 +1,11 @@
use crate::ctx::Context;
use crate::dbs::{Iterable, Iterator, Operable, Options, Statement, Transaction};
#[cfg(not(target_arch = "wasm32"))]
use crate::dbs::distinct::AsyncDistinct;
use crate::dbs::distinct::SyncDistinct;
use crate::dbs::{Iterable, Iterator, Operable, Options, Processed, Statement, Transaction};
use crate::err::Error;
use crate::idx::ft::docids::DocId;
use crate::idx::planner::plan::Plan;
use crate::idx::planner::executor::IteratorRef;
use crate::idx::planner::plan::IndexOption;
use crate::key::{graph, thing};
use crate::sql::dir::Dir;
use crate::sql::{Edges, Range, Table, Thing, Value};
@ -18,8 +21,9 @@ impl Iterable {
txn: &Transaction,
stm: &Statement<'_>,
ite: &mut Iterator,
dis: Option<&mut SyncDistinct>,
) -> Result<(), Error> {
Processor::Iterator(ite).process_iterable(ctx, opt, txn, stm, self).await
Processor::Iterator(dis, ite).process_iterable(ctx, opt, txn, stm, self).await
}
#[cfg(not(target_arch = "wasm32"))]
@ -29,37 +33,49 @@ impl Iterable {
opt: &Options,
txn: &Transaction,
stm: &Statement<'_>,
chn: Sender<(Option<Thing>, Option<DocId>, Operable)>,
chn: Sender<Processed>,
dis: Option<AsyncDistinct>,
) -> Result<(), Error> {
Processor::Channel(chn).process_iterable(ctx, opt, txn, stm, self).await
Processor::Channel(dis, chn).process_iterable(ctx, opt, txn, stm, self).await
}
}
enum Processor<'a> {
Iterator(&'a mut Iterator),
Iterator(Option<&'a mut SyncDistinct>, &'a mut Iterator),
#[cfg(not(target_arch = "wasm32"))]
Channel(Sender<(Option<Thing>, Option<DocId>, Operable)>),
Channel(Option<AsyncDistinct>, Sender<Processed>),
}
impl<'a> Processor<'a> {
#[allow(clippy::too_many_arguments)]
async fn process(
&mut self,
ctx: &Context<'_>,
opt: &Options,
txn: &Transaction,
stm: &Statement<'_>,
rid: Option<Thing>,
doc_id: Option<DocId>,
val: Operable,
pro: Processed,
) -> Result<(), Error> {
match self {
Processor::Iterator(ite) => {
ite.process(ctx, opt, txn, stm, rid, doc_id, val).await;
Processor::Iterator(distinct, ite) => {
let is_processed = if let Some(d) = distinct {
d.check_already_processed(&pro)
} else {
false
};
if !is_processed {
ite.process(ctx, opt, txn, stm, pro).await;
}
}
#[cfg(not(target_arch = "wasm32"))]
Processor::Channel(chn) => {
chn.send((rid, doc_id, val)).await?;
Processor::Channel(distinct, chn) => {
let is_processed = if let Some(d) = distinct {
d.check_already_processed(&pro).await
} else {
false
};
if !is_processed {
chn.send(pro).await?;
}
}
};
Ok(())
@ -80,7 +96,9 @@ impl<'a> Processor<'a> {
Iterable::Table(v) => self.process_table(ctx, opt, txn, stm, v).await?,
Iterable::Range(v) => self.process_range(ctx, opt, txn, stm, v).await?,
Iterable::Edges(e) => self.process_edge(ctx, opt, txn, stm, e).await?,
Iterable::Index(t, p) => self.process_index(ctx, opt, txn, stm, t, p).await?,
Iterable::Index(t, ir, io) => {
self.process_index(ctx, opt, txn, stm, t, ir, io).await?
}
Iterable::Mergeable(v, o) => {
self.process_mergeable(ctx, opt, txn, stm, v, o).await?
}
@ -101,9 +119,14 @@ impl<'a> Processor<'a> {
v: Value,
) -> Result<(), Error> {
// Pass the value through
let val = Operable::Value(v);
let pro = Processed {
ir: None,
rid: None,
doc_id: None,
val: Operable::Value(v),
};
// Process the document record
self.process(ctx, opt, txn, stm, None, None, val).await
self.process(ctx, opt, txn, stm, pro).await
}
async fn process_thing(
@ -125,7 +148,13 @@ impl<'a> Processor<'a> {
None => Value::None,
});
// Process the document record
self.process(ctx, opt, txn, stm, Some(v), None, val).await?;
let pro = Processed {
ir: None,
rid: Some(v),
doc_id: None,
val,
};
self.process(ctx, opt, txn, stm, pro).await?;
// Everything ok
Ok(())
}
@ -152,7 +181,13 @@ impl<'a> Processor<'a> {
// Create a new operable value
let val = Operable::Mergeable(x, o);
// Process the document record
self.process(ctx, opt, txn, stm, Some(v), None, val).await?;
let pro = Processed {
ir: None,
rid: Some(v),
doc_id: None,
val,
};
self.process(ctx, opt, txn, stm, pro).await?;
// Everything ok
Ok(())
}
@ -181,7 +216,13 @@ impl<'a> Processor<'a> {
// Create a new operable value
let val = Operable::Relatable(f, x, w);
// Process the document record
self.process(ctx, opt, txn, stm, Some(v), None, val).await?;
let pro = Processed {
ir: None,
rid: Some(v),
doc_id: None,
val,
};
self.process(ctx, opt, txn, stm, pro).await?;
// Everything ok
Ok(())
}
@ -242,7 +283,13 @@ impl<'a> Processor<'a> {
// Create a new operable value
let val = Operable::Value(val);
// Process the record
self.process(ctx, opt, txn, stm, Some(rid), None, val).await?;
let pro = Processed {
ir: None,
rid: Some(rid),
doc_id: None,
val,
};
self.process(ctx, opt, txn, stm, pro).await?;
}
continue;
}
@ -325,7 +372,13 @@ impl<'a> Processor<'a> {
// Create a new operable value
let val = Operable::Value(val);
// Process the record
self.process(ctx, opt, txn, stm, Some(rid), None, val).await?;
let pro = Processed {
ir: None,
rid: Some(rid),
doc_id: None,
val,
};
self.process(ctx, opt, txn, stm, pro).await?;
}
continue;
}
@ -465,7 +518,13 @@ impl<'a> Processor<'a> {
None => Value::None,
});
// Process the record
self.process(ctx, opt, txn, stm, Some(rid), None, val).await?;
let pro = Processed {
ir: None,
rid: Some(rid),
doc_id: None,
val,
};
self.process(ctx, opt, txn, stm, pro).await?;
}
continue;
}
@ -476,6 +535,7 @@ impl<'a> Processor<'a> {
Ok(())
}
#[allow(clippy::too_many_arguments)]
async fn process_index(
&mut self,
ctx: &Context<'_>,
@ -483,53 +543,60 @@ impl<'a> Processor<'a> {
txn: &Transaction,
stm: &Statement<'_>,
table: Table,
plan: Plan,
ir: IteratorRef,
io: IndexOption,
) -> Result<(), Error> {
// Check that the table exists
txn.lock().await.check_ns_db_tb(opt.ns(), opt.db(), &table.0, opt.strict).await?;
let exe = ctx.get_query_executor(&table.0);
if let Some(exe) = exe {
let mut iterator = plan.new_iterator(opt, txn, exe).await?;
let mut things = iterator.next_batch(txn, 1000).await?;
while !things.is_empty() {
// Check if the context is finished
if ctx.is_done() {
break;
}
for (thing, doc_id) in things {
// Check the context
if let Some(pla) = ctx.get_query_planner() {
if let Some(exe) = pla.get_query_executor(&table.0) {
let mut iterator = exe.new_iterator(opt, ir, io).await?;
let mut things = iterator.next_batch(txn, 1000).await?;
while !things.is_empty() {
// Check if the context is finished
if ctx.is_done() {
break;
}
// If the record is from another table we can skip
if !thing.tb.eq(table.as_str()) {
continue;
for (thing, doc_id) in things {
// Check the context
if ctx.is_done() {
break;
}
// If the record is from another table we can skip
if !thing.tb.eq(table.as_str()) {
continue;
}
// Fetch the data from the store
let key = thing::new(opt.ns(), opt.db(), &table.0, &thing.id);
let val = txn.lock().await.get(key.clone()).await?;
let rid = Thing::from((key.tb, key.id));
// Parse the data from the store
let val = Operable::Value(match val {
Some(v) => Value::from(v),
None => Value::None,
});
// Process the document record
let pro = Processed {
ir: Some(ir),
rid: Some(rid),
doc_id: Some(doc_id),
val,
};
self.process(ctx, opt, txn, stm, pro).await?;
}
// Fetch the data from the store
let key = thing::new(opt.ns(), opt.db(), &table.0, &thing.id);
let val = txn.lock().await.get(key.clone()).await?;
let rid = Thing::from((key.tb, key.id));
// Parse the data from the store
let val = Operable::Value(match val {
Some(v) => Value::from(v),
None => Value::None,
});
// Process the document record
self.process(ctx, opt, txn, stm, Some(rid), Some(doc_id), val).await?;
// Collect the next batch of ids
things = iterator.next_batch(txn, 1000).await?;
}
// Collect the next batch of ids
things = iterator.next_batch(txn, 1000).await?;
// Everything ok
return Ok(());
}
// Everything ok
Ok(())
} else {
Err(Error::QueryNotExecutedDetail {
message: "The QueryExecutor has not been found.".to_string(),
})
}
Err(Error::QueryNotExecutedDetail {
message: "No QueryExecutor has not been found.".to_string(),
})
}
}

View file

@ -1,36 +1,31 @@
use crate::ctx::Context;
use crate::dbs::Options;
use crate::dbs::Statement;
use crate::dbs::Workable;
use crate::dbs::{Operable, Transaction};
use crate::dbs::{Options, Processed};
use crate::doc::Document;
use crate::err::Error;
use crate::idx::ft::docids::DocId;
use crate::sql::thing::Thing;
use crate::sql::value::Value;
use channel::Sender;
impl<'a> Document<'a> {
#[allow(dead_code)]
#[allow(clippy::too_many_arguments)]
pub(crate) async fn compute(
ctx: &Context<'_>,
opt: &Options,
txn: &Transaction,
stm: &Statement<'_>,
chn: Sender<Result<Value, Error>>,
thg: Option<Thing>,
doc_id: Option<DocId>,
val: Operable,
pro: Processed,
) -> Result<(), Error> {
// Setup a new workable
let ins = match val {
let ins = match pro.val {
Operable::Value(v) => (v, Workable::Normal),
Operable::Mergeable(v, o) => (v, Workable::Insert(o)),
Operable::Relatable(f, v, w) => (v, Workable::Relate(f, w)),
};
// Setup a new document
let mut doc = Document::new(thg.as_ref(), doc_id, &ins.0, ins.1);
let mut doc = Document::new(pro.ir, pro.rid.as_ref(), pro.doc_id, &ins.0, ins.1);
// Process the statement
let res = match stm {
Statement::Select(_) => doc.select(ctx, opt, txn, stm).await,

View file

@ -4,6 +4,7 @@ use crate::dbs::Transaction;
use crate::dbs::Workable;
use crate::err::Error;
use crate::idx::ft::docids::DocId;
use crate::idx::planner::executor::IteratorRef;
use crate::sql::statements::define::DefineEventStatement;
use crate::sql::statements::define::DefineFieldStatement;
use crate::sql::statements::define::DefineIndexStatement;
@ -23,14 +24,21 @@ pub(crate) struct Document<'a> {
}
pub struct CursorDoc<'a> {
pub(crate) ir: Option<IteratorRef>,
pub(crate) rid: Option<&'a Thing>,
pub(crate) doc: Cow<'a, Value>,
pub(crate) doc_id: Option<DocId>,
}
impl<'a> CursorDoc<'a> {
pub(crate) fn new(rid: Option<&'a Thing>, doc_id: Option<DocId>, doc: &'a Value) -> Self {
pub(crate) fn new(
ir: Option<IteratorRef>,
rid: Option<&'a Thing>,
doc_id: Option<DocId>,
doc: &'a Value,
) -> Self {
Self {
ir,
rid,
doc: Cow::Borrowed(doc),
doc_id,
@ -38,6 +46,28 @@ impl<'a> CursorDoc<'a> {
}
}
impl<'a> From<&'a Value> for CursorDoc<'a> {
fn from(doc: &'a Value) -> Self {
Self {
ir: None,
rid: None,
doc: Cow::Borrowed(doc),
doc_id: None,
}
}
}
impl<'a> From<&'a mut Value> for CursorDoc<'a> {
fn from(doc: &'a mut Value) -> Self {
Self {
ir: None,
rid: None,
doc: Cow::Borrowed(doc),
doc_id: None,
}
}
}
impl<'a> Debug for Document<'a> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "Document - id: <{:?}>", self.id)
@ -52,6 +82,7 @@ impl<'a> From<&Document<'a>> for Vec<u8> {
impl<'a> Document<'a> {
pub fn new(
ir: Option<IteratorRef>,
id: Option<&'a Thing>,
doc_id: Option<DocId>,
val: &'a Value,
@ -60,8 +91,8 @@ impl<'a> Document<'a> {
Document {
id,
extras,
current: CursorDoc::new(id, doc_id, val),
initial: CursorDoc::new(id, doc_id, val),
current: CursorDoc::new(ir, id, doc_id, val),
initial: CursorDoc::new(ir, id, doc_id, val),
}
}
}

View file

@ -171,13 +171,25 @@ pub(crate) async fn matches(
ctx: &Context<'_>,
txn: &Transaction,
doc: Option<&CursorDoc<'_>>,
e: &Expression,
exp: &Expression,
) -> Result<Value, Error> {
if let Some(doc) = doc {
if let Some(thg) = doc.rid {
if let Some(exe) = ctx.get_query_executor(&thg.tb) {
// Check the matches
return exe.matches(txn, thg, e).await;
if let Some(pla) = ctx.get_query_planner() {
if let Some(exe) = pla.get_query_executor(&thg.tb) {
// If we find the expression in `pre_match`,
// it means that we are using an Iterator::Index
// and we are iterating over documents that already matches the expression.
if let Some(ir) = doc.ir {
if let Some(e) = exe.get_iterator_expression(ir) {
if e.eq(exp) {
return Ok(Value::Bool(true));
}
}
}
// Evaluate the matches
return exe.matches(txn, thg, exp).await;
}
}
}
}

View file

@ -2,54 +2,57 @@ use crate::ctx::Context;
use crate::dbs::Transaction;
use crate::doc::CursorDoc;
use crate::err::Error;
use crate::sql::Value;
use crate::idx::planner::executor::QueryExecutor;
use crate::sql::{Thing, Value};
fn get_execution_context<'a>(
ctx: &'a Context<'_>,
txn: Option<&'a Transaction>,
doc: Option<&'a CursorDoc<'_>>,
) -> Option<(&'a Transaction, &'a QueryExecutor, &'a CursorDoc<'a>, &'a Thing)> {
if let Some(txn) = txn {
if let Some(doc) = doc {
if let Some(thg) = doc.rid {
if let Some(pla) = ctx.get_query_planner() {
if let Some(exe) = pla.get_query_executor(&thg.tb) {
return Some((txn, exe, doc, thg));
}
}
}
}
}
None
}
pub async fn score(
(ctx, txn, doc): (&Context<'_>, Option<&Transaction>, Option<&CursorDoc<'_>>),
(match_ref,): (Value,),
) -> Result<Value, Error> {
if let Some(txn) = txn {
if let Some(doc) = doc {
if let Some(thg) = doc.rid {
if let Some(exe) = ctx.get_query_executor(&thg.tb) {
return exe.score(txn, &match_ref, thg, doc.doc_id).await;
}
}
}
if let Some((txn, exe, doc, thg)) = get_execution_context(ctx, txn, doc) {
exe.score(txn, &match_ref, thg, doc.doc_id).await
} else {
Ok(Value::None)
}
Ok(Value::None)
}
pub async fn highlight(
(ctx, txn, doc): (&Context<'_>, Option<&Transaction>, Option<&CursorDoc<'_>>),
(prefix, suffix, match_ref): (Value, Value, Value),
) -> Result<Value, Error> {
if let Some(txn) = txn {
if let Some(doc) = doc {
if let Some(thg) = doc.rid {
if let Some(exe) = ctx.get_query_executor(&thg.tb) {
return exe
.highlight(txn, thg, prefix, suffix, &match_ref, doc.doc.as_ref())
.await;
}
}
}
if let Some((txn, exe, doc, thg)) = get_execution_context(ctx, txn, doc) {
exe.highlight(txn, thg, prefix, suffix, &match_ref, doc.doc.as_ref()).await
} else {
Ok(Value::None)
}
Ok(Value::None)
}
pub async fn offsets(
(ctx, txn, doc): (&Context<'_>, Option<&Transaction>, Option<&CursorDoc<'_>>),
(match_ref,): (Value,),
) -> Result<Value, Error> {
if let Some(txn) = txn {
if let Some(doc) = doc {
if let Some(thg) = doc.rid {
if let Some(exe) = ctx.get_query_executor(&thg.tb) {
return exe.offsets(txn, thg, &match_ref).await;
}
}
}
if let Some((txn, exe, _, thg)) = get_execution_context(ctx, txn, doc) {
exe.offsets(txn, thg, &match_ref).await
} else {
Ok(Value::None)
}
Ok(Value::None)
}

View file

@ -17,7 +17,7 @@ use crate::idx::ft::highlighter::{Highlighter, Offseter};
use crate::idx::ft::offsets::Offsets;
use crate::idx::ft::postings::Postings;
use crate::idx::ft::scorer::BM25Scorer;
use crate::idx::ft::termdocs::TermDocs;
use crate::idx::ft::termdocs::{TermDocs, TermsDocs};
use crate::idx::ft::terms::{TermId, Terms};
use crate::idx::{btree, IndexKeyBase, SerdeState};
use crate::kvs::{Key, Transaction};
@ -320,7 +320,7 @@ impl FtIndex {
pub(super) fn new_hits_iterator(
&self,
terms_docs: Arc<Vec<Option<(TermId, RoaringTreemap)>>>,
terms_docs: TermsDocs,
) -> Result<Option<HitsIterator>, Error> {
let mut hits: Option<RoaringTreemap> = None;
for opt_term_docs in terms_docs.iter() {
@ -342,10 +342,7 @@ impl FtIndex {
Ok(None)
}
pub(super) fn new_scorer(
&self,
terms_docs: Arc<Vec<Option<(TermId, RoaringTreemap)>>>,
) -> Result<Option<BM25Scorer>, Error> {
pub(super) fn new_scorer(&self, terms_docs: TermsDocs) -> Result<Option<BM25Scorer>, Error> {
if let Some(bm25) = &self.bm25 {
return Ok(Some(BM25Scorer::new(
self.postings.clone(),

View file

@ -2,10 +2,9 @@ use crate::err::Error;
use crate::idx::ft::docids::DocId;
use crate::idx::ft::doclength::{DocLength, DocLengths};
use crate::idx::ft::postings::{Postings, TermFrequency};
use crate::idx::ft::terms::TermId;
use crate::idx::ft::termdocs::TermsDocs;
use crate::idx::ft::Bm25Params;
use crate::kvs::Transaction;
use roaring::RoaringTreemap;
use std::sync::Arc;
use tokio::sync::RwLock;
@ -13,7 +12,7 @@ pub(super) type Score = f32;
pub(crate) struct BM25Scorer {
postings: Arc<RwLock<Postings>>,
terms_docs: Arc<Vec<Option<(TermId, RoaringTreemap)>>>,
terms_docs: TermsDocs,
doc_lengths: Arc<RwLock<DocLengths>>,
average_doc_length: f32,
doc_count: f32,
@ -23,7 +22,7 @@ pub(crate) struct BM25Scorer {
impl BM25Scorer {
pub(super) fn new(
postings: Arc<RwLock<Postings>>,
terms_docs: Arc<Vec<Option<(TermId, RoaringTreemap)>>>,
terms_docs: TermsDocs,
doc_lengths: Arc<RwLock<DocLengths>>,
total_docs_length: u128,
doc_count: u64,

View file

@ -6,25 +6,28 @@ use crate::idx::ft::scorer::BM25Scorer;
use crate::idx::ft::termdocs::TermsDocs;
use crate::idx::ft::terms::TermId;
use crate::idx::ft::{FtIndex, MatchRef};
use crate::idx::planner::iterators::{
MatchesThingIterator, NonUniqueEqualThingIterator, ThingIterator, UniqueEqualThingIterator,
};
use crate::idx::planner::plan::IndexOption;
use crate::idx::planner::tree::IndexMap;
use crate::idx::IndexKeyBase;
use crate::kvs;
use crate::kvs::Key;
use crate::sql::index::Index;
use crate::sql::{Expression, Table, Thing, Value};
use roaring::RoaringTreemap;
use crate::sql::{Expression, Operator, Table, Thing, Value};
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::RwLock;
pub(crate) type IteratorRef = u16;
pub(crate) struct QueryExecutor {
table: String,
pre_match_expression: Option<Expression>,
pre_match_entry: Option<FtEntry>,
ft_map: HashMap<String, FtIndex>,
mr_entries: HashMap<MatchRef, FtEntry>,
exp_entries: HashMap<Expression, FtEntry>,
iterators: Vec<Expression>,
}
impl QueryExecutor {
@ -33,7 +36,6 @@ impl QueryExecutor {
txn: &Transaction,
table: &Table,
index_map: IndexMap,
pre_match_expression: Option<Expression>,
) -> Result<Self, Error> {
let mut run = txn.lock().await;
@ -82,25 +84,27 @@ impl QueryExecutor {
}
}
let mut pre_match_entry = None;
if let Some(exp) = &pre_match_expression {
pre_match_entry = exp_entries.get(exp).cloned();
}
Ok(Self {
table: table.0.clone(),
pre_match_expression,
pre_match_entry,
ft_map,
mr_entries,
exp_entries,
iterators: Vec::new(),
})
}
pub(super) fn pre_match_terms_docs(&self) -> Option<TermsDocs> {
if let Some(entry) = &self.pre_match_entry {
return Some(entry.0.terms_docs.clone());
}
None
pub(super) fn add_iterator(&mut self, exp: Expression) -> IteratorRef {
let ir = self.iterators.len();
self.iterators.push(exp);
ir as IteratorRef
}
pub(crate) fn is_distinct(&self, ir: IteratorRef) -> bool {
(ir as usize) < self.iterators.len()
}
pub(crate) fn get_iterator_expression(&self, ir: IteratorRef) -> Option<&Expression> {
self.iterators.get(ir as usize)
}
fn get_match_ref(match_ref: &Value) -> Option<MatchRef> {
@ -112,21 +116,68 @@ impl QueryExecutor {
}
}
pub(crate) async fn new_iterator(
&self,
opt: &Options,
ir: IteratorRef,
io: IndexOption,
) -> Result<ThingIterator, Error> {
match &io.ix().index {
Index::Idx => Self::new_index_iterator(opt, io),
Index::Uniq => Self::new_unique_index_iterator(opt, io),
Index::Search {
..
} => self.new_search_index_iterator(ir, io).await,
}
}
fn new_index_iterator(opt: &Options, io: IndexOption) -> Result<ThingIterator, Error> {
if io.op() == &Operator::Equal {
return Ok(ThingIterator::NonUniqueEqual(NonUniqueEqualThingIterator::new(
opt,
io.ix(),
io.value(),
)?));
}
Err(Error::BypassQueryPlanner)
}
fn new_unique_index_iterator(opt: &Options, io: IndexOption) -> Result<ThingIterator, Error> {
if io.op() == &Operator::Equal {
return Ok(ThingIterator::UniqueEqual(UniqueEqualThingIterator::new(
opt,
io.ix(),
io.value(),
)?));
}
Err(Error::BypassQueryPlanner)
}
async fn new_search_index_iterator(
&self,
ir: IteratorRef,
io: IndexOption,
) -> Result<ThingIterator, Error> {
if let Some(exp) = self.iterators.get(ir as usize) {
if let Operator::Matches(_) = io.op() {
let ixn = &io.ix().name.0;
if let Some(fti) = self.ft_map.get(ixn) {
if let Some(fte) = self.exp_entries.get(exp) {
let it = MatchesThingIterator::new(fti, fte.0.terms_docs.clone()).await?;
return Ok(ThingIterator::Matches(it));
}
}
}
}
Err(Error::BypassQueryPlanner)
}
pub(crate) async fn matches(
&self,
txn: &Transaction,
thg: &Thing,
exp: &Expression,
) -> Result<Value, Error> {
// If we find the expression in `pre_match_expression`,
// it means that we are using an Iterator::Index
// and we are iterating over document that already matches the expression.
if let Some(pme) = &self.pre_match_expression {
if pme.eq(exp) {
return Ok(Value::Bool(true));
}
}
// Otherwise, we look for the first possible index options, and evaluate the expression
// Does the record id match this executor's table?
if thg.tb.eq(&self.table) {
@ -244,7 +295,7 @@ struct Inner {
index_option: IndexOption,
doc_ids: Arc<RwLock<DocIds>>,
terms: Vec<Option<TermId>>,
terms_docs: Arc<Vec<Option<(TermId, RoaringTreemap)>>>,
terms_docs: TermsDocs,
scorer: Option<BM25Scorer>,
}

View file

@ -0,0 +1,126 @@
use crate::dbs::{Options, Transaction};
use crate::err::Error;
use crate::idx::ft::docids::{DocId, NO_DOC_ID};
use crate::idx::ft::termdocs::TermsDocs;
use crate::idx::ft::{FtIndex, HitsIterator};
use crate::key;
use crate::kvs::Key;
use crate::sql::statements::DefineIndexStatement;
use crate::sql::{Array, Thing, Value};
pub(crate) enum ThingIterator {
NonUniqueEqual(NonUniqueEqualThingIterator),
UniqueEqual(UniqueEqualThingIterator),
Matches(MatchesThingIterator),
}
impl ThingIterator {
pub(crate) async fn next_batch(
&mut self,
tx: &Transaction,
size: u32,
) -> Result<Vec<(Thing, DocId)>, Error> {
match self {
ThingIterator::NonUniqueEqual(i) => i.next_batch(tx, size).await,
ThingIterator::UniqueEqual(i) => i.next_batch(tx, size).await,
ThingIterator::Matches(i) => i.next_batch(tx, size).await,
}
}
}
pub(crate) struct NonUniqueEqualThingIterator {
beg: Vec<u8>,
end: Vec<u8>,
}
impl NonUniqueEqualThingIterator {
pub(super) fn new(
opt: &Options,
ix: &DefineIndexStatement,
v: &Value,
) -> Result<NonUniqueEqualThingIterator, Error> {
let v = Array::from(v.clone());
let (beg, end) =
key::index::Index::range_all_ids(opt.ns(), opt.db(), &ix.what, &ix.name, &v);
Ok(Self {
beg,
end,
})
}
async fn next_batch(
&mut self,
txn: &Transaction,
limit: u32,
) -> Result<Vec<(Thing, DocId)>, Error> {
let min = self.beg.clone();
let max = self.end.clone();
let res = txn.lock().await.scan(min..max, limit).await?;
if let Some((key, _)) = res.last() {
self.beg = key.clone();
self.beg.push(0x00);
}
let res = res.iter().map(|(_, val)| (val.into(), NO_DOC_ID)).collect();
Ok(res)
}
}
pub(crate) struct UniqueEqualThingIterator {
key: Option<Key>,
}
impl UniqueEqualThingIterator {
pub(super) fn new(opt: &Options, ix: &DefineIndexStatement, v: &Value) -> Result<Self, Error> {
let v = Array::from(v.clone());
let key = key::index::Index::new(opt.ns(), opt.db(), &ix.what, &ix.name, v, None).into();
Ok(Self {
key: Some(key),
})
}
async fn next_batch(
&mut self,
txn: &Transaction,
_limit: u32,
) -> Result<Vec<(Thing, DocId)>, Error> {
if let Some(key) = self.key.take() {
if let Some(val) = txn.lock().await.get(key).await? {
return Ok(vec![(val.into(), NO_DOC_ID)]);
}
}
Ok(vec![])
}
}
pub(crate) struct MatchesThingIterator {
hits: Option<HitsIterator>,
}
impl MatchesThingIterator {
pub(super) async fn new(fti: &FtIndex, terms_docs: TermsDocs) -> Result<Self, Error> {
let hits = fti.new_hits_iterator(terms_docs)?;
Ok(Self {
hits,
})
}
async fn next_batch(
&mut self,
txn: &Transaction,
mut limit: u32,
) -> Result<Vec<(Thing, DocId)>, Error> {
let mut res = vec![];
if let Some(hits) = &mut self.hits {
let mut run = txn.lock().await;
while limit > 0 {
if let Some(hit) = hits.next(&mut run).await? {
res.push(hit);
} else {
break;
}
limit -= 1;
}
}
Ok(res)
}
}

View file

@ -1,14 +1,15 @@
pub(crate) mod executor;
pub(crate) mod iterators;
pub(crate) mod plan;
mod tree;
use crate::ctx::Context;
use crate::dbs::{Iterable, Options, Transaction};
use crate::dbs::{Iterable, Iterator, Options, Transaction};
use crate::err::Error;
use crate::idx::planner::executor::QueryExecutor;
use crate::idx::planner::plan::{Plan, PlanBuilder};
use crate::idx::planner::tree::{Node, Tree};
use crate::sql::{Cond, Operator, Table};
use crate::idx::planner::tree::Tree;
use crate::sql::{Cond, Table};
use std::collections::HashMap;
pub(crate) struct QueryPlanner<'a> {
@ -16,6 +17,7 @@ pub(crate) struct QueryPlanner<'a> {
cond: &'a Option<Cond>,
/// There is one executor per table
executors: HashMap<String, QueryExecutor>,
requires_distinct: bool,
}
impl<'a> QueryPlanner<'a> {
@ -24,83 +26,57 @@ impl<'a> QueryPlanner<'a> {
opt,
cond,
executors: HashMap::default(),
requires_distinct: false,
}
}
pub(crate) async fn get_iterable(
pub(crate) async fn add_iterables(
&mut self,
ctx: &Context<'_>,
txn: &Transaction,
t: Table,
) -> Result<Iterable, Error> {
it: &mut Iterator,
) -> Result<(), Error> {
let res = Tree::build(ctx, self.opt, txn, &t, self.cond).await?;
if let Some((node, im)) = res {
if let Some(plan) = AllAndStrategy::build(&node)? {
let e = QueryExecutor::new(self.opt, txn, &t, im, Some(plan.e.clone())).await?;
self.executors.insert(t.0.clone(), e);
return Ok(Iterable::Index(t, plan));
let mut exe = QueryExecutor::new(self.opt, txn, &t, im).await?;
let ok = match PlanBuilder::build(node) {
Ok(plan) => match plan {
Plan::SingleIndex(exp, io) => {
let ir = exe.add_iterator(exp);
it.ingest(Iterable::Index(t.clone(), ir, io));
true
}
Plan::MultiIndex(v) => {
for (exp, io) in v {
let ir = exe.add_iterator(exp);
it.ingest(Iterable::Index(t.clone(), ir, io));
self.requires_distinct = true;
}
true
}
},
Err(Error::BypassQueryPlanner) => false,
Err(e) => return Err(e),
};
self.executors.insert(t.0.clone(), exe);
if ok {
return Ok(());
}
let e = QueryExecutor::new(self.opt, txn, &t, im, None).await?;
self.executors.insert(t.0.clone(), e);
}
Ok(Iterable::Table(t))
}
pub(crate) fn finish(self) -> Option<HashMap<String, QueryExecutor>> {
if self.executors.is_empty() {
None
} else {
Some(self.executors)
}
}
}
struct AllAndStrategy {
b: PlanBuilder,
}
/// Successful if every boolean operators are AND
/// and there is at least one condition covered by an index
impl AllAndStrategy {
fn build(node: &Node) -> Result<Option<Plan>, Error> {
let mut s = AllAndStrategy {
b: PlanBuilder::default(),
};
match s.eval_node(node) {
Ok(_) => match s.b.build() {
Ok(p) => Ok(Some(p)),
Err(Error::BypassQueryPlanner) => Ok(None),
Err(e) => Err(e),
},
Err(Error::BypassQueryPlanner) => Ok(None),
Err(e) => Err(e),
}
}
fn eval_node(&mut self, node: &Node) -> Result<(), Error> {
match node {
Node::Expression {
io: index_option,
left,
right,
exp: expression,
} => {
if let Some(io) = index_option {
self.b.add_index_option(expression.clone(), io.clone());
}
self.eval_expression(left, right, expression.operator())
}
Node::Unsupported => Err(Error::BypassQueryPlanner),
_ => Ok(()),
}
}
fn eval_expression(&mut self, left: &Node, right: &Node, op: &Operator) -> Result<(), Error> {
if op.eq(&Operator::Or) {
return Err(Error::BypassQueryPlanner);
}
self.eval_node(left)?;
self.eval_node(right)?;
it.ingest(Iterable::Table(t));
Ok(())
}
pub(crate) fn has_executors(&self) -> bool {
!self.executors.is_empty()
}
pub(crate) fn get_query_executor(&self, tb: &str) -> Option<&QueryExecutor> {
self.executors.get(tb)
}
pub(crate) fn requires_distinct(&self) -> bool {
self.requires_distinct
}
}

View file

@ -1,74 +1,100 @@
use crate::dbs::{Options, Transaction};
use crate::err::Error;
use crate::idx::btree::store::BTreeStoreType;
use crate::idx::ft::docids::{DocId, NO_DOC_ID};
use crate::idx::ft::termdocs::TermsDocs;
use crate::idx::ft::{FtIndex, HitsIterator, MatchRef};
use crate::idx::planner::executor::QueryExecutor;
use crate::idx::IndexKeyBase;
use crate::key;
use crate::kvs::Key;
use crate::sql::index::Index;
use crate::sql::scoring::Scoring;
use crate::idx::ft::MatchRef;
use crate::idx::planner::tree::Node;
use crate::sql::statements::DefineIndexStatement;
use crate::sql::{Array, Expression, Ident, Idiom, Object, Operator, Thing, Value};
use crate::sql::Object;
use crate::sql::{Expression, Idiom, Operator, Value};
use std::collections::HashMap;
use std::hash::Hash;
use std::sync::Arc;
#[derive(Default)]
pub(super) struct PlanBuilder {
indexes: Vec<(Expression, IndexOption)>,
all_and: bool,
all_exp_with_index: bool,
}
impl PlanBuilder {
pub(super) fn add_index_option(&mut self, e: Expression, i: IndexOption) {
pub(super) fn build(root: Node) -> Result<Plan, Error> {
let mut b = PlanBuilder {
indexes: Vec::new(),
all_and: true,
all_exp_with_index: true,
};
// Browse the AST and collect information
b.eval_node(root)?;
// If we didn't found any index, we're done with no index plan
if b.indexes.is_empty() {
return Err(Error::BypassQueryPlanner);
}
// If every boolean operator are AND then we can use the single index plan
if b.all_and {
if let Some((e, i)) = b.indexes.pop() {
return Ok(Plan::SingleIndex(e, i));
}
}
// If every expression is backed by an index with can use the MultiIndex plan
if b.all_exp_with_index {
return Ok(Plan::MultiIndex(b.indexes));
}
Err(Error::BypassQueryPlanner)
}
fn eval_node(&mut self, node: Node) -> Result<(), Error> {
match node {
Node::Expression {
io,
left,
right,
exp,
} => {
if self.all_and && Operator::Or.eq(exp.operator()) {
self.all_and = false;
}
let is_bool = self.check_boolean_operator(exp.operator());
if let Some(io) = io {
self.add_index_option(exp, io);
} else if self.all_exp_with_index && !is_bool {
self.all_exp_with_index = false;
}
self.eval_expression(*left, *right)
}
Node::Unsupported => Err(Error::BypassQueryPlanner),
_ => Ok(()),
}
}
fn check_boolean_operator(&mut self, op: &Operator) -> bool {
match op {
Operator::Neg | Operator::Or => {
if self.all_and {
self.all_and = false;
}
true
}
Operator::And => true,
_ => false,
}
}
fn eval_expression(&mut self, left: Node, right: Node) -> Result<(), Error> {
self.eval_node(left)?;
self.eval_node(right)?;
Ok(())
}
fn add_index_option(&mut self, e: Expression, i: IndexOption) {
self.indexes.push((e, i));
}
pub(super) fn build(mut self) -> Result<Plan, Error> {
// TODO select the best option if there are several (cost based)
if let Some((e, i)) = self.indexes.pop() {
Ok(Plan::new(e, i))
} else {
Err(Error::BypassQueryPlanner)
}
}
}
pub(crate) struct Plan {
pub(super) e: Expression,
pub(super) i: IndexOption,
}
impl Plan {
pub(super) fn new(e: Expression, i: IndexOption) -> Self {
Self {
e,
i,
}
}
pub(crate) async fn new_iterator(
&self,
opt: &Options,
txn: &Transaction,
exe: &QueryExecutor,
) -> Result<ThingIterator, Error> {
self.i.new_iterator(opt, txn, exe).await
}
pub(crate) fn explain(&self) -> Value {
Value::Object(Object::from(HashMap::from([
("index", Value::from(self.i.ix().name.0.to_owned())),
("operator", Value::from(self.i.op().to_string())),
("value", self.i.value().clone()),
])))
}
pub(super) enum Plan {
SingleIndex(Expression, IndexOption),
MultiIndex(Vec<(Expression, IndexOption)>),
}
#[derive(Debug, Clone, Eq, PartialEq, Hash)]
pub(super) struct IndexOption(Arc<Inner>);
pub(crate) struct IndexOption(Arc<Inner>);
#[derive(Debug, Eq, PartialEq, Hash)]
pub(super) struct Inner {
@ -123,193 +149,12 @@ impl IndexOption {
self.0.mr.as_ref()
}
async fn new_iterator(
&self,
opt: &Options,
txn: &Transaction,
exe: &QueryExecutor,
) -> Result<ThingIterator, Error> {
match &self.ix().index {
Index::Idx => {
if self.op() == &Operator::Equal {
return Ok(ThingIterator::NonUniqueEqual(NonUniqueEqualThingIterator::new(
opt,
self.ix(),
self.value(),
)?));
}
}
Index::Uniq => {
if self.op() == &Operator::Equal {
return Ok(ThingIterator::UniqueEqual(UniqueEqualThingIterator::new(
opt,
self.ix(),
self.value(),
)?));
}
}
Index::Search {
az,
hl,
sc,
order,
} => {
if let Operator::Matches(_) = self.op() {
let td = exe.pre_match_terms_docs();
return Ok(ThingIterator::Matches(
MatchesThingIterator::new(opt, txn, self.ix(), az, *hl, sc, *order, td)
.await?,
));
}
}
}
Err(Error::BypassQueryPlanner)
}
}
pub(crate) enum ThingIterator {
NonUniqueEqual(NonUniqueEqualThingIterator),
UniqueEqual(UniqueEqualThingIterator),
Matches(MatchesThingIterator),
}
impl ThingIterator {
pub(crate) async fn next_batch(
&mut self,
tx: &Transaction,
size: u32,
) -> Result<Vec<(Thing, DocId)>, Error> {
match self {
ThingIterator::NonUniqueEqual(i) => i.next_batch(tx, size).await,
ThingIterator::UniqueEqual(i) => i.next_batch(tx, size).await,
ThingIterator::Matches(i) => i.next_batch(tx, size).await,
}
}
}
pub(crate) struct NonUniqueEqualThingIterator {
beg: Vec<u8>,
end: Vec<u8>,
}
impl NonUniqueEqualThingIterator {
fn new(
opt: &Options,
ix: &DefineIndexStatement,
v: &Value,
) -> Result<NonUniqueEqualThingIterator, Error> {
let v = Array::from(v.clone());
let (beg, end) =
key::index::Index::range_all_ids(opt.ns(), opt.db(), &ix.what, &ix.name, &v);
Ok(Self {
beg,
end,
})
}
async fn next_batch(
&mut self,
txn: &Transaction,
limit: u32,
) -> Result<Vec<(Thing, DocId)>, Error> {
let min = self.beg.clone();
let max = self.end.clone();
let res = txn.lock().await.scan(min..max, limit).await?;
if let Some((key, _)) = res.last() {
self.beg = key.clone();
self.beg.push(0x00);
}
let res = res.iter().map(|(_, val)| (val.into(), NO_DOC_ID)).collect();
Ok(res)
}
}
pub(crate) struct UniqueEqualThingIterator {
key: Option<Key>,
}
impl UniqueEqualThingIterator {
fn new(opt: &Options, ix: &DefineIndexStatement, v: &Value) -> Result<Self, Error> {
let v = Array::from(v.clone());
let key = key::index::Index::new(opt.ns(), opt.db(), &ix.what, &ix.name, v, None).into();
Ok(Self {
key: Some(key),
})
}
async fn next_batch(
&mut self,
txn: &Transaction,
_limit: u32,
) -> Result<Vec<(Thing, DocId)>, Error> {
if let Some(key) = self.key.take() {
if let Some(val) = txn.lock().await.get(key).await? {
return Ok(vec![(val.into(), NO_DOC_ID)]);
}
}
Ok(vec![])
}
}
pub(crate) struct MatchesThingIterator {
hits: Option<HitsIterator>,
}
impl MatchesThingIterator {
#[allow(clippy::too_many_arguments)]
async fn new(
opt: &Options,
txn: &Transaction,
ix: &DefineIndexStatement,
az: &Ident,
hl: bool,
sc: &Scoring,
order: u32,
terms_docs: Option<TermsDocs>,
) -> Result<Self, Error> {
let ikb = IndexKeyBase::new(opt, ix);
if let Scoring::Bm {
..
} = sc
{
let mut run = txn.lock().await;
let az = run.get_az(opt.ns(), opt.db(), az.as_str()).await?;
let fti = FtIndex::new(&mut run, az, ikb, order, sc, hl, BTreeStoreType::Read).await?;
if let Some(terms_docs) = terms_docs {
let hits = fti.new_hits_iterator(terms_docs)?;
Ok(Self {
hits,
})
} else {
Ok(Self {
hits: None,
})
}
} else {
Err(Error::FeatureNotYetImplemented {
feature: "Vector Search",
})
}
}
async fn next_batch(
&mut self,
txn: &Transaction,
mut limit: u32,
) -> Result<Vec<(Thing, DocId)>, Error> {
let mut res = vec![];
if let Some(hits) = &mut self.hits {
let mut run = txn.lock().await;
while limit > 0 {
if let Some(hit) = hits.next(&mut run).await? {
res.push(hit);
} else {
break;
}
limit -= 1;
}
}
Ok(res)
pub(crate) fn explain(&self) -> Value {
Value::Object(Object::from(HashMap::from([
("index", Value::from(self.ix().name.0.to_owned())),
("operator", Value::from(self.op().to_string())),
("value", self.value().clone()),
])))
}
}

View file

@ -84,7 +84,7 @@ impl Fields {
if let Some(doc) = doc {
self.compute_value(ctx, opt, txn, doc, group).await
} else {
let doc = CursorDoc::new(None, None, &Value::None);
let doc = (&Value::None).into();
self.compute_value(ctx, opt, txn, &doc, group).await
}
}

View file

@ -97,7 +97,7 @@ impl SelectStatement {
let v = w.compute(ctx, opt, txn, doc).await?;
match v {
Value::Table(t) => {
i.ingest(planner.get_iterable(ctx, txn, t).await?);
planner.add_iterables(ctx, txn, t, &mut i).await?;
}
Value::Thing(v) => i.ingest(Iterable::Thing(v)),
Value::Range(v) => i.ingest(Iterable::Range(*v)),
@ -128,9 +128,9 @@ impl SelectStatement {
// Assign the statement
let stm = Statement::from(self);
// Add query executors if any
if let Some(ex) = planner.finish() {
if planner.has_executors() {
let mut ctx = Context::new(ctx);
ctx.set_query_executors(ex);
ctx.set_query_planner(&planner);
// Output the results
i.output(&ctx, opt, txn, &stm).await
} else {

View file

@ -1,6 +1,5 @@
use crate::ctx::Context;
use crate::dbs::{Options, Transaction};
use crate::doc::CursorDoc;
use crate::err::Error;
use crate::exe::try_join_all_buffered;
use crate::sql::array::Abolish;
@ -119,7 +118,7 @@ impl Value {
// iterate in reverse, and call swap_remove
let mut m = HashSet::new();
for (i, v) in v.iter().enumerate() {
let cur = CursorDoc::new(None, None, v);
let cur = v.into();
if w.compute(ctx, opt, txn, Some(&cur)).await?.is_truthy() {
m.insert(i);
};
@ -133,7 +132,7 @@ impl Value {
let mut p = Vec::new();
// Store the elements and positions to update
for (i, o) in v.iter_mut().enumerate() {
let cur = CursorDoc::new(None, None, o);
let cur = o.into();
if w.compute(ctx, opt, txn, Some(&cur)).await?.is_truthy() {
a.push(o.clone());
p.push(i);
@ -157,7 +156,7 @@ impl Value {
_ => {
let path = path.next();
for v in v.iter_mut() {
let cur = CursorDoc::new(None, None, v);
let cur = v.into();
if w.compute(ctx, opt, txn, Some(&cur)).await?.is_truthy() {
v.del(ctx, opt, txn, path).await?;
}

View file

@ -1,6 +1,5 @@
use crate::ctx::Context;
use crate::dbs::{Options, Transaction};
use crate::doc::CursorDoc;
use crate::err::Error;
use crate::sql::edges::Edges;
use crate::sql::field::{Field, Fields};
@ -64,7 +63,7 @@ impl Value {
Part::Where(w) => {
let path = path.next();
for v in v.iter_mut() {
let cur = CursorDoc::new(None, None, v);
let cur = v.into();
if w.compute(ctx, opt, txn, Some(&cur)).await?.is_truthy() {
v.fetch(ctx, opt, txn, path).await?;
}

View file

@ -123,8 +123,8 @@ impl Value {
Part::Where(w) => {
let mut a = Vec::new();
for v in v.iter() {
let cur = Some(CursorDoc::new(None, None, v));
if w.compute(ctx, opt, txn, cur.as_ref()).await?.is_truthy() {
let cur = v.into();
if w.compute(ctx, opt, txn, Some(&cur)).await?.is_truthy() {
a.push(v.clone());
}
}
@ -382,7 +382,7 @@ mod tests {
let doc = Value::parse("{ name: 'Tobie', something: [{ age: 34 }, { age: 36 }] }");
let idi = Idiom::parse("test.something[WHERE age > 35]");
let val = Value::parse("{ test: <future> { { something: something } } }");
let cur = CursorDoc::new(None, None, &doc);
let cur = (&doc).into();
let res = val.get(&ctx, &opt, &txn, Some(&cur), &idi).await.unwrap();
assert_eq!(
res,

View file

@ -1,6 +1,5 @@
use crate::ctx::Context;
use crate::dbs::{Options, Transaction};
use crate::doc::CursorDoc;
use crate::err::Error;
use crate::exe::try_join_all_buffered;
use crate::sql::part::Next;
@ -92,7 +91,7 @@ impl Value {
let mut p = Vec::new();
// Store the elements and positions to update
for (i, o) in v.iter_mut().enumerate() {
let cur = CursorDoc::new(None, None, o);
let cur = o.into();
if w.compute(ctx, opt, txn, Some(&cur)).await?.is_truthy() {
a.push(o.clone());
p.push(i);
@ -111,7 +110,7 @@ impl Value {
_ => {
let path = path.next();
for v in v.iter_mut() {
let cur = CursorDoc::new(None, None, v);
let cur = v.into();
if w.compute(ctx, opt, txn, Some(&cur)).await?.is_truthy() {
v.set(ctx, opt, txn, path, val.clone()).await?;
}

View file

@ -242,15 +242,19 @@ async fn select_where_matches_using_index_and_score() -> Result<(), Error> {
#[tokio::test]
async fn select_where_matches_without_using_index_and_score() -> Result<(), Error> {
let sql = r"
CREATE blog:1 SET title = 'the quick brown fox jumped over the lazy dog';
CREATE blog:2 SET title = 'the fast fox jumped over the lazy dog';
CREATE blog:3 SET title = 'the other animals sat there watching';
CREATE blog:4 SET title = 'the dog sat there and did nothing';
CREATE blog:1 SET title = 'the quick brown fox jumped over the lazy dog', label = 'test';
CREATE blog:2 SET title = 'the fast fox jumped over the lazy dog', label = 'test';
CREATE blog:3 SET title = 'the other animals sat there watching', label = 'test';
CREATE blog:4 SET title = 'the dog sat there and did nothing', label = 'test';
DEFINE ANALYZER simple TOKENIZERS blank,class;
DEFINE INDEX blog_title ON blog FIELDS title SEARCH ANALYZER simple BM25 HIGHLIGHTS;
LET $keywords = 'animals';
SELECT id,search::score(1) AS score FROM blog WHERE (title @1@ $keywords AND id>0) OR (title @1@ $keywords AND id<99);
SELECT id,search::score(1) + search::score(2) AS score FROM blog WHERE title @1@ 'dummy1' OR title @2@ 'dummy2';
SELECT id,search::score(1) AS score FROM blog
WHERE (title @1@ $keywords AND label = 'test')
OR (title @1@ $keywords AND label = 'test');
SELECT id,search::score(1) + search::score(2) AS score FROM blog
WHERE (title @1@ 'dummy1' AND label = 'test')
OR (title @2@ 'dummy2' AND label = 'test');
";
let dbs = Datastore::new("memory").await?;
let ses = Session::for_kv().with_ns("test").with_db("test");

109
lib/tests/planner.rs Normal file
View file

@ -0,0 +1,109 @@
mod parse;
use parse::Parse;
use surrealdb::dbs::Session;
use surrealdb::err::Error;
use surrealdb::kvs::Datastore;
use surrealdb::sql::Value;
async fn test_select_where_iterate_multi_index(parallel: bool) -> Result<(), Error> {
let parallel = if parallel {
"PARALLEL"
} else {
""
};
let sql = format!(
"
CREATE person:tobie SET name = 'Tobie', genre='m', company='SurrealDB';
CREATE person:jaime SET name = 'Jaime', genre='m', company='SurrealDB';
CREATE person:lizzie SET name = 'Lizzie', genre='f', company='SurrealDB';
CREATE person:neytiry SET name = 'Neytiri', genre='f', company='Metkayina';
DEFINE ANALYZER simple TOKENIZERS blank,class FILTERS lowercase;
DEFINE INDEX ft_company ON person FIELDS company SEARCH ANALYZER simple BM25;
DEFINE INDEX uniq_name ON TABLE person COLUMNS name UNIQUE;
DEFINE INDEX idx_genre ON TABLE person COLUMNS genre;
SELECT name FROM person WHERE name = 'Jaime' OR genre = 'm' OR company @@ 'surrealdb' {parallel};
SELECT name FROM person WHERE name = 'Jaime' OR genre = 'm' OR company @@ 'surrealdb' {parallel} EXPLAIN FULL;"
);
let dbs = Datastore::new("memory").await?;
let ses = Session::for_kv().with_ns("test").with_db("test");
let res = &mut dbs.execute(&sql, &ses, None).await?;
assert_eq!(res.len(), 10);
//
for _ in 0..8 {
let _ = res.remove(0).result?;
}
//
let tmp = res.remove(0).result?;
let val = Value::parse(
"[
{
name: 'Jaime'
},
{
name: 'Tobie'
},
{
name: 'Lizzie'
}
]",
);
assert_eq!(format!("{:#}", tmp), format!("{:#}", val));
//
let tmp = res.remove(0).result?;
let val = Value::parse(
"[
{
detail: {
plan: {
index: 'uniq_name',
operator: '=',
value: 'Jaime'
},
table: 'person',
},
operation: 'Iterate Index'
},
{
detail: {
plan: {
index: 'idx_genre',
operator: '=',
value: 'm'
},
table: 'person',
},
operation: 'Iterate Index'
},
{
detail: {
plan: {
index: 'ft_company',
operator: '@@',
value: 'surrealdb'
},
table: 'person',
},
operation: 'Iterate Index'
},
{
detail: {
count: 3
},
operation: 'Fetch'
}
]",
);
assert_eq!(format!("{:#}", tmp), format!("{:#}", val));
Ok(())
}
#[tokio::test]
async fn select_where_iterate_multi_index() -> Result<(), Error> {
test_select_where_iterate_multi_index(false).await
}
#[tokio::test]
async fn select_where_iterate_multi_index_parallel() -> Result<(), Error> {
test_select_where_iterate_multi_index(true).await
}