Feature - Full-text search scoring function (#2158)

This commit is contained in:
Emmanuel Keller 2023-06-23 21:26:19 +01:00 committed by GitHub
parent 484571361d
commit 6d5dcfadd9
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
16 changed files with 678 additions and 397 deletions

View file

@ -2,6 +2,7 @@ use crate::ctx::canceller::Canceller;
use crate::ctx::reason::Reason; use crate::ctx::reason::Reason;
use crate::dbs::Transaction; use crate::dbs::Transaction;
use crate::err::Error; use crate::err::Error;
use crate::idx::ft::docids::DocId;
use crate::idx::planner::executor::QueryExecutor; use crate::idx::planner::executor::QueryExecutor;
use crate::sql::value::Value; use crate::sql::value::Value;
use crate::sql::Thing; use crate::sql::Thing;
@ -39,6 +40,8 @@ pub struct Context<'a> {
query_executors: Option<Arc<HashMap<String, QueryExecutor>>>, query_executors: Option<Arc<HashMap<String, QueryExecutor>>>,
// An optional record id // An optional record id
thing: Option<&'a Thing>, thing: Option<&'a Thing>,
// An optional doc id
doc_id: Option<DocId>,
// An optional cursor document // An optional cursor document
cursor_doc: Option<&'a Value>, cursor_doc: Option<&'a Value>,
} }
@ -73,6 +76,7 @@ impl<'a> Context<'a> {
transaction: None, transaction: None,
query_executors: None, query_executors: None,
thing: None, thing: None,
doc_id: None,
cursor_doc: None, cursor_doc: None,
} }
} }
@ -87,6 +91,7 @@ impl<'a> Context<'a> {
transaction: parent.transaction.clone(), transaction: parent.transaction.clone(),
query_executors: parent.query_executors.clone(), query_executors: parent.query_executors.clone(),
thing: parent.thing, thing: parent.thing,
doc_id: parent.doc_id,
cursor_doc: parent.cursor_doc, cursor_doc: parent.cursor_doc,
} }
} }
@ -123,6 +128,10 @@ impl<'a> Context<'a> {
self.thing = Some(thing); self.thing = Some(thing);
} }
pub fn add_doc_id(&mut self, doc_id: DocId) {
self.doc_id = Some(doc_id);
}
/// Add a cursor document to this context. /// Add a cursor document to this context.
/// Usage: A new child context is created by an iterator for each document. /// Usage: A new child context is created by an iterator for each document.
/// The iterator sets the value of the current document (known as cursor document). /// The iterator sets the value of the current document (known as cursor document).
@ -165,6 +174,10 @@ impl<'a> Context<'a> {
self.thing self.thing
} }
pub fn doc_id(&self) -> Option<DocId> {
self.doc_id
}
pub fn doc(&self) -> Option<&Value> { pub fn doc(&self) -> Option<&Value> {
self.cursor_doc self.cursor_doc
} }

View file

@ -449,43 +449,51 @@ impl Iterable {
let txn = ctx.try_clone_transaction()?; let txn = ctx.try_clone_transaction()?;
// Check that the table exists // Check that the table exists
txn.lock().await.check_ns_db_tb(opt.ns(), opt.db(), &table.0, opt.strict).await?; txn.lock().await.check_ns_db_tb(opt.ns(), opt.db(), &table.0, opt.strict).await?;
let mut iterator = plan.new_iterator(opt, &txn).await?; let exe = ctx.get_query_executor(&table.0);
let mut things = iterator.next_batch(&txn, 1000).await?; if let Some(exe) = exe {
while !things.is_empty() { let mut iterator = plan.new_iterator(opt, &txn, exe).await?;
// Check if the context is finished let mut things = iterator.next_batch(&txn, 1000).await?;
if ctx.is_done() { while !things.is_empty() {
break; // Check if the context is finished
}
for thing in things {
// Check the context
if ctx.is_done() { if ctx.is_done() {
break; break;
} }
// If the record is from another table we can skip for (thing, doc_id) in things {
if !thing.tb.eq(table.as_str()) { // Check the context
continue; if ctx.is_done() {
break;
}
// If the record is from another table we can skip
if !thing.tb.eq(table.as_str()) {
continue;
}
// Fetch the data from the store
let key = thing::new(opt.ns(), opt.db(), &table.0, &thing.id);
let val = txn.lock().await.get(key.clone()).await?;
let rid = Thing::from((key.tb, key.id));
let mut ctx = Context::new(ctx);
ctx.add_thing(&rid);
ctx.add_doc_id(doc_id);
// Parse the data from the store
let val = Operable::Value(match val {
Some(v) => Value::from(v),
None => Value::None,
});
// Process the document record
ite.process(&ctx, opt, stm, val).await;
} }
// Fetch the data from the store // Collect the next batch of ids
let key = thing::new(opt.ns(), opt.db(), &table.0, &thing.id); things = iterator.next_batch(&txn, 1000).await?;
let val = txn.lock().await.get(key.clone()).await?;
let rid = Thing::from((key.tb, key.id));
let mut ctx = Context::new(ctx);
ctx.add_thing(&rid);
// Parse the data from the store
let val = Operable::Value(match val {
Some(v) => Value::from(v),
None => Value::None,
});
// Process the document record
ite.process(&ctx, opt, stm, val).await;
} }
Ok(())
// Collect the next batch of ids } else {
things = iterator.next_batch(&txn, 1000).await?; Err(Error::QueryNotExecutedDetail {
message: "The QueryExecutor has not been found.".to_string(),
})
} }
Ok(())
} }
} }

View file

@ -1,3 +1,4 @@
use crate::idx::ft::MatchRef;
use crate::sql::idiom::Idiom; use crate::sql::idiom::Idiom;
use crate::sql::value::Value; use crate::sql::value::Value;
use base64_lib::DecodeError as Base64Error; use base64_lib::DecodeError as Base64Error;
@ -497,6 +498,12 @@ pub enum Error {
#[doc(hidden)] #[doc(hidden)]
#[error("Bypass the query planner")] #[error("Bypass the query planner")]
BypassQueryPlanner, BypassQueryPlanner,
/// Duplicated match references are not allowed
#[error("Duplicated Match reference: {mr}")]
DuplicatedMatchRef {
mr: MatchRef,
},
} }
impl From<Error> for String { impl From<Error> for String {

View file

@ -302,6 +302,7 @@ pub async fn asynchronous(ctx: &Context<'_>, name: &str, args: Vec<Value>) -> Re
"http::patch" => http::patch(ctx).await, "http::patch" => http::patch(ctx).await,
"http::delete" => http::delete(ctx).await, "http::delete" => http::delete(ctx).await,
// //
"search::score" => search::score(ctx).await,
"search::highlight" => search::highlight(ctx).await, "search::highlight" => search::highlight(ctx).await,
"search::offsets" => search::offsets(ctx).await, "search::offsets" => search::offsets(ctx).await,
// //

View file

@ -8,5 +8,6 @@ impl_module_def!(
Package, Package,
"search", "search",
"highlight" => fut Async, "highlight" => fut Async,
"offsets" => fut Async "offsets" => fut Async,
"score" => fut Async
); );

View file

@ -2,6 +2,16 @@ use crate::ctx::Context;
use crate::err::Error; use crate::err::Error;
use crate::sql::Value; use crate::sql::Value;
pub async fn score(ctx: &Context<'_>, (match_ref,): (Value,)) -> Result<Value, Error> {
if let Some(thg) = ctx.thing() {
if let Some(exe) = ctx.get_query_executor(&thg.tb) {
let txn = ctx.try_clone_transaction()?;
return exe.score(txn, &match_ref, thg, ctx.doc_id()).await;
}
}
Ok(Value::None)
}
pub async fn highlight( pub async fn highlight(
ctx: &Context<'_>, ctx: &Context<'_>,
(prefix, suffix, match_ref): (Value, Value, Value), (prefix, suffix, match_ref): (Value, Value, Value),
@ -10,7 +20,7 @@ pub async fn highlight(
if let Some(thg) = ctx.thing() { if let Some(thg) = ctx.thing() {
if let Some(exe) = ctx.get_query_executor(&thg.tb) { if let Some(exe) = ctx.get_query_executor(&thg.tb) {
let txn = ctx.try_clone_transaction()?; let txn = ctx.try_clone_transaction()?;
return exe.highlight(txn, thg, prefix, suffix, match_ref.clone(), doc).await; return exe.highlight(txn, thg, prefix, suffix, &match_ref, doc).await;
} }
} }
} }
@ -21,7 +31,7 @@ pub async fn offsets(ctx: &Context<'_>, (match_ref,): (Value,)) -> Result<Value,
if let Some(thg) = ctx.thing() { if let Some(thg) = ctx.thing() {
if let Some(exe) = ctx.get_query_executor(&thg.tb) { if let Some(exe) = ctx.get_query_executor(&thg.tb) {
let txn = ctx.try_clone_transaction()?; let txn = ctx.try_clone_transaction()?;
return exe.offsets(txn, thg, match_ref.clone()).await; return exe.offsets(txn, thg, &match_ref).await;
} }
} }
Ok(Value::None) Ok(Value::None)

View file

@ -8,7 +8,9 @@ use serde::{Deserialize, Serialize};
pub(crate) type DocId = u64; pub(crate) type DocId = u64;
pub(super) struct DocIds { pub(crate) const NO_DOC_ID: u64 = u64::MAX;
pub(crate) struct DocIds {
state_key: Key, state_key: Key,
index_key_base: IndexKeyBase, index_key_base: IndexKeyBase,
btree: BTree<DocIdsKeyProvider>, btree: BTree<DocIdsKeyProvider>,
@ -59,7 +61,7 @@ impl DocIds {
doc_id doc_id
} }
pub(super) async fn get_doc_id( pub(crate) async fn get_doc_id(
&self, &self,
tx: &mut Transaction, tx: &mut Transaction,
doc_key: Key, doc_key: Key,

View file

@ -4,18 +4,18 @@ mod doclength;
mod highlighter; mod highlighter;
mod offsets; mod offsets;
mod postings; mod postings;
mod scorer; pub(super) mod scorer;
mod termdocs; mod termdocs;
pub(crate) mod terms; pub(crate) mod terms;
use crate::err::Error; use crate::err::Error;
use crate::idx::ft::analyzer::Analyzer; use crate::idx::ft::analyzer::Analyzer;
use crate::idx::ft::docids::DocIds; use crate::idx::ft::docids::{DocId, DocIds};
use crate::idx::ft::doclength::DocLengths; use crate::idx::ft::doclength::DocLengths;
use crate::idx::ft::highlighter::{Highlighter, Offseter}; use crate::idx::ft::highlighter::{Highlighter, Offseter};
use crate::idx::ft::offsets::Offsets; use crate::idx::ft::offsets::Offsets;
use crate::idx::ft::postings::Postings; use crate::idx::ft::postings::Postings;
use crate::idx::ft::scorer::{BM25Scorer, Score}; use crate::idx::ft::scorer::BM25Scorer;
use crate::idx::ft::termdocs::TermDocs; use crate::idx::ft::termdocs::TermDocs;
use crate::idx::ft::terms::{TermId, Terms}; use crate::idx::ft::terms::{TermId, Terms};
use crate::idx::{btree, IndexKeyBase, SerdeState}; use crate::idx::{btree, IndexKeyBase, SerdeState};
@ -27,6 +27,7 @@ use roaring::treemap::IntoIter;
use roaring::RoaringTreemap; use roaring::RoaringTreemap;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::ops::BitAnd; use std::ops::BitAnd;
use std::sync::Arc;
pub(crate) type MatchRef = u8; pub(crate) type MatchRef = u8;
@ -118,7 +119,7 @@ impl FtIndex {
}) })
} }
async fn doc_ids(&self, tx: &mut Transaction) -> Result<DocIds, Error> { pub(crate) async fn doc_ids(&self, tx: &mut Transaction) -> Result<DocIds, Error> {
DocIds::new(tx, self.index_key_base.clone(), self.order).await DocIds::new(tx, self.index_key_base.clone(), self.order).await
} }
@ -304,75 +305,59 @@ impl FtIndex {
Ok(terms) Ok(terms)
} }
pub(super) async fn search( pub(super) async fn get_terms_docs(
&self, &self,
tx: &mut Transaction, tx: &mut Transaction,
query_string: String, terms: &Vec<TermId>,
) -> Result<(Vec<TermId>, Option<HitsIterator>), Error> { ) -> Result<Vec<(TermId, RoaringTreemap)>, Error> {
let t = self.terms(tx).await?;
let td = self.term_docs();
let (terms, missing) = self.analyzer.extract_terms(&t, tx, query_string).await?;
if missing {
// If any term does not exists, as we are doing an AND query,
// we can return an empty results set
return Ok((terms, None));
}
let mut hits: Option<RoaringTreemap> = None;
let mut terms_docs = Vec::with_capacity(terms.len()); let mut terms_docs = Vec::with_capacity(terms.len());
for term_id in &terms { let td = self.term_docs();
for term_id in terms {
if let Some(term_docs) = td.get_docs(tx, *term_id).await? { if let Some(term_docs) = td.get_docs(tx, *term_id).await? {
if let Some(h) = hits {
hits = Some(h.bitand(&term_docs));
} else {
hits = Some(term_docs.clone());
}
terms_docs.push((*term_id, term_docs)); terms_docs.push((*term_id, term_docs));
} }
} }
Ok(terms_docs)
}
pub(super) async fn new_hits_iterator(
&self,
tx: &mut Transaction,
terms_docs: Arc<Vec<(TermId, RoaringTreemap)>>,
) -> Result<Option<HitsIterator>, Error> {
let mut hits: Option<RoaringTreemap> = None;
for (_, term_docs) in terms_docs.iter() {
if let Some(h) = hits {
hits = Some(h.bitand(term_docs));
} else {
hits = Some(term_docs.clone());
}
}
if let Some(hits) = hits { if let Some(hits) = hits {
if !hits.is_empty() { if !hits.is_empty() {
let postings = self.postings(tx).await?;
let doc_lengths = self.doc_lengths(tx).await?;
let mut scorer = None;
if let Some(bm25) = &self.bm25 {
scorer = Some(BM25Scorer::new(
doc_lengths,
self.state.total_docs_lengths,
self.state.doc_count,
bm25.clone(),
));
}
let doc_ids = self.doc_ids(tx).await?; let doc_ids = self.doc_ids(tx).await?;
return Ok(( return Ok(Some(HitsIterator::new(doc_ids, hits)));
terms,
Some(HitsIterator::new(doc_ids, postings, hits, terms_docs, scorer)),
));
} }
} }
Ok((terms, None)) Ok(None)
} }
pub(super) async fn match_id_value( pub(super) async fn new_scorer(
&self, &self,
tx: &mut Transaction, tx: &mut Transaction,
thg: &Thing, terms_docs: Arc<Vec<(TermId, RoaringTreemap)>>,
term: &str, ) -> Result<Option<BM25Scorer>, Error> {
) -> Result<bool, Error> { if let Some(bm25) = &self.bm25 {
let doc_key: Key = thg.into(); return Ok(Some(BM25Scorer::new(
let doc_ids = self.doc_ids(tx).await?; self.postings(tx).await?,
if let Some(doc_id) = doc_ids.get_doc_id(tx, doc_key).await? { terms_docs,
let terms = self.terms(tx).await?; self.doc_lengths(tx).await?,
if let Some(term_id) = terms.get_term_id(tx, term).await? { self.state.total_docs_lengths,
let postings = self.postings(tx).await?; self.state.doc_count,
if let Some(term_freq) = postings.get_term_frequency(tx, term_id, doc_id).await? { bm25.clone(),
if term_freq > 0 { )));
return Ok(true);
}
}
}
} }
Ok(false) Ok(None)
} }
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
@ -437,51 +422,24 @@ impl FtIndex {
pub(crate) struct HitsIterator { pub(crate) struct HitsIterator {
doc_ids: DocIds, doc_ids: DocIds,
postings: Postings,
iter: IntoIter, iter: IntoIter,
terms_docs: Vec<(TermId, RoaringTreemap)>,
scorer: Option<BM25Scorer>,
} }
impl HitsIterator { impl HitsIterator {
fn new( fn new(doc_ids: DocIds, hits: RoaringTreemap) -> Self {
doc_ids: DocIds,
postings: Postings,
hits: RoaringTreemap,
terms_docs: Vec<(TermId, RoaringTreemap)>,
scorer: Option<BM25Scorer>,
) -> Self {
Self { Self {
doc_ids, doc_ids,
postings,
iter: hits.into_iter(), iter: hits.into_iter(),
terms_docs,
scorer,
} }
} }
pub(crate) async fn next( pub(crate) async fn next(
&mut self, &mut self,
tx: &mut Transaction, tx: &mut Transaction,
) -> Result<Option<(Thing, Option<Score>)>, Error> { ) -> Result<Option<(Thing, DocId)>, Error> {
for doc_id in self.iter.by_ref() { for doc_id in self.iter.by_ref() {
if let Some(doc_key) = self.doc_ids.get_doc_key(tx, doc_id).await? { if let Some(doc_key) = self.doc_ids.get_doc_key(tx, doc_id).await? {
let score = if let Some(scorer) = &self.scorer { return Ok(Some((doc_key.into(), doc_id)));
let mut sc = 0.0;
for (term_id, docs) in &self.terms_docs {
if docs.contains(doc_id) {
if let Some(term_freq) =
self.postings.get_term_frequency(tx, *term_id, doc_id).await?
{
sc += scorer.score(tx, doc_id, docs.len(), term_freq).await?;
}
}
}
Some(sc)
} else {
None
};
return Ok(Some((doc_key.into(), score)));
} }
} }
Ok(None) Ok(None)
@ -490,23 +448,27 @@ impl HitsIterator {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use crate::idx::ft::{FtIndex, HitsIterator, Score}; use crate::idx::ft::scorer::{BM25Scorer, Score};
use crate::idx::ft::{FtIndex, HitsIterator};
use crate::idx::IndexKeyBase; use crate::idx::IndexKeyBase;
use crate::kvs::{Datastore, Transaction}; use crate::kvs::{Datastore, Transaction};
use crate::sql::scoring::Scoring; use crate::sql::scoring::Scoring;
use crate::sql::statements::define::analyzer; use crate::sql::statements::define::analyzer;
use crate::sql::{Array, Thing}; use crate::sql::{Array, Thing};
use std::collections::HashMap; use std::collections::HashMap;
use std::sync::Arc;
use test_log::test; use test_log::test;
async fn check_hits( async fn check_hits(
i: Option<HitsIterator>,
tx: &mut Transaction, tx: &mut Transaction,
hits: Option<HitsIterator>,
scr: BM25Scorer,
e: Vec<(&Thing, Option<Score>)>, e: Vec<(&Thing, Option<Score>)>,
) { ) {
if let Some(mut i) = i { if let Some(mut hits) = hits {
let mut map = HashMap::new(); let mut map = HashMap::new();
while let Some((k, s)) = i.next(tx).await.unwrap() { while let Some((k, d)) = hits.next(tx).await.unwrap() {
let s = scr.score(tx, d).await.unwrap();
map.insert(k, s); map.insert(k, s);
} }
assert_eq!(map.len(), e.len()); assert_eq!(map.len(), e.len());
@ -518,6 +480,18 @@ mod tests {
} }
} }
async fn search(
tx: &mut Transaction,
fti: &FtIndex,
qs: &str,
) -> (Option<HitsIterator>, BM25Scorer) {
let t = fti.extract_terms(tx, qs.to_string()).await.unwrap();
let td = Arc::new(fti.get_terms_docs(tx, &t).await.unwrap());
let scr = fti.new_scorer(tx, td.clone()).await.unwrap().unwrap();
let hits = fti.new_hits_iterator(tx, td).await.unwrap();
(hits, scr)
}
#[test(tokio::test)] #[test(tokio::test)]
async fn test_ft_index() { async fn test_ft_index() {
let ds = Datastore::new("memory").await.unwrap(); let ds = Datastore::new("memory").await.unwrap();
@ -587,23 +561,23 @@ mod tests {
assert_eq!(statistics.doc_lengths.keys_count, 3); assert_eq!(statistics.doc_lengths.keys_count, 3);
// Search & score // Search & score
let (_, i) = fti.search(&mut tx, "hello".to_string()).await.unwrap(); let (hits, scr) = search(&mut tx, &fti, "hello").await;
check_hits(i, &mut tx, vec![(&doc1, Some(0.0)), (&doc2, Some(0.0))]).await; check_hits(&mut tx, hits, scr, vec![(&doc1, Some(0.0)), (&doc2, Some(0.0))]).await;
let (_, i) = fti.search(&mut tx, "world".to_string()).await.unwrap(); let (hits, scr) = search(&mut tx, &fti, "world").await;
check_hits(i, &mut tx, vec![(&doc1, Some(0.4859746))]).await; check_hits(&mut tx, hits, scr, vec![(&doc1, Some(0.4859746))]).await;
let (_, i) = fti.search(&mut tx, "yellow".to_string()).await.unwrap(); let (hits, scr) = search(&mut tx, &fti, "yellow").await;
check_hits(i, &mut tx, vec![(&doc2, Some(0.4859746))]).await; check_hits(&mut tx, hits, scr, vec![(&doc2, Some(0.4859746))]).await;
let (_, i) = fti.search(&mut tx, "foo".to_string()).await.unwrap(); let (hits, scr) = search(&mut tx, &fti, "foo").await;
check_hits(i, &mut tx, vec![(&doc3, Some(0.56902087))]).await; check_hits(&mut tx, hits, scr, vec![(&doc3, Some(0.56902087))]).await;
let (_, i) = fti.search(&mut tx, "bar".to_string()).await.unwrap(); let (hits, scr) = search(&mut tx, &fti, "bar").await;
check_hits(i, &mut tx, vec![(&doc3, Some(0.56902087))]).await; check_hits(&mut tx, hits, scr, vec![(&doc3, Some(0.56902087))]).await;
let (_, i) = fti.search(&mut tx, "dummy".to_string()).await.unwrap(); let (hits, _) = search(&mut tx, &fti, "dummy").await;
assert!(i.is_none()); assert!(hits.is_none());
} }
{ {
@ -624,16 +598,16 @@ mod tests {
// We can still find 'foo' // We can still find 'foo'
let mut tx = ds.transaction(false, false).await.unwrap(); let mut tx = ds.transaction(false, false).await.unwrap();
let (_, i) = fti.search(&mut tx, "foo".to_string()).await.unwrap(); let (hits, scr) = search(&mut tx, &fti, "foo").await;
check_hits(i, &mut tx, vec![(&doc3, Some(0.56902087))]).await; check_hits(&mut tx, hits, scr, vec![(&doc3, Some(0.56902087))]).await;
// We can't anymore find 'bar' // We can't anymore find 'bar'
let (_, i) = fti.search(&mut tx, "bar".to_string()).await.unwrap(); let (hits, _) = search(&mut tx, &fti, "bar").await;
assert!(i.is_none()); assert!(hits.is_none());
// We can now find 'nobar' // We can now find 'nobar'
let (_, i) = fti.search(&mut tx, "nobar".to_string()).await.unwrap(); let (hits, scr) = search(&mut tx, &fti, "nobar").await;
check_hits(i, &mut tx, vec![(&doc3, Some(0.56902087))]).await; check_hits(&mut tx, hits, scr, vec![(&doc3, Some(0.56902087))]).await;
} }
{ {
@ -655,13 +629,11 @@ mod tests {
tx.commit().await.unwrap(); tx.commit().await.unwrap();
let mut tx = ds.transaction(false, false).await.unwrap(); let mut tx = ds.transaction(false, false).await.unwrap();
let (v, h) = fti.search(&mut tx, "hello".to_string()).await.unwrap(); let (hits, _) = search(&mut tx, &fti, "hello").await;
assert!(v.is_empty()); assert!(hits.is_none());
assert!(h.is_none());
let (v, h) = fti.search(&mut tx, "foo".to_string()).await.unwrap(); let (hits, _) = search(&mut tx, &fti, "foo").await;
assert!(v.is_empty()); assert!(hits.is_none());
assert!(h.is_none());
} }
} }
@ -742,10 +714,11 @@ mod tests {
assert_eq!(statistics.doc_ids.keys_count, 4); assert_eq!(statistics.doc_ids.keys_count, 4);
assert_eq!(statistics.doc_lengths.keys_count, 4); assert_eq!(statistics.doc_lengths.keys_count, 4);
let (_, i) = fti.search(&mut tx, "the".to_string()).await.unwrap(); let (hits, scr) = search(&mut tx, &fti, "the").await;
check_hits( check_hits(
i,
&mut tx, &mut tx,
hits,
scr,
vec![ vec![
(&doc1, Some(0.0)), (&doc1, Some(0.0)),
(&doc2, Some(0.0)), (&doc2, Some(0.0)),
@ -755,34 +728,35 @@ mod tests {
) )
.await; .await;
let (_, i) = fti.search(&mut tx, "dog".to_string()).await.unwrap(); let (hits, scr) = search(&mut tx, &fti, "dog").await;
check_hits( check_hits(
i,
&mut tx, &mut tx,
hits,
scr,
vec![(&doc1, Some(0.0)), (&doc2, Some(0.0)), (&doc3, Some(0.0))], vec![(&doc1, Some(0.0)), (&doc2, Some(0.0)), (&doc3, Some(0.0))],
) )
.await; .await;
let (_, i) = fti.search(&mut tx, "fox".to_string()).await.unwrap(); let (hits, scr) = search(&mut tx, &fti, "fox").await;
check_hits(i, &mut tx, vec![(&doc1, Some(0.0)), (&doc2, Some(0.0))]).await; check_hits(&mut tx, hits, scr, vec![(&doc1, Some(0.0)), (&doc2, Some(0.0))]).await;
let (_, i) = fti.search(&mut tx, "over".to_string()).await.unwrap(); let (hits, scr) = search(&mut tx, &fti, "over").await;
check_hits(i, &mut tx, vec![(&doc1, Some(0.0)), (&doc2, Some(0.0))]).await; check_hits(&mut tx, hits, scr, vec![(&doc1, Some(0.0)), (&doc2, Some(0.0))]).await;
let (_, i) = fti.search(&mut tx, "lazy".to_string()).await.unwrap(); let (hits, scr) = search(&mut tx, &fti, "lazy").await;
check_hits(i, &mut tx, vec![(&doc1, Some(0.0)), (&doc2, Some(0.0))]).await; check_hits(&mut tx, hits, scr, vec![(&doc1, Some(0.0)), (&doc2, Some(0.0))]).await;
let (_, i) = fti.search(&mut tx, "jumped".to_string()).await.unwrap(); let (hits, scr) = search(&mut tx, &fti, "jumped").await;
check_hits(i, &mut tx, vec![(&doc1, Some(0.0)), (&doc2, Some(0.0))]).await; check_hits(&mut tx, hits, scr, vec![(&doc1, Some(0.0)), (&doc2, Some(0.0))]).await;
let (_, i) = fti.search(&mut tx, "nothing".to_string()).await.unwrap(); let (hits, scr) = search(&mut tx, &fti, "nothing").await;
check_hits(i, &mut tx, vec![(&doc3, Some(0.87105393))]).await; check_hits(&mut tx, hits, scr, vec![(&doc3, Some(0.87105393))]).await;
let (_, i) = fti.search(&mut tx, "animals".to_string()).await.unwrap(); let (hits, scr) = search(&mut tx, &fti, "animals").await;
check_hits(i, &mut tx, vec![(&doc4, Some(0.92279965))]).await; check_hits(&mut tx, hits, scr, vec![(&doc4, Some(0.92279965))]).await;
let (_, i) = fti.search(&mut tx, "dummy".to_string()).await.unwrap(); let (hits, _) = search(&mut tx, &fti, "dummy").await;
assert!(i.is_none()); assert!(hits.is_none());
} }
} }
} }

View file

@ -1,13 +1,18 @@
use crate::err::Error; use crate::err::Error;
use crate::idx::ft::docids::DocId; use crate::idx::ft::docids::DocId;
use crate::idx::ft::doclength::{DocLength, DocLengths}; use crate::idx::ft::doclength::{DocLength, DocLengths};
use crate::idx::ft::postings::TermFrequency; use crate::idx::ft::postings::{Postings, TermFrequency};
use crate::idx::ft::terms::TermId;
use crate::idx::ft::Bm25Params; use crate::idx::ft::Bm25Params;
use crate::kvs::Transaction; use crate::kvs::Transaction;
use roaring::RoaringTreemap;
use std::sync::Arc;
pub(super) type Score = f32; pub(super) type Score = f32;
pub(super) struct BM25Scorer { pub(crate) struct BM25Scorer {
postings: Postings,
terms_docs: Arc<Vec<(TermId, RoaringTreemap)>>,
doc_lengths: DocLengths, doc_lengths: DocLengths,
average_doc_length: f32, average_doc_length: f32,
doc_count: f32, doc_count: f32,
@ -16,12 +21,16 @@ pub(super) struct BM25Scorer {
impl BM25Scorer { impl BM25Scorer {
pub(super) fn new( pub(super) fn new(
postings: Postings,
terms_docs: Arc<Vec<(TermId, RoaringTreemap)>>,
doc_lengths: DocLengths, doc_lengths: DocLengths,
total_docs_length: u128, total_docs_length: u128,
doc_count: u64, doc_count: u64,
bm25: Bm25Params, bm25: Bm25Params,
) -> Self { ) -> Self {
Self { Self {
postings,
terms_docs,
doc_lengths, doc_lengths,
average_doc_length: (total_docs_length as f32) / (doc_count as f32), average_doc_length: (total_docs_length as f32) / (doc_count as f32),
doc_count: doc_count as f32, doc_count: doc_count as f32,
@ -29,7 +38,7 @@ impl BM25Scorer {
} }
} }
pub(super) async fn score( async fn term_score(
&self, &self,
tx: &mut Transaction, tx: &mut Transaction,
doc_id: DocId, doc_id: DocId,
@ -40,6 +49,24 @@ impl BM25Scorer {
Ok(self.compute_bm25_score(term_frequency as f32, term_doc_count as f32, doc_length as f32)) Ok(self.compute_bm25_score(term_frequency as f32, term_doc_count as f32, doc_length as f32))
} }
pub(crate) async fn score(
&self,
tx: &mut Transaction,
doc_id: DocId,
) -> Result<Option<Score>, Error> {
let mut sc = 0.0;
for (term_id, docs) in self.terms_docs.iter() {
if docs.contains(doc_id) {
if let Some(term_freq) =
self.postings.get_term_frequency(tx, *term_id, doc_id).await?
{
sc += self.term_score(tx, doc_id, docs.len(), term_freq).await?;
}
}
}
Ok(Some(sc))
}
// https://en.wikipedia.org/wiki/Okapi_BM25 // https://en.wikipedia.org/wiki/Okapi_BM25
// Including the lower-bounding term frequency normalization (2011 CIKM) // Including the lower-bounding term frequency normalization (2011 CIKM)
fn compute_bm25_score(&self, term_freq: f32, term_doc_count: f32, doc_length: f32) -> f32 { fn compute_bm25_score(&self, term_freq: f32, term_doc_count: f32, doc_length: f32) -> f32 {

View file

@ -1,32 +1,27 @@
use crate::dbs::{Options, Transaction}; use crate::dbs::{Options, Transaction};
use crate::err::Error; use crate::err::Error;
use crate::idx::ft::docids::{DocId, DocIds};
use crate::idx::ft::scorer::BM25Scorer;
use crate::idx::ft::terms::TermId; use crate::idx::ft::terms::TermId;
use crate::idx::ft::{FtIndex, MatchRef}; use crate::idx::ft::{FtIndex, MatchRef};
use crate::idx::planner::plan::IndexOption; use crate::idx::planner::plan::IndexOption;
use crate::idx::planner::tree::IndexMap; use crate::idx::planner::tree::IndexMap;
use crate::idx::IndexKeyBase; use crate::idx::IndexKeyBase;
use crate::kvs;
use crate::kvs::Key;
use crate::sql::index::Index; use crate::sql::index::Index;
use crate::sql::{Expression, Idiom, Table, Thing, Value}; use crate::sql::{Expression, Table, Thing, Value};
use std::collections::{HashMap, HashSet}; use roaring::RoaringTreemap;
use std::collections::HashMap;
use std::sync::Arc; use std::sync::Arc;
#[derive(Clone)]
pub(crate) struct QueryExecutor { pub(crate) struct QueryExecutor {
inner: Arc<Inner>,
}
struct Inner {
table: String, table: String,
index: HashMap<Expression, HashSet<IndexOption>>, pre_match_expression: Option<Expression>,
pre_match: Option<Expression>, pre_match_entry: Option<FtEntry>,
ft_map: HashMap<String, FtIndex>, ft_map: HashMap<String, FtIndex>,
terms: HashMap<MatchRef, IndexFieldTerms>, mr_entries: HashMap<MatchRef, FtEntry>,
} exp_entries: HashMap<Expression, FtEntry>,
struct IndexFieldTerms {
ix: String,
id: Idiom,
t: Vec<TermId>,
} }
impl QueryExecutor { impl QueryExecutor {
@ -35,83 +30,114 @@ impl QueryExecutor {
txn: &Transaction, txn: &Transaction,
table: &Table, table: &Table,
index_map: IndexMap, index_map: IndexMap,
pre_match: Option<Expression>, pre_match_expression: Option<Expression>,
) -> Result<Self, Error> { ) -> Result<Self, Error> {
let mut run = txn.lock().await; let mut run = txn.lock().await;
let mut ft_map = HashMap::new();
for ios in index_map.index.values() { let mut mr_entries = HashMap::default();
for io in ios { let mut exp_entries = HashMap::default();
if let Index::Search { let mut ft_map = HashMap::default();
az,
order, // Create all the instances of FtIndex
sc, // Build the FtEntries and map them to Expressions and MatchRef
hl, for (exp, io) in index_map.consume() {
} = &io.ix.index let mut entry = None;
{ if let Index::Search {
if !ft_map.contains_key(&io.ix.name.0) { az,
let ikb = IndexKeyBase::new(opt, &io.ix); order,
let az = run.get_az(opt.ns(), opt.db(), az.as_str()).await?; sc,
let ft = FtIndex::new(&mut run, az, ikb, *order, sc, *hl).await?; hl,
ft_map.insert(io.ix.name.0.clone(), ft); } = &io.ix().index
{
let ixn = &io.ix().name.0;
if let Some(ft) = ft_map.get(ixn) {
if entry.is_none() {
entry = FtEntry::new(&mut run, ft, io).await?;
} }
} else {
let ikb = IndexKeyBase::new(opt, io.ix());
let az = run.get_az(opt.ns(), opt.db(), az.as_str()).await?;
let ft = FtIndex::new(&mut run, az, ikb, *order, sc, *hl).await?;
let ixn = ixn.to_owned();
if entry.is_none() {
entry = FtEntry::new(&mut run, &ft, io).await?;
}
ft_map.insert(ixn, ft);
} }
} }
}
let mut terms = HashMap::with_capacity(index_map.terms.len()); if let Some(e) = entry {
for (mr, ifv) in index_map.terms { if let Some(mr) = e.0.index_option.match_ref() {
if let Some(ft) = ft_map.get(&ifv.ix) { if mr_entries.insert(*mr, e.clone()).is_some() {
let term_ids = ft.extract_terms(&mut run, ifv.val.clone()).await?; return Err(Error::DuplicatedMatchRef {
terms.insert( mr: *mr,
mr, });
IndexFieldTerms { }
ix: ifv.ix, }
id: ifv.id, exp_entries.insert(exp, e);
t: term_ids,
},
);
} }
} }
let mut pre_match_entry = None;
if let Some(exp) = &pre_match_expression {
pre_match_entry = exp_entries.get(exp).cloned();
}
Ok(Self { Ok(Self {
inner: Arc::new(Inner { table: table.0.clone(),
table: table.0.clone(), pre_match_expression,
index: index_map.index, pre_match_entry,
pre_match, ft_map,
ft_map, mr_entries,
terms, exp_entries,
}),
}) })
} }
pub(super) fn pre_match_terms_docs(&self) -> Option<Arc<Vec<(TermId, RoaringTreemap)>>> {
if let Some(entry) = &self.pre_match_entry {
return Some(entry.0.terms_docs.clone());
}
None
}
fn get_match_ref(match_ref: &Value) -> Option<MatchRef> {
if let Value::Number(n) = match_ref {
let m = n.to_int() as u8;
Some(m)
} else {
None
}
}
pub(crate) async fn matches( pub(crate) async fn matches(
&self, &self,
txn: &Transaction, txn: &Transaction,
thg: &Thing, thg: &Thing,
exp: &Expression, exp: &Expression,
) -> Result<Value, Error> { ) -> Result<Value, Error> {
// If we find the expression in `pre_match`, // If we find the expression in `pre_match_expression`,
// it means that we are using an Iterator::Index // it means that we are using an Iterator::Index
// and we are iterating over document that already matches the expression. // and we are iterating over document that already matches the expression.
if let Some(pre_match) = &self.inner.pre_match { if let Some(pme) = &self.pre_match_expression {
if pre_match.eq(exp) { if pme.eq(exp) {
return Ok(Value::Bool(true)); return Ok(Value::Bool(true));
} }
} }
// Otherwise, we look for the first possible index options, and evaluate the expression // Otherwise, we look for the first possible index options, and evaluate the expression
// Does the record id match this executor's table? // Does the record id match this executor's table?
// Does the record id match this executor's table? if thg.tb.eq(&self.table) {
if thg.tb.eq(&self.inner.table) { if let Some(ft) = self.exp_entries.get(exp) {
if let Some(ios) = self.inner.index.get(exp) { let mut run = txn.lock().await;
for io in ios { let doc_key: Key = thg.into();
if let Some(fti) = self.inner.ft_map.get(&io.ix.name.0) { if let Some(doc_id) = ft.0.doc_ids.get_doc_id(&mut run, doc_key).await? {
let mut run = txn.lock().await; for (_, docs) in ft.0.terms_docs.iter() {
// TODO The query string could be extracted when IndexOptions are created if !docs.contains(doc_id) {
let query_string = io.v.clone().convert_to_string()?; return Ok(Value::Bool(false));
return Ok(Value::Bool( }
fti.match_id_value(&mut run, thg, &query_string).await?,
));
} }
return Ok(Value::Bool(true));
} }
return Ok(Value::Bool(false));
} }
} }
@ -121,27 +147,37 @@ impl QueryExecutor {
}) })
} }
fn get_ft_entry(&self, match_ref: &Value) -> Option<&FtEntry> {
if let Some(mr) = Self::get_match_ref(match_ref) {
self.mr_entries.get(&mr)
} else {
None
}
}
fn get_ft_entry_and_index(&self, match_ref: &Value) -> Option<(&FtEntry, &FtIndex)> {
if let Some(e) = self.get_ft_entry(match_ref) {
if let Some(ft) = self.ft_map.get(&e.0.index_option.ix().name.0) {
return Some((e, ft));
}
}
None
}
pub(crate) async fn highlight( pub(crate) async fn highlight(
&self, &self,
txn: Transaction, txn: Transaction,
thg: &Thing, thg: &Thing,
prefix: Value, prefix: Value,
suffix: Value, suffix: Value,
match_ref: Value, match_ref: &Value,
doc: &Value, doc: &Value,
) -> Result<Value, Error> { ) -> Result<Value, Error> {
let mut tx = txn.lock().await; if let Some((e, ft)) = self.get_ft_entry_and_index(match_ref) {
// We have to make the connection between the match ref from the highlight function... let mut run = txn.lock().await;
if let Value::Number(n) = match_ref { return ft
let m = n.as_int() as u8; .highlight(&mut run, thg, &e.0.terms, prefix, suffix, e.0.index_option.id(), doc)
// ... and from the match operator (@{matchref}@) .await;
if let Some(ift) = self.inner.terms.get(&m) {
// Check we have an index?
if let Some(ft) = self.inner.ft_map.get(&ift.ix) {
// All good, we can do the highlight
return ft.highlight(&mut tx, thg, &ift.t, prefix, suffix, &ift.id, doc).await;
}
}
} }
Ok(Value::None) Ok(Value::None)
} }
@ -150,21 +186,70 @@ impl QueryExecutor {
&self, &self,
txn: Transaction, txn: Transaction,
thg: &Thing, thg: &Thing,
match_ref: Value, match_ref: &Value,
) -> Result<Value, Error> { ) -> Result<Value, Error> {
let mut tx = txn.lock().await; if let Some((e, ft)) = self.get_ft_entry_and_index(match_ref) {
// We have to make the connection between the match ref from the highlight function... let mut run = txn.lock().await;
if let Value::Number(n) = match_ref { return ft.extract_offsets(&mut run, thg, &e.0.terms).await;
let m = n.as_int() as u8; }
// ... and from the match operator (@{matchref}@) Ok(Value::None)
if let Some(ift) = self.inner.terms.get(&m) { }
// Check we have an index?
if let Some(ft) = self.inner.ft_map.get(&ift.ix) { pub(crate) async fn score(
// All good, we can extract the offsets &self,
return ft.extract_offsets(&mut tx, thg, &ift.t).await; txn: Transaction,
match_ref: &Value,
rid: &Thing,
mut doc_id: Option<DocId>,
) -> Result<Value, Error> {
if let Some(e) = self.get_ft_entry(match_ref) {
if let Some(scorer) = &e.0.scorer {
let mut run = txn.lock().await;
if doc_id.is_none() {
let key: Key = rid.into();
doc_id = e.0.doc_ids.get_doc_id(&mut run, key).await?;
};
if let Some(doc_id) = doc_id {
let score = scorer.score(&mut run, doc_id).await?;
if let Some(score) = score {
return Ok(Value::from(score));
}
} }
} }
} }
Ok(Value::None) Ok(Value::None)
} }
} }
#[derive(Clone)]
struct FtEntry(Arc<Inner>);
struct Inner {
index_option: IndexOption,
doc_ids: DocIds,
terms: Vec<TermId>,
terms_docs: Arc<Vec<(TermId, RoaringTreemap)>>,
scorer: Option<BM25Scorer>,
}
impl FtEntry {
async fn new(
tx: &mut kvs::Transaction,
ft: &FtIndex,
io: IndexOption,
) -> Result<Option<Self>, Error> {
if let Some(qs) = io.qs() {
let terms = ft.extract_terms(tx, qs.to_owned()).await?;
let terms_docs = Arc::new(ft.get_terms_docs(tx, &terms).await?);
Ok(Some(Self(Arc::new(Inner {
index_option: io,
doc_ids: ft.doc_ids(tx).await?,
scorer: ft.new_scorer(tx, terms_docs.clone()).await?,
terms,
terms_docs,
}))))
} else {
Ok(None)
}
}
}

View file

@ -37,7 +37,7 @@ impl<'a> QueryPlanner<'a> {
let res = Tree::build(self.opt, &txn, &t, self.cond).await?; let res = Tree::build(self.opt, &txn, &t, self.cond).await?;
if let Some((node, im)) = res { if let Some((node, im)) = res {
if let Some(plan) = AllAndStrategy::build(&node)? { if let Some(plan) = AllAndStrategy::build(&node)? {
let e = plan.i.new_query_executor(opt, &txn, &t, im).await?; let e = QueryExecutor::new(opt, &txn, &t, im, Some(plan.e.clone())).await?;
self.executors.insert(t.0.clone(), e); self.executors.insert(t.0.clone(), e);
return Ok(Iterable::Index(t, plan)); return Ok(Iterable::Index(t, plan));
} }
@ -81,15 +81,15 @@ impl AllAndStrategy {
fn eval_node(&mut self, node: &Node) -> Result<(), Error> { fn eval_node(&mut self, node: &Node) -> Result<(), Error> {
match node { match node {
Node::Expression { Node::Expression {
index_option, io: index_option,
left, left,
right, right,
operator, exp: expression,
} => { } => {
if let Some(io) = index_option { if let Some(io) = index_option {
self.b.add(io.clone()); self.b.add_index_option(expression.clone(), io.clone());
} }
self.eval_expression(left, right, operator) self.eval_expression(left, right, expression.operator())
} }
Node::Unsupported => Err(Error::BypassQueryPlanner), Node::Unsupported => Err(Error::BypassQueryPlanner),
_ => Ok(()), _ => Ok(()),

View file

@ -1,33 +1,36 @@
use crate::dbs::{Options, Transaction}; use crate::dbs::{Options, Transaction};
use crate::err::Error; use crate::err::Error;
use crate::idx::ft::docids::{DocId, NO_DOC_ID};
use crate::idx::ft::terms::TermId; use crate::idx::ft::terms::TermId;
use crate::idx::ft::{FtIndex, HitsIterator, MatchRef}; use crate::idx::ft::{FtIndex, HitsIterator, MatchRef};
use crate::idx::planner::executor::QueryExecutor; use crate::idx::planner::executor::QueryExecutor;
use crate::idx::planner::tree::IndexMap;
use crate::idx::IndexKeyBase; use crate::idx::IndexKeyBase;
use crate::key; use crate::key;
use crate::kvs::Key; use crate::kvs::Key;
use crate::sql::index::Index; use crate::sql::index::Index;
use crate::sql::scoring::Scoring; use crate::sql::scoring::Scoring;
use crate::sql::statements::DefineIndexStatement; use crate::sql::statements::DefineIndexStatement;
use crate::sql::{Array, Expression, Ident, Object, Operator, Table, Thing, Value}; use crate::sql::{Array, Expression, Ident, Idiom, Object, Operator, Thing, Value};
use async_trait::async_trait; use async_trait::async_trait;
use roaring::RoaringTreemap;
use std::collections::HashMap; use std::collections::HashMap;
use std::hash::Hash;
use std::sync::Arc;
#[derive(Default)] #[derive(Default)]
pub(super) struct PlanBuilder { pub(super) struct PlanBuilder {
indexes: Vec<IndexOption>, indexes: Vec<(Expression, IndexOption)>,
} }
impl PlanBuilder { impl PlanBuilder {
pub(super) fn add(&mut self, i: IndexOption) { pub(super) fn add_index_option(&mut self, e: Expression, i: IndexOption) {
self.indexes.push(i); self.indexes.push((e, i));
} }
pub(super) fn build(mut self) -> Result<Plan, Error> { pub(super) fn build(mut self) -> Result<Plan, Error> {
// TODO select the best option if there are several (cost based) // TODO select the best option if there are several (cost based)
if let Some(index) = self.indexes.pop() { if let Some((e, i)) = self.indexes.pop() {
Ok(index.into()) Ok(Plan::new(e, i))
} else { } else {
Err(Error::BypassQueryPlanner) Err(Error::BypassQueryPlanner)
} }
@ -35,108 +38,144 @@ impl PlanBuilder {
} }
pub(crate) struct Plan { pub(crate) struct Plan {
pub(super) e: Expression,
pub(super) i: IndexOption, pub(super) i: IndexOption,
} }
impl Plan { impl Plan {
pub(super) fn new(e: Expression, i: IndexOption) -> Self {
Self {
e,
i,
}
}
pub(crate) async fn new_iterator( pub(crate) async fn new_iterator(
&self, &self,
opt: &Options, opt: &Options,
txn: &Transaction, txn: &Transaction,
exe: &QueryExecutor,
) -> Result<Box<dyn ThingIterator>, Error> { ) -> Result<Box<dyn ThingIterator>, Error> {
self.i.new_iterator(opt, txn).await self.i.new_iterator(opt, txn, exe).await
} }
pub(crate) fn explain(&self) -> Value { pub(crate) fn explain(&self) -> Value {
let IndexOption {
ix,
v,
op,
..
} = &self.i;
Value::Object(Object::from(HashMap::from([ Value::Object(Object::from(HashMap::from([
("index", Value::from(ix.name.0.to_owned())), ("index", Value::from(self.i.ix().name.0.to_owned())),
("operator", Value::from(op.to_string())), ("operator", Value::from(self.i.op().to_string())),
("value", v.clone()), ("value", self.i.value().clone()),
]))) ])))
} }
} }
impl From<IndexOption> for Plan {
fn from(i: IndexOption) -> Self {
Self {
i,
}
}
}
#[derive(Debug, Clone, Eq, PartialEq, Hash)] #[derive(Debug, Clone, Eq, PartialEq, Hash)]
pub(super) struct IndexOption { pub(super) struct IndexOption(Arc<Inner>);
pub(super) ix: DefineIndexStatement,
pub(super) v: Value, #[derive(Debug, Eq, PartialEq, Hash)]
pub(super) op: Operator, pub(super) struct Inner {
ep: Expression, ix: DefineIndexStatement,
id: Idiom,
v: Value,
qs: Option<String>,
op: Operator,
mr: Option<MatchRef>,
} }
impl IndexOption { impl IndexOption {
pub(super) fn new(ix: DefineIndexStatement, op: Operator, v: Value, ep: Expression) -> Self { pub(super) fn new(
Self { ix: DefineIndexStatement,
id: Idiom,
op: Operator,
v: Value,
qs: Option<String>,
mr: Option<MatchRef>,
) -> Self {
Self(Arc::new(Inner {
ix, ix,
id,
op, op,
v, v,
ep, qs,
} mr,
}))
} }
pub(super) async fn new_query_executor( pub(super) fn ix(&self) -> &DefineIndexStatement {
&self, &self.0.ix
opt: &Options, }
txn: &Transaction,
t: &Table, pub(super) fn op(&self) -> &Operator {
i: IndexMap, &self.0.op
) -> Result<QueryExecutor, Error> { }
QueryExecutor::new(opt, txn, t, i, Some(self.ep.clone())).await
pub(super) fn value(&self) -> &Value {
&self.0.v
}
pub(super) fn qs(&self) -> Option<&String> {
self.0.qs.as_ref()
}
pub(super) fn id(&self) -> &Idiom {
&self.0.id
}
pub(super) fn match_ref(&self) -> Option<&MatchRef> {
self.0.mr.as_ref()
} }
async fn new_iterator( async fn new_iterator(
&self, &self,
opt: &Options, opt: &Options,
txn: &Transaction, txn: &Transaction,
exe: &QueryExecutor,
) -> Result<Box<dyn ThingIterator>, Error> { ) -> Result<Box<dyn ThingIterator>, Error> {
match &self.ix.index { match &self.ix().index {
Index::Idx => match self.op { Index::Idx => {
Operator::Equal => { if self.op() == &Operator::Equal {
Ok(Box::new(NonUniqueEqualThingIterator::new(opt, &self.ix, &self.v)?)) return Ok(Box::new(NonUniqueEqualThingIterator::new(
opt,
self.ix(),
self.value(),
)?));
} }
_ => Err(Error::BypassQueryPlanner), }
}, Index::Uniq => {
Index::Uniq => match self.op { if self.op() == &Operator::Equal {
Operator::Equal => { return Ok(Box::new(UniqueEqualThingIterator::new(
Ok(Box::new(UniqueEqualThingIterator::new(opt, &self.ix, &self.v)?)) opt,
self.ix(),
self.value(),
)?));
} }
_ => Err(Error::BypassQueryPlanner), }
},
Index::Search { Index::Search {
az, az,
hl, hl,
sc, sc,
order, order,
} => match self.op { } => {
Operator::Matches(mr) => Ok(Box::new( if let Operator::Matches(_) = self.op() {
MatchesThingIterator::new(opt, txn, &self.ix, az, *hl, sc, *order, mr, &self.v) let td = exe.pre_match_terms_docs();
.await?, return Ok(Box::new(
)), MatchesThingIterator::new(opt, txn, self.ix(), az, *hl, sc, *order, td)
_ => Err(Error::BypassQueryPlanner), .await?,
}, ));
}
}
} }
Err(Error::BypassQueryPlanner)
} }
} }
#[cfg_attr(not(target_arch = "wasm32"), async_trait)] #[cfg_attr(not(target_arch = "wasm32"), async_trait)]
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] #[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
pub(crate) trait ThingIterator: Send { pub(crate) trait ThingIterator: Send {
async fn next_batch(&mut self, tx: &Transaction, size: u32) -> Result<Vec<Thing>, Error>; async fn next_batch(
&mut self,
tx: &Transaction,
size: u32,
) -> Result<Vec<(Thing, DocId)>, Error>;
} }
struct NonUniqueEqualThingIterator { struct NonUniqueEqualThingIterator {
@ -159,7 +198,11 @@ impl NonUniqueEqualThingIterator {
#[cfg_attr(not(target_arch = "wasm32"), async_trait)] #[cfg_attr(not(target_arch = "wasm32"), async_trait)]
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] #[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
impl ThingIterator for NonUniqueEqualThingIterator { impl ThingIterator for NonUniqueEqualThingIterator {
async fn next_batch(&mut self, txn: &Transaction, limit: u32) -> Result<Vec<Thing>, Error> { async fn next_batch(
&mut self,
txn: &Transaction,
limit: u32,
) -> Result<Vec<(Thing, DocId)>, Error> {
let min = self.beg.clone(); let min = self.beg.clone();
let max = self.end.clone(); let max = self.end.clone();
let res = txn.lock().await.scan(min..max, limit).await?; let res = txn.lock().await.scan(min..max, limit).await?;
@ -167,7 +210,7 @@ impl ThingIterator for NonUniqueEqualThingIterator {
self.beg = key.clone(); self.beg = key.clone();
self.beg.push(0x00); self.beg.push(0x00);
} }
let res = res.iter().map(|(_, val)| val.into()).collect(); let res = res.iter().map(|(_, val)| (val.into(), NO_DOC_ID)).collect();
Ok(res) Ok(res)
} }
} }
@ -189,10 +232,14 @@ impl UniqueEqualThingIterator {
#[cfg_attr(not(target_arch = "wasm32"), async_trait)] #[cfg_attr(not(target_arch = "wasm32"), async_trait)]
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] #[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
impl ThingIterator for UniqueEqualThingIterator { impl ThingIterator for UniqueEqualThingIterator {
async fn next_batch(&mut self, txn: &Transaction, _limit: u32) -> Result<Vec<Thing>, Error> { async fn next_batch(
&mut self,
txn: &Transaction,
_limit: u32,
) -> Result<Vec<(Thing, DocId)>, Error> {
if let Some(key) = self.key.take() { if let Some(key) = self.key.take() {
if let Some(val) = txn.lock().await.get(key).await? { if let Some(val) = txn.lock().await.get(key).await? {
return Ok(vec![val.into()]); return Ok(vec![(val.into(), NO_DOC_ID)]);
} }
} }
Ok(vec![]) Ok(vec![])
@ -200,7 +247,6 @@ impl ThingIterator for UniqueEqualThingIterator {
} }
struct MatchesThingIterator { struct MatchesThingIterator {
_terms: Option<(MatchRef, Vec<TermId>)>,
hits: Option<HitsIterator>, hits: Option<HitsIterator>,
} }
@ -214,23 +260,26 @@ impl MatchesThingIterator {
hl: bool, hl: bool,
sc: &Scoring, sc: &Scoring,
order: u32, order: u32,
mr: Option<MatchRef>, terms_docs: Option<Arc<Vec<(TermId, RoaringTreemap)>>>,
v: &Value,
) -> Result<Self, Error> { ) -> Result<Self, Error> {
let ikb = IndexKeyBase::new(opt, ix); let ikb = IndexKeyBase::new(opt, ix);
let mut run = txn.lock().await;
if let Scoring::Bm { if let Scoring::Bm {
.. ..
} = sc } = sc
{ {
let query_string = v.clone().convert_to_string()?; let mut run = txn.lock().await;
let az = run.get_az(opt.ns(), opt.db(), az.as_str()).await?; let az = run.get_az(opt.ns(), opt.db(), az.as_str()).await?;
let fti = FtIndex::new(&mut run, az, ikb, order, sc, hl).await?; let fti = FtIndex::new(&mut run, az, ikb, order, sc, hl).await?;
let (terms, hits) = fti.search(&mut run, query_string).await?; if let Some(terms_docs) = terms_docs {
Ok(Self { let hits = fti.new_hits_iterator(&mut run, terms_docs).await?;
hits, Ok(Self {
_terms: mr.map(|mr| (mr, terms)), hits,
}) })
} else {
Ok(Self {
hits: None,
})
}
} else { } else {
Err(Error::FeatureNotYetImplemented { Err(Error::FeatureNotYetImplemented {
feature: "Vector Search", feature: "Vector Search",
@ -242,12 +291,16 @@ impl MatchesThingIterator {
#[cfg_attr(not(target_arch = "wasm32"), async_trait)] #[cfg_attr(not(target_arch = "wasm32"), async_trait)]
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] #[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
impl ThingIterator for MatchesThingIterator { impl ThingIterator for MatchesThingIterator {
async fn next_batch(&mut self, txn: &Transaction, mut limit: u32) -> Result<Vec<Thing>, Error> { async fn next_batch(
&mut self,
txn: &Transaction,
mut limit: u32,
) -> Result<Vec<(Thing, DocId)>, Error> {
let mut res = vec![]; let mut res = vec![];
if let Some(hits) = &mut self.hits { if let Some(hits) = &mut self.hits {
let mut run = txn.lock().await; let mut run = txn.lock().await;
while limit > 0 { while limit > 0 {
if let Some((hit, _)) = hits.next(&mut run).await? { if let Some(hit) = hits.next(&mut run).await? {
res.push(hit); res.push(hit);
} else { } else {
break; break;
@ -258,3 +311,39 @@ impl ThingIterator for MatchesThingIterator {
Ok(res) Ok(res)
} }
} }
#[cfg(test)]
mod tests {
use crate::idx::planner::plan::IndexOption;
use crate::sql::statements::DefineIndexStatement;
use crate::sql::{Idiom, Operator, Value};
use std::collections::HashSet;
#[test]
fn test_hash_index_option() {
let mut set = HashSet::new();
let io1 = IndexOption::new(
DefineIndexStatement::default(),
Idiom::from("a.b".to_string()),
Operator::Equal,
Value::from("test"),
None,
None,
);
let io2 = IndexOption::new(
DefineIndexStatement::default(),
Idiom::from("a.b".to_string()),
Operator::Equal,
Value::from("test"),
None,
None,
);
set.insert(io1);
set.insert(io2.clone());
set.insert(io2);
assert_eq!(set.len(), 1);
}
}

View file

@ -1,30 +1,18 @@
use crate::dbs::{Options, Transaction}; use crate::dbs::{Options, Transaction};
use crate::err::Error; use crate::err::Error;
use crate::idx::ft::MatchRef;
use crate::idx::planner::plan::IndexOption; use crate::idx::planner::plan::IndexOption;
use crate::sql::index::Index; use crate::sql::index::Index;
use crate::sql::statements::DefineIndexStatement; use crate::sql::statements::DefineIndexStatement;
use crate::sql::{Cond, Expression, Idiom, Operator, Subquery, Table, Value}; use crate::sql::{Cond, Expression, Idiom, Operator, Subquery, Table, Value};
use async_recursion::async_recursion; use async_recursion::async_recursion;
use std::collections::hash_map::Entry; use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use std::sync::Arc; use std::sync::Arc;
#[derive(Default)]
pub(super) struct IndexMap {
pub(super) index: HashMap<Expression, HashSet<IndexOption>>,
pub(super) terms: HashMap<MatchRef, IndexFieldValue>,
}
pub(super) struct IndexFieldValue {
pub(super) ix: String,
pub(super) id: Idiom,
pub(super) val: String,
}
pub(super) struct Tree {} pub(super) struct Tree {}
impl Tree { impl Tree {
/// Traverse the all the conditions and extract every expression
/// that can be resolved by an index.
pub(super) async fn build<'a>( pub(super) async fn build<'a>(
opt: &'a Options, opt: &'a Options,
txn: &'a Transaction, txn: &'a Transaction,
@ -112,17 +100,25 @@ impl<'a> TreeBuilder<'a> {
} => { } => {
let left = self.eval_value(l).await?; let left = self.eval_value(l).await?;
let right = self.eval_value(r).await?; let right = self.eval_value(r).await?;
let mut index_option = None; if let Some(io) = self.index_map.0.get(e) {
return Ok(Node::Expression {
io: Some(io.clone()),
left: Box::new(left),
right: Box::new(right),
exp: e.clone(),
});
}
let mut io = None;
if let Some((id, ix)) = left.is_indexed_field() { if let Some((id, ix)) = left.is_indexed_field() {
index_option = self.lookup_index_option(ix, o, id, &right, e); io = self.lookup_index_option(ix, o, id, &right, e);
} else if let Some((id, ix)) = right.is_indexed_field() { } else if let Some((id, ix)) = right.is_indexed_field() {
index_option = self.lookup_index_option(ix, o, id, &left, e); io = self.lookup_index_option(ix, o, id, &left, e);
}; };
Ok(Node::Expression { Ok(Node::Expression {
index_option, io,
left: Box::new(left), left: Box::new(left),
right: Box::new(right), right: Box::new(right),
operator: o.to_owned(), exp: e.clone(),
}) })
} }
} }
@ -134,51 +130,31 @@ impl<'a> TreeBuilder<'a> {
op: &Operator, op: &Operator,
id: &Idiom, id: &Idiom,
v: &Node, v: &Node,
ep: &Expression, e: &Expression,
) -> Option<IndexOption> { ) -> Option<IndexOption> {
if let Some(v) = v.is_scalar() { if let Some(v) = v.is_scalar() {
if match &ix.index { let (found, mr, qs) = match &ix.index {
Index::Idx => Operator::Equal.eq(op), Index::Idx => (Operator::Equal.eq(op), None, None),
Index::Uniq => Operator::Equal.eq(op), Index::Uniq => (Operator::Equal.eq(op), None, None),
Index::Search { Index::Search {
.. ..
} => { } => {
if let Operator::Matches(mr) = op { if let Operator::Matches(mr) = op {
if let Some(mr) = mr { (true, *mr, Some(v.clone().to_raw_string()))
self.index_map.terms.insert(
*mr,
IndexFieldValue {
ix: ix.name.0.to_owned(),
id: id.to_owned(),
val: v.to_raw_string(),
},
);
}
true
} else { } else {
false (false, None, None)
} }
} }
} { };
let io = IndexOption::new(ix.clone(), op.to_owned(), v.clone(), ep.clone()); if found {
self.add_index(ep, io.clone()); let io = IndexOption::new(ix.clone(), id.clone(), op.to_owned(), v.clone(), qs, mr);
self.index_map.0.insert(e.clone(), io.clone());
return Some(io); return Some(io);
} }
} }
None None
} }
fn add_index(&mut self, e: &Expression, io: IndexOption) {
match self.index_map.index.entry(e.clone()) {
Entry::Occupied(mut e) => {
e.get_mut().insert(io);
}
Entry::Vacant(e) => {
e.insert(HashSet::from([io]));
}
}
}
async fn eval_subquery(&mut self, s: &Subquery) -> Result<Node, Error> { async fn eval_subquery(&mut self, s: &Subquery) -> Result<Node, Error> {
Ok(match s { Ok(match s {
Subquery::Value(v) => self.eval_value(v).await?, Subquery::Value(v) => self.eval_value(v).await?,
@ -187,13 +163,23 @@ impl<'a> TreeBuilder<'a> {
} }
} }
/// For each expression the a possible index option
#[derive(Default)]
pub(super) struct IndexMap(HashMap<Expression, IndexOption>);
impl IndexMap {
pub(super) fn consume(self) -> HashMap<Expression, IndexOption> {
self.0
}
}
#[derive(Debug, Clone, Eq, PartialEq, Hash)] #[derive(Debug, Clone, Eq, PartialEq, Hash)]
pub(super) enum Node { pub(super) enum Node {
Expression { Expression {
index_option: Option<IndexOption>, io: Option<IndexOption>,
left: Box<Node>, left: Box<Node>,
right: Box<Node>, right: Box<Node>,
operator: Operator, exp: Expression,
}, },
IndexedField(Idiom, DefineIndexStatement), IndexedField(Idiom, DefineIndexStatement),
NonIndexedField, NonIndexedField,

View file

@ -86,6 +86,20 @@ impl Expression {
} }
} }
/// Returns the operator
pub(crate) fn operator(&self) -> &Operator {
match self {
Expression::Unary {
o,
..
} => o,
Expression::Binary {
o,
..
} => o,
}
}
/// Process this type returning a computed simple Value /// Process this type returning a computed simple Value
pub(crate) async fn compute(&self, ctx: &Context<'_>, opt: &Options) -> Result<Value, Error> { pub(crate) async fn compute(&self, ctx: &Context<'_>, opt: &Options) -> Result<Value, Error> {
let (l, o, r) = match self { let (l, o, r) = match self {

View file

@ -455,7 +455,7 @@ fn function_rand(i: &str) -> IResult<&str, &str> {
} }
fn function_search(i: &str) -> IResult<&str, &str> { fn function_search(i: &str) -> IResult<&str, &str> {
alt((tag("highlight"), tag("offsets")))(i) alt((tag("score"), tag("highlight"), tag("offsets")))(i)
} }
fn function_session(i: &str) -> IResult<&str, &str> { fn function_session(i: &str) -> IResult<&str, &str> {

View file

@ -57,7 +57,7 @@ async fn select_where_matches_without_using_index_iterator() -> Result<(), Error
CREATE blog:2 SET title = 'Foo Bar!'; CREATE blog:2 SET title = 'Foo Bar!';
DEFINE ANALYZER simple TOKENIZERS blank,class FILTERS lowercase; DEFINE ANALYZER simple TOKENIZERS blank,class FILTERS lowercase;
DEFINE INDEX blog_title ON blog FIELDS title SEARCH ANALYZER simple BM25(1.2,0.75) HIGHLIGHTS; DEFINE INDEX blog_title ON blog FIELDS title SEARCH ANALYZER simple BM25(1.2,0.75) HIGHLIGHTS;
SELECT id,search::highlight('<em>', '</em>', 1) AS title FROM blog WHERE (title @0@ 'hello' AND id>0) OR (title @1@ 'world' AND id<99) EXPLAIN; SELECT id,search::highlight('<em>', '</em>', 1) AS title FROM blog WHERE (title @0@ 'hello' AND identifier > 0) OR (title @1@ 'world' AND identifier < 99) EXPLAIN;
"; ";
let dbs = Datastore::new("memory").await?; let dbs = Datastore::new("memory").await?;
let ses = Session::for_kv().with_ns("test").with_db("test"); let ses = Session::for_kv().with_ns("test").with_db("test");
@ -192,3 +192,67 @@ async fn select_where_matches_using_index_offsets() -> Result<(), Error> {
assert_eq!(tmp, val); assert_eq!(tmp, val);
Ok(()) Ok(())
} }
#[tokio::test]
async fn select_where_matches_using_index_and_score() -> Result<(), Error> {
let sql = r"
CREATE blog:1 SET title = 'the quick brown fox jumped over the lazy dog';
CREATE blog:2 SET title = 'the fast fox jumped over the lazy dog';
CREATE blog:3 SET title = 'the other animals sat there watching';
CREATE blog:4 SET title = 'the dog sat there and did nothing';
DEFINE ANALYZER simple TOKENIZERS blank,class;
DEFINE INDEX blog_title ON blog FIELDS title SEARCH ANALYZER simple BM25(1.2,0.75) HIGHLIGHTS;
SELECT id,search::score(1) AS score FROM blog WHERE title @1@ 'animals';
";
let dbs = Datastore::new("memory").await?;
let ses = Session::for_kv().with_ns("test").with_db("test");
let res = &mut dbs.execute(&sql, &ses, None, false).await?;
assert_eq!(res.len(), 7);
//
for _ in 0..6 {
let _ = res.remove(0).result?;
}
let tmp = res.remove(0).result?;
let val = Value::parse(
"[
{
id: blog:3,
score: 0.9227996468544006
}
]",
);
assert_eq!(tmp, val);
Ok(())
}
#[tokio::test]
async fn select_where_matches_without_using_index_and_score() -> Result<(), Error> {
let sql = r"
CREATE blog:1 SET title = 'the quick brown fox jumped over the lazy dog';
CREATE blog:2 SET title = 'the fast fox jumped over the lazy dog';
CREATE blog:3 SET title = 'the other animals sat there watching';
CREATE blog:4 SET title = 'the dog sat there and did nothing';
DEFINE ANALYZER simple TOKENIZERS blank,class;
DEFINE INDEX blog_title ON blog FIELDS title SEARCH ANALYZER simple BM25(1.2,0.75) HIGHLIGHTS;
SELECT id,search::score(1) AS score FROM blog WHERE (title @1@ 'animals' AND id>0) OR (title @1@ 'animals' AND id<99);
";
let dbs = Datastore::new("memory").await?;
let ses = Session::for_kv().with_ns("test").with_db("test");
let res = &mut dbs.execute(&sql, &ses, None, false).await?;
assert_eq!(res.len(), 7);
//
for _ in 0..6 {
let _ = res.remove(0).result?;
}
let tmp = res.remove(0).result?;
let val = Value::parse(
"[
{
id: blog:3,
score: 0.9227996468544006
}
]",
);
assert_eq!(tmp, val);
Ok(())
}