diff --git a/core/src/dbs/iterator.rs b/core/src/dbs/iterator.rs index ca1d3896..f4862574 100644 --- a/core/src/dbs/iterator.rs +++ b/core/src/dbs/iterator.rs @@ -19,19 +19,18 @@ use crate::sql::thing::Thing; use crate::sql::value::Value; use async_recursion::async_recursion; use std::mem; -use std::sync::Arc; #[derive(Clone)] pub(crate) enum Iterable { Value(Value), - Table(Arc), + Table(Table), Thing(Thing), Range(Range), Edges(Edges), Defer(Thing), Mergeable(Thing, Value), Relatable(Thing, Thing, Thing), - Index(Arc
, IteratorRef), + Index(Table, IteratorRef), } pub(crate) struct Processed { @@ -118,7 +117,7 @@ impl Iterator { } _ => { // Ingest the table for scanning - self.ingest(Iterable::Table(Arc::new(v))) + self.ingest(Iterable::Table(v)) } }, // There is no data clause so create a record id @@ -129,7 +128,7 @@ impl Iterator { } _ => { // Ingest the table for scanning - self.ingest(Iterable::Table(Arc::new(v))) + self.ingest(Iterable::Table(v)) } }, }, diff --git a/core/src/dbs/processor.rs b/core/src/dbs/processor.rs index 6d8dea7a..b97da805 100644 --- a/core/src/dbs/processor.rs +++ b/core/src/dbs/processor.rs @@ -127,10 +127,10 @@ impl<'a> Processor<'a> { // Avoiding search in the hashmap of the query planner for each doc let mut ctx = Context::new(ctx); ctx.set_query_executor(exe.clone()); - return self.process_table(&ctx, opt, txn, stm, v.as_ref()).await; + return self.process_table(&ctx, opt, txn, stm, &v).await; } } - self.process_table(ctx, opt, txn, stm, v.as_ref()).await? + self.process_table(ctx, opt, txn, stm, &v).await? } Iterable::Range(v) => self.process_range(ctx, opt, txn, stm, v).await?, Iterable::Edges(e) => self.process_edge(ctx, opt, txn, stm, e).await?, @@ -141,10 +141,10 @@ impl<'a> Processor<'a> { // Avoiding search in the hashmap of the query planner for each doc let mut ctx = Context::new(ctx); ctx.set_query_executor(exe.clone()); - return self.process_index(&ctx, opt, txn, stm, t.as_ref(), ir).await; + return self.process_index(&ctx, opt, txn, stm, &t, ir).await; } } - self.process_index(ctx, opt, txn, stm, t.as_ref(), ir).await? + self.process_index(ctx, opt, txn, stm, &t, ir).await? } Iterable::Mergeable(v, o) => { self.process_mergeable(ctx, opt, txn, stm, v, o).await? @@ -563,7 +563,8 @@ impl<'a> Processor<'a> { txn.lock().await.check_ns_db_tb(opt.ns(), opt.db(), &table.0, opt.strict).await?; if let Some(exe) = ctx.get_query_executor() { if let Some(mut iterator) = exe.new_iterator(opt, ir).await? { - let mut things = iterator.next_batch(txn, PROCESSOR_BATCH_SIZE).await?; + let mut things = Vec::new(); + iterator.next_batch(txn, PROCESSOR_BATCH_SIZE, &mut things).await?; while !things.is_empty() { // Check if the context is finished if ctx.is_done() { @@ -601,7 +602,8 @@ impl<'a> Processor<'a> { } // Collect the next batch of ids - things = iterator.next_batch(txn, PROCESSOR_BATCH_SIZE).await?; + things = Vec::new(); + iterator.next_batch(txn, PROCESSOR_BATCH_SIZE, &mut things).await?; } // Everything ok return Ok(()); diff --git a/core/src/fnc/operate.rs b/core/src/fnc/operate.rs index c8efece4..21f40467 100644 --- a/core/src/fnc/operate.rs +++ b/core/src/fnc/operate.rs @@ -212,15 +212,19 @@ fn get_executor_option<'a>( pub(crate) async fn matches( ctx: &Context<'_>, + opt: &Options, txn: &Transaction, doc: Option<&CursorDoc<'_>>, exp: &Expression, + l: Value, + r: Value, ) -> Result { - match get_executor_option(ctx, doc, exp) { - ExecutorOption::PreMatch => Ok(Value::Bool(true)), - ExecutorOption::None => Ok(Value::Bool(false)), - ExecutorOption::Execute(exe, thg) => exe.matches(txn, thg, exp).await, - } + let res = match get_executor_option(ctx, doc, exp) { + ExecutorOption::PreMatch => true, + ExecutorOption::None => false, + ExecutorOption::Execute(exe, thg) => exe.matches(ctx, opt, txn, thg, exp, l, r).await?, + }; + Ok(res.into()) } pub(crate) async fn knn( diff --git a/core/src/idx/ft/analyzer/filter.rs b/core/src/idx/ft/analyzer/filter.rs index 36566f43..af1adc9a 100644 --- a/core/src/idx/ft/analyzer/filter.rs +++ b/core/src/idx/ft/analyzer/filter.rs @@ -11,7 +11,6 @@ pub(super) enum FilteringStage { Indexing, Querying, } - pub(super) enum Filter { Stemmer(Stemmer), Ascii, diff --git a/core/src/idx/ft/analyzer/mod.rs b/core/src/idx/ft/analyzer/mod.rs index 10c2f70f..60f7ec66 100644 --- a/core/src/idx/ft/analyzer/mod.rs +++ b/core/src/idx/ft/analyzer/mod.rs @@ -6,7 +6,7 @@ use crate::idx::ft::analyzer::tokenizer::{Tokenizer, Tokens}; use crate::idx::ft::doclength::DocLength; use crate::idx::ft::offsets::{Offset, OffsetRecords}; use crate::idx::ft::postings::TermFrequency; -use crate::idx::ft::terms::{TermId, Terms}; +use crate::idx::ft::terms::{TermId, TermLen, Terms}; use crate::sql::statements::DefineAnalyzerStatement; use crate::sql::tokenizer::Tokenizer as SqlTokenizer; use crate::sql::Value; @@ -34,31 +34,96 @@ impl From for Analyzer { } } } + +pub(in crate::idx) type TermsList = Vec>; + +pub(in crate::idx) struct TermsSet { + set: HashSet, + has_unknown_terms: bool, +} + +impl TermsSet { + /// If the query TermsSet contains terms that are unknown in the index + /// of if there is no terms in the set then + /// we are sure that it does not match any document + pub(in crate::idx) fn is_matchable(&self) -> bool { + !(self.has_unknown_terms || self.set.is_empty()) + } + + pub(in crate::idx) fn is_subset(&self, other: &TermsSet) -> bool { + if self.has_unknown_terms { + return false; + } + self.set.is_subset(&other.set) + } +} + impl Analyzer { - pub(super) async fn extract_terms( + pub(super) async fn extract_querying_terms( &self, ctx: &Context<'_>, opt: &Options, txn: &Transaction, t: &Terms, - query_string: String, - ) -> Result>, Error> { - let tokens = - self.generate_tokens(ctx, opt, txn, FilteringStage::Querying, query_string).await?; - // We first collect every unique terms - // as it can contains duplicates - let mut terms = HashSet::new(); - for token in tokens.list() { - terms.insert(token); - } - // Now we can extract the term ids - let mut res = Vec::with_capacity(terms.len()); + content: String, + ) -> Result<(TermsList, TermsSet), Error> { + let tokens = self.generate_tokens(ctx, opt, txn, FilteringStage::Querying, content).await?; + // We extract the term ids + let mut list = Vec::with_capacity(tokens.list().len()); + let mut unique_tokens = HashSet::new(); + let mut set = HashSet::new(); let mut tx = txn.lock().await; - for term in terms { - let opt_term_id = t.get_term_id(&mut tx, tokens.get_token_string(term)?).await?; - res.push(opt_term_id.map(|tid| (tid, term.get_char_len()))); + let mut has_unknown_terms = false; + for token in tokens.list() { + // Tokens can contains duplicated, not need to evaluate them again + if unique_tokens.insert(token) { + // Is the term known in the index? + let opt_term_id = t.get_term_id(&mut tx, tokens.get_token_string(token)?).await?; + list.push(opt_term_id.map(|tid| (tid, token.get_char_len()))); + if let Some(term_id) = opt_term_id { + set.insert(term_id); + } else { + has_unknown_terms = true; + } + } } - Ok(res) + Ok(( + list, + TermsSet { + set, + has_unknown_terms, + }, + )) + } + + pub(in crate::idx) async fn extract_indexing_terms( + &self, + ctx: &Context<'_>, + opt: &Options, + txn: &Transaction, + t: &Terms, + content: Value, + ) -> Result { + let mut tv = Vec::new(); + self.analyze_value(ctx, opt, txn, content, FilteringStage::Indexing, &mut tv).await?; + let mut set = HashSet::new(); + let mut has_unknown_terms = false; + let mut tx = txn.lock().await; + for tokens in tv { + for token in tokens.list() { + if let Some(term_id) = + t.get_term_id(&mut tx, tokens.get_token_string(token)?).await? + { + set.insert(term_id); + } else { + has_unknown_terms = true; + } + } + } + Ok(TermsSet { + set, + has_unknown_terms, + }) } /// This method is used for indexing. diff --git a/core/src/idx/ft/analyzer/tokenizer.rs b/core/src/idx/ft/analyzer/tokenizer.rs index 2c6f0e53..3c9e617d 100644 --- a/core/src/idx/ft/analyzer/tokenizer.rs +++ b/core/src/idx/ft/analyzer/tokenizer.rs @@ -5,7 +5,7 @@ use crate::idx::ft::offsets::{Offset, Position}; use crate::sql::tokenizer::Tokenizer as SqlTokenizer; use crate::sql::Value; -pub(super) struct Tokens { +pub(in crate::idx) struct Tokens { /// The input string i: String, /// The final list of tokens diff --git a/core/src/idx/ft/mod.rs b/core/src/idx/ft/mod.rs index 62e4146e..112cbf0f 100644 --- a/core/src/idx/ft/mod.rs +++ b/core/src/idx/ft/mod.rs @@ -11,14 +11,14 @@ use crate::ctx::Context; use crate::dbs::{Options, Transaction}; use crate::err::Error; use crate::idx::docids::{DocId, DocIds}; -use crate::idx::ft::analyzer::Analyzer; +use crate::idx::ft::analyzer::{Analyzer, TermsList, TermsSet}; use crate::idx::ft::doclength::DocLengths; use crate::idx::ft::highlighter::{Highlighter, Offseter}; use crate::idx::ft::offsets::Offsets; use crate::idx::ft::postings::Postings; use crate::idx::ft::scorer::BM25Scorer; use crate::idx::ft::termdocs::{TermDocs, TermsDocs}; -use crate::idx::ft::terms::{TermId, Terms}; +use crate::idx::ft::terms::{TermId, TermLen, Terms}; use crate::idx::trees::btree::BStatistics; use crate::idx::trees::store::IndexStores; use crate::idx::{IndexKeyBase, VersionedSerdeState}; @@ -39,7 +39,7 @@ use tokio::sync::RwLock; pub(crate) type MatchRef = u8; pub(crate) struct FtIndex { - analyzer: Analyzer, + analyzer: Arc, state_key: Key, index_key_base: IndexKeyBase, state: State, @@ -164,7 +164,7 @@ impl FtIndex { index_key_base, bm25, highlighting: p.hl, - analyzer: az.into(), + analyzer: Arc::new(az.into()), doc_ids, doc_lengths, postings, @@ -178,6 +178,14 @@ impl FtIndex { self.doc_ids.clone() } + pub(super) fn terms(&self) -> Arc> { + self.terms.clone() + } + + pub(super) fn analyzer(&self) -> Arc { + self.analyzer.clone() + } + pub(crate) async fn remove_document( &mut self, txn: &Transaction, @@ -326,22 +334,22 @@ impl FtIndex { Ok(()) } - pub(super) async fn extract_terms( + pub(super) async fn extract_querying_terms( &self, ctx: &Context<'_>, opt: &Options, txn: &Transaction, query_string: String, - ) -> Result>, Error> { + ) -> Result<(TermsList, TermsSet), Error> { let t = self.terms.read().await; - let terms = self.analyzer.extract_terms(ctx, opt, txn, &t, query_string).await?; - Ok(terms) + let res = self.analyzer.extract_querying_terms(ctx, opt, txn, &t, query_string).await?; + Ok(res) } pub(super) async fn get_terms_docs( &self, tx: &mut kvs::Transaction, - terms: &Vec>, + terms: &TermsList, ) -> Result>, Error> { let mut terms_docs = Vec::with_capacity(terms.len()); for opt_term in terms { @@ -402,7 +410,7 @@ impl FtIndex { &self, tx: &mut kvs::Transaction, thg: &Thing, - terms: &[Option<(TermId, u32)>], + terms: &[Option<(TermId, TermLen)>], prefix: Value, suffix: Value, partial: bool, @@ -538,9 +546,10 @@ mod tests { fti: &FtIndex, qs: &str, ) -> (Option, BM25Scorer) { - let t = fti.extract_terms(ctx, opt, txn, qs.to_string()).await.unwrap(); + let (term_list, _) = + fti.extract_querying_terms(ctx, opt, txn, qs.to_string()).await.unwrap(); let mut tx = txn.lock().await; - let td = Arc::new(fti.get_terms_docs(&mut tx, &t).await.unwrap()); + let td = Arc::new(fti.get_terms_docs(&mut tx, &term_list).await.unwrap()); drop(tx); let scr = fti.new_scorer(td.clone()).unwrap().unwrap(); let hits = fti.new_hits_iterator(td).unwrap(); diff --git a/core/src/idx/ft/terms.rs b/core/src/idx/ft/terms.rs index 59aa9e79..afca3cb9 100644 --- a/core/src/idx/ft/terms.rs +++ b/core/src/idx/ft/terms.rs @@ -9,8 +9,9 @@ use roaring::RoaringTreemap; use serde::{Deserialize, Serialize}; pub(crate) type TermId = u64; +pub(crate) type TermLen = u32; -pub(super) struct Terms { +pub(in crate::idx) struct Terms { state_key: Key, index_key_base: IndexKeyBase, btree: BTree, diff --git a/core/src/idx/planner/executor.rs b/core/src/idx/planner/executor.rs index 4de9ec1b..a3bbb695 100644 --- a/core/src/idx/planner/executor.rs +++ b/core/src/idx/planner/executor.rs @@ -3,19 +3,20 @@ use crate::dbs::{Options, Transaction}; use crate::doc::CursorDoc; use crate::err::Error; use crate::idx::docids::{DocId, DocIds}; +use crate::idx::ft::analyzer::{Analyzer, TermsList, TermsSet}; use crate::idx::ft::scorer::BM25Scorer; use crate::idx::ft::termdocs::TermsDocs; -use crate::idx::ft::terms::TermId; +use crate::idx::ft::terms::Terms; use crate::idx::ft::{FtIndex, MatchRef}; use crate::idx::planner::iterators::{ - DocIdsIterator, IndexEqualThingIterator, IndexRangeThingIterator, IndexUnionThingIterator, - MatchesThingIterator, ThingIterator, UniqueEqualThingIterator, UniqueRangeThingIterator, - UniqueUnionThingIterator, + DocIdsIterator, IndexEqualThingIterator, IndexJoinThingIterator, IndexRangeThingIterator, + IndexUnionThingIterator, MatchesThingIterator, ThingIterator, UniqueEqualThingIterator, + UniqueJoinThingIterator, UniqueRangeThingIterator, UniqueUnionThingIterator, }; use crate::idx::planner::knn::KnnPriorityList; use crate::idx::planner::plan::IndexOperator::Matches; use crate::idx::planner::plan::{IndexOperator, IndexOption, RangeValue}; -use crate::idx::planner::tree::{IndexRef, IndexesMap}; +use crate::idx::planner::tree::{IdiomPosition, IndexRef, IndexesMap}; use crate::idx::planner::{IterationStage, KnnSet}; use crate::idx::trees::mtree::MTreeIndex; use crate::idx::IndexKeyBase; @@ -28,9 +29,8 @@ use std::collections::{HashMap, HashSet, VecDeque}; use std::sync::Arc; use tokio::sync::RwLock; -pub(super) type KnnEntry = (KnnPriorityList, Arc, Arc>, Distance); -pub(super) type KnnExpressions = - HashMap, (u32, Arc, Arc>, Distance)>; +pub(super) type KnnEntry = (KnnPriorityList, Idiom, Arc>, Distance); +pub(super) type KnnExpressions = HashMap, (u32, Idiom, Arc>, Distance)>; #[derive(Clone)] pub(crate) struct QueryExecutor(Arc); @@ -60,16 +60,17 @@ pub(super) enum IteratorEntry { } impl IteratorEntry { - pub(super) fn explain(&self, e: &mut HashMap<&str, Value>) -> IndexRef { + pub(super) fn explain(&self, ix_def: &[DefineIndexStatement]) -> Value { match self { - Self::Single(_, io) => { - io.explain(e); - io.ix_ref() - } + Self::Single(_, io) => io.explain(ix_def), Self::Range(_, ir, from, to) => { + let mut e = HashMap::default(); + if let Some(ix) = ix_def.get(*ir as usize) { + e.insert("index", Value::from(ix.name.0.to_owned())); + } e.insert("from", Value::from(from)); e.insert("to", Value::from(to)); - *ir + Value::from(Object::from(e)) } } } @@ -227,7 +228,7 @@ impl QueryExecutor { !self.0.knn_entries.is_empty() } - /// Returns `true` if either the expression is matching the current iterator. + /// Returns `true` if the expression is matching the current iterator. pub(crate) fn is_iterator_expression(&self, ir: IteratorRef, exp: &Expression) -> bool { match self.0.it_entries.get(ir as usize) { Some(IteratorEntry::Single(e, ..)) => exp.eq(e.as_ref()), @@ -238,14 +239,7 @@ impl QueryExecutor { pub(crate) fn explain(&self, itr: IteratorRef) -> Value { match self.0.it_entries.get(itr as usize) { - Some(ie) => { - let mut e = HashMap::default(); - let ir = ie.explain(&mut e); - if let Some(ix) = self.0.index_definitions.get(ir as usize) { - e.insert("index", Value::from(ix.name.0.to_owned())); - } - Value::from(Object::from(e)) - } + Some(ie) => ie.explain(self.0.index_definitions.as_slice()), None => Value::None, } } @@ -282,10 +276,12 @@ impl QueryExecutor { it_ref: IteratorRef, io: &IndexOption, ) -> Result, Error> { - if let Some(ix) = self.0.index_definitions.get(io.ix_ref() as usize) { + if let Some(ix) = self.get_index_def(io.ix_ref()) { match ix.index { - Index::Idx => Ok(Self::new_index_iterator(opt, ix, io.clone())), - Index::Uniq => Ok(Self::new_unique_index_iterator(opt, ix, io.clone())), + Index::Idx => Ok(self.new_index_iterator(opt, it_ref, ix, io.clone()).await?), + Index::Uniq => { + Ok(self.new_unique_index_iterator(opt, it_ref, ix, io.clone()).await?) + } Index::Search { .. } => self.new_search_index_iterator(it_ref, io.clone()).await, @@ -296,20 +292,27 @@ impl QueryExecutor { } } - fn new_index_iterator( + async fn new_index_iterator( + &self, opt: &Options, + it_ref: IteratorRef, ix: &DefineIndexStatement, io: IndexOption, - ) -> Option { - match io.op() { - IndexOperator::Equality(value) => { - Some(ThingIterator::IndexEqual(IndexEqualThingIterator::new(opt, ix, value))) - } - IndexOperator::Union(value) => { - Some(ThingIterator::IndexUnion(IndexUnionThingIterator::new(opt, ix, value))) + ) -> Result, Error> { + Ok(match io.op() { + IndexOperator::Equality(value) => Some(ThingIterator::IndexEqual( + IndexEqualThingIterator::new(opt.ns(), opt.db(), &ix.what, &ix.name, value), + )), + IndexOperator::Union(value) => Some(ThingIterator::IndexUnion( + IndexUnionThingIterator::new(opt.ns(), opt.db(), &ix.what, &ix.name, value), + )), + IndexOperator::Join(ios) => { + let iterators = self.build_iterators(opt, it_ref, ios).await?; + let index_join = Box::new(IndexJoinThingIterator::new(opt, ix, iterators)); + Some(ThingIterator::IndexJoin(index_join)) } _ => None, - } + }) } fn new_range_iterator( @@ -319,16 +322,26 @@ impl QueryExecutor { from: &RangeValue, to: &RangeValue, ) -> Option { - if let Some(ix) = self.0.index_definitions.get(ir as usize) { + if let Some(ix) = self.get_index_def(ir) { match ix.index { Index::Idx => { return Some(ThingIterator::IndexRange(IndexRangeThingIterator::new( - opt, ix, from, to, + opt.ns(), + opt.db(), + &ix.what, + &ix.name, + from, + to, ))) } Index::Uniq => { return Some(ThingIterator::UniqueRange(UniqueRangeThingIterator::new( - opt, ix, from, to, + opt.ns(), + opt.db(), + &ix.what, + &ix.name, + from, + to, ))) } _ => {} @@ -337,20 +350,27 @@ impl QueryExecutor { None } - fn new_unique_index_iterator( + async fn new_unique_index_iterator( + &self, opt: &Options, + it_ref: IteratorRef, ix: &DefineIndexStatement, io: IndexOption, - ) -> Option { - match io.op() { - IndexOperator::Equality(value) => { - Some(ThingIterator::UniqueEqual(UniqueEqualThingIterator::new(opt, ix, value))) - } + ) -> Result, Error> { + Ok(match io.op() { + IndexOperator::Equality(value) => Some(ThingIterator::UniqueEqual( + UniqueEqualThingIterator::new(opt.ns(), opt.db(), &ix.what, &ix.name, value), + )), IndexOperator::Union(value) => { Some(ThingIterator::UniqueUnion(UniqueUnionThingIterator::new(opt, ix, value))) } + IndexOperator::Join(ios) => { + let iterators = self.build_iterators(opt, it_ref, ios).await?; + let unique_join = Box::new(UniqueJoinThingIterator::new(opt, ix, iterators)); + Some(ThingIterator::UniqueJoin(unique_join)) + } _ => None, - } + }) } async fn new_search_index_iterator( @@ -381,40 +401,43 @@ impl QueryExecutor { None } + async fn build_iterators( + &self, + opt: &Options, + it_ref: IteratorRef, + ios: &[IndexOption], + ) -> Result, Error> { + let mut iterators = VecDeque::with_capacity(ios.len()); + for io in ios { + if let Some(it) = Box::pin(self.new_single_iterator(opt, it_ref, io)).await? { + iterators.push_back(it); + } + } + Ok(iterators) + } + + fn get_index_def(&self, ir: IndexRef) -> Option<&DefineIndexStatement> { + self.0.index_definitions.get(ir as usize) + } + + #[allow(clippy::too_many_arguments)] pub(crate) async fn matches( &self, + ctx: &Context<'_>, + opt: &Options, txn: &Transaction, thg: &Thing, exp: &Expression, - ) -> Result { - // Otherwise, we look for the first possible index options, and evaluate the expression - // Does the record id match this executor's table? - if thg.tb.eq(&self.0.table) { - if let Some(ft) = self.0.exp_entries.get(exp) { - let mut run = txn.lock().await; - let doc_key: Key = thg.into(); - if let Some(doc_id) = - ft.0.doc_ids.read().await.get_doc_id(&mut run, doc_key).await? - { - let term_goals = ft.0.terms_docs.len(); - // If there is no terms, it can't be a match - if term_goals == 0 { - return Ok(Value::Bool(false)); - } - for opt_td in ft.0.terms_docs.iter() { - if let Some((_, docs)) = opt_td { - if !docs.contains(doc_id) { - return Ok(Value::Bool(false)); - } - } else { - // If one of the term is missing, it can't be a match - return Ok(Value::Bool(false)); - } - } - return Ok(Value::Bool(true)); + l: Value, + r: Value, + ) -> Result { + if let Some(ft) = self.0.exp_entries.get(exp) { + if let Some(ix_def) = self.get_index_def(ft.0.index_option.ix_ref()) { + if self.0.table.eq(&ix_def.what.0) { + return self.matches_with_doc_id(txn, thg, ft).await; } - return Ok(Value::Bool(false)); } + return self.matches_with_value(ctx, opt, txn, ft, l, r).await; } // If no previous case were successful, we end up with a user error @@ -423,6 +446,60 @@ impl QueryExecutor { }) } + async fn matches_with_doc_id( + &self, + txn: &Transaction, + thg: &Thing, + ft: &FtEntry, + ) -> Result { + let mut run = txn.lock().await; + let doc_key: Key = thg.into(); + if let Some(doc_id) = ft.0.doc_ids.read().await.get_doc_id(&mut run, doc_key).await? { + let term_goals = ft.0.terms_docs.len(); + // If there is no terms, it can't be a match + if term_goals == 0 { + return Ok(false); + } + for opt_td in ft.0.terms_docs.iter() { + if let Some((_, docs)) = opt_td { + if !docs.contains(doc_id) { + return Ok(false); + } + } else { + // If one of the term is missing, it can't be a match + return Ok(false); + } + } + return Ok(true); + } + Ok(false) + } + + async fn matches_with_value( + &self, + ctx: &Context<'_>, + opt: &Options, + txn: &Transaction, + ft: &FtEntry, + l: Value, + r: Value, + ) -> Result { + // If the query terms contains terms that are unknown in the index + // of if there is not terms in the query + // we are sure that it does not match any document + if !ft.0.query_terms_set.is_matchable() { + return Ok(false); + } + let v = match ft.0.index_option.id_pos() { + IdiomPosition::Left => r, + IdiomPosition::Right => l, + }; + let terms = ft.0.terms.read().await; + // Extract the terms set from the record + let t = ft.0.analyzer.extract_indexing_terms(ctx, opt, txn, &terms, v).await?; + Ok(ft.0.query_terms_set.is_subset(&t)) + } + fn get_ft_entry(&self, match_ref: &Value) -> Option<&FtEntry> { if let Some(mr) = Self::get_match_ref(match_ref) { self.0.mr_entries.get(&mr) @@ -457,7 +534,7 @@ impl QueryExecutor { .highlight( &mut run, thg, - &e.0.terms, + &e.0.query_terms_list, prefix, suffix, partial, @@ -478,7 +555,7 @@ impl QueryExecutor { ) -> Result { if let Some((e, ft)) = self.get_ft_entry_and_index(&match_ref) { let mut run = txn.lock().await; - return ft.extract_offsets(&mut run, thg, &e.0.terms, partial).await; + return ft.extract_offsets(&mut run, thg, &e.0.query_terms_list, partial).await; } Ok(Value::None) } @@ -515,7 +592,10 @@ struct FtEntry(Arc); struct Inner { index_option: IndexOption, doc_ids: Arc>, - terms: Vec>, + analyzer: Arc, + query_terms_set: TermsSet, + query_terms_list: TermsList, + terms: Arc>, terms_docs: TermsDocs, scorer: Option, } @@ -529,14 +609,18 @@ impl FtEntry { io: IndexOption, ) -> Result, Error> { if let Matches(qs, _) = io.op() { - let terms = ft.extract_terms(ctx, opt, txn, qs.to_owned()).await?; + let (terms_list, terms_set) = + ft.extract_querying_terms(ctx, opt, txn, qs.to_owned()).await?; let mut tx = txn.lock().await; - let terms_docs = Arc::new(ft.get_terms_docs(&mut tx, &terms).await?); + let terms_docs = Arc::new(ft.get_terms_docs(&mut tx, &terms_list).await?); Ok(Some(Self(Arc::new(Inner { index_option: io, doc_ids: ft.doc_ids(), + analyzer: ft.analyzer(), + query_terms_set: terms_set, + query_terms_list: terms_list, scorer: ft.new_scorer(terms_docs.clone())?, - terms, + terms: ft.terms(), terms_docs, })))) } else { diff --git a/core/src/idx/planner/iterators.rs b/core/src/idx/planner/iterators.rs index 054ca346..ed624d34 100644 --- a/core/src/idx/planner/iterators.rs +++ b/core/src/idx/planner/iterators.rs @@ -7,7 +7,8 @@ use crate::idx::planner::plan::RangeValue; use crate::key::index::Index; use crate::kvs::{Key, Limit, ScanPage}; use crate::sql::statements::DefineIndexStatement; -use crate::sql::{Array, Thing, Value}; +use crate::sql::{Array, Ident, Thing, Value}; +use radix_trie::Trie; use std::collections::VecDeque; use std::sync::Arc; use tokio::sync::RwLock; @@ -16,54 +17,76 @@ pub(crate) enum ThingIterator { IndexEqual(IndexEqualThingIterator), IndexRange(IndexRangeThingIterator), IndexUnion(IndexUnionThingIterator), + IndexJoin(Box), UniqueEqual(UniqueEqualThingIterator), UniqueRange(UniqueRangeThingIterator), UniqueUnion(UniqueUnionThingIterator), + UniqueJoin(Box), Matches(MatchesThingIterator), Knn(DocIdsIterator), } impl ThingIterator { - pub(crate) async fn next_batch( + pub(crate) async fn next_batch( &mut self, tx: &Transaction, size: u32, - ) -> Result)>, Error> { + collector: &mut T, + ) -> Result { match self { - ThingIterator::IndexEqual(i) => i.next_batch(tx, size).await, - ThingIterator::UniqueEqual(i) => i.next_batch(tx).await, - ThingIterator::IndexRange(i) => i.next_batch(tx, size).await, - ThingIterator::UniqueRange(i) => i.next_batch(tx, size).await, - ThingIterator::IndexUnion(i) => i.next_batch(tx, size).await, - ThingIterator::UniqueUnion(i) => i.next_batch(tx, size).await, - ThingIterator::Matches(i) => i.next_batch(tx, size).await, - ThingIterator::Knn(i) => i.next_batch(tx, size).await, + Self::IndexEqual(i) => i.next_batch(tx, size, collector).await, + Self::UniqueEqual(i) => i.next_batch(tx, collector).await, + Self::IndexRange(i) => i.next_batch(tx, size, collector).await, + Self::UniqueRange(i) => i.next_batch(tx, size, collector).await, + Self::IndexUnion(i) => i.next_batch(tx, size, collector).await, + Self::UniqueUnion(i) => i.next_batch(tx, size, collector).await, + Self::Matches(i) => i.next_batch(tx, size, collector).await, + Self::Knn(i) => i.next_batch(tx, size, collector).await, + Self::IndexJoin(i) => Box::pin(i.next_batch(tx, size, collector)).await, + Self::UniqueJoin(i) => Box::pin(i.next_batch(tx, size, collector)).await, } } } +pub(crate) trait ThingCollector { + fn add(&mut self, thing: Thing, doc_id: Option); +} + +impl ThingCollector for Vec<(Thing, Option)> { + fn add(&mut self, thing: Thing, doc_id: Option) { + self.push((thing, doc_id)); + } +} + +impl ThingCollector for VecDeque<(Thing, Option)> { + fn add(&mut self, thing: Thing, doc_id: Option) { + self.push_back((thing, doc_id)); + } +} + pub(crate) struct IndexEqualThingIterator { beg: Vec, end: Vec, } impl IndexEqualThingIterator { - pub(super) fn new(opt: &Options, ix: &DefineIndexStatement, v: &Value) -> Self { + pub(super) fn new(ns: &str, db: &str, ix_what: &Ident, ix_name: &Ident, v: &Value) -> Self { let a = Array::from(v.clone()); - let beg = Index::prefix_ids_beg(opt.ns(), opt.db(), &ix.what, &ix.name, &a); - let end = Index::prefix_ids_end(opt.ns(), opt.db(), &ix.what, &ix.name, &a); + let beg = Index::prefix_ids_beg(ns, db, ix_what, ix_name, &a); + let end = Index::prefix_ids_end(ns, db, ix_what, ix_name, &a); Self { beg, end, } } - async fn next_scan( + async fn next_scan( txn: &Transaction, beg: &mut Vec, end: &[u8], limit: u32, - ) -> Result)>, Error> { + collector: &mut T, + ) -> Result { let min = beg.clone(); let max = end.to_owned(); let res = txn @@ -83,16 +106,18 @@ impl IndexEqualThingIterator { key.push(0x00); *beg = key; } - let res = res.iter().map(|(_, val)| (val.into(), None)).collect(); - Ok(res) + let count = res.len(); + res.into_iter().for_each(|(_, val)| collector.add(val.into(), None)); + Ok(count) } - async fn next_batch( + async fn next_batch( &mut self, txn: &Transaction, limit: u32, - ) -> Result)>, Error> { - Self::next_scan(txn, &mut self.beg, &self.end, limit).await + collector: &mut T, + ) -> Result { + Self::next_scan(txn, &mut self.beg, &self.end, limit, collector).await } } @@ -146,47 +171,62 @@ pub(crate) struct IndexRangeThingIterator { impl IndexRangeThingIterator { pub(super) fn new( - opt: &Options, - ix: &DefineIndexStatement, + ns: &str, + db: &str, + ix_what: &Ident, + ix_name: &Ident, from: &RangeValue, to: &RangeValue, ) -> Self { - let beg = Self::compute_beg(opt, ix, from); - let end = Self::compute_end(opt, ix, to); + let beg = Self::compute_beg(ns, db, ix_what, ix_name, from); + let end = Self::compute_end(ns, db, ix_what, ix_name, to); Self { r: RangeScan::new(beg, from.inclusive, end, to.inclusive), } } - fn compute_beg(opt: &Options, ix: &DefineIndexStatement, from: &RangeValue) -> Vec { + fn compute_beg( + ns: &str, + db: &str, + ix_what: &Ident, + ix_name: &Ident, + from: &RangeValue, + ) -> Vec { if from.value == Value::None { - return Index::prefix_beg(opt.ns(), opt.db(), &ix.what, &ix.name); + return Index::prefix_beg(ns, db, ix_what, ix_name); } let fd = Array::from(from.value.to_owned()); if from.inclusive { - Index::prefix_ids_beg(opt.ns(), opt.db(), &ix.what, &ix.name, &fd) + Index::prefix_ids_beg(ns, db, ix_what, ix_name, &fd) } else { - Index::prefix_ids_end(opt.ns(), opt.db(), &ix.what, &ix.name, &fd) + Index::prefix_ids_end(ns, db, ix_what, ix_name, &fd) } } - fn compute_end(opt: &Options, ix: &DefineIndexStatement, to: &RangeValue) -> Vec { + fn compute_end( + ns: &str, + db: &str, + ix_what: &Ident, + ix_name: &Ident, + to: &RangeValue, + ) -> Vec { if to.value == Value::None { - return Index::prefix_end(opt.ns(), opt.db(), &ix.what, &ix.name); + return Index::prefix_end(ns, db, ix_what, ix_name); } let fd = Array::from(to.value.to_owned()); if to.inclusive { - Index::prefix_ids_end(opt.ns(), opt.db(), &ix.what, &ix.name, &fd) + Index::prefix_ids_end(ns, db, ix_what, ix_name, &fd) } else { - Index::prefix_ids_beg(opt.ns(), opt.db(), &ix.what, &ix.name, &fd) + Index::prefix_ids_beg(ns, db, ix_what, ix_name, &fd) } } - async fn next_batch( + async fn next_batch( &mut self, txn: &Transaction, limit: u32, - ) -> Result)>, Error> { + collector: &mut T, + ) -> Result { let min = self.r.beg.clone(); let max = self.r.end.clone(); let res = txn @@ -205,13 +245,14 @@ impl IndexRangeThingIterator { self.r.beg.clone_from(key); self.r.beg.push(0x00); } - let mut r = Vec::with_capacity(res.len()); + let mut count = 0; for (k, v) in res { if self.r.matches(&k) { - r.push((v.into(), None)); + collector.add(v.into(), None); + count += 1; } } - Ok(r) + Ok(count) } } @@ -221,14 +262,14 @@ pub(crate) struct IndexUnionThingIterator { } impl IndexUnionThingIterator { - pub(super) fn new(opt: &Options, ix: &DefineIndexStatement, a: &Array) -> Self { + pub(super) fn new(ns: &str, db: &str, ix_what: &Ident, ix_name: &Ident, a: &Array) -> Self { // We create a VecDeque to hold the prefix keys (begin and end) for each value in the array. let mut values: VecDeque<(Vec, Vec)> = a.0.iter() .map(|v| { let a = Array::from(v.clone()); - let beg = Index::prefix_ids_beg(opt.ns(), opt.db(), &ix.what, &ix.name, &a); - let end = Index::prefix_ids_end(opt.ns(), opt.db(), &ix.what, &ix.name, &a); + let beg = Index::prefix_ids_beg(ns, db, ix_what, ix_name, &a); + let end = Index::prefix_ids_end(ns, db, ix_what, ix_name, &a); (beg, end) }) .collect(); @@ -239,19 +280,148 @@ impl IndexUnionThingIterator { } } - async fn next_batch( + async fn next_batch( &mut self, txn: &Transaction, limit: u32, - ) -> Result)>, Error> { + collector: &mut T, + ) -> Result { while let Some(r) = &mut self.current { - let res = IndexEqualThingIterator::next_scan(txn, &mut r.0, &r.1, limit).await?; - if !res.is_empty() { - return Ok(res); + let count = + IndexEqualThingIterator::next_scan(txn, &mut r.0, &r.1, limit, collector).await?; + if count != 0 { + return Ok(count); } self.current = self.values.pop_front(); } - Ok(vec![]) + Ok(0) + } +} + +struct JoinThingIterator { + ns: String, + db: String, + ix_what: Ident, + ix_name: Ident, + remote_iterators: VecDeque, + current_remote: Option, + current_remote_batch: VecDeque<(Thing, Option)>, + current_local: Option, + distinct: Trie, +} + +impl JoinThingIterator { + pub(super) fn new( + opt: &Options, + ix: &DefineIndexStatement, + remote_iterators: VecDeque, + ) -> Self { + Self { + ns: opt.ns().to_string(), + db: opt.db().to_string(), + ix_what: ix.what.clone(), + ix_name: ix.name.clone(), + current_remote: None, + current_remote_batch: VecDeque::with_capacity(0), + remote_iterators, + current_local: None, + distinct: Default::default(), + } + } +} + +impl JoinThingIterator { + async fn next_current_remote_batch( + &mut self, + tx: &Transaction, + limit: u32, + ) -> Result { + loop { + if let Some(it) = &mut self.current_remote { + self.current_remote_batch.clear(); + if it.next_batch(tx, limit, &mut self.current_remote_batch).await? > 0 { + return Ok(true); + } + } + self.current_remote = self.remote_iterators.pop_front(); + if self.current_remote.is_none() { + return Ok(false); + } + } + } + + async fn next_current_local( + &mut self, + tx: &Transaction, + limit: u32, + new_iter: F, + ) -> Result + where + F: Fn(&str, &str, &Ident, &Ident, Value) -> ThingIterator, + { + loop { + while let Some((thing, _)) = self.current_remote_batch.pop_front() { + let k: Key = (&thing).into(); + let value = Value::from(thing); + if self.distinct.insert(k, true).is_none() { + self.current_local = + Some(new_iter(&self.ns, &self.db, &self.ix_what, &self.ix_name, value)); + return Ok(true); + } + } + if !self.next_current_remote_batch(tx, limit).await? { + break; + } + } + Ok(false) + } + + async fn next_batch( + &mut self, + tx: &Transaction, + limit: u32, + collector: &mut T, + new_iter: F, + ) -> Result + where + F: Fn(&str, &str, &Ident, &Ident, Value) -> ThingIterator + Copy, + { + loop { + if let Some(current_local) = &mut self.current_local { + let n = current_local.next_batch(tx, limit, collector).await?; + if n > 0 { + return Ok(n); + } + } + if !self.next_current_local(tx, limit, new_iter).await? { + return Ok(0); + } + } + } +} + +pub(crate) struct IndexJoinThingIterator(JoinThingIterator); + +impl IndexJoinThingIterator { + pub(super) fn new( + opt: &Options, + ix: &DefineIndexStatement, + remote_iterators: VecDeque, + ) -> Self { + Self(JoinThingIterator::new(opt, ix, remote_iterators)) + } + + async fn next_batch( + &mut self, + tx: &Transaction, + limit: u32, + collector: &mut T, + ) -> Result { + let new_iter = |ns: &str, db: &str, ix_what: &Ident, ix_name: &Ident, value: Value| { + let it = IndexEqualThingIterator::new(ns, db, ix_what, ix_name, &value); + ThingIterator::IndexEqual(it) + }; + self.0.next_batch(tx, limit, collector, new_iter).await } } @@ -260,24 +430,27 @@ pub(crate) struct UniqueEqualThingIterator { } impl UniqueEqualThingIterator { - pub(super) fn new(opt: &Options, ix: &DefineIndexStatement, v: &Value) -> Self { + pub(super) fn new(ns: &str, db: &str, ix_what: &Ident, ix_name: &Ident, v: &Value) -> Self { let a = Array::from(v.to_owned()); - let key = Index::new(opt.ns(), opt.db(), &ix.what, &ix.name, &a, None).into(); + let key = Index::new(ns, db, ix_what, ix_name, &a, None).into(); Self { key: Some(key), } } - async fn next_batch( + async fn next_batch( &mut self, txn: &Transaction, - ) -> Result)>, Error> { + collector: &mut T, + ) -> Result { + let mut count = 0; if let Some(key) = self.key.take() { if let Some(val) = txn.lock().await.get(key).await? { - return Ok(vec![(val.into(), None)]); + collector.add(val.into(), None); + count += 1; } } - Ok(vec![]) + Ok(count) } } @@ -288,51 +461,59 @@ pub(crate) struct UniqueRangeThingIterator { impl UniqueRangeThingIterator { pub(super) fn new( - opt: &Options, - ix: &DefineIndexStatement, + ns: &str, + db: &str, + ix_what: &Ident, + ix_name: &Ident, from: &RangeValue, to: &RangeValue, ) -> Self { - let beg = Self::compute_beg(opt, ix, from); - let end = Self::compute_end(opt, ix, to); + let beg = Self::compute_beg(ns, db, ix_what, ix_name, from); + let end = Self::compute_end(ns, db, ix_what, ix_name, to); Self { r: RangeScan::new(beg, from.inclusive, end, to.inclusive), done: false, } } - fn compute_beg(opt: &Options, ix: &DefineIndexStatement, from: &RangeValue) -> Vec { + fn compute_beg( + ns: &str, + db: &str, + ix_what: &Ident, + ix_name: &Ident, + from: &RangeValue, + ) -> Vec { if from.value == Value::None { - return Index::prefix_beg(opt.ns(), opt.db(), &ix.what, &ix.name); + return Index::prefix_beg(ns, db, ix_what, ix_name); } - Index::new( - opt.ns(), - opt.db(), - &ix.what, - &ix.name, - &Array::from(from.value.to_owned()), - None, - ) - .encode() - .unwrap() - } - - fn compute_end(opt: &Options, ix: &DefineIndexStatement, to: &RangeValue) -> Vec { - if to.value == Value::None { - return Index::prefix_end(opt.ns(), opt.db(), &ix.what, &ix.name); - } - Index::new(opt.ns(), opt.db(), &ix.what, &ix.name, &Array::from(to.value.to_owned()), None) + Index::new(ns, db, ix_what, ix_name, &Array::from(from.value.to_owned()), None) .encode() .unwrap() } - async fn next_batch( + fn compute_end( + ns: &str, + db: &str, + ix_what: &Ident, + ix_name: &Ident, + to: &RangeValue, + ) -> Vec { + if to.value == Value::None { + return Index::prefix_end(ns, db, ix_what, ix_name); + } + Index::new(ns, db, ix_what, ix_name, &Array::from(to.value.to_owned()), None) + .encode() + .unwrap() + } + + async fn next_batch( &mut self, txn: &Transaction, mut limit: u32, - ) -> Result)>, Error> { + collector: &mut T, + ) -> Result { if self.done { - return Ok(vec![]); + return Ok(0); } let min = self.r.beg.clone(); let max = self.r.end.clone(); @@ -347,26 +528,27 @@ impl UniqueRangeThingIterator { limit, ) .await?; - let res = res.values; - let mut r = Vec::with_capacity(res.len()); - for (k, v) in res { + let mut count = 0; + for (k, v) in res.values { limit -= 1; if limit == 0 { self.r.beg = k; - return Ok(r); + return Ok(count); } if self.r.matches(&k) { - r.push((v.into(), None)); + collector.add(v.into(), None); + count += 1; } } let end = self.r.end.clone(); if self.r.matches(&end) { if let Some(v) = tx.get(end).await? { - r.push((v.into(), None)); + collector.add(v.into(), None); + count += 1; } } self.done = true; - Ok(r) + Ok(count) } } @@ -390,22 +572,49 @@ impl UniqueUnionThingIterator { } } - async fn next_batch( + async fn next_batch( &mut self, txn: &Transaction, limit: u32, - ) -> Result)>, Error> { + collector: &mut T, + ) -> Result { let mut run = txn.lock().await; - let mut res = vec![]; + let mut count = 0; while let Some(key) = self.keys.pop_front() { if let Some(val) = run.get(key).await? { - res.push((val.into(), None)); - } - if res.len() >= limit as usize { - return Ok(res); + collector.add(val.into(), None); + count += 1; + if count >= limit { + break; + } } } - Ok(res) + Ok(count as usize) + } +} + +pub(crate) struct UniqueJoinThingIterator(JoinThingIterator); + +impl UniqueJoinThingIterator { + pub(super) fn new( + opt: &Options, + ix: &DefineIndexStatement, + remote_iterators: VecDeque, + ) -> Self { + Self(JoinThingIterator::new(opt, ix, remote_iterators)) + } + + async fn next_batch( + &mut self, + tx: &Transaction, + limit: u32, + collector: &mut T, + ) -> Result { + let new_iter = |ns: &str, db: &str, ix_what: &Ident, ix_name: &Ident, value: Value| { + let it = UniqueEqualThingIterator::new(ns, db, ix_what, ix_name, &value); + ThingIterator::UniqueEqual(it) + }; + self.0.next_batch(tx, limit, collector, new_iter).await } } @@ -421,24 +630,25 @@ impl MatchesThingIterator { }) } - async fn next_batch( + async fn next_batch( &mut self, txn: &Transaction, - mut limit: u32, - ) -> Result)>, Error> { - let mut res = vec![]; + limit: u32, + collector: &mut T, + ) -> Result { + let mut count = 0; if let Some(hits) = &mut self.hits { let mut run = txn.lock().await; - while limit > 0 { + while limit > count { if let Some((thg, doc_id)) = hits.next(&mut run).await? { - res.push((thg, Some(doc_id))); + collector.add(thg, Some(doc_id)); + count += 1; } else { break; } - limit -= 1; } } - Ok(res) + Ok(count as usize) } } @@ -454,25 +664,26 @@ impl DocIdsIterator { res, } } - async fn next_batch( + async fn next_batch( &mut self, txn: &Transaction, - mut limit: u32, - ) -> Result)>, Error> { - let mut res = vec![]; + limit: u32, + collector: &mut T, + ) -> Result { let mut tx = txn.lock().await; - while limit > 0 { + let mut count = 0; + while limit > count { if let Some(doc_id) = self.res.pop_front() { if let Some(doc_key) = self.doc_ids.read().await.get_doc_key(&mut tx, doc_id).await? { - res.push((doc_key.into(), Some(doc_id))); - limit -= 1; + collector.add(doc_key.into(), Some(doc_id)); + count += 1; } } else { break; } } - Ok(res) + Ok(count as usize) } } diff --git a/core/src/idx/planner/mod.rs b/core/src/idx/planner/mod.rs index adbb1ae9..f7114ebf 100644 --- a/core/src/idx/planner/mod.rs +++ b/core/src/idx/planner/mod.rs @@ -53,7 +53,6 @@ impl<'a> QueryPlanner<'a> { ) -> Result<(), Error> { let mut is_table_iterator = false; let mut is_knn = false; - let t = Arc::new(t); match Tree::build(ctx, self.opt, txn, &t, self.cond, self.with).await? { Some(tree) => { is_knn = is_knn || !tree.knn_expressions.is_empty(); @@ -117,7 +116,7 @@ impl<'a> QueryPlanner<'a> { fn add( &mut self, - tb: Arc
, + tb: Table, irf: Option, exe: InnerQueryExecutor, it: &mut Iterator, diff --git a/core/src/idx/planner/plan.rs b/core/src/idx/planner/plan.rs index c7ed4c8f..7e54c539 100644 --- a/core/src/idx/planner/plan.rs +++ b/core/src/idx/planner/plan.rs @@ -1,6 +1,7 @@ use crate::err::Error; use crate::idx::ft::MatchRef; -use crate::idx::planner::tree::{GroupRef, IndexRef, Node}; +use crate::idx::planner::tree::{GroupRef, IdiomPosition, IndexRef, Node}; +use crate::sql::statements::DefineIndexStatement; use crate::sql::with::With; use crate::sql::{Array, Idiom, Object}; use crate::sql::{Expression, Operator, Value}; @@ -162,48 +163,53 @@ pub(super) enum Plan { SingleIndexRange(IndexRef, UnionRangeQueryBuilder), } -#[derive(Debug, Clone, Eq, PartialEq, Hash)] -pub(crate) struct IndexOption(Arc); - -#[derive(Debug, Eq, PartialEq, Hash)] -pub(super) struct Inner { +#[derive(Debug, Eq, PartialEq, Hash, Clone)] +pub(super) struct IndexOption { + /// A reference o the index definition ir: IndexRef, - id: Arc, - op: IndexOperator, + id: Idiom, + id_pos: IdiomPosition, + op: Arc, } #[derive(Debug, Eq, PartialEq, Hash)] pub(super) enum IndexOperator { Equality(Value), Union(Array), + Join(Vec), RangePart(Operator, Value), Matches(String, Option), Knn(Array, u32), } impl IndexOption { - pub(super) fn new(ir: IndexRef, id: Arc, op: IndexOperator) -> Self { - Self(Arc::new(Inner { + pub(super) fn new(ir: IndexRef, id: Idiom, id_pos: IdiomPosition, op: IndexOperator) -> Self { + Self { ir, id, - op, - })) + id_pos, + op: Arc::new(op), + } } pub(super) fn require_distinct(&self) -> bool { - matches!(self.0.op, IndexOperator::Union(_)) + matches!(self.op.as_ref(), IndexOperator::Union(_)) } pub(super) fn ix_ref(&self) -> IndexRef { - self.0.ir + self.ir } pub(super) fn op(&self) -> &IndexOperator { - &self.0.op + self.op.as_ref() } pub(super) fn id_ref(&self) -> &Idiom { - self.0.id.as_ref() + &self.id + } + + pub(super) fn id_pos(&self) -> IdiomPosition { + self.id_pos } fn reduce_array(v: &Value) -> Value { @@ -215,7 +221,11 @@ impl IndexOption { v.clone() } - pub(crate) fn explain(&self, e: &mut HashMap<&str, Value>) { + pub(crate) fn explain(&self, ix_def: &[DefineIndexStatement]) -> Value { + let mut e = HashMap::new(); + if let Some(ix) = ix_def.get(self.ir as usize) { + e.insert("index", Value::from(ix.name.0.to_owned())); + } match self.op() { IndexOperator::Equality(v) => { e.insert("operator", Value::from(Operator::Equal.to_string())); @@ -225,6 +235,15 @@ impl IndexOption { e.insert("operator", Value::from("union")); e.insert("value", Value::Array(a.clone())); } + IndexOperator::Join(ios) => { + e.insert("operator", Value::from("join")); + let mut joins = Vec::with_capacity(ios.len()); + for io in ios { + joins.push(io.explain(ix_def)); + } + let joins = Value::from(joins); + e.insert("joins", joins); + } IndexOperator::Matches(qs, a) => { e.insert("operator", Value::from(Operator::Matches(*a).to_string())); e.insert("value", Value::from(qs.to_owned())); @@ -238,6 +257,7 @@ impl IndexOption { e.insert("value", Value::Array(a.clone())); } }; + Value::from(e) } } @@ -391,23 +411,25 @@ impl UnionRangeQueryBuilder { #[cfg(test)] mod tests { use crate::idx::planner::plan::{IndexOperator, IndexOption, RangeValue}; + use crate::idx::planner::tree::IdiomPosition; use crate::sql::{Array, Idiom, Value}; use crate::syn::Parse; use std::collections::HashSet; - use std::sync::Arc; #[test] fn test_hash_index_option() { let mut set = HashSet::new(); let io1 = IndexOption::new( 1, - Arc::new(Idiom::parse("test")), + Idiom::parse("test"), + IdiomPosition::Right, IndexOperator::Equality(Value::Array(Array::from(vec!["test"]))), ); let io2 = IndexOption::new( 1, - Arc::new(Idiom::parse("test")), + Idiom::parse("test"), + IdiomPosition::Right, IndexOperator::Equality(Value::Array(Array::from(vec!["test"]))), ); diff --git a/core/src/idx/planner/tree.rs b/core/src/idx/planner/tree.rs index 79a58e37..5da5c46b 100644 --- a/core/src/idx/planner/tree.rs +++ b/core/src/idx/planner/tree.rs @@ -3,10 +3,11 @@ use crate::dbs::{Options, Transaction}; use crate::err::Error; use crate::idx::planner::executor::KnnExpressions; use crate::idx::planner::plan::{IndexOperator, IndexOption}; +use crate::kvs; use crate::sql::index::{Distance, Index}; -use crate::sql::statements::DefineIndexStatement; +use crate::sql::statements::{DefineFieldStatement, DefineIndexStatement}; use crate::sql::{ - Array, Cond, Expression, Idiom, Number, Operator, Part, Subquery, Table, Value, With, + Array, Cond, Expression, Idiom, Kind, Number, Operator, Part, Subquery, Table, Value, With, }; use async_recursion::async_recursion; use std::collections::HashMap; @@ -51,16 +52,26 @@ struct TreeBuilder<'a> { txn: &'a Transaction, table: &'a Table, with: &'a Option, - indexes: Option>, + schemas: HashMap, + idioms_indexes: HashMap>, resolved_expressions: HashMap, ResolvedExpression>, - resolved_idioms: HashMap, Arc>, - idioms_indexes: HashMap, Option>>>, + resolved_idioms: HashMap, index_map: IndexesMap, with_indexes: Vec, knn_expressions: KnnExpressions, + idioms_record_options: HashMap, group_sequence: GroupRef, } +#[derive(Debug, Clone, Eq, PartialEq, Hash)] +pub(super) struct RecordOptions { + locals: LocalIndexRefs, + remotes: RemoteIndexRefs, +} + +pub(super) type LocalIndexRefs = Vec; +pub(super) type RemoteIndexRefs = Arc>; + impl<'a> TreeBuilder<'a> { fn new( ctx: &'a Context<'_>, @@ -79,28 +90,28 @@ impl<'a> TreeBuilder<'a> { txn, table, with, - indexes: None, + schemas: Default::default(), + idioms_indexes: Default::default(), resolved_expressions: Default::default(), resolved_idioms: Default::default(), - idioms_indexes: Default::default(), index_map: Default::default(), with_indexes, knn_expressions: Default::default(), + idioms_record_options: Default::default(), group_sequence: 0, } } - async fn lazy_cache_indexes(&mut self) -> Result<(), Error> { - if self.indexes.is_none() { - let indexes = self - .txn - .clone() - .lock() - .await - .all_tb_indexes(self.opt.ns(), self.opt.db(), &self.table.0) - .await?; - self.indexes = Some(indexes); + async fn lazy_load_schema_resolver( + &mut self, + tx: &mut kvs::Transaction, + table: &Table, + ) -> Result<(), Error> { + if self.schemas.contains_key(table) { + return Ok(()); } + let l = SchemaCache::new(self.opt, table, tx).await?; + self.schemas.insert(table.clone(), l); Ok(()) } @@ -139,11 +150,8 @@ impl<'a> TreeBuilder<'a> { async fn eval_idiom(&mut self, group: GroupRef, i: &Idiom) -> Result { // Check if the idiom has already been resolved - if let Some(i) = self.resolved_idioms.get(i) { - if let Some(Some(irs)) = self.idioms_indexes.get(i).cloned() { - return Ok(Node::IndexedField(i.clone(), irs)); - } - return Ok(Node::NonIndexedField(i.clone())); + if let Some(node) = self.resolved_idioms.get(i).cloned() { + return Ok(node); }; // Compute the idiom value if it is a param @@ -154,42 +162,100 @@ impl<'a> TreeBuilder<'a> { } } - self.lazy_cache_indexes().await?; + let n = self.resolve_idiom(i).await?; + self.resolved_idioms.insert(i.clone(), n.clone()); - let i = Arc::new(i.clone()); - - self.resolved_idioms.insert(i.clone(), i.clone()); - - // Try to detect if it matches an index - if let Some(irs) = self.resolve_indexes(&i) { - return Ok(Node::IndexedField(i.clone(), irs)); - } - - Ok(Node::NonIndexedField(i)) + Ok(n) } - fn resolve_indexes(&mut self, i: &Arc) -> Option>> { - let mut res = None; - if let Some(indexes) = &self.indexes { - let mut irs = Vec::new(); - for ix in indexes.as_ref() { - if ix.cols.len() == 1 && ix.cols[0].eq(i) { - let ixr = self.index_map.definitions.len() as IndexRef; - if let Some(With::Index(ixs)) = self.with { - if ixs.contains(&ix.name.0) { - self.with_indexes.push(ixr); - } - } - self.index_map.definitions.push(ix.clone()); - irs.push(ixr); - } - } + async fn resolve_idiom(&mut self, i: &Idiom) -> Result { + let mut tx = self.txn.lock().await; + self.lazy_load_schema_resolver(&mut tx, self.table).await?; + + // Try to detect if it matches an index + if let Some(schema) = self.schemas.get(self.table).cloned() { + let irs = self.resolve_indexes(self.table, i, &schema); if !irs.is_empty() { - res = Some(Arc::new(irs)); + return Ok(Node::IndexedField(i.clone(), irs)); + } + // Try to detect an indexed record field + if let Some(ro) = self.resolve_record_field(&mut tx, schema.fields.as_ref(), i).await? { + return Ok(Node::RecordField(i.clone(), ro)); } } - self.idioms_indexes.insert(i.clone(), res.clone()); - res + Ok(Node::NonIndexedField(i.clone())) + } + + fn resolve_indexes(&mut self, t: &Table, i: &Idiom, schema: &SchemaCache) -> Vec { + if let Some(m) = self.idioms_indexes.get(t) { + if let Some(irs) = m.get(i).cloned() { + return irs; + } + } + let mut irs = Vec::new(); + for ix in schema.indexes.iter() { + if ix.cols.len() == 1 && ix.cols[0].eq(i) { + let ixr = self.index_map.definitions.len() as IndexRef; + if let Some(With::Index(ixs)) = self.with { + if ixs.contains(&ix.name.0) { + self.with_indexes.push(ixr); + } + } + self.index_map.definitions.push(ix.clone()); + irs.push(ixr); + } + } + if let Some(e) = self.idioms_indexes.get_mut(t) { + e.insert(i.clone(), irs.clone()); + } else { + self.idioms_indexes.insert(t.clone(), HashMap::from([(i.clone(), irs.clone())])); + } + irs + } + + async fn resolve_record_field( + &mut self, + tx: &mut kvs::Transaction, + fields: &[DefineFieldStatement], + idiom: &Idiom, + ) -> Result, Error> { + for field in fields.iter() { + if let Some(Kind::Record(tables)) = &field.kind { + if idiom.starts_with(&field.name.0) { + let (local_field, remote_field) = idiom.0.split_at(field.name.0.len()); + if remote_field.is_empty() { + return Ok(None); + } + let local_field = Idiom::from(local_field); + self.lazy_load_schema_resolver(tx, self.table).await?; + let locals; + if let Some(shema) = self.schemas.get(self.table).cloned() { + locals = self.resolve_indexes(self.table, &local_field, &shema); + } else { + return Ok(None); + } + + let remote_field = Idiom::from(remote_field); + let mut remotes = vec![]; + for table in tables { + self.lazy_load_schema_resolver(tx, table).await?; + if let Some(shema) = self.schemas.get(table).cloned() { + let remote_irs = self.resolve_indexes(table, &remote_field, &shema); + remotes.push((remote_field.clone(), remote_irs)); + } else { + return Ok(None); + } + } + let ro = RecordOptions { + locals, + remotes: Arc::new(remotes), + }; + self.idioms_record_options.insert(idiom.clone(), ro.clone()); + return Ok(Some(ro)); + } + } + } + Ok(None) } async fn eval_expression(&mut self, group: GroupRef, e: &Expression) -> Result { @@ -210,23 +276,25 @@ impl<'a> TreeBuilder<'a> { let left = Arc::new(self.eval_value(group, l).await?); let right = Arc::new(self.eval_value(group, r).await?); let mut io = None; - if let Some((id, irs)) = left.is_indexed_field() { - io = self.lookup_index_option( - irs.as_slice(), + if let Some((id, local_irs, remote_irs)) = left.is_indexed_field() { + io = self.lookup_index_options( o, id, &right, &exp, IdiomPosition::Left, + local_irs, + remote_irs, )?; - } else if let Some((id, irs)) = right.is_indexed_field() { - io = self.lookup_index_option( - irs.as_slice(), + } else if let Some((id, local_irs, remote_irs)) = right.is_indexed_field() { + io = self.lookup_index_options( o, id, &left, &exp, IdiomPosition::Right, + local_irs, + remote_irs, )?; } else if let Some(id) = left.is_non_indexed_field() { self.eval_knn(id, &right, &exp)?; @@ -236,7 +304,7 @@ impl<'a> TreeBuilder<'a> { let re = ResolvedExpression { group, exp: exp.clone(), - io: io.clone(), + io, left: left.clone(), right: right.clone(), }; @@ -246,11 +314,41 @@ impl<'a> TreeBuilder<'a> { } } + #[allow(clippy::too_many_arguments)] + fn lookup_index_options( + &mut self, + o: &Operator, + id: &Idiom, + node: &Node, + exp: &Arc, + p: IdiomPosition, + local_irs: LocalIndexRefs, + remote_irs: Option, + ) -> Result, Error> { + if let Some(remote_irs) = remote_irs { + let mut remote_ios = Vec::with_capacity(remote_irs.len()); + for (id, irs) in remote_irs.iter() { + if let Some(io) = self.lookup_index_option(irs.as_slice(), o, id, node, exp, p)? { + remote_ios.push(io); + } else { + return Ok(None); + } + } + if let Some(ir) = self.lookup_join_index_ref(local_irs.as_slice()) { + let io = IndexOption::new(ir, id.clone(), p, IndexOperator::Join(remote_ios)); + return Ok(Some(io)); + } + return Ok(None); + } + let io = self.lookup_index_option(local_irs.as_slice(), o, id, node, exp, p)?; + Ok(io) + } + fn lookup_index_option( &mut self, irs: &[IndexRef], op: &Operator, - id: Arc, + id: &Idiom, n: &Node, e: &Arc, p: IdiomPosition, @@ -263,10 +361,10 @@ impl<'a> TreeBuilder<'a> { Index::Search { .. } => Self::eval_matches_operator(op, n), - Index::MTree(_) => self.eval_indexed_knn(e, op, n, id.clone())?, + Index::MTree(_) => self.eval_indexed_knn(e, op, n, id)?, }; if let Some(op) = op { - let io = IndexOption::new(*ir, id, op); + let io = IndexOption::new(*ir, id.clone(), p, op); self.index_map.options.push((e.clone(), io.clone())); return Ok(Some(io)); } @@ -274,6 +372,19 @@ impl<'a> TreeBuilder<'a> { } Ok(None) } + + fn lookup_join_index_ref(&self, irs: &[IndexRef]) -> Option { + for ir in irs { + if let Some(ix) = self.index_map.definitions.get(*ir as usize) { + match &ix.index { + Index::Idx | Index::Uniq => return Some(*ir), + _ => {} + }; + } + } + None + } + fn eval_matches_operator(op: &Operator, n: &Node) -> Option { if let Some(v) = n.is_computed() { if let Operator::Matches(mr) = op { @@ -288,14 +399,14 @@ impl<'a> TreeBuilder<'a> { exp: &Arc, op: &Operator, n: &Node, - id: Arc, + id: &Idiom, ) -> Result, Error> { if let Operator::Knn(k, d) = op { if let Node::Computed(v) = n { let vec: Vec = v.as_ref().try_into()?; self.knn_expressions.insert( exp.clone(), - (*k, id, Arc::new(vec), d.clone().unwrap_or(Distance::Euclidean)), + (*k, id.clone(), Arc::new(vec), d.clone().unwrap_or(Distance::Euclidean)), ); if let Value::Array(a) = v.as_ref() { match d { @@ -310,13 +421,13 @@ impl<'a> TreeBuilder<'a> { Ok(None) } - fn eval_knn(&mut self, id: Arc, val: &Node, exp: &Arc) -> Result<(), Error> { + fn eval_knn(&mut self, id: &Idiom, val: &Node, exp: &Arc) -> Result<(), Error> { if let Operator::Knn(k, d) = exp.operator() { if let Node::Computed(v) = val { let vec: Vec = v.as_ref().try_into()?; self.knn_expressions.insert( exp.clone(), - (*k, id, Arc::new(vec), d.clone().unwrap_or(Distance::Euclidean)), + (*k, id.clone(), Arc::new(vec), d.clone().unwrap_or(Distance::Euclidean)), ); } } @@ -367,6 +478,23 @@ pub(super) struct IndexesMap { pub(super) definitions: Vec, } +#[derive(Clone)] +struct SchemaCache { + indexes: Arc<[DefineIndexStatement]>, + fields: Arc<[DefineFieldStatement]>, +} + +impl SchemaCache { + async fn new(opt: &Options, table: &Table, tx: &mut kvs::Transaction) -> Result { + let indexes = tx.all_tb_indexes(opt.ns(), opt.db(), table).await?; + let fields = tx.all_tb_fields(opt.ns(), opt.db(), table).await?; + Ok(Self { + indexes, + fields, + }) + } +} + pub(super) type GroupRef = u16; #[derive(Debug, Clone, Eq, PartialEq, Hash)] @@ -378,8 +506,9 @@ pub(super) enum Node { right: Arc, exp: Arc, }, - IndexedField(Arc, Arc>), - NonIndexedField(Arc), + IndexedField(Idiom, Vec), + RecordField(Idiom, RecordOptions), + NonIndexedField(Idiom), Computed(Arc), Unsupported(String), } @@ -393,28 +522,31 @@ impl Node { } } - pub(super) fn is_indexed_field(&self) -> Option<(Arc, Arc>)> { - if let Node::IndexedField(id, irs) = self { - Some((id.clone(), irs.clone())) - } else { - None + pub(super) fn is_indexed_field( + &self, + ) -> Option<(&Idiom, LocalIndexRefs, Option)> { + match self { + Node::IndexedField(id, irs) => Some((id, irs.clone(), None)), + Node::RecordField(id, ro) => Some((id, ro.locals.clone(), Some(ro.remotes.clone()))), + _ => None, } } - pub(super) fn is_non_indexed_field(&self) -> Option> { + pub(super) fn is_non_indexed_field(&self) -> Option<&Idiom> { if let Node::NonIndexedField(id) = self { - Some(id.clone()) + Some(id) } else { None } } } -#[derive(Clone, Copy)] -enum IdiomPosition { +#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash)] +pub(super) enum IdiomPosition { Left, Right, } + impl IdiomPosition { // Reverses the operator for non-commutative operators fn transform(&self, op: &Operator) -> Operator { diff --git a/core/src/sql/expression.rs b/core/src/sql/expression.rs index 996859ff..29d23283 100644 --- a/core/src/sql/expression.rs +++ b/core/src/sql/expression.rs @@ -185,7 +185,7 @@ impl Expression { Operator::NoneInside => fnc::operate::inside_none(&l, &r), Operator::Outside => fnc::operate::outside(&l, &r), Operator::Intersects => fnc::operate::intersects(&l, &r), - Operator::Matches(_) => fnc::operate::matches(ctx, txn, doc, self).await, + Operator::Matches(_) => fnc::operate::matches(ctx, opt, txn, doc, self, l, r).await, Operator::Knn(_, _) => fnc::operate::knn(ctx, opt, txn, doc, self).await, _ => unreachable!(), } diff --git a/lib/tests/matches.rs b/lib/tests/matches.rs index f1fccb61..b40707df 100644 --- a/lib/tests/matches.rs +++ b/lib/tests/matches.rs @@ -37,11 +37,11 @@ async fn select_where_matches_using_index() -> Result<(), Error> { }, operation: 'Iterate Index' }, - { - detail: { - type: 'Memory' - }, - operation: 'Collector' + { + detail: { + type: 'Memory' + }, + operation: 'Collector' } ]", ); diff --git a/lib/tests/planner.rs b/lib/tests/planner.rs index 487650fd..ccd51488 100644 --- a/lib/tests/planner.rs +++ b/lib/tests/planner.rs @@ -1914,3 +1914,367 @@ async fn select_with_in_operator_multiple_indexes() -> Result<(), Error> { assert_eq!(format!("{:#}", tmp), format!("{:#}", val)); Ok(()) } + +#[tokio::test] +async fn select_with_record_id_link_no_index() -> Result<(), Error> { + let dbs = new_ds().await?; + let ses = Session::owner().with_ns("test").with_db("test"); + // + let sql = " + DEFINE FIELD name ON TABLE t TYPE string; + DEFINE FIELD t ON TABLE i TYPE record(t); + CREATE t:1 SET name = 'h'; + CREATE t:2 SET name = 'h'; + CREATE i:A SET t = t:1; + CREATE i:B SET t = t:2; + SELECT * FROM i WHERE t.name = 'h'; + SELECT * FROM i WHERE t.name = 'h' EXPLAIN; + "; + let mut res = dbs.execute(&sql, &ses, None).await?; + // + assert_eq!(res.len(), 8); + skip_ok(&mut res, 6)?; + // + let tmp = res.remove(0).result?; + let val = Value::parse( + r#"[ + { "id": i:A, "t": t:1 }, + { "id": i:B, "t": t:2 } + ]"#, + ); + assert_eq!(format!("{:#}", tmp), format!("{:#}", val)); + // + let tmp = res.remove(0).result?; + let val = Value::parse( + r#"[ + { + detail: { + table: 'i' + }, + operation: 'Iterate Table' + }, + { + detail: { + reason: 'NO INDEX FOUND' + }, + operation: 'Fallback' + }, + { + detail: { + type: 'Memory' + }, + operation: 'Collector' + } + ]"#, + ); + assert_eq!(format!("{:#}", tmp), format!("{:#}", val)); + // + Ok(()) +} + +#[tokio::test] +async fn select_with_record_id_link_index() -> Result<(), Error> { + let dbs = new_ds().await?; + let ses = Session::owner().with_ns("test").with_db("test"); + // + let sql = " + DEFINE INDEX i_t_id ON TABLE i COLUMNS t; + DEFINE INDEX t_name_idx ON TABLE t COLUMNS name; + DEFINE FIELD name ON TABLE t TYPE string; + DEFINE FIELD t ON TABLE i TYPE record(t); + CREATE t:1 SET name = 'h'; + CREATE t:2 SET name = 'h'; + CREATE i:A SET t = t:1; + CREATE i:B SET t = t:2; + SELECT * FROM i WHERE t.name = 'h' EXPLAIN; + SELECT * FROM i WHERE t.name = 'h'; + "; + let mut res = dbs.execute(&sql, &ses, None).await?; + // + assert_eq!(res.len(), 10); + skip_ok(&mut res, 8)?; + // + let expected = Value::parse( + r#"[ + { "id": i:A, "t": t:1 }, + { "id": i:B, "t": t:2 } + ]"#, + ); + // + let tmp = res.remove(0).result?; + let val = Value::parse( + r#"[ + { + detail: { + plan: { + index: 'i_t_id', + joins: [ + { + index: 't_name_idx', + operator: '=', + value: 'h' + } + ], + operator: 'join' + }, + table: 'i' + }, + operation: 'Iterate Index' + }, + { + detail: { + type: 'Memory' + }, + operation: 'Collector' + } + ]"#, + ); + assert_eq!(format!("{:#}", tmp), format!("{:#}", val)); + // + let tmp = res.remove(0).result?; + assert_eq!(format!("{:#}", tmp), format!("{:#}", expected)); + // + Ok(()) +} + +#[tokio::test] +async fn select_with_record_id_link_unique_index() -> Result<(), Error> { + let dbs = new_ds().await?; + let ses = Session::owner().with_ns("test").with_db("test"); + // + let sql = " + DEFINE INDEX i_t_unique_id ON TABLE i COLUMNS t UNIQUE; + DEFINE INDEX t_name_idx ON TABLE t COLUMNS name; + DEFINE FIELD name ON TABLE t TYPE string; + DEFINE FIELD t ON TABLE i TYPE record(t); + CREATE t:1 SET name = 'h'; + CREATE t:2 SET name = 'h'; + CREATE i:A SET t = t:1; + CREATE i:B SET t = t:2; + SELECT * FROM i WHERE t.name = 'h' EXPLAIN; + SELECT * FROM i WHERE t.name = 'h'; + "; + let mut res = dbs.execute(&sql, &ses, None).await?; + // + assert_eq!(res.len(), 10); + skip_ok(&mut res, 8)?; + // + let expected = Value::parse( + r#"[ + { "id": i:A, "t": t:1 }, + { "id": i:B, "t": t:2 } + ]"#, + ); + // + let tmp = res.remove(0).result?; + let val = Value::parse( + r#"[ + { + detail: { + plan: { + index: 'i_t_unique_id', + joins: [ + { + index: 't_name_idx', + operator: '=', + value: 'h' + } + ], + operator: 'join' + }, + table: 'i' + }, + operation: 'Iterate Index' + }, + { + detail: { + type: 'Memory' + }, + operation: 'Collector' + } + ]"#, + ); + assert_eq!(format!("{:#}", tmp), format!("{:#}", val)); + // + let tmp = res.remove(0).result?; + assert_eq!(format!("{:#}", tmp), format!("{:#}", expected)); + // + Ok(()) +} +#[tokio::test] +async fn select_with_record_id_link_unique_remote_index() -> Result<(), Error> { + let dbs = new_ds().await?; + let ses = Session::owner().with_ns("test").with_db("test"); + // + let sql = " + DEFINE INDEX i_t_id ON TABLE i COLUMNS t; + DEFINE INDEX t_name_unique_idx ON TABLE t COLUMNS name UNIQUE; + DEFINE FIELD name ON TABLE t TYPE string; + DEFINE FIELD t ON TABLE i TYPE record(t); + CREATE t:1 SET name = 'a'; + CREATE t:2 SET name = 'b'; + CREATE i:A SET t = t:1; + CREATE i:B SET t = t:2; + SELECT * FROM i WHERE t.name IN ['a', 'b'] EXPLAIN; + SELECT * FROM i WHERE t.name IN ['a', 'b']; + "; + let mut res = dbs.execute(&sql, &ses, None).await?; + // + assert_eq!(res.len(), 10); + skip_ok(&mut res, 8)?; + // + let expected = Value::parse( + r#"[ + { "id": i:A, "t": t:1 }, + { "id": i:B, "t": t:2 } + ]"#, + ); + // + let tmp = res.remove(0).result?; + let val = Value::parse( + r#"[ + { + detail: { + plan: { + index: 'i_t_id', + joins: [ + { + index: 't_name_unique_idx', + operator: 'union', + value: [ + 'a', + 'b' + ] + } + ], + operator: 'join' + }, + table: 'i' + }, + operation: 'Iterate Index' + }, + { + detail: { + type: 'Memory' + }, + operation: 'Collector' + } + ]"#, + ); + assert_eq!(format!("{:#}", tmp), format!("{:#}", val)); + // + let tmp = res.remove(0).result?; + assert_eq!(format!("{:#}", tmp), format!("{:#}", expected)); + // + Ok(()) +} + +#[tokio::test] +async fn select_with_record_id_link_full_text_index() -> Result<(), Error> { + let dbs = new_ds().await?; + let ses = Session::owner().with_ns("test").with_db("test"); + // + let sql = " + DEFINE ANALYZER name TOKENIZERS class FILTERS lowercase,ngram(1,128); + DEFINE INDEX t_name_search_idx ON TABLE t COLUMNS name SEARCH ANALYZER name BM25 HIGHLIGHTS; + DEFINE INDEX i_t_id ON TABLE i COLUMNS t; + DEFINE FIELD name ON TABLE t TYPE string; + DEFINE FIELD t ON TABLE i TYPE record(t); + CREATE t:1 SET name = 'Hello World'; + CREATE i:A SET t = t:1; + SELECT * FROM i WHERE t.name @@ 'world' EXPLAIN; + SELECT * FROM i WHERE t.name @@ 'world'; + "; + let mut res = dbs.execute(&sql, &ses, None).await?; + + assert_eq!(res.len(), 9); + skip_ok(&mut res, 7)?; + // + let tmp = res.remove(0).result?; + let val = Value::parse( + r#"[ + { + detail: { + plan: { + index: 'i_t_id', + joins: [ + { + index: 't_name_search_idx', + operator: '@@', + value: 'world' + } + ], + operator: 'join' + }, + table: 'i' + }, + operation: 'Iterate Index' + }, + { + detail: { + type: 'Memory' + }, + operation: 'Collector' + } + ]"#, + ); + assert_eq!(format!("{:#}", tmp), format!("{:#}", val)); + // + let tmp = res.remove(0).result?; + let val = Value::parse(r#"[{ "id": i:A, "t": t:1}]"#); + assert_eq!(format!("{:#}", tmp), format!("{:#}", val)); + // + Ok(()) +} + +#[tokio::test] +async fn select_with_record_id_link_full_text_no_record_index() -> Result<(), Error> { + let dbs = new_ds().await?; + let ses = Session::owner().with_ns("test").with_db("test"); + // + let sql = " + DEFINE ANALYZER name TOKENIZERS class FILTERS lowercase,ngram(1,128); + DEFINE INDEX t_name_search_idx ON TABLE t COLUMNS name SEARCH ANALYZER name BM25 HIGHLIGHTS; + DEFINE FIELD name ON TABLE t TYPE string; + DEFINE FIELD t ON TABLE i TYPE record(t); + CREATE t:1 SET name = 'Hello World'; + CREATE i:A SET t = t:1; + SELECT * FROM i WHERE t.name @@ 'world' EXPLAIN; + SELECT * FROM i WHERE t.name @@ 'world'; + "; + let mut res = dbs.execute(&sql, &ses, None).await?; + + assert_eq!(res.len(), 8); + skip_ok(&mut res, 6)?; + // + let tmp = res.remove(0).result?; + let val = Value::parse( + r#"[ + { + detail: { + table: 'i' + }, + operation: 'Iterate Table' + }, + { + detail: { + reason: 'NO INDEX FOUND' + }, + operation: 'Fallback' + }, + { + detail: { + type: 'Memory' + }, + operation: 'Collector' + } + ]"#, + ); + assert_eq!(format!("{:#}", tmp), format!("{:#}", val)); + // + let tmp = res.remove(0).result?; + let val = Value::parse(r#"[{ "id": i:A, "t": t:1}]"#); + assert_eq!(format!("{:#}", tmp), format!("{:#}", val)); + // + Ok(()) +}