Query planner strategy for record links (#3668)

This commit is contained in:
Emmanuel Keller 2024-04-16 19:05:09 +01:00 committed by GitHub
parent 1007a30ea4
commit a82c0ec0ad
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
16 changed files with 1233 additions and 342 deletions

View file

@ -19,19 +19,18 @@ use crate::sql::thing::Thing;
use crate::sql::value::Value; use crate::sql::value::Value;
use async_recursion::async_recursion; use async_recursion::async_recursion;
use std::mem; use std::mem;
use std::sync::Arc;
#[derive(Clone)] #[derive(Clone)]
pub(crate) enum Iterable { pub(crate) enum Iterable {
Value(Value), Value(Value),
Table(Arc<Table>), Table(Table),
Thing(Thing), Thing(Thing),
Range(Range), Range(Range),
Edges(Edges), Edges(Edges),
Defer(Thing), Defer(Thing),
Mergeable(Thing, Value), Mergeable(Thing, Value),
Relatable(Thing, Thing, Thing), Relatable(Thing, Thing, Thing),
Index(Arc<Table>, IteratorRef), Index(Table, IteratorRef),
} }
pub(crate) struct Processed { pub(crate) struct Processed {
@ -118,7 +117,7 @@ impl Iterator {
} }
_ => { _ => {
// Ingest the table for scanning // Ingest the table for scanning
self.ingest(Iterable::Table(Arc::new(v))) self.ingest(Iterable::Table(v))
} }
}, },
// There is no data clause so create a record id // There is no data clause so create a record id
@ -129,7 +128,7 @@ impl Iterator {
} }
_ => { _ => {
// Ingest the table for scanning // Ingest the table for scanning
self.ingest(Iterable::Table(Arc::new(v))) self.ingest(Iterable::Table(v))
} }
}, },
}, },

View file

@ -127,10 +127,10 @@ impl<'a> Processor<'a> {
// Avoiding search in the hashmap of the query planner for each doc // Avoiding search in the hashmap of the query planner for each doc
let mut ctx = Context::new(ctx); let mut ctx = Context::new(ctx);
ctx.set_query_executor(exe.clone()); ctx.set_query_executor(exe.clone());
return self.process_table(&ctx, opt, txn, stm, v.as_ref()).await; return self.process_table(&ctx, opt, txn, stm, &v).await;
} }
} }
self.process_table(ctx, opt, txn, stm, v.as_ref()).await? self.process_table(ctx, opt, txn, stm, &v).await?
} }
Iterable::Range(v) => self.process_range(ctx, opt, txn, stm, v).await?, Iterable::Range(v) => self.process_range(ctx, opt, txn, stm, v).await?,
Iterable::Edges(e) => self.process_edge(ctx, opt, txn, stm, e).await?, Iterable::Edges(e) => self.process_edge(ctx, opt, txn, stm, e).await?,
@ -141,10 +141,10 @@ impl<'a> Processor<'a> {
// Avoiding search in the hashmap of the query planner for each doc // Avoiding search in the hashmap of the query planner for each doc
let mut ctx = Context::new(ctx); let mut ctx = Context::new(ctx);
ctx.set_query_executor(exe.clone()); ctx.set_query_executor(exe.clone());
return self.process_index(&ctx, opt, txn, stm, t.as_ref(), ir).await; return self.process_index(&ctx, opt, txn, stm, &t, ir).await;
} }
} }
self.process_index(ctx, opt, txn, stm, t.as_ref(), ir).await? self.process_index(ctx, opt, txn, stm, &t, ir).await?
} }
Iterable::Mergeable(v, o) => { Iterable::Mergeable(v, o) => {
self.process_mergeable(ctx, opt, txn, stm, v, o).await? self.process_mergeable(ctx, opt, txn, stm, v, o).await?
@ -563,7 +563,8 @@ impl<'a> Processor<'a> {
txn.lock().await.check_ns_db_tb(opt.ns(), opt.db(), &table.0, opt.strict).await?; txn.lock().await.check_ns_db_tb(opt.ns(), opt.db(), &table.0, opt.strict).await?;
if let Some(exe) = ctx.get_query_executor() { if let Some(exe) = ctx.get_query_executor() {
if let Some(mut iterator) = exe.new_iterator(opt, ir).await? { if let Some(mut iterator) = exe.new_iterator(opt, ir).await? {
let mut things = iterator.next_batch(txn, PROCESSOR_BATCH_SIZE).await?; let mut things = Vec::new();
iterator.next_batch(txn, PROCESSOR_BATCH_SIZE, &mut things).await?;
while !things.is_empty() { while !things.is_empty() {
// Check if the context is finished // Check if the context is finished
if ctx.is_done() { if ctx.is_done() {
@ -601,7 +602,8 @@ impl<'a> Processor<'a> {
} }
// Collect the next batch of ids // Collect the next batch of ids
things = iterator.next_batch(txn, PROCESSOR_BATCH_SIZE).await?; things = Vec::new();
iterator.next_batch(txn, PROCESSOR_BATCH_SIZE, &mut things).await?;
} }
// Everything ok // Everything ok
return Ok(()); return Ok(());

View file

@ -212,15 +212,19 @@ fn get_executor_option<'a>(
pub(crate) async fn matches( pub(crate) async fn matches(
ctx: &Context<'_>, ctx: &Context<'_>,
opt: &Options,
txn: &Transaction, txn: &Transaction,
doc: Option<&CursorDoc<'_>>, doc: Option<&CursorDoc<'_>>,
exp: &Expression, exp: &Expression,
l: Value,
r: Value,
) -> Result<Value, Error> { ) -> Result<Value, Error> {
match get_executor_option(ctx, doc, exp) { let res = match get_executor_option(ctx, doc, exp) {
ExecutorOption::PreMatch => Ok(Value::Bool(true)), ExecutorOption::PreMatch => true,
ExecutorOption::None => Ok(Value::Bool(false)), ExecutorOption::None => false,
ExecutorOption::Execute(exe, thg) => exe.matches(txn, thg, exp).await, ExecutorOption::Execute(exe, thg) => exe.matches(ctx, opt, txn, thg, exp, l, r).await?,
} };
Ok(res.into())
} }
pub(crate) async fn knn( pub(crate) async fn knn(

View file

@ -11,7 +11,6 @@ pub(super) enum FilteringStage {
Indexing, Indexing,
Querying, Querying,
} }
pub(super) enum Filter { pub(super) enum Filter {
Stemmer(Stemmer), Stemmer(Stemmer),
Ascii, Ascii,

View file

@ -6,7 +6,7 @@ use crate::idx::ft::analyzer::tokenizer::{Tokenizer, Tokens};
use crate::idx::ft::doclength::DocLength; use crate::idx::ft::doclength::DocLength;
use crate::idx::ft::offsets::{Offset, OffsetRecords}; use crate::idx::ft::offsets::{Offset, OffsetRecords};
use crate::idx::ft::postings::TermFrequency; use crate::idx::ft::postings::TermFrequency;
use crate::idx::ft::terms::{TermId, Terms}; use crate::idx::ft::terms::{TermId, TermLen, Terms};
use crate::sql::statements::DefineAnalyzerStatement; use crate::sql::statements::DefineAnalyzerStatement;
use crate::sql::tokenizer::Tokenizer as SqlTokenizer; use crate::sql::tokenizer::Tokenizer as SqlTokenizer;
use crate::sql::Value; use crate::sql::Value;
@ -34,31 +34,96 @@ impl From<DefineAnalyzerStatement> for Analyzer {
} }
} }
} }
pub(in crate::idx) type TermsList = Vec<Option<(TermId, TermLen)>>;
pub(in crate::idx) struct TermsSet {
set: HashSet<TermId>,
has_unknown_terms: bool,
}
impl TermsSet {
/// If the query TermsSet contains terms that are unknown in the index
/// of if there is no terms in the set then
/// we are sure that it does not match any document
pub(in crate::idx) fn is_matchable(&self) -> bool {
!(self.has_unknown_terms || self.set.is_empty())
}
pub(in crate::idx) fn is_subset(&self, other: &TermsSet) -> bool {
if self.has_unknown_terms {
return false;
}
self.set.is_subset(&other.set)
}
}
impl Analyzer { impl Analyzer {
pub(super) async fn extract_terms( pub(super) async fn extract_querying_terms(
&self, &self,
ctx: &Context<'_>, ctx: &Context<'_>,
opt: &Options, opt: &Options,
txn: &Transaction, txn: &Transaction,
t: &Terms, t: &Terms,
query_string: String, content: String,
) -> Result<Vec<Option<(TermId, u32)>>, Error> { ) -> Result<(TermsList, TermsSet), Error> {
let tokens = let tokens = self.generate_tokens(ctx, opt, txn, FilteringStage::Querying, content).await?;
self.generate_tokens(ctx, opt, txn, FilteringStage::Querying, query_string).await?; // We extract the term ids
// We first collect every unique terms let mut list = Vec::with_capacity(tokens.list().len());
// as it can contains duplicates let mut unique_tokens = HashSet::new();
let mut terms = HashSet::new(); let mut set = HashSet::new();
for token in tokens.list() {
terms.insert(token);
}
// Now we can extract the term ids
let mut res = Vec::with_capacity(terms.len());
let mut tx = txn.lock().await; let mut tx = txn.lock().await;
for term in terms { let mut has_unknown_terms = false;
let opt_term_id = t.get_term_id(&mut tx, tokens.get_token_string(term)?).await?; for token in tokens.list() {
res.push(opt_term_id.map(|tid| (tid, term.get_char_len()))); // Tokens can contains duplicated, not need to evaluate them again
if unique_tokens.insert(token) {
// Is the term known in the index?
let opt_term_id = t.get_term_id(&mut tx, tokens.get_token_string(token)?).await?;
list.push(opt_term_id.map(|tid| (tid, token.get_char_len())));
if let Some(term_id) = opt_term_id {
set.insert(term_id);
} else {
has_unknown_terms = true;
} }
Ok(res) }
}
Ok((
list,
TermsSet {
set,
has_unknown_terms,
},
))
}
pub(in crate::idx) async fn extract_indexing_terms(
&self,
ctx: &Context<'_>,
opt: &Options,
txn: &Transaction,
t: &Terms,
content: Value,
) -> Result<TermsSet, Error> {
let mut tv = Vec::new();
self.analyze_value(ctx, opt, txn, content, FilteringStage::Indexing, &mut tv).await?;
let mut set = HashSet::new();
let mut has_unknown_terms = false;
let mut tx = txn.lock().await;
for tokens in tv {
for token in tokens.list() {
if let Some(term_id) =
t.get_term_id(&mut tx, tokens.get_token_string(token)?).await?
{
set.insert(term_id);
} else {
has_unknown_terms = true;
}
}
}
Ok(TermsSet {
set,
has_unknown_terms,
})
} }
/// This method is used for indexing. /// This method is used for indexing.

View file

@ -5,7 +5,7 @@ use crate::idx::ft::offsets::{Offset, Position};
use crate::sql::tokenizer::Tokenizer as SqlTokenizer; use crate::sql::tokenizer::Tokenizer as SqlTokenizer;
use crate::sql::Value; use crate::sql::Value;
pub(super) struct Tokens { pub(in crate::idx) struct Tokens {
/// The input string /// The input string
i: String, i: String,
/// The final list of tokens /// The final list of tokens

View file

@ -11,14 +11,14 @@ use crate::ctx::Context;
use crate::dbs::{Options, Transaction}; use crate::dbs::{Options, Transaction};
use crate::err::Error; use crate::err::Error;
use crate::idx::docids::{DocId, DocIds}; use crate::idx::docids::{DocId, DocIds};
use crate::idx::ft::analyzer::Analyzer; use crate::idx::ft::analyzer::{Analyzer, TermsList, TermsSet};
use crate::idx::ft::doclength::DocLengths; use crate::idx::ft::doclength::DocLengths;
use crate::idx::ft::highlighter::{Highlighter, Offseter}; use crate::idx::ft::highlighter::{Highlighter, Offseter};
use crate::idx::ft::offsets::Offsets; use crate::idx::ft::offsets::Offsets;
use crate::idx::ft::postings::Postings; use crate::idx::ft::postings::Postings;
use crate::idx::ft::scorer::BM25Scorer; use crate::idx::ft::scorer::BM25Scorer;
use crate::idx::ft::termdocs::{TermDocs, TermsDocs}; use crate::idx::ft::termdocs::{TermDocs, TermsDocs};
use crate::idx::ft::terms::{TermId, Terms}; use crate::idx::ft::terms::{TermId, TermLen, Terms};
use crate::idx::trees::btree::BStatistics; use crate::idx::trees::btree::BStatistics;
use crate::idx::trees::store::IndexStores; use crate::idx::trees::store::IndexStores;
use crate::idx::{IndexKeyBase, VersionedSerdeState}; use crate::idx::{IndexKeyBase, VersionedSerdeState};
@ -39,7 +39,7 @@ use tokio::sync::RwLock;
pub(crate) type MatchRef = u8; pub(crate) type MatchRef = u8;
pub(crate) struct FtIndex { pub(crate) struct FtIndex {
analyzer: Analyzer, analyzer: Arc<Analyzer>,
state_key: Key, state_key: Key,
index_key_base: IndexKeyBase, index_key_base: IndexKeyBase,
state: State, state: State,
@ -164,7 +164,7 @@ impl FtIndex {
index_key_base, index_key_base,
bm25, bm25,
highlighting: p.hl, highlighting: p.hl,
analyzer: az.into(), analyzer: Arc::new(az.into()),
doc_ids, doc_ids,
doc_lengths, doc_lengths,
postings, postings,
@ -178,6 +178,14 @@ impl FtIndex {
self.doc_ids.clone() self.doc_ids.clone()
} }
pub(super) fn terms(&self) -> Arc<RwLock<Terms>> {
self.terms.clone()
}
pub(super) fn analyzer(&self) -> Arc<Analyzer> {
self.analyzer.clone()
}
pub(crate) async fn remove_document( pub(crate) async fn remove_document(
&mut self, &mut self,
txn: &Transaction, txn: &Transaction,
@ -326,22 +334,22 @@ impl FtIndex {
Ok(()) Ok(())
} }
pub(super) async fn extract_terms( pub(super) async fn extract_querying_terms(
&self, &self,
ctx: &Context<'_>, ctx: &Context<'_>,
opt: &Options, opt: &Options,
txn: &Transaction, txn: &Transaction,
query_string: String, query_string: String,
) -> Result<Vec<Option<(TermId, u32)>>, Error> { ) -> Result<(TermsList, TermsSet), Error> {
let t = self.terms.read().await; let t = self.terms.read().await;
let terms = self.analyzer.extract_terms(ctx, opt, txn, &t, query_string).await?; let res = self.analyzer.extract_querying_terms(ctx, opt, txn, &t, query_string).await?;
Ok(terms) Ok(res)
} }
pub(super) async fn get_terms_docs( pub(super) async fn get_terms_docs(
&self, &self,
tx: &mut kvs::Transaction, tx: &mut kvs::Transaction,
terms: &Vec<Option<(TermId, u32)>>, terms: &TermsList,
) -> Result<Vec<Option<(TermId, RoaringTreemap)>>, Error> { ) -> Result<Vec<Option<(TermId, RoaringTreemap)>>, Error> {
let mut terms_docs = Vec::with_capacity(terms.len()); let mut terms_docs = Vec::with_capacity(terms.len());
for opt_term in terms { for opt_term in terms {
@ -402,7 +410,7 @@ impl FtIndex {
&self, &self,
tx: &mut kvs::Transaction, tx: &mut kvs::Transaction,
thg: &Thing, thg: &Thing,
terms: &[Option<(TermId, u32)>], terms: &[Option<(TermId, TermLen)>],
prefix: Value, prefix: Value,
suffix: Value, suffix: Value,
partial: bool, partial: bool,
@ -538,9 +546,10 @@ mod tests {
fti: &FtIndex, fti: &FtIndex,
qs: &str, qs: &str,
) -> (Option<HitsIterator>, BM25Scorer) { ) -> (Option<HitsIterator>, BM25Scorer) {
let t = fti.extract_terms(ctx, opt, txn, qs.to_string()).await.unwrap(); let (term_list, _) =
fti.extract_querying_terms(ctx, opt, txn, qs.to_string()).await.unwrap();
let mut tx = txn.lock().await; let mut tx = txn.lock().await;
let td = Arc::new(fti.get_terms_docs(&mut tx, &t).await.unwrap()); let td = Arc::new(fti.get_terms_docs(&mut tx, &term_list).await.unwrap());
drop(tx); drop(tx);
let scr = fti.new_scorer(td.clone()).unwrap().unwrap(); let scr = fti.new_scorer(td.clone()).unwrap().unwrap();
let hits = fti.new_hits_iterator(td).unwrap(); let hits = fti.new_hits_iterator(td).unwrap();

View file

@ -9,8 +9,9 @@ use roaring::RoaringTreemap;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
pub(crate) type TermId = u64; pub(crate) type TermId = u64;
pub(crate) type TermLen = u32;
pub(super) struct Terms { pub(in crate::idx) struct Terms {
state_key: Key, state_key: Key,
index_key_base: IndexKeyBase, index_key_base: IndexKeyBase,
btree: BTree<FstKeys>, btree: BTree<FstKeys>,

View file

@ -3,19 +3,20 @@ use crate::dbs::{Options, Transaction};
use crate::doc::CursorDoc; use crate::doc::CursorDoc;
use crate::err::Error; use crate::err::Error;
use crate::idx::docids::{DocId, DocIds}; use crate::idx::docids::{DocId, DocIds};
use crate::idx::ft::analyzer::{Analyzer, TermsList, TermsSet};
use crate::idx::ft::scorer::BM25Scorer; use crate::idx::ft::scorer::BM25Scorer;
use crate::idx::ft::termdocs::TermsDocs; use crate::idx::ft::termdocs::TermsDocs;
use crate::idx::ft::terms::TermId; use crate::idx::ft::terms::Terms;
use crate::idx::ft::{FtIndex, MatchRef}; use crate::idx::ft::{FtIndex, MatchRef};
use crate::idx::planner::iterators::{ use crate::idx::planner::iterators::{
DocIdsIterator, IndexEqualThingIterator, IndexRangeThingIterator, IndexUnionThingIterator, DocIdsIterator, IndexEqualThingIterator, IndexJoinThingIterator, IndexRangeThingIterator,
MatchesThingIterator, ThingIterator, UniqueEqualThingIterator, UniqueRangeThingIterator, IndexUnionThingIterator, MatchesThingIterator, ThingIterator, UniqueEqualThingIterator,
UniqueUnionThingIterator, UniqueJoinThingIterator, UniqueRangeThingIterator, UniqueUnionThingIterator,
}; };
use crate::idx::planner::knn::KnnPriorityList; use crate::idx::planner::knn::KnnPriorityList;
use crate::idx::planner::plan::IndexOperator::Matches; use crate::idx::planner::plan::IndexOperator::Matches;
use crate::idx::planner::plan::{IndexOperator, IndexOption, RangeValue}; use crate::idx::planner::plan::{IndexOperator, IndexOption, RangeValue};
use crate::idx::planner::tree::{IndexRef, IndexesMap}; use crate::idx::planner::tree::{IdiomPosition, IndexRef, IndexesMap};
use crate::idx::planner::{IterationStage, KnnSet}; use crate::idx::planner::{IterationStage, KnnSet};
use crate::idx::trees::mtree::MTreeIndex; use crate::idx::trees::mtree::MTreeIndex;
use crate::idx::IndexKeyBase; use crate::idx::IndexKeyBase;
@ -28,9 +29,8 @@ use std::collections::{HashMap, HashSet, VecDeque};
use std::sync::Arc; use std::sync::Arc;
use tokio::sync::RwLock; use tokio::sync::RwLock;
pub(super) type KnnEntry = (KnnPriorityList, Arc<Idiom>, Arc<Vec<Number>>, Distance); pub(super) type KnnEntry = (KnnPriorityList, Idiom, Arc<Vec<Number>>, Distance);
pub(super) type KnnExpressions = pub(super) type KnnExpressions = HashMap<Arc<Expression>, (u32, Idiom, Arc<Vec<Number>>, Distance)>;
HashMap<Arc<Expression>, (u32, Arc<Idiom>, Arc<Vec<Number>>, Distance)>;
#[derive(Clone)] #[derive(Clone)]
pub(crate) struct QueryExecutor(Arc<InnerQueryExecutor>); pub(crate) struct QueryExecutor(Arc<InnerQueryExecutor>);
@ -60,16 +60,17 @@ pub(super) enum IteratorEntry {
} }
impl IteratorEntry { impl IteratorEntry {
pub(super) fn explain(&self, e: &mut HashMap<&str, Value>) -> IndexRef { pub(super) fn explain(&self, ix_def: &[DefineIndexStatement]) -> Value {
match self { match self {
Self::Single(_, io) => { Self::Single(_, io) => io.explain(ix_def),
io.explain(e);
io.ix_ref()
}
Self::Range(_, ir, from, to) => { Self::Range(_, ir, from, to) => {
let mut e = HashMap::default();
if let Some(ix) = ix_def.get(*ir as usize) {
e.insert("index", Value::from(ix.name.0.to_owned()));
}
e.insert("from", Value::from(from)); e.insert("from", Value::from(from));
e.insert("to", Value::from(to)); e.insert("to", Value::from(to));
*ir Value::from(Object::from(e))
} }
} }
} }
@ -227,7 +228,7 @@ impl QueryExecutor {
!self.0.knn_entries.is_empty() !self.0.knn_entries.is_empty()
} }
/// Returns `true` if either the expression is matching the current iterator. /// Returns `true` if the expression is matching the current iterator.
pub(crate) fn is_iterator_expression(&self, ir: IteratorRef, exp: &Expression) -> bool { pub(crate) fn is_iterator_expression(&self, ir: IteratorRef, exp: &Expression) -> bool {
match self.0.it_entries.get(ir as usize) { match self.0.it_entries.get(ir as usize) {
Some(IteratorEntry::Single(e, ..)) => exp.eq(e.as_ref()), Some(IteratorEntry::Single(e, ..)) => exp.eq(e.as_ref()),
@ -238,14 +239,7 @@ impl QueryExecutor {
pub(crate) fn explain(&self, itr: IteratorRef) -> Value { pub(crate) fn explain(&self, itr: IteratorRef) -> Value {
match self.0.it_entries.get(itr as usize) { match self.0.it_entries.get(itr as usize) {
Some(ie) => { Some(ie) => ie.explain(self.0.index_definitions.as_slice()),
let mut e = HashMap::default();
let ir = ie.explain(&mut e);
if let Some(ix) = self.0.index_definitions.get(ir as usize) {
e.insert("index", Value::from(ix.name.0.to_owned()));
}
Value::from(Object::from(e))
}
None => Value::None, None => Value::None,
} }
} }
@ -282,10 +276,12 @@ impl QueryExecutor {
it_ref: IteratorRef, it_ref: IteratorRef,
io: &IndexOption, io: &IndexOption,
) -> Result<Option<ThingIterator>, Error> { ) -> Result<Option<ThingIterator>, Error> {
if let Some(ix) = self.0.index_definitions.get(io.ix_ref() as usize) { if let Some(ix) = self.get_index_def(io.ix_ref()) {
match ix.index { match ix.index {
Index::Idx => Ok(Self::new_index_iterator(opt, ix, io.clone())), Index::Idx => Ok(self.new_index_iterator(opt, it_ref, ix, io.clone()).await?),
Index::Uniq => Ok(Self::new_unique_index_iterator(opt, ix, io.clone())), Index::Uniq => {
Ok(self.new_unique_index_iterator(opt, it_ref, ix, io.clone()).await?)
}
Index::Search { Index::Search {
.. ..
} => self.new_search_index_iterator(it_ref, io.clone()).await, } => self.new_search_index_iterator(it_ref, io.clone()).await,
@ -296,20 +292,27 @@ impl QueryExecutor {
} }
} }
fn new_index_iterator( async fn new_index_iterator(
&self,
opt: &Options, opt: &Options,
it_ref: IteratorRef,
ix: &DefineIndexStatement, ix: &DefineIndexStatement,
io: IndexOption, io: IndexOption,
) -> Option<ThingIterator> { ) -> Result<Option<ThingIterator>, Error> {
match io.op() { Ok(match io.op() {
IndexOperator::Equality(value) => { IndexOperator::Equality(value) => Some(ThingIterator::IndexEqual(
Some(ThingIterator::IndexEqual(IndexEqualThingIterator::new(opt, ix, value))) IndexEqualThingIterator::new(opt.ns(), opt.db(), &ix.what, &ix.name, value),
} )),
IndexOperator::Union(value) => { IndexOperator::Union(value) => Some(ThingIterator::IndexUnion(
Some(ThingIterator::IndexUnion(IndexUnionThingIterator::new(opt, ix, value))) IndexUnionThingIterator::new(opt.ns(), opt.db(), &ix.what, &ix.name, value),
)),
IndexOperator::Join(ios) => {
let iterators = self.build_iterators(opt, it_ref, ios).await?;
let index_join = Box::new(IndexJoinThingIterator::new(opt, ix, iterators));
Some(ThingIterator::IndexJoin(index_join))
} }
_ => None, _ => None,
} })
} }
fn new_range_iterator( fn new_range_iterator(
@ -319,16 +322,26 @@ impl QueryExecutor {
from: &RangeValue, from: &RangeValue,
to: &RangeValue, to: &RangeValue,
) -> Option<ThingIterator> { ) -> Option<ThingIterator> {
if let Some(ix) = self.0.index_definitions.get(ir as usize) { if let Some(ix) = self.get_index_def(ir) {
match ix.index { match ix.index {
Index::Idx => { Index::Idx => {
return Some(ThingIterator::IndexRange(IndexRangeThingIterator::new( return Some(ThingIterator::IndexRange(IndexRangeThingIterator::new(
opt, ix, from, to, opt.ns(),
opt.db(),
&ix.what,
&ix.name,
from,
to,
))) )))
} }
Index::Uniq => { Index::Uniq => {
return Some(ThingIterator::UniqueRange(UniqueRangeThingIterator::new( return Some(ThingIterator::UniqueRange(UniqueRangeThingIterator::new(
opt, ix, from, to, opt.ns(),
opt.db(),
&ix.what,
&ix.name,
from,
to,
))) )))
} }
_ => {} _ => {}
@ -337,20 +350,27 @@ impl QueryExecutor {
None None
} }
fn new_unique_index_iterator( async fn new_unique_index_iterator(
&self,
opt: &Options, opt: &Options,
it_ref: IteratorRef,
ix: &DefineIndexStatement, ix: &DefineIndexStatement,
io: IndexOption, io: IndexOption,
) -> Option<ThingIterator> { ) -> Result<Option<ThingIterator>, Error> {
match io.op() { Ok(match io.op() {
IndexOperator::Equality(value) => { IndexOperator::Equality(value) => Some(ThingIterator::UniqueEqual(
Some(ThingIterator::UniqueEqual(UniqueEqualThingIterator::new(opt, ix, value))) UniqueEqualThingIterator::new(opt.ns(), opt.db(), &ix.what, &ix.name, value),
} )),
IndexOperator::Union(value) => { IndexOperator::Union(value) => {
Some(ThingIterator::UniqueUnion(UniqueUnionThingIterator::new(opt, ix, value))) Some(ThingIterator::UniqueUnion(UniqueUnionThingIterator::new(opt, ix, value)))
} }
_ => None, IndexOperator::Join(ios) => {
let iterators = self.build_iterators(opt, it_ref, ios).await?;
let unique_join = Box::new(UniqueJoinThingIterator::new(opt, ix, iterators));
Some(ThingIterator::UniqueJoin(unique_join))
} }
_ => None,
})
} }
async fn new_search_index_iterator( async fn new_search_index_iterator(
@ -381,40 +401,43 @@ impl QueryExecutor {
None None
} }
async fn build_iterators(
&self,
opt: &Options,
it_ref: IteratorRef,
ios: &[IndexOption],
) -> Result<VecDeque<ThingIterator>, Error> {
let mut iterators = VecDeque::with_capacity(ios.len());
for io in ios {
if let Some(it) = Box::pin(self.new_single_iterator(opt, it_ref, io)).await? {
iterators.push_back(it);
}
}
Ok(iterators)
}
fn get_index_def(&self, ir: IndexRef) -> Option<&DefineIndexStatement> {
self.0.index_definitions.get(ir as usize)
}
#[allow(clippy::too_many_arguments)]
pub(crate) async fn matches( pub(crate) async fn matches(
&self, &self,
ctx: &Context<'_>,
opt: &Options,
txn: &Transaction, txn: &Transaction,
thg: &Thing, thg: &Thing,
exp: &Expression, exp: &Expression,
) -> Result<Value, Error> { l: Value,
// Otherwise, we look for the first possible index options, and evaluate the expression r: Value,
// Does the record id match this executor's table? ) -> Result<bool, Error> {
if thg.tb.eq(&self.0.table) {
if let Some(ft) = self.0.exp_entries.get(exp) { if let Some(ft) = self.0.exp_entries.get(exp) {
let mut run = txn.lock().await; if let Some(ix_def) = self.get_index_def(ft.0.index_option.ix_ref()) {
let doc_key: Key = thg.into(); if self.0.table.eq(&ix_def.what.0) {
if let Some(doc_id) = return self.matches_with_doc_id(txn, thg, ft).await;
ft.0.doc_ids.read().await.get_doc_id(&mut run, doc_key).await?
{
let term_goals = ft.0.terms_docs.len();
// If there is no terms, it can't be a match
if term_goals == 0 {
return Ok(Value::Bool(false));
}
for opt_td in ft.0.terms_docs.iter() {
if let Some((_, docs)) = opt_td {
if !docs.contains(doc_id) {
return Ok(Value::Bool(false));
}
} else {
// If one of the term is missing, it can't be a match
return Ok(Value::Bool(false));
} }
} }
return Ok(Value::Bool(true)); return self.matches_with_value(ctx, opt, txn, ft, l, r).await;
}
return Ok(Value::Bool(false));
}
} }
// If no previous case were successful, we end up with a user error // If no previous case were successful, we end up with a user error
@ -423,6 +446,60 @@ impl QueryExecutor {
}) })
} }
async fn matches_with_doc_id(
&self,
txn: &Transaction,
thg: &Thing,
ft: &FtEntry,
) -> Result<bool, Error> {
let mut run = txn.lock().await;
let doc_key: Key = thg.into();
if let Some(doc_id) = ft.0.doc_ids.read().await.get_doc_id(&mut run, doc_key).await? {
let term_goals = ft.0.terms_docs.len();
// If there is no terms, it can't be a match
if term_goals == 0 {
return Ok(false);
}
for opt_td in ft.0.terms_docs.iter() {
if let Some((_, docs)) = opt_td {
if !docs.contains(doc_id) {
return Ok(false);
}
} else {
// If one of the term is missing, it can't be a match
return Ok(false);
}
}
return Ok(true);
}
Ok(false)
}
async fn matches_with_value(
&self,
ctx: &Context<'_>,
opt: &Options,
txn: &Transaction,
ft: &FtEntry,
l: Value,
r: Value,
) -> Result<bool, Error> {
// If the query terms contains terms that are unknown in the index
// of if there is not terms in the query
// we are sure that it does not match any document
if !ft.0.query_terms_set.is_matchable() {
return Ok(false);
}
let v = match ft.0.index_option.id_pos() {
IdiomPosition::Left => r,
IdiomPosition::Right => l,
};
let terms = ft.0.terms.read().await;
// Extract the terms set from the record
let t = ft.0.analyzer.extract_indexing_terms(ctx, opt, txn, &terms, v).await?;
Ok(ft.0.query_terms_set.is_subset(&t))
}
fn get_ft_entry(&self, match_ref: &Value) -> Option<&FtEntry> { fn get_ft_entry(&self, match_ref: &Value) -> Option<&FtEntry> {
if let Some(mr) = Self::get_match_ref(match_ref) { if let Some(mr) = Self::get_match_ref(match_ref) {
self.0.mr_entries.get(&mr) self.0.mr_entries.get(&mr)
@ -457,7 +534,7 @@ impl QueryExecutor {
.highlight( .highlight(
&mut run, &mut run,
thg, thg,
&e.0.terms, &e.0.query_terms_list,
prefix, prefix,
suffix, suffix,
partial, partial,
@ -478,7 +555,7 @@ impl QueryExecutor {
) -> Result<Value, Error> { ) -> Result<Value, Error> {
if let Some((e, ft)) = self.get_ft_entry_and_index(&match_ref) { if let Some((e, ft)) = self.get_ft_entry_and_index(&match_ref) {
let mut run = txn.lock().await; let mut run = txn.lock().await;
return ft.extract_offsets(&mut run, thg, &e.0.terms, partial).await; return ft.extract_offsets(&mut run, thg, &e.0.query_terms_list, partial).await;
} }
Ok(Value::None) Ok(Value::None)
} }
@ -515,7 +592,10 @@ struct FtEntry(Arc<Inner>);
struct Inner { struct Inner {
index_option: IndexOption, index_option: IndexOption,
doc_ids: Arc<RwLock<DocIds>>, doc_ids: Arc<RwLock<DocIds>>,
terms: Vec<Option<(TermId, u32)>>, analyzer: Arc<Analyzer>,
query_terms_set: TermsSet,
query_terms_list: TermsList,
terms: Arc<RwLock<Terms>>,
terms_docs: TermsDocs, terms_docs: TermsDocs,
scorer: Option<BM25Scorer>, scorer: Option<BM25Scorer>,
} }
@ -529,14 +609,18 @@ impl FtEntry {
io: IndexOption, io: IndexOption,
) -> Result<Option<Self>, Error> { ) -> Result<Option<Self>, Error> {
if let Matches(qs, _) = io.op() { if let Matches(qs, _) = io.op() {
let terms = ft.extract_terms(ctx, opt, txn, qs.to_owned()).await?; let (terms_list, terms_set) =
ft.extract_querying_terms(ctx, opt, txn, qs.to_owned()).await?;
let mut tx = txn.lock().await; let mut tx = txn.lock().await;
let terms_docs = Arc::new(ft.get_terms_docs(&mut tx, &terms).await?); let terms_docs = Arc::new(ft.get_terms_docs(&mut tx, &terms_list).await?);
Ok(Some(Self(Arc::new(Inner { Ok(Some(Self(Arc::new(Inner {
index_option: io, index_option: io,
doc_ids: ft.doc_ids(), doc_ids: ft.doc_ids(),
analyzer: ft.analyzer(),
query_terms_set: terms_set,
query_terms_list: terms_list,
scorer: ft.new_scorer(terms_docs.clone())?, scorer: ft.new_scorer(terms_docs.clone())?,
terms, terms: ft.terms(),
terms_docs, terms_docs,
})))) }))))
} else { } else {

View file

@ -7,7 +7,8 @@ use crate::idx::planner::plan::RangeValue;
use crate::key::index::Index; use crate::key::index::Index;
use crate::kvs::{Key, Limit, ScanPage}; use crate::kvs::{Key, Limit, ScanPage};
use crate::sql::statements::DefineIndexStatement; use crate::sql::statements::DefineIndexStatement;
use crate::sql::{Array, Thing, Value}; use crate::sql::{Array, Ident, Thing, Value};
use radix_trie::Trie;
use std::collections::VecDeque; use std::collections::VecDeque;
use std::sync::Arc; use std::sync::Arc;
use tokio::sync::RwLock; use tokio::sync::RwLock;
@ -16,54 +17,76 @@ pub(crate) enum ThingIterator {
IndexEqual(IndexEqualThingIterator), IndexEqual(IndexEqualThingIterator),
IndexRange(IndexRangeThingIterator), IndexRange(IndexRangeThingIterator),
IndexUnion(IndexUnionThingIterator), IndexUnion(IndexUnionThingIterator),
IndexJoin(Box<IndexJoinThingIterator>),
UniqueEqual(UniqueEqualThingIterator), UniqueEqual(UniqueEqualThingIterator),
UniqueRange(UniqueRangeThingIterator), UniqueRange(UniqueRangeThingIterator),
UniqueUnion(UniqueUnionThingIterator), UniqueUnion(UniqueUnionThingIterator),
UniqueJoin(Box<UniqueJoinThingIterator>),
Matches(MatchesThingIterator), Matches(MatchesThingIterator),
Knn(DocIdsIterator), Knn(DocIdsIterator),
} }
impl ThingIterator { impl ThingIterator {
pub(crate) async fn next_batch( pub(crate) async fn next_batch<T: ThingCollector>(
&mut self, &mut self,
tx: &Transaction, tx: &Transaction,
size: u32, size: u32,
) -> Result<Vec<(Thing, Option<DocId>)>, Error> { collector: &mut T,
) -> Result<usize, Error> {
match self { match self {
ThingIterator::IndexEqual(i) => i.next_batch(tx, size).await, Self::IndexEqual(i) => i.next_batch(tx, size, collector).await,
ThingIterator::UniqueEqual(i) => i.next_batch(tx).await, Self::UniqueEqual(i) => i.next_batch(tx, collector).await,
ThingIterator::IndexRange(i) => i.next_batch(tx, size).await, Self::IndexRange(i) => i.next_batch(tx, size, collector).await,
ThingIterator::UniqueRange(i) => i.next_batch(tx, size).await, Self::UniqueRange(i) => i.next_batch(tx, size, collector).await,
ThingIterator::IndexUnion(i) => i.next_batch(tx, size).await, Self::IndexUnion(i) => i.next_batch(tx, size, collector).await,
ThingIterator::UniqueUnion(i) => i.next_batch(tx, size).await, Self::UniqueUnion(i) => i.next_batch(tx, size, collector).await,
ThingIterator::Matches(i) => i.next_batch(tx, size).await, Self::Matches(i) => i.next_batch(tx, size, collector).await,
ThingIterator::Knn(i) => i.next_batch(tx, size).await, Self::Knn(i) => i.next_batch(tx, size, collector).await,
Self::IndexJoin(i) => Box::pin(i.next_batch(tx, size, collector)).await,
Self::UniqueJoin(i) => Box::pin(i.next_batch(tx, size, collector)).await,
} }
} }
} }
pub(crate) trait ThingCollector {
fn add(&mut self, thing: Thing, doc_id: Option<DocId>);
}
impl ThingCollector for Vec<(Thing, Option<DocId>)> {
fn add(&mut self, thing: Thing, doc_id: Option<DocId>) {
self.push((thing, doc_id));
}
}
impl ThingCollector for VecDeque<(Thing, Option<DocId>)> {
fn add(&mut self, thing: Thing, doc_id: Option<DocId>) {
self.push_back((thing, doc_id));
}
}
pub(crate) struct IndexEqualThingIterator { pub(crate) struct IndexEqualThingIterator {
beg: Vec<u8>, beg: Vec<u8>,
end: Vec<u8>, end: Vec<u8>,
} }
impl IndexEqualThingIterator { impl IndexEqualThingIterator {
pub(super) fn new(opt: &Options, ix: &DefineIndexStatement, v: &Value) -> Self { pub(super) fn new(ns: &str, db: &str, ix_what: &Ident, ix_name: &Ident, v: &Value) -> Self {
let a = Array::from(v.clone()); let a = Array::from(v.clone());
let beg = Index::prefix_ids_beg(opt.ns(), opt.db(), &ix.what, &ix.name, &a); let beg = Index::prefix_ids_beg(ns, db, ix_what, ix_name, &a);
let end = Index::prefix_ids_end(opt.ns(), opt.db(), &ix.what, &ix.name, &a); let end = Index::prefix_ids_end(ns, db, ix_what, ix_name, &a);
Self { Self {
beg, beg,
end, end,
} }
} }
async fn next_scan( async fn next_scan<T: ThingCollector>(
txn: &Transaction, txn: &Transaction,
beg: &mut Vec<u8>, beg: &mut Vec<u8>,
end: &[u8], end: &[u8],
limit: u32, limit: u32,
) -> Result<Vec<(Thing, Option<DocId>)>, Error> { collector: &mut T,
) -> Result<usize, Error> {
let min = beg.clone(); let min = beg.clone();
let max = end.to_owned(); let max = end.to_owned();
let res = txn let res = txn
@ -83,16 +106,18 @@ impl IndexEqualThingIterator {
key.push(0x00); key.push(0x00);
*beg = key; *beg = key;
} }
let res = res.iter().map(|(_, val)| (val.into(), None)).collect(); let count = res.len();
Ok(res) res.into_iter().for_each(|(_, val)| collector.add(val.into(), None));
Ok(count)
} }
async fn next_batch( async fn next_batch<T: ThingCollector>(
&mut self, &mut self,
txn: &Transaction, txn: &Transaction,
limit: u32, limit: u32,
) -> Result<Vec<(Thing, Option<DocId>)>, Error> { collector: &mut T,
Self::next_scan(txn, &mut self.beg, &self.end, limit).await ) -> Result<usize, Error> {
Self::next_scan(txn, &mut self.beg, &self.end, limit, collector).await
} }
} }
@ -146,47 +171,62 @@ pub(crate) struct IndexRangeThingIterator {
impl IndexRangeThingIterator { impl IndexRangeThingIterator {
pub(super) fn new( pub(super) fn new(
opt: &Options, ns: &str,
ix: &DefineIndexStatement, db: &str,
ix_what: &Ident,
ix_name: &Ident,
from: &RangeValue, from: &RangeValue,
to: &RangeValue, to: &RangeValue,
) -> Self { ) -> Self {
let beg = Self::compute_beg(opt, ix, from); let beg = Self::compute_beg(ns, db, ix_what, ix_name, from);
let end = Self::compute_end(opt, ix, to); let end = Self::compute_end(ns, db, ix_what, ix_name, to);
Self { Self {
r: RangeScan::new(beg, from.inclusive, end, to.inclusive), r: RangeScan::new(beg, from.inclusive, end, to.inclusive),
} }
} }
fn compute_beg(opt: &Options, ix: &DefineIndexStatement, from: &RangeValue) -> Vec<u8> { fn compute_beg(
ns: &str,
db: &str,
ix_what: &Ident,
ix_name: &Ident,
from: &RangeValue,
) -> Vec<u8> {
if from.value == Value::None { if from.value == Value::None {
return Index::prefix_beg(opt.ns(), opt.db(), &ix.what, &ix.name); return Index::prefix_beg(ns, db, ix_what, ix_name);
} }
let fd = Array::from(from.value.to_owned()); let fd = Array::from(from.value.to_owned());
if from.inclusive { if from.inclusive {
Index::prefix_ids_beg(opt.ns(), opt.db(), &ix.what, &ix.name, &fd) Index::prefix_ids_beg(ns, db, ix_what, ix_name, &fd)
} else { } else {
Index::prefix_ids_end(opt.ns(), opt.db(), &ix.what, &ix.name, &fd) Index::prefix_ids_end(ns, db, ix_what, ix_name, &fd)
} }
} }
fn compute_end(opt: &Options, ix: &DefineIndexStatement, to: &RangeValue) -> Vec<u8> { fn compute_end(
ns: &str,
db: &str,
ix_what: &Ident,
ix_name: &Ident,
to: &RangeValue,
) -> Vec<u8> {
if to.value == Value::None { if to.value == Value::None {
return Index::prefix_end(opt.ns(), opt.db(), &ix.what, &ix.name); return Index::prefix_end(ns, db, ix_what, ix_name);
} }
let fd = Array::from(to.value.to_owned()); let fd = Array::from(to.value.to_owned());
if to.inclusive { if to.inclusive {
Index::prefix_ids_end(opt.ns(), opt.db(), &ix.what, &ix.name, &fd) Index::prefix_ids_end(ns, db, ix_what, ix_name, &fd)
} else { } else {
Index::prefix_ids_beg(opt.ns(), opt.db(), &ix.what, &ix.name, &fd) Index::prefix_ids_beg(ns, db, ix_what, ix_name, &fd)
} }
} }
async fn next_batch( async fn next_batch<T: ThingCollector>(
&mut self, &mut self,
txn: &Transaction, txn: &Transaction,
limit: u32, limit: u32,
) -> Result<Vec<(Thing, Option<DocId>)>, Error> { collector: &mut T,
) -> Result<usize, Error> {
let min = self.r.beg.clone(); let min = self.r.beg.clone();
let max = self.r.end.clone(); let max = self.r.end.clone();
let res = txn let res = txn
@ -205,13 +245,14 @@ impl IndexRangeThingIterator {
self.r.beg.clone_from(key); self.r.beg.clone_from(key);
self.r.beg.push(0x00); self.r.beg.push(0x00);
} }
let mut r = Vec::with_capacity(res.len()); let mut count = 0;
for (k, v) in res { for (k, v) in res {
if self.r.matches(&k) { if self.r.matches(&k) {
r.push((v.into(), None)); collector.add(v.into(), None);
count += 1;
} }
} }
Ok(r) Ok(count)
} }
} }
@ -221,14 +262,14 @@ pub(crate) struct IndexUnionThingIterator {
} }
impl IndexUnionThingIterator { impl IndexUnionThingIterator {
pub(super) fn new(opt: &Options, ix: &DefineIndexStatement, a: &Array) -> Self { pub(super) fn new(ns: &str, db: &str, ix_what: &Ident, ix_name: &Ident, a: &Array) -> Self {
// We create a VecDeque to hold the prefix keys (begin and end) for each value in the array. // We create a VecDeque to hold the prefix keys (begin and end) for each value in the array.
let mut values: VecDeque<(Vec<u8>, Vec<u8>)> = let mut values: VecDeque<(Vec<u8>, Vec<u8>)> =
a.0.iter() a.0.iter()
.map(|v| { .map(|v| {
let a = Array::from(v.clone()); let a = Array::from(v.clone());
let beg = Index::prefix_ids_beg(opt.ns(), opt.db(), &ix.what, &ix.name, &a); let beg = Index::prefix_ids_beg(ns, db, ix_what, ix_name, &a);
let end = Index::prefix_ids_end(opt.ns(), opt.db(), &ix.what, &ix.name, &a); let end = Index::prefix_ids_end(ns, db, ix_what, ix_name, &a);
(beg, end) (beg, end)
}) })
.collect(); .collect();
@ -239,19 +280,148 @@ impl IndexUnionThingIterator {
} }
} }
async fn next_batch( async fn next_batch<T: ThingCollector>(
&mut self, &mut self,
txn: &Transaction, txn: &Transaction,
limit: u32, limit: u32,
) -> Result<Vec<(Thing, Option<DocId>)>, Error> { collector: &mut T,
) -> Result<usize, Error> {
while let Some(r) = &mut self.current { while let Some(r) = &mut self.current {
let res = IndexEqualThingIterator::next_scan(txn, &mut r.0, &r.1, limit).await?; let count =
if !res.is_empty() { IndexEqualThingIterator::next_scan(txn, &mut r.0, &r.1, limit, collector).await?;
return Ok(res); if count != 0 {
return Ok(count);
} }
self.current = self.values.pop_front(); self.current = self.values.pop_front();
} }
Ok(vec![]) Ok(0)
}
}
struct JoinThingIterator {
ns: String,
db: String,
ix_what: Ident,
ix_name: Ident,
remote_iterators: VecDeque<ThingIterator>,
current_remote: Option<ThingIterator>,
current_remote_batch: VecDeque<(Thing, Option<DocId>)>,
current_local: Option<ThingIterator>,
distinct: Trie<Key, bool>,
}
impl JoinThingIterator {
pub(super) fn new(
opt: &Options,
ix: &DefineIndexStatement,
remote_iterators: VecDeque<ThingIterator>,
) -> Self {
Self {
ns: opt.ns().to_string(),
db: opt.db().to_string(),
ix_what: ix.what.clone(),
ix_name: ix.name.clone(),
current_remote: None,
current_remote_batch: VecDeque::with_capacity(0),
remote_iterators,
current_local: None,
distinct: Default::default(),
}
}
}
impl JoinThingIterator {
async fn next_current_remote_batch(
&mut self,
tx: &Transaction,
limit: u32,
) -> Result<bool, Error> {
loop {
if let Some(it) = &mut self.current_remote {
self.current_remote_batch.clear();
if it.next_batch(tx, limit, &mut self.current_remote_batch).await? > 0 {
return Ok(true);
}
}
self.current_remote = self.remote_iterators.pop_front();
if self.current_remote.is_none() {
return Ok(false);
}
}
}
async fn next_current_local<F>(
&mut self,
tx: &Transaction,
limit: u32,
new_iter: F,
) -> Result<bool, Error>
where
F: Fn(&str, &str, &Ident, &Ident, Value) -> ThingIterator,
{
loop {
while let Some((thing, _)) = self.current_remote_batch.pop_front() {
let k: Key = (&thing).into();
let value = Value::from(thing);
if self.distinct.insert(k, true).is_none() {
self.current_local =
Some(new_iter(&self.ns, &self.db, &self.ix_what, &self.ix_name, value));
return Ok(true);
}
}
if !self.next_current_remote_batch(tx, limit).await? {
break;
}
}
Ok(false)
}
async fn next_batch<T: ThingCollector, F>(
&mut self,
tx: &Transaction,
limit: u32,
collector: &mut T,
new_iter: F,
) -> Result<usize, Error>
where
F: Fn(&str, &str, &Ident, &Ident, Value) -> ThingIterator + Copy,
{
loop {
if let Some(current_local) = &mut self.current_local {
let n = current_local.next_batch(tx, limit, collector).await?;
if n > 0 {
return Ok(n);
}
}
if !self.next_current_local(tx, limit, new_iter).await? {
return Ok(0);
}
}
}
}
pub(crate) struct IndexJoinThingIterator(JoinThingIterator);
impl IndexJoinThingIterator {
pub(super) fn new(
opt: &Options,
ix: &DefineIndexStatement,
remote_iterators: VecDeque<ThingIterator>,
) -> Self {
Self(JoinThingIterator::new(opt, ix, remote_iterators))
}
async fn next_batch<T: ThingCollector>(
&mut self,
tx: &Transaction,
limit: u32,
collector: &mut T,
) -> Result<usize, Error> {
let new_iter = |ns: &str, db: &str, ix_what: &Ident, ix_name: &Ident, value: Value| {
let it = IndexEqualThingIterator::new(ns, db, ix_what, ix_name, &value);
ThingIterator::IndexEqual(it)
};
self.0.next_batch(tx, limit, collector, new_iter).await
} }
} }
@ -260,24 +430,27 @@ pub(crate) struct UniqueEqualThingIterator {
} }
impl UniqueEqualThingIterator { impl UniqueEqualThingIterator {
pub(super) fn new(opt: &Options, ix: &DefineIndexStatement, v: &Value) -> Self { pub(super) fn new(ns: &str, db: &str, ix_what: &Ident, ix_name: &Ident, v: &Value) -> Self {
let a = Array::from(v.to_owned()); let a = Array::from(v.to_owned());
let key = Index::new(opt.ns(), opt.db(), &ix.what, &ix.name, &a, None).into(); let key = Index::new(ns, db, ix_what, ix_name, &a, None).into();
Self { Self {
key: Some(key), key: Some(key),
} }
} }
async fn next_batch( async fn next_batch<T: ThingCollector>(
&mut self, &mut self,
txn: &Transaction, txn: &Transaction,
) -> Result<Vec<(Thing, Option<DocId>)>, Error> { collector: &mut T,
) -> Result<usize, Error> {
let mut count = 0;
if let Some(key) = self.key.take() { if let Some(key) = self.key.take() {
if let Some(val) = txn.lock().await.get(key).await? { if let Some(val) = txn.lock().await.get(key).await? {
return Ok(vec![(val.into(), None)]); collector.add(val.into(), None);
count += 1;
} }
} }
Ok(vec![]) Ok(count)
} }
} }
@ -288,51 +461,59 @@ pub(crate) struct UniqueRangeThingIterator {
impl UniqueRangeThingIterator { impl UniqueRangeThingIterator {
pub(super) fn new( pub(super) fn new(
opt: &Options, ns: &str,
ix: &DefineIndexStatement, db: &str,
ix_what: &Ident,
ix_name: &Ident,
from: &RangeValue, from: &RangeValue,
to: &RangeValue, to: &RangeValue,
) -> Self { ) -> Self {
let beg = Self::compute_beg(opt, ix, from); let beg = Self::compute_beg(ns, db, ix_what, ix_name, from);
let end = Self::compute_end(opt, ix, to); let end = Self::compute_end(ns, db, ix_what, ix_name, to);
Self { Self {
r: RangeScan::new(beg, from.inclusive, end, to.inclusive), r: RangeScan::new(beg, from.inclusive, end, to.inclusive),
done: false, done: false,
} }
} }
fn compute_beg(opt: &Options, ix: &DefineIndexStatement, from: &RangeValue) -> Vec<u8> { fn compute_beg(
ns: &str,
db: &str,
ix_what: &Ident,
ix_name: &Ident,
from: &RangeValue,
) -> Vec<u8> {
if from.value == Value::None { if from.value == Value::None {
return Index::prefix_beg(opt.ns(), opt.db(), &ix.what, &ix.name); return Index::prefix_beg(ns, db, ix_what, ix_name);
} }
Index::new( Index::new(ns, db, ix_what, ix_name, &Array::from(from.value.to_owned()), None)
opt.ns(),
opt.db(),
&ix.what,
&ix.name,
&Array::from(from.value.to_owned()),
None,
)
.encode() .encode()
.unwrap() .unwrap()
} }
fn compute_end(opt: &Options, ix: &DefineIndexStatement, to: &RangeValue) -> Vec<u8> { fn compute_end(
ns: &str,
db: &str,
ix_what: &Ident,
ix_name: &Ident,
to: &RangeValue,
) -> Vec<u8> {
if to.value == Value::None { if to.value == Value::None {
return Index::prefix_end(opt.ns(), opt.db(), &ix.what, &ix.name); return Index::prefix_end(ns, db, ix_what, ix_name);
} }
Index::new(opt.ns(), opt.db(), &ix.what, &ix.name, &Array::from(to.value.to_owned()), None) Index::new(ns, db, ix_what, ix_name, &Array::from(to.value.to_owned()), None)
.encode() .encode()
.unwrap() .unwrap()
} }
async fn next_batch( async fn next_batch<T: ThingCollector>(
&mut self, &mut self,
txn: &Transaction, txn: &Transaction,
mut limit: u32, mut limit: u32,
) -> Result<Vec<(Thing, Option<DocId>)>, Error> { collector: &mut T,
) -> Result<usize, Error> {
if self.done { if self.done {
return Ok(vec![]); return Ok(0);
} }
let min = self.r.beg.clone(); let min = self.r.beg.clone();
let max = self.r.end.clone(); let max = self.r.end.clone();
@ -347,26 +528,27 @@ impl UniqueRangeThingIterator {
limit, limit,
) )
.await?; .await?;
let res = res.values; let mut count = 0;
let mut r = Vec::with_capacity(res.len()); for (k, v) in res.values {
for (k, v) in res {
limit -= 1; limit -= 1;
if limit == 0 { if limit == 0 {
self.r.beg = k; self.r.beg = k;
return Ok(r); return Ok(count);
} }
if self.r.matches(&k) { if self.r.matches(&k) {
r.push((v.into(), None)); collector.add(v.into(), None);
count += 1;
} }
} }
let end = self.r.end.clone(); let end = self.r.end.clone();
if self.r.matches(&end) { if self.r.matches(&end) {
if let Some(v) = tx.get(end).await? { if let Some(v) = tx.get(end).await? {
r.push((v.into(), None)); collector.add(v.into(), None);
count += 1;
} }
} }
self.done = true; self.done = true;
Ok(r) Ok(count)
} }
} }
@ -390,22 +572,49 @@ impl UniqueUnionThingIterator {
} }
} }
async fn next_batch( async fn next_batch<T: ThingCollector>(
&mut self, &mut self,
txn: &Transaction, txn: &Transaction,
limit: u32, limit: u32,
) -> Result<Vec<(Thing, Option<DocId>)>, Error> { collector: &mut T,
) -> Result<usize, Error> {
let mut run = txn.lock().await; let mut run = txn.lock().await;
let mut res = vec![]; let mut count = 0;
while let Some(key) = self.keys.pop_front() { while let Some(key) = self.keys.pop_front() {
if let Some(val) = run.get(key).await? { if let Some(val) = run.get(key).await? {
res.push((val.into(), None)); collector.add(val.into(), None);
} count += 1;
if res.len() >= limit as usize { if count >= limit {
return Ok(res); break;
} }
} }
Ok(res) }
Ok(count as usize)
}
}
pub(crate) struct UniqueJoinThingIterator(JoinThingIterator);
impl UniqueJoinThingIterator {
pub(super) fn new(
opt: &Options,
ix: &DefineIndexStatement,
remote_iterators: VecDeque<ThingIterator>,
) -> Self {
Self(JoinThingIterator::new(opt, ix, remote_iterators))
}
async fn next_batch<T: ThingCollector>(
&mut self,
tx: &Transaction,
limit: u32,
collector: &mut T,
) -> Result<usize, Error> {
let new_iter = |ns: &str, db: &str, ix_what: &Ident, ix_name: &Ident, value: Value| {
let it = UniqueEqualThingIterator::new(ns, db, ix_what, ix_name, &value);
ThingIterator::UniqueEqual(it)
};
self.0.next_batch(tx, limit, collector, new_iter).await
} }
} }
@ -421,24 +630,25 @@ impl MatchesThingIterator {
}) })
} }
async fn next_batch( async fn next_batch<T: ThingCollector>(
&mut self, &mut self,
txn: &Transaction, txn: &Transaction,
mut limit: u32, limit: u32,
) -> Result<Vec<(Thing, Option<DocId>)>, Error> { collector: &mut T,
let mut res = vec![]; ) -> Result<usize, Error> {
let mut count = 0;
if let Some(hits) = &mut self.hits { if let Some(hits) = &mut self.hits {
let mut run = txn.lock().await; let mut run = txn.lock().await;
while limit > 0 { while limit > count {
if let Some((thg, doc_id)) = hits.next(&mut run).await? { if let Some((thg, doc_id)) = hits.next(&mut run).await? {
res.push((thg, Some(doc_id))); collector.add(thg, Some(doc_id));
count += 1;
} else { } else {
break; break;
} }
limit -= 1;
} }
} }
Ok(res) Ok(count as usize)
} }
} }
@ -454,25 +664,26 @@ impl DocIdsIterator {
res, res,
} }
} }
async fn next_batch( async fn next_batch<T: ThingCollector>(
&mut self, &mut self,
txn: &Transaction, txn: &Transaction,
mut limit: u32, limit: u32,
) -> Result<Vec<(Thing, Option<DocId>)>, Error> { collector: &mut T,
let mut res = vec![]; ) -> Result<usize, Error> {
let mut tx = txn.lock().await; let mut tx = txn.lock().await;
while limit > 0 { let mut count = 0;
while limit > count {
if let Some(doc_id) = self.res.pop_front() { if let Some(doc_id) = self.res.pop_front() {
if let Some(doc_key) = if let Some(doc_key) =
self.doc_ids.read().await.get_doc_key(&mut tx, doc_id).await? self.doc_ids.read().await.get_doc_key(&mut tx, doc_id).await?
{ {
res.push((doc_key.into(), Some(doc_id))); collector.add(doc_key.into(), Some(doc_id));
limit -= 1; count += 1;
} }
} else { } else {
break; break;
} }
} }
Ok(res) Ok(count as usize)
} }
} }

View file

@ -53,7 +53,6 @@ impl<'a> QueryPlanner<'a> {
) -> Result<(), Error> { ) -> Result<(), Error> {
let mut is_table_iterator = false; let mut is_table_iterator = false;
let mut is_knn = false; let mut is_knn = false;
let t = Arc::new(t);
match Tree::build(ctx, self.opt, txn, &t, self.cond, self.with).await? { match Tree::build(ctx, self.opt, txn, &t, self.cond, self.with).await? {
Some(tree) => { Some(tree) => {
is_knn = is_knn || !tree.knn_expressions.is_empty(); is_knn = is_knn || !tree.knn_expressions.is_empty();
@ -117,7 +116,7 @@ impl<'a> QueryPlanner<'a> {
fn add( fn add(
&mut self, &mut self,
tb: Arc<Table>, tb: Table,
irf: Option<IteratorRef>, irf: Option<IteratorRef>,
exe: InnerQueryExecutor, exe: InnerQueryExecutor,
it: &mut Iterator, it: &mut Iterator,

View file

@ -1,6 +1,7 @@
use crate::err::Error; use crate::err::Error;
use crate::idx::ft::MatchRef; use crate::idx::ft::MatchRef;
use crate::idx::planner::tree::{GroupRef, IndexRef, Node}; use crate::idx::planner::tree::{GroupRef, IdiomPosition, IndexRef, Node};
use crate::sql::statements::DefineIndexStatement;
use crate::sql::with::With; use crate::sql::with::With;
use crate::sql::{Array, Idiom, Object}; use crate::sql::{Array, Idiom, Object};
use crate::sql::{Expression, Operator, Value}; use crate::sql::{Expression, Operator, Value};
@ -162,48 +163,53 @@ pub(super) enum Plan {
SingleIndexRange(IndexRef, UnionRangeQueryBuilder), SingleIndexRange(IndexRef, UnionRangeQueryBuilder),
} }
#[derive(Debug, Clone, Eq, PartialEq, Hash)] #[derive(Debug, Eq, PartialEq, Hash, Clone)]
pub(crate) struct IndexOption(Arc<Inner>); pub(super) struct IndexOption {
/// A reference o the index definition
#[derive(Debug, Eq, PartialEq, Hash)]
pub(super) struct Inner {
ir: IndexRef, ir: IndexRef,
id: Arc<Idiom>, id: Idiom,
op: IndexOperator, id_pos: IdiomPosition,
op: Arc<IndexOperator>,
} }
#[derive(Debug, Eq, PartialEq, Hash)] #[derive(Debug, Eq, PartialEq, Hash)]
pub(super) enum IndexOperator { pub(super) enum IndexOperator {
Equality(Value), Equality(Value),
Union(Array), Union(Array),
Join(Vec<IndexOption>),
RangePart(Operator, Value), RangePart(Operator, Value),
Matches(String, Option<MatchRef>), Matches(String, Option<MatchRef>),
Knn(Array, u32), Knn(Array, u32),
} }
impl IndexOption { impl IndexOption {
pub(super) fn new(ir: IndexRef, id: Arc<Idiom>, op: IndexOperator) -> Self { pub(super) fn new(ir: IndexRef, id: Idiom, id_pos: IdiomPosition, op: IndexOperator) -> Self {
Self(Arc::new(Inner { Self {
ir, ir,
id, id,
op, id_pos,
})) op: Arc::new(op),
}
} }
pub(super) fn require_distinct(&self) -> bool { pub(super) fn require_distinct(&self) -> bool {
matches!(self.0.op, IndexOperator::Union(_)) matches!(self.op.as_ref(), IndexOperator::Union(_))
} }
pub(super) fn ix_ref(&self) -> IndexRef { pub(super) fn ix_ref(&self) -> IndexRef {
self.0.ir self.ir
} }
pub(super) fn op(&self) -> &IndexOperator { pub(super) fn op(&self) -> &IndexOperator {
&self.0.op self.op.as_ref()
} }
pub(super) fn id_ref(&self) -> &Idiom { pub(super) fn id_ref(&self) -> &Idiom {
self.0.id.as_ref() &self.id
}
pub(super) fn id_pos(&self) -> IdiomPosition {
self.id_pos
} }
fn reduce_array(v: &Value) -> Value { fn reduce_array(v: &Value) -> Value {
@ -215,7 +221,11 @@ impl IndexOption {
v.clone() v.clone()
} }
pub(crate) fn explain(&self, e: &mut HashMap<&str, Value>) { pub(crate) fn explain(&self, ix_def: &[DefineIndexStatement]) -> Value {
let mut e = HashMap::new();
if let Some(ix) = ix_def.get(self.ir as usize) {
e.insert("index", Value::from(ix.name.0.to_owned()));
}
match self.op() { match self.op() {
IndexOperator::Equality(v) => { IndexOperator::Equality(v) => {
e.insert("operator", Value::from(Operator::Equal.to_string())); e.insert("operator", Value::from(Operator::Equal.to_string()));
@ -225,6 +235,15 @@ impl IndexOption {
e.insert("operator", Value::from("union")); e.insert("operator", Value::from("union"));
e.insert("value", Value::Array(a.clone())); e.insert("value", Value::Array(a.clone()));
} }
IndexOperator::Join(ios) => {
e.insert("operator", Value::from("join"));
let mut joins = Vec::with_capacity(ios.len());
for io in ios {
joins.push(io.explain(ix_def));
}
let joins = Value::from(joins);
e.insert("joins", joins);
}
IndexOperator::Matches(qs, a) => { IndexOperator::Matches(qs, a) => {
e.insert("operator", Value::from(Operator::Matches(*a).to_string())); e.insert("operator", Value::from(Operator::Matches(*a).to_string()));
e.insert("value", Value::from(qs.to_owned())); e.insert("value", Value::from(qs.to_owned()));
@ -238,6 +257,7 @@ impl IndexOption {
e.insert("value", Value::Array(a.clone())); e.insert("value", Value::Array(a.clone()));
} }
}; };
Value::from(e)
} }
} }
@ -391,23 +411,25 @@ impl UnionRangeQueryBuilder {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use crate::idx::planner::plan::{IndexOperator, IndexOption, RangeValue}; use crate::idx::planner::plan::{IndexOperator, IndexOption, RangeValue};
use crate::idx::planner::tree::IdiomPosition;
use crate::sql::{Array, Idiom, Value}; use crate::sql::{Array, Idiom, Value};
use crate::syn::Parse; use crate::syn::Parse;
use std::collections::HashSet; use std::collections::HashSet;
use std::sync::Arc;
#[test] #[test]
fn test_hash_index_option() { fn test_hash_index_option() {
let mut set = HashSet::new(); let mut set = HashSet::new();
let io1 = IndexOption::new( let io1 = IndexOption::new(
1, 1,
Arc::new(Idiom::parse("test")), Idiom::parse("test"),
IdiomPosition::Right,
IndexOperator::Equality(Value::Array(Array::from(vec!["test"]))), IndexOperator::Equality(Value::Array(Array::from(vec!["test"]))),
); );
let io2 = IndexOption::new( let io2 = IndexOption::new(
1, 1,
Arc::new(Idiom::parse("test")), Idiom::parse("test"),
IdiomPosition::Right,
IndexOperator::Equality(Value::Array(Array::from(vec!["test"]))), IndexOperator::Equality(Value::Array(Array::from(vec!["test"]))),
); );

View file

@ -3,10 +3,11 @@ use crate::dbs::{Options, Transaction};
use crate::err::Error; use crate::err::Error;
use crate::idx::planner::executor::KnnExpressions; use crate::idx::planner::executor::KnnExpressions;
use crate::idx::planner::plan::{IndexOperator, IndexOption}; use crate::idx::planner::plan::{IndexOperator, IndexOption};
use crate::kvs;
use crate::sql::index::{Distance, Index}; use crate::sql::index::{Distance, Index};
use crate::sql::statements::DefineIndexStatement; use crate::sql::statements::{DefineFieldStatement, DefineIndexStatement};
use crate::sql::{ use crate::sql::{
Array, Cond, Expression, Idiom, Number, Operator, Part, Subquery, Table, Value, With, Array, Cond, Expression, Idiom, Kind, Number, Operator, Part, Subquery, Table, Value, With,
}; };
use async_recursion::async_recursion; use async_recursion::async_recursion;
use std::collections::HashMap; use std::collections::HashMap;
@ -51,16 +52,26 @@ struct TreeBuilder<'a> {
txn: &'a Transaction, txn: &'a Transaction,
table: &'a Table, table: &'a Table,
with: &'a Option<With>, with: &'a Option<With>,
indexes: Option<Arc<[DefineIndexStatement]>>, schemas: HashMap<Table, SchemaCache>,
idioms_indexes: HashMap<Table, HashMap<Idiom, LocalIndexRefs>>,
resolved_expressions: HashMap<Arc<Expression>, ResolvedExpression>, resolved_expressions: HashMap<Arc<Expression>, ResolvedExpression>,
resolved_idioms: HashMap<Arc<Idiom>, Arc<Idiom>>, resolved_idioms: HashMap<Idiom, Node>,
idioms_indexes: HashMap<Arc<Idiom>, Option<Arc<Vec<IndexRef>>>>,
index_map: IndexesMap, index_map: IndexesMap,
with_indexes: Vec<IndexRef>, with_indexes: Vec<IndexRef>,
knn_expressions: KnnExpressions, knn_expressions: KnnExpressions,
idioms_record_options: HashMap<Idiom, RecordOptions>,
group_sequence: GroupRef, group_sequence: GroupRef,
} }
#[derive(Debug, Clone, Eq, PartialEq, Hash)]
pub(super) struct RecordOptions {
locals: LocalIndexRefs,
remotes: RemoteIndexRefs,
}
pub(super) type LocalIndexRefs = Vec<IndexRef>;
pub(super) type RemoteIndexRefs = Arc<Vec<(Idiom, LocalIndexRefs)>>;
impl<'a> TreeBuilder<'a> { impl<'a> TreeBuilder<'a> {
fn new( fn new(
ctx: &'a Context<'_>, ctx: &'a Context<'_>,
@ -79,28 +90,28 @@ impl<'a> TreeBuilder<'a> {
txn, txn,
table, table,
with, with,
indexes: None, schemas: Default::default(),
idioms_indexes: Default::default(),
resolved_expressions: Default::default(), resolved_expressions: Default::default(),
resolved_idioms: Default::default(), resolved_idioms: Default::default(),
idioms_indexes: Default::default(),
index_map: Default::default(), index_map: Default::default(),
with_indexes, with_indexes,
knn_expressions: Default::default(), knn_expressions: Default::default(),
idioms_record_options: Default::default(),
group_sequence: 0, group_sequence: 0,
} }
} }
async fn lazy_cache_indexes(&mut self) -> Result<(), Error> { async fn lazy_load_schema_resolver(
if self.indexes.is_none() { &mut self,
let indexes = self tx: &mut kvs::Transaction,
.txn table: &Table,
.clone() ) -> Result<(), Error> {
.lock() if self.schemas.contains_key(table) {
.await return Ok(());
.all_tb_indexes(self.opt.ns(), self.opt.db(), &self.table.0)
.await?;
self.indexes = Some(indexes);
} }
let l = SchemaCache::new(self.opt, table, tx).await?;
self.schemas.insert(table.clone(), l);
Ok(()) Ok(())
} }
@ -139,11 +150,8 @@ impl<'a> TreeBuilder<'a> {
async fn eval_idiom(&mut self, group: GroupRef, i: &Idiom) -> Result<Node, Error> { async fn eval_idiom(&mut self, group: GroupRef, i: &Idiom) -> Result<Node, Error> {
// Check if the idiom has already been resolved // Check if the idiom has already been resolved
if let Some(i) = self.resolved_idioms.get(i) { if let Some(node) = self.resolved_idioms.get(i).cloned() {
if let Some(Some(irs)) = self.idioms_indexes.get(i).cloned() { return Ok(node);
return Ok(Node::IndexedField(i.clone(), irs));
}
return Ok(Node::NonIndexedField(i.clone()));
}; };
// Compute the idiom value if it is a param // Compute the idiom value if it is a param
@ -154,25 +162,38 @@ impl<'a> TreeBuilder<'a> {
} }
} }
self.lazy_cache_indexes().await?; let n = self.resolve_idiom(i).await?;
self.resolved_idioms.insert(i.clone(), n.clone());
let i = Arc::new(i.clone()); Ok(n)
}
self.resolved_idioms.insert(i.clone(), i.clone()); async fn resolve_idiom(&mut self, i: &Idiom) -> Result<Node, Error> {
let mut tx = self.txn.lock().await;
self.lazy_load_schema_resolver(&mut tx, self.table).await?;
// Try to detect if it matches an index // Try to detect if it matches an index
if let Some(irs) = self.resolve_indexes(&i) { if let Some(schema) = self.schemas.get(self.table).cloned() {
let irs = self.resolve_indexes(self.table, i, &schema);
if !irs.is_empty() {
return Ok(Node::IndexedField(i.clone(), irs)); return Ok(Node::IndexedField(i.clone(), irs));
} }
// Try to detect an indexed record field
Ok(Node::NonIndexedField(i)) if let Some(ro) = self.resolve_record_field(&mut tx, schema.fields.as_ref(), i).await? {
return Ok(Node::RecordField(i.clone(), ro));
}
}
Ok(Node::NonIndexedField(i.clone()))
} }
fn resolve_indexes(&mut self, i: &Arc<Idiom>) -> Option<Arc<Vec<IndexRef>>> { fn resolve_indexes(&mut self, t: &Table, i: &Idiom, schema: &SchemaCache) -> Vec<IndexRef> {
let mut res = None; if let Some(m) = self.idioms_indexes.get(t) {
if let Some(indexes) = &self.indexes { if let Some(irs) = m.get(i).cloned() {
return irs;
}
}
let mut irs = Vec::new(); let mut irs = Vec::new();
for ix in indexes.as_ref() { for ix in schema.indexes.iter() {
if ix.cols.len() == 1 && ix.cols[0].eq(i) { if ix.cols.len() == 1 && ix.cols[0].eq(i) {
let ixr = self.index_map.definitions.len() as IndexRef; let ixr = self.index_map.definitions.len() as IndexRef;
if let Some(With::Index(ixs)) = self.with { if let Some(With::Index(ixs)) = self.with {
@ -184,12 +205,57 @@ impl<'a> TreeBuilder<'a> {
irs.push(ixr); irs.push(ixr);
} }
} }
if !irs.is_empty() { if let Some(e) = self.idioms_indexes.get_mut(t) {
res = Some(Arc::new(irs)); e.insert(i.clone(), irs.clone());
} else {
self.idioms_indexes.insert(t.clone(), HashMap::from([(i.clone(), irs.clone())]));
}
irs
}
async fn resolve_record_field(
&mut self,
tx: &mut kvs::Transaction,
fields: &[DefineFieldStatement],
idiom: &Idiom,
) -> Result<Option<RecordOptions>, Error> {
for field in fields.iter() {
if let Some(Kind::Record(tables)) = &field.kind {
if idiom.starts_with(&field.name.0) {
let (local_field, remote_field) = idiom.0.split_at(field.name.0.len());
if remote_field.is_empty() {
return Ok(None);
}
let local_field = Idiom::from(local_field);
self.lazy_load_schema_resolver(tx, self.table).await?;
let locals;
if let Some(shema) = self.schemas.get(self.table).cloned() {
locals = self.resolve_indexes(self.table, &local_field, &shema);
} else {
return Ok(None);
}
let remote_field = Idiom::from(remote_field);
let mut remotes = vec![];
for table in tables {
self.lazy_load_schema_resolver(tx, table).await?;
if let Some(shema) = self.schemas.get(table).cloned() {
let remote_irs = self.resolve_indexes(table, &remote_field, &shema);
remotes.push((remote_field.clone(), remote_irs));
} else {
return Ok(None);
} }
} }
self.idioms_indexes.insert(i.clone(), res.clone()); let ro = RecordOptions {
res locals,
remotes: Arc::new(remotes),
};
self.idioms_record_options.insert(idiom.clone(), ro.clone());
return Ok(Some(ro));
}
}
}
Ok(None)
} }
async fn eval_expression(&mut self, group: GroupRef, e: &Expression) -> Result<Node, Error> { async fn eval_expression(&mut self, group: GroupRef, e: &Expression) -> Result<Node, Error> {
@ -210,23 +276,25 @@ impl<'a> TreeBuilder<'a> {
let left = Arc::new(self.eval_value(group, l).await?); let left = Arc::new(self.eval_value(group, l).await?);
let right = Arc::new(self.eval_value(group, r).await?); let right = Arc::new(self.eval_value(group, r).await?);
let mut io = None; let mut io = None;
if let Some((id, irs)) = left.is_indexed_field() { if let Some((id, local_irs, remote_irs)) = left.is_indexed_field() {
io = self.lookup_index_option( io = self.lookup_index_options(
irs.as_slice(),
o, o,
id, id,
&right, &right,
&exp, &exp,
IdiomPosition::Left, IdiomPosition::Left,
local_irs,
remote_irs,
)?; )?;
} else if let Some((id, irs)) = right.is_indexed_field() { } else if let Some((id, local_irs, remote_irs)) = right.is_indexed_field() {
io = self.lookup_index_option( io = self.lookup_index_options(
irs.as_slice(),
o, o,
id, id,
&left, &left,
&exp, &exp,
IdiomPosition::Right, IdiomPosition::Right,
local_irs,
remote_irs,
)?; )?;
} else if let Some(id) = left.is_non_indexed_field() { } else if let Some(id) = left.is_non_indexed_field() {
self.eval_knn(id, &right, &exp)?; self.eval_knn(id, &right, &exp)?;
@ -236,7 +304,7 @@ impl<'a> TreeBuilder<'a> {
let re = ResolvedExpression { let re = ResolvedExpression {
group, group,
exp: exp.clone(), exp: exp.clone(),
io: io.clone(), io,
left: left.clone(), left: left.clone(),
right: right.clone(), right: right.clone(),
}; };
@ -246,11 +314,41 @@ impl<'a> TreeBuilder<'a> {
} }
} }
#[allow(clippy::too_many_arguments)]
fn lookup_index_options(
&mut self,
o: &Operator,
id: &Idiom,
node: &Node,
exp: &Arc<Expression>,
p: IdiomPosition,
local_irs: LocalIndexRefs,
remote_irs: Option<RemoteIndexRefs>,
) -> Result<Option<IndexOption>, Error> {
if let Some(remote_irs) = remote_irs {
let mut remote_ios = Vec::with_capacity(remote_irs.len());
for (id, irs) in remote_irs.iter() {
if let Some(io) = self.lookup_index_option(irs.as_slice(), o, id, node, exp, p)? {
remote_ios.push(io);
} else {
return Ok(None);
}
}
if let Some(ir) = self.lookup_join_index_ref(local_irs.as_slice()) {
let io = IndexOption::new(ir, id.clone(), p, IndexOperator::Join(remote_ios));
return Ok(Some(io));
}
return Ok(None);
}
let io = self.lookup_index_option(local_irs.as_slice(), o, id, node, exp, p)?;
Ok(io)
}
fn lookup_index_option( fn lookup_index_option(
&mut self, &mut self,
irs: &[IndexRef], irs: &[IndexRef],
op: &Operator, op: &Operator,
id: Arc<Idiom>, id: &Idiom,
n: &Node, n: &Node,
e: &Arc<Expression>, e: &Arc<Expression>,
p: IdiomPosition, p: IdiomPosition,
@ -263,10 +361,10 @@ impl<'a> TreeBuilder<'a> {
Index::Search { Index::Search {
.. ..
} => Self::eval_matches_operator(op, n), } => Self::eval_matches_operator(op, n),
Index::MTree(_) => self.eval_indexed_knn(e, op, n, id.clone())?, Index::MTree(_) => self.eval_indexed_knn(e, op, n, id)?,
}; };
if let Some(op) = op { if let Some(op) = op {
let io = IndexOption::new(*ir, id, op); let io = IndexOption::new(*ir, id.clone(), p, op);
self.index_map.options.push((e.clone(), io.clone())); self.index_map.options.push((e.clone(), io.clone()));
return Ok(Some(io)); return Ok(Some(io));
} }
@ -274,6 +372,19 @@ impl<'a> TreeBuilder<'a> {
} }
Ok(None) Ok(None)
} }
fn lookup_join_index_ref(&self, irs: &[IndexRef]) -> Option<IndexRef> {
for ir in irs {
if let Some(ix) = self.index_map.definitions.get(*ir as usize) {
match &ix.index {
Index::Idx | Index::Uniq => return Some(*ir),
_ => {}
};
}
}
None
}
fn eval_matches_operator(op: &Operator, n: &Node) -> Option<IndexOperator> { fn eval_matches_operator(op: &Operator, n: &Node) -> Option<IndexOperator> {
if let Some(v) = n.is_computed() { if let Some(v) = n.is_computed() {
if let Operator::Matches(mr) = op { if let Operator::Matches(mr) = op {
@ -288,14 +399,14 @@ impl<'a> TreeBuilder<'a> {
exp: &Arc<Expression>, exp: &Arc<Expression>,
op: &Operator, op: &Operator,
n: &Node, n: &Node,
id: Arc<Idiom>, id: &Idiom,
) -> Result<Option<IndexOperator>, Error> { ) -> Result<Option<IndexOperator>, Error> {
if let Operator::Knn(k, d) = op { if let Operator::Knn(k, d) = op {
if let Node::Computed(v) = n { if let Node::Computed(v) = n {
let vec: Vec<Number> = v.as_ref().try_into()?; let vec: Vec<Number> = v.as_ref().try_into()?;
self.knn_expressions.insert( self.knn_expressions.insert(
exp.clone(), exp.clone(),
(*k, id, Arc::new(vec), d.clone().unwrap_or(Distance::Euclidean)), (*k, id.clone(), Arc::new(vec), d.clone().unwrap_or(Distance::Euclidean)),
); );
if let Value::Array(a) = v.as_ref() { if let Value::Array(a) = v.as_ref() {
match d { match d {
@ -310,13 +421,13 @@ impl<'a> TreeBuilder<'a> {
Ok(None) Ok(None)
} }
fn eval_knn(&mut self, id: Arc<Idiom>, val: &Node, exp: &Arc<Expression>) -> Result<(), Error> { fn eval_knn(&mut self, id: &Idiom, val: &Node, exp: &Arc<Expression>) -> Result<(), Error> {
if let Operator::Knn(k, d) = exp.operator() { if let Operator::Knn(k, d) = exp.operator() {
if let Node::Computed(v) = val { if let Node::Computed(v) = val {
let vec: Vec<Number> = v.as_ref().try_into()?; let vec: Vec<Number> = v.as_ref().try_into()?;
self.knn_expressions.insert( self.knn_expressions.insert(
exp.clone(), exp.clone(),
(*k, id, Arc::new(vec), d.clone().unwrap_or(Distance::Euclidean)), (*k, id.clone(), Arc::new(vec), d.clone().unwrap_or(Distance::Euclidean)),
); );
} }
} }
@ -367,6 +478,23 @@ pub(super) struct IndexesMap {
pub(super) definitions: Vec<DefineIndexStatement>, pub(super) definitions: Vec<DefineIndexStatement>,
} }
#[derive(Clone)]
struct SchemaCache {
indexes: Arc<[DefineIndexStatement]>,
fields: Arc<[DefineFieldStatement]>,
}
impl SchemaCache {
async fn new(opt: &Options, table: &Table, tx: &mut kvs::Transaction) -> Result<Self, Error> {
let indexes = tx.all_tb_indexes(opt.ns(), opt.db(), table).await?;
let fields = tx.all_tb_fields(opt.ns(), opt.db(), table).await?;
Ok(Self {
indexes,
fields,
})
}
}
pub(super) type GroupRef = u16; pub(super) type GroupRef = u16;
#[derive(Debug, Clone, Eq, PartialEq, Hash)] #[derive(Debug, Clone, Eq, PartialEq, Hash)]
@ -378,8 +506,9 @@ pub(super) enum Node {
right: Arc<Node>, right: Arc<Node>,
exp: Arc<Expression>, exp: Arc<Expression>,
}, },
IndexedField(Arc<Idiom>, Arc<Vec<IndexRef>>), IndexedField(Idiom, Vec<IndexRef>),
NonIndexedField(Arc<Idiom>), RecordField(Idiom, RecordOptions),
NonIndexedField(Idiom),
Computed(Arc<Value>), Computed(Arc<Value>),
Unsupported(String), Unsupported(String),
} }
@ -393,28 +522,31 @@ impl Node {
} }
} }
pub(super) fn is_indexed_field(&self) -> Option<(Arc<Idiom>, Arc<Vec<IndexRef>>)> { pub(super) fn is_indexed_field(
if let Node::IndexedField(id, irs) = self { &self,
Some((id.clone(), irs.clone())) ) -> Option<(&Idiom, LocalIndexRefs, Option<RemoteIndexRefs>)> {
} else { match self {
None Node::IndexedField(id, irs) => Some((id, irs.clone(), None)),
Node::RecordField(id, ro) => Some((id, ro.locals.clone(), Some(ro.remotes.clone()))),
_ => None,
} }
} }
pub(super) fn is_non_indexed_field(&self) -> Option<Arc<Idiom>> { pub(super) fn is_non_indexed_field(&self) -> Option<&Idiom> {
if let Node::NonIndexedField(id) = self { if let Node::NonIndexedField(id) = self {
Some(id.clone()) Some(id)
} else { } else {
None None
} }
} }
} }
#[derive(Clone, Copy)] #[derive(Clone, Copy, Debug, Eq, PartialEq, Hash)]
enum IdiomPosition { pub(super) enum IdiomPosition {
Left, Left,
Right, Right,
} }
impl IdiomPosition { impl IdiomPosition {
// Reverses the operator for non-commutative operators // Reverses the operator for non-commutative operators
fn transform(&self, op: &Operator) -> Operator { fn transform(&self, op: &Operator) -> Operator {

View file

@ -185,7 +185,7 @@ impl Expression {
Operator::NoneInside => fnc::operate::inside_none(&l, &r), Operator::NoneInside => fnc::operate::inside_none(&l, &r),
Operator::Outside => fnc::operate::outside(&l, &r), Operator::Outside => fnc::operate::outside(&l, &r),
Operator::Intersects => fnc::operate::intersects(&l, &r), Operator::Intersects => fnc::operate::intersects(&l, &r),
Operator::Matches(_) => fnc::operate::matches(ctx, txn, doc, self).await, Operator::Matches(_) => fnc::operate::matches(ctx, opt, txn, doc, self, l, r).await,
Operator::Knn(_, _) => fnc::operate::knn(ctx, opt, txn, doc, self).await, Operator::Knn(_, _) => fnc::operate::knn(ctx, opt, txn, doc, self).await,
_ => unreachable!(), _ => unreachable!(),
} }

View file

@ -1914,3 +1914,367 @@ async fn select_with_in_operator_multiple_indexes() -> Result<(), Error> {
assert_eq!(format!("{:#}", tmp), format!("{:#}", val)); assert_eq!(format!("{:#}", tmp), format!("{:#}", val));
Ok(()) Ok(())
} }
#[tokio::test]
async fn select_with_record_id_link_no_index() -> Result<(), Error> {
let dbs = new_ds().await?;
let ses = Session::owner().with_ns("test").with_db("test");
//
let sql = "
DEFINE FIELD name ON TABLE t TYPE string;
DEFINE FIELD t ON TABLE i TYPE record(t);
CREATE t:1 SET name = 'h';
CREATE t:2 SET name = 'h';
CREATE i:A SET t = t:1;
CREATE i:B SET t = t:2;
SELECT * FROM i WHERE t.name = 'h';
SELECT * FROM i WHERE t.name = 'h' EXPLAIN;
";
let mut res = dbs.execute(&sql, &ses, None).await?;
//
assert_eq!(res.len(), 8);
skip_ok(&mut res, 6)?;
//
let tmp = res.remove(0).result?;
let val = Value::parse(
r#"[
{ "id": i:A, "t": t:1 },
{ "id": i:B, "t": t:2 }
]"#,
);
assert_eq!(format!("{:#}", tmp), format!("{:#}", val));
//
let tmp = res.remove(0).result?;
let val = Value::parse(
r#"[
{
detail: {
table: 'i'
},
operation: 'Iterate Table'
},
{
detail: {
reason: 'NO INDEX FOUND'
},
operation: 'Fallback'
},
{
detail: {
type: 'Memory'
},
operation: 'Collector'
}
]"#,
);
assert_eq!(format!("{:#}", tmp), format!("{:#}", val));
//
Ok(())
}
#[tokio::test]
async fn select_with_record_id_link_index() -> Result<(), Error> {
let dbs = new_ds().await?;
let ses = Session::owner().with_ns("test").with_db("test");
//
let sql = "
DEFINE INDEX i_t_id ON TABLE i COLUMNS t;
DEFINE INDEX t_name_idx ON TABLE t COLUMNS name;
DEFINE FIELD name ON TABLE t TYPE string;
DEFINE FIELD t ON TABLE i TYPE record(t);
CREATE t:1 SET name = 'h';
CREATE t:2 SET name = 'h';
CREATE i:A SET t = t:1;
CREATE i:B SET t = t:2;
SELECT * FROM i WHERE t.name = 'h' EXPLAIN;
SELECT * FROM i WHERE t.name = 'h';
";
let mut res = dbs.execute(&sql, &ses, None).await?;
//
assert_eq!(res.len(), 10);
skip_ok(&mut res, 8)?;
//
let expected = Value::parse(
r#"[
{ "id": i:A, "t": t:1 },
{ "id": i:B, "t": t:2 }
]"#,
);
//
let tmp = res.remove(0).result?;
let val = Value::parse(
r#"[
{
detail: {
plan: {
index: 'i_t_id',
joins: [
{
index: 't_name_idx',
operator: '=',
value: 'h'
}
],
operator: 'join'
},
table: 'i'
},
operation: 'Iterate Index'
},
{
detail: {
type: 'Memory'
},
operation: 'Collector'
}
]"#,
);
assert_eq!(format!("{:#}", tmp), format!("{:#}", val));
//
let tmp = res.remove(0).result?;
assert_eq!(format!("{:#}", tmp), format!("{:#}", expected));
//
Ok(())
}
#[tokio::test]
async fn select_with_record_id_link_unique_index() -> Result<(), Error> {
let dbs = new_ds().await?;
let ses = Session::owner().with_ns("test").with_db("test");
//
let sql = "
DEFINE INDEX i_t_unique_id ON TABLE i COLUMNS t UNIQUE;
DEFINE INDEX t_name_idx ON TABLE t COLUMNS name;
DEFINE FIELD name ON TABLE t TYPE string;
DEFINE FIELD t ON TABLE i TYPE record(t);
CREATE t:1 SET name = 'h';
CREATE t:2 SET name = 'h';
CREATE i:A SET t = t:1;
CREATE i:B SET t = t:2;
SELECT * FROM i WHERE t.name = 'h' EXPLAIN;
SELECT * FROM i WHERE t.name = 'h';
";
let mut res = dbs.execute(&sql, &ses, None).await?;
//
assert_eq!(res.len(), 10);
skip_ok(&mut res, 8)?;
//
let expected = Value::parse(
r#"[
{ "id": i:A, "t": t:1 },
{ "id": i:B, "t": t:2 }
]"#,
);
//
let tmp = res.remove(0).result?;
let val = Value::parse(
r#"[
{
detail: {
plan: {
index: 'i_t_unique_id',
joins: [
{
index: 't_name_idx',
operator: '=',
value: 'h'
}
],
operator: 'join'
},
table: 'i'
},
operation: 'Iterate Index'
},
{
detail: {
type: 'Memory'
},
operation: 'Collector'
}
]"#,
);
assert_eq!(format!("{:#}", tmp), format!("{:#}", val));
//
let tmp = res.remove(0).result?;
assert_eq!(format!("{:#}", tmp), format!("{:#}", expected));
//
Ok(())
}
#[tokio::test]
async fn select_with_record_id_link_unique_remote_index() -> Result<(), Error> {
let dbs = new_ds().await?;
let ses = Session::owner().with_ns("test").with_db("test");
//
let sql = "
DEFINE INDEX i_t_id ON TABLE i COLUMNS t;
DEFINE INDEX t_name_unique_idx ON TABLE t COLUMNS name UNIQUE;
DEFINE FIELD name ON TABLE t TYPE string;
DEFINE FIELD t ON TABLE i TYPE record(t);
CREATE t:1 SET name = 'a';
CREATE t:2 SET name = 'b';
CREATE i:A SET t = t:1;
CREATE i:B SET t = t:2;
SELECT * FROM i WHERE t.name IN ['a', 'b'] EXPLAIN;
SELECT * FROM i WHERE t.name IN ['a', 'b'];
";
let mut res = dbs.execute(&sql, &ses, None).await?;
//
assert_eq!(res.len(), 10);
skip_ok(&mut res, 8)?;
//
let expected = Value::parse(
r#"[
{ "id": i:A, "t": t:1 },
{ "id": i:B, "t": t:2 }
]"#,
);
//
let tmp = res.remove(0).result?;
let val = Value::parse(
r#"[
{
detail: {
plan: {
index: 'i_t_id',
joins: [
{
index: 't_name_unique_idx',
operator: 'union',
value: [
'a',
'b'
]
}
],
operator: 'join'
},
table: 'i'
},
operation: 'Iterate Index'
},
{
detail: {
type: 'Memory'
},
operation: 'Collector'
}
]"#,
);
assert_eq!(format!("{:#}", tmp), format!("{:#}", val));
//
let tmp = res.remove(0).result?;
assert_eq!(format!("{:#}", tmp), format!("{:#}", expected));
//
Ok(())
}
#[tokio::test]
async fn select_with_record_id_link_full_text_index() -> Result<(), Error> {
let dbs = new_ds().await?;
let ses = Session::owner().with_ns("test").with_db("test");
//
let sql = "
DEFINE ANALYZER name TOKENIZERS class FILTERS lowercase,ngram(1,128);
DEFINE INDEX t_name_search_idx ON TABLE t COLUMNS name SEARCH ANALYZER name BM25 HIGHLIGHTS;
DEFINE INDEX i_t_id ON TABLE i COLUMNS t;
DEFINE FIELD name ON TABLE t TYPE string;
DEFINE FIELD t ON TABLE i TYPE record(t);
CREATE t:1 SET name = 'Hello World';
CREATE i:A SET t = t:1;
SELECT * FROM i WHERE t.name @@ 'world' EXPLAIN;
SELECT * FROM i WHERE t.name @@ 'world';
";
let mut res = dbs.execute(&sql, &ses, None).await?;
assert_eq!(res.len(), 9);
skip_ok(&mut res, 7)?;
//
let tmp = res.remove(0).result?;
let val = Value::parse(
r#"[
{
detail: {
plan: {
index: 'i_t_id',
joins: [
{
index: 't_name_search_idx',
operator: '@@',
value: 'world'
}
],
operator: 'join'
},
table: 'i'
},
operation: 'Iterate Index'
},
{
detail: {
type: 'Memory'
},
operation: 'Collector'
}
]"#,
);
assert_eq!(format!("{:#}", tmp), format!("{:#}", val));
//
let tmp = res.remove(0).result?;
let val = Value::parse(r#"[{ "id": i:A, "t": t:1}]"#);
assert_eq!(format!("{:#}", tmp), format!("{:#}", val));
//
Ok(())
}
#[tokio::test]
async fn select_with_record_id_link_full_text_no_record_index() -> Result<(), Error> {
let dbs = new_ds().await?;
let ses = Session::owner().with_ns("test").with_db("test");
//
let sql = "
DEFINE ANALYZER name TOKENIZERS class FILTERS lowercase,ngram(1,128);
DEFINE INDEX t_name_search_idx ON TABLE t COLUMNS name SEARCH ANALYZER name BM25 HIGHLIGHTS;
DEFINE FIELD name ON TABLE t TYPE string;
DEFINE FIELD t ON TABLE i TYPE record(t);
CREATE t:1 SET name = 'Hello World';
CREATE i:A SET t = t:1;
SELECT * FROM i WHERE t.name @@ 'world' EXPLAIN;
SELECT * FROM i WHERE t.name @@ 'world';
";
let mut res = dbs.execute(&sql, &ses, None).await?;
assert_eq!(res.len(), 8);
skip_ok(&mut res, 6)?;
//
let tmp = res.remove(0).result?;
let val = Value::parse(
r#"[
{
detail: {
table: 'i'
},
operation: 'Iterate Table'
},
{
detail: {
reason: 'NO INDEX FOUND'
},
operation: 'Fallback'
},
{
detail: {
type: 'Memory'
},
operation: 'Collector'
}
]"#,
);
assert_eq!(format!("{:#}", tmp), format!("{:#}", val));
//
let tmp = res.remove(0).result?;
let val = Value::parse(r#"[{ "id": i:A, "t": t:1}]"#);
assert_eq!(format!("{:#}", tmp), format!("{:#}", val));
//
Ok(())
}