use crate::dbs::{Options, Transaction}; use crate::err::Error; use crate::idx::ft::docids::{DocId, NO_DOC_ID}; use crate::idx::ft::terms::TermId; use crate::idx::ft::{FtIndex, HitsIterator, MatchRef}; use crate::idx::planner::executor::QueryExecutor; use crate::idx::IndexKeyBase; use crate::key; use crate::kvs::Key; use crate::sql::index::Index; use crate::sql::scoring::Scoring; use crate::sql::statements::DefineIndexStatement; use crate::sql::{Array, Expression, Ident, Idiom, Object, Operator, Thing, Value}; use async_trait::async_trait; use roaring::RoaringTreemap; use std::collections::HashMap; use std::hash::Hash; use std::sync::Arc; #[derive(Default)] pub(super) struct PlanBuilder { indexes: Vec<(Expression, IndexOption)>, } impl PlanBuilder { pub(super) fn add_index_option(&mut self, e: Expression, i: IndexOption) { self.indexes.push((e, i)); } pub(super) fn build(mut self) -> Result { // TODO select the best option if there are several (cost based) if let Some((e, i)) = self.indexes.pop() { Ok(Plan::new(e, i)) } else { Err(Error::BypassQueryPlanner) } } } pub(crate) struct Plan { pub(super) e: Expression, pub(super) i: IndexOption, } impl Plan { pub(super) fn new(e: Expression, i: IndexOption) -> Self { Self { e, i, } } pub(crate) async fn new_iterator( &self, opt: &Options, txn: &Transaction, exe: &QueryExecutor, ) -> Result, Error> { self.i.new_iterator(opt, txn, exe).await } pub(crate) fn explain(&self) -> Value { Value::Object(Object::from(HashMap::from([ ("index", Value::from(self.i.ix().name.0.to_owned())), ("operator", Value::from(self.i.op().to_string())), ("value", self.i.value().clone()), ]))) } } #[derive(Debug, Clone, Eq, PartialEq, Hash)] pub(super) struct IndexOption(Arc); #[derive(Debug, Eq, PartialEq, Hash)] pub(super) struct Inner { ix: DefineIndexStatement, id: Idiom, v: Value, qs: Option, op: Operator, mr: Option, } impl IndexOption { pub(super) fn new( ix: DefineIndexStatement, id: Idiom, op: Operator, v: Value, qs: Option, mr: Option, ) -> Self { Self(Arc::new(Inner { ix, id, op, v, qs, mr, })) } pub(super) fn ix(&self) -> &DefineIndexStatement { &self.0.ix } pub(super) fn op(&self) -> &Operator { &self.0.op } pub(super) fn value(&self) -> &Value { &self.0.v } pub(super) fn qs(&self) -> Option<&String> { self.0.qs.as_ref() } pub(super) fn id(&self) -> &Idiom { &self.0.id } pub(super) fn match_ref(&self) -> Option<&MatchRef> { self.0.mr.as_ref() } async fn new_iterator( &self, opt: &Options, txn: &Transaction, exe: &QueryExecutor, ) -> Result, Error> { match &self.ix().index { Index::Idx => { if self.op() == &Operator::Equal { return Ok(Box::new(NonUniqueEqualThingIterator::new( opt, self.ix(), self.value(), )?)); } } Index::Uniq => { if self.op() == &Operator::Equal { return Ok(Box::new(UniqueEqualThingIterator::new( opt, self.ix(), self.value(), )?)); } } Index::Search { az, hl, sc, order, } => { if let Operator::Matches(_) = self.op() { let td = exe.pre_match_terms_docs(); return Ok(Box::new( MatchesThingIterator::new(opt, txn, self.ix(), az, *hl, sc, *order, td) .await?, )); } } } Err(Error::BypassQueryPlanner) } } #[cfg_attr(not(target_arch = "wasm32"), async_trait)] #[cfg_attr(target_arch = "wasm32", async_trait(?Send))] pub(crate) trait ThingIterator: Send { async fn next_batch( &mut self, tx: &Transaction, size: u32, ) -> Result, Error>; } struct NonUniqueEqualThingIterator { beg: Vec, end: Vec, } impl NonUniqueEqualThingIterator { fn new(opt: &Options, ix: &DefineIndexStatement, v: &Value) -> Result { let v = Array::from(v.clone()); let beg = key::index::prefix_all_ids(opt.ns(), opt.db(), &ix.what, &ix.name, &v); let end = key::index::suffix_all_ids(opt.ns(), opt.db(), &ix.what, &ix.name, &v); Ok(Self { beg, end, }) } } #[cfg_attr(not(target_arch = "wasm32"), async_trait)] #[cfg_attr(target_arch = "wasm32", async_trait(?Send))] impl ThingIterator for NonUniqueEqualThingIterator { async fn next_batch( &mut self, txn: &Transaction, limit: u32, ) -> Result, Error> { let min = self.beg.clone(); let max = self.end.clone(); let res = txn.lock().await.scan(min..max, limit).await?; if let Some((key, _)) = res.last() { self.beg = key.clone(); self.beg.push(0x00); } let res = res.iter().map(|(_, val)| (val.into(), NO_DOC_ID)).collect(); Ok(res) } } struct UniqueEqualThingIterator { key: Option, } impl UniqueEqualThingIterator { fn new(opt: &Options, ix: &DefineIndexStatement, v: &Value) -> Result { let v = Array::from(v.clone()); let key = key::index::new(opt.ns(), opt.db(), &ix.what, &ix.name, &v, None).into(); Ok(Self { key: Some(key), }) } } #[cfg_attr(not(target_arch = "wasm32"), async_trait)] #[cfg_attr(target_arch = "wasm32", async_trait(?Send))] impl ThingIterator for UniqueEqualThingIterator { async fn next_batch( &mut self, txn: &Transaction, _limit: u32, ) -> Result, Error> { if let Some(key) = self.key.take() { if let Some(val) = txn.lock().await.get(key).await? { return Ok(vec![(val.into(), NO_DOC_ID)]); } } Ok(vec![]) } } struct MatchesThingIterator { hits: Option, } impl MatchesThingIterator { #[allow(clippy::too_many_arguments)] async fn new( opt: &Options, txn: &Transaction, ix: &DefineIndexStatement, az: &Ident, hl: bool, sc: &Scoring, order: u32, terms_docs: Option>>, ) -> Result { let ikb = IndexKeyBase::new(opt, ix); if let Scoring::Bm { .. } = sc { let mut run = txn.lock().await; let az = run.get_az(opt.ns(), opt.db(), az.as_str()).await?; let fti = FtIndex::new(&mut run, az, ikb, order, sc, hl).await?; if let Some(terms_docs) = terms_docs { let hits = fti.new_hits_iterator(&mut run, terms_docs).await?; Ok(Self { hits, }) } else { Ok(Self { hits: None, }) } } else { Err(Error::FeatureNotYetImplemented { feature: "Vector Search", }) } } } #[cfg_attr(not(target_arch = "wasm32"), async_trait)] #[cfg_attr(target_arch = "wasm32", async_trait(?Send))] impl ThingIterator for MatchesThingIterator { async fn next_batch( &mut self, txn: &Transaction, mut limit: u32, ) -> Result, Error> { let mut res = vec![]; if let Some(hits) = &mut self.hits { let mut run = txn.lock().await; while limit > 0 { if let Some(hit) = hits.next(&mut run).await? { res.push(hit); } else { break; } limit -= 1; } } Ok(res) } } #[cfg(test)] mod tests { use crate::idx::planner::plan::IndexOption; use crate::sql::statements::DefineIndexStatement; use crate::sql::{Idiom, Operator, Value}; use std::collections::HashSet; #[test] fn test_hash_index_option() { let mut set = HashSet::new(); let io1 = IndexOption::new( DefineIndexStatement::default(), Idiom::from("a.b".to_string()), Operator::Equal, Value::from("test"), None, None, ); let io2 = IndexOption::new( DefineIndexStatement::default(), Idiom::from("a.b".to_string()), Operator::Equal, Value::from("test"), None, None, ); set.insert(io1); set.insert(io2.clone()); set.insert(io2); assert_eq!(set.len(), 1); } }