Add initial full-text search indexer implementation ()

This commit is contained in:
Emmanuel Keller 2023-05-29 12:46:41 +01:00 committed by GitHub
parent 88e6dc274a
commit 8e48604e20
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
29 changed files with 5279 additions and 132 deletions

58
Cargo.lock generated
View file

@ -631,6 +631,15 @@ dependencies = [
"serde",
]
[[package]]
name = "bincode"
version = "1.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b1f45e9417d87227c7a56d22e471c6206462cba514c7590c09aff4cf6d1ddcad"
dependencies = [
"serde",
]
[[package]]
name = "bindgen"
version = "0.57.0"
@ -1601,6 +1610,12 @@ dependencies = [
"bindgen 0.60.1",
]
[[package]]
name = "fst"
version = "0.4.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7ab85b9b05e3978cc9a9cf8fea7f01b494e1a09ed3037e16ba39edc7a29eb61a"
[[package]]
name = "funty"
version = "2.0.0"
@ -3358,6 +3373,7 @@ checksum = "c069c179fcdc6a2fe24d8d18305cf085fdbd4f922c041943e203685d6a1c58fd"
dependencies = [
"endian-type",
"nibble_vec",
"serde",
]
[[package]]
@ -3585,6 +3601,12 @@ dependencies = [
"winreg",
]
[[package]]
name = "retain_mut"
version = "0.1.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8c31b5c4033f8fdde8700e4657be2c497e7288f01515be52168c631e2e4d4086"
[[package]]
name = "rexie"
version = "0.4.2"
@ -3646,6 +3668,18 @@ dependencies = [
"serde",
]
[[package]]
name = "roaring"
version = "0.10.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ef0fb5e826a8bde011ecae6a8539dd333884335c57ff0f003fbe27c25bbe8f71"
dependencies = [
"bytemuck",
"byteorder",
"retain_mut",
"serde",
]
[[package]]
name = "robust"
version = "0.2.3"
@ -4101,6 +4135,12 @@ version = "1.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a507befe795404456341dfab10cef66ead4c041f62b8b11bbb92bffe5d0953e0"
[[package]]
name = "snap"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5e9f0ab6ef7eb7353d9119c170a436d1bf248eea575ac42d19d12f4e34130831"
[[package]]
name = "socket2"
version = "0.4.9"
@ -4228,9 +4268,11 @@ dependencies = [
"async-channel",
"async-executor",
"async-recursion",
"async-trait",
"base64 0.21.1",
"bcrypt",
"bigdecimal",
"bincode",
"bung",
"chrono",
"criterion",
@ -4239,6 +4281,7 @@ dependencies = [
"env_logger 0.10.0",
"flume",
"foundationdb",
"fst",
"futures 0.3.28",
"futures-concurrency",
"fuzzy-matcher",
@ -4256,9 +4299,11 @@ dependencies = [
"pharos",
"pin-project-lite",
"pprof",
"radix_trie",
"rand 0.8.5",
"regex",
"reqwest",
"roaring",
"rocksdb",
"rquickjs",
"rustls 0.20.8",
@ -4268,9 +4313,11 @@ dependencies = [
"serde_json",
"sha-1",
"sha2",
"snap",
"storekey",
"surrealdb-derive",
"temp-dir",
"test-log",
"thiserror",
"tikv-client",
"time 0.3.21",
@ -4401,6 +4448,17 @@ dependencies = [
"windows-sys 0.48.0",
]
[[package]]
name = "test-log"
version = "0.2.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "38f0c854faeb68a048f0f2dc410c5ddae3bf83854ef0e4977d58306a5edef50e"
dependencies = [
"proc-macro2",
"quote",
"syn 1.0.109",
]
[[package]]
name = "textwrap"
version = "0.16.0"

View file

@ -53,11 +53,13 @@ targets = []
addr = { version = "0.15.6", default-features = false, features = ["std"] }
argon2 = "0.5.0"
ascii = { version = "0.3.2", package = "any_ascii" }
async-trait = "0.1.68"
async-recursion = "1.0.4"
base64_lib = { version = "0.21.1", package = "base64" }
bcrypt = "0.14.0"
bigdecimal = { version = "0.3.1", features = ["serde", "string-only"] }
bung = "0.1.0"
bincode = "1.3.3"
channel = { version = "1.8.0", package = "async-channel" }
chrono = { version = "0.4.24", features = ["serde"] }
derive = { version = "0.8.0", package = "surrealdb-derive" }
@ -65,6 +67,7 @@ dmp = "0.2.0"
echodb = { version = "0.4.0", optional = true }
executor = { version = "1.5.1", package = "async-executor" }
flume = "0.10.14"
fst = "0.4.7"
foundationdb = { version = "0.7.0", default-features = false, features = ["embedded-fdb-include"], optional = true }
futures = "0.3.28"
futures-concurrency = "7.2.0"
@ -82,11 +85,14 @@ nom = { version = "7.1.3", features = ["alloc"] }
once_cell = "1.17.1"
pbkdf2 = { version = "0.12.1", features = ["simple"] }
pin-project-lite = "0.2.9"
radix_trie = { version = "0.2.1", features = ["serde"] }
rand = "0.8.5"
regex = "1.8.2"
reqwest = { version = "0.11.18", default-features = false, features = ["json", "stream"], optional = true }
roaring = { version = "0.10.1", features = ["serde"] }
rocksdb = { version = "0.21.0", optional = true }
rustls = { version = "0.20.8", optional = true }
snap = "1.1.0"
scrypt = "0.11.0"
semver = { version = "1.0.17", features = ["serde"] }
serde = { version = "1.0.163", features = ["derive"] }
@ -105,6 +111,7 @@ url = "2.3.1"
[dev-dependencies]
criterion = "0.4"
env_logger = "0.10.0"
test-log = "0.2.11"
pprof = { version = "0.11.1", features = [ "flamegraph", "criterion" ] }
temp-dir = "0.1.11"
time = { version = "0.3.21", features = ["serde"] }

View file

@ -4,7 +4,15 @@ use crate::dbs::Statement;
use crate::dbs::Transaction;
use crate::doc::Document;
use crate::err::Error;
// use crate::idx::ft::FtIndex;
use crate::idx::ft::FtIndex;
use crate::idx::IndexKeyBase;
use crate::sql::array::Array;
use crate::sql::index::Index;
use crate::sql::scoring::Scoring;
use crate::sql::statements::DefineIndexStatement;
use crate::sql::{Ident, Number, Thing, Value};
use crate::{key, kvs};
impl<'a> Document<'a> {
pub async fn index(
@ -31,74 +39,169 @@ impl<'a> Document<'a> {
// Loop through all index statements
for ix in self.ix(opt, txn).await?.iter() {
// Calculate old values
let mut o = Array::with_capacity(ix.cols.len());
for i in ix.cols.iter() {
let v = i.compute(ctx, opt, txn, Some(&self.initial)).await?;
o.push(v);
}
let o = Self::build_opt_array(ctx, &txn, opt, ix, &self.initial).await?;
// Calculate new values
let mut n = Array::with_capacity(ix.cols.len());
for i in ix.cols.iter() {
let v = i.compute(ctx, opt, txn, Some(&self.current)).await?;
n.push(v);
}
// Clone transaction
let run = txn.clone();
// Claim transaction
let mut run = run.lock().await;
let n = Self::build_opt_array(ctx, &txn, opt, ix, &self.current).await?;
// Update the index entries
if opt.force || o != n {
match ix.uniq {
true => {
// Delete the old index data
if self.initial.is_some() {
#[rustfmt::skip]
let key = crate::key::index::new(opt.ns(), opt.db(), &ix.what, &ix.name, &o, None);
let _ = run.delc(key, Some(rid)).await; // Ignore this error
}
// Create the new index data
if self.current.is_some() {
#[rustfmt::skip]
let key = crate::key::index::new(opt.ns(), opt.db(), &ix.what, &ix.name, &n, None);
if run.putc(key, rid, None).await.is_err() {
return Err(Error::IndexExists {
thing: rid.to_string(),
index: ix.name.to_string(),
value: match n.len() {
1 => n.first().unwrap().to_string(),
_ => n.to_string(),
},
});
}
}
}
false => {
// Delete the old index data
if self.initial.is_some() {
#[rustfmt::skip]
let key = crate::key::index::new(opt.ns(), opt.db(), &ix.what, &ix.name, &o, Some(&rid.id));
let _ = run.delc(key, Some(rid)).await; // Ignore this error
}
// Create the new index data
if self.current.is_some() {
#[rustfmt::skip]
let key = crate::key::index::new(opt.ns(), opt.db(), &ix.what, &ix.name, &n, Some(&rid.id));
if run.putc(key, rid, None).await.is_err() {
return Err(Error::IndexExists {
thing: rid.to_string(),
index: ix.name.to_string(),
value: match n.len() {
1 => n.first().unwrap().to_string(),
_ => n.to_string(),
},
});
}
}
}
// Claim transaction
let mut run = txn.lock().await;
// Store all the variable and parameters required by the index operation
let mut ic = IndexOperation::new(opt, ix, o, n, rid);
// Index operation dispatching
match &ix.index {
Index::Uniq => ic.index_unique(&mut run).await?,
Index::Idx => ic.index_non_unique(&mut run).await?,
Index::Search {
az,
sc,
hl,
} => match sc {
Scoring::Bm {
k1,
b,
order,
} => ic.index_best_matching_search(&mut run, az, k1, b, order, *hl).await?,
Scoring::Vs => ic.index_vector_search(az, *hl).await?,
},
};
}
}
// Carry on
Ok(())
}
async fn build_opt_array(
ctx: &Context<'_>,
txn: &Transaction,
opt: &Options,
ix: &DefineIndexStatement,
value: &Value,
) -> Result<Option<Array>, Error> {
if !value.is_some() {
return Ok(None);
}
let mut o = Array::with_capacity(ix.cols.len());
for i in ix.cols.iter() {
let v = i.compute(ctx, opt, txn, Some(value)).await?;
o.push(v);
}
Ok(Some(o))
}
}
struct IndexOperation<'a> {
opt: &'a Options,
ix: &'a DefineIndexStatement,
/// The old value (if existing)
o: Option<Array>,
/// The new value (if existing)
n: Option<Array>,
rid: &'a Thing,
}
impl<'a> IndexOperation<'a> {
fn new(
opt: &'a Options,
ix: &'a DefineIndexStatement,
o: Option<Array>,
n: Option<Array>,
rid: &'a Thing,
) -> Self {
Self {
opt,
ix,
o,
n,
rid,
}
}
fn get_non_unique_index_key(&self, v: &Array) -> key::index::Index {
key::index::new(
self.opt.ns(),
self.opt.db(),
&self.ix.what,
&self.ix.name,
v,
Some(&self.rid.id),
)
}
async fn index_non_unique(&self, run: &mut kvs::Transaction) -> Result<(), Error> {
// Delete the old index data
if let Some(o) = &self.o {
let key = self.get_non_unique_index_key(o);
let _ = run.delc(key, Some(self.rid)).await; // Ignore this error
}
// Create the new index data
if let Some(n) = &self.n {
let key = self.get_non_unique_index_key(n);
if run.putc(key, self.rid, None).await.is_err() {
return self.err_index_exists(n);
}
}
Ok(())
}
fn get_unique_index_key(&self, v: &Array) -> key::index::Index {
key::index::new(self.opt.ns(), self.opt.db(), &self.ix.what, &self.ix.name, v, None)
}
async fn index_unique(&self, run: &mut kvs::Transaction) -> Result<(), Error> {
// Delete the old index data
if let Some(o) = &self.o {
let key = self.get_unique_index_key(o);
let _ = run.delc(key, Some(self.rid)).await; // Ignore this error
}
// Create the new index data
if let Some(n) = &self.n {
let key = self.get_unique_index_key(n);
if run.putc(key, self.rid, None).await.is_err() {
return self.err_index_exists(n);
}
}
Ok(())
}
fn err_index_exists(&self, n: &Array) -> Result<(), Error> {
Err(Error::IndexExists {
thing: self.rid.to_string(),
index: self.ix.name.to_string(),
value: match n.len() {
1 => n.first().unwrap().to_string(),
_ => n.to_string(),
},
})
}
async fn index_best_matching_search(
&self,
run: &mut kvs::Transaction,
_az: &Ident,
_k1: &Number,
_b: &Number,
order: &Number,
_hl: bool,
) -> Result<(), Error> {
let ikb = IndexKeyBase::new(self.opt, self.ix);
let mut ft = FtIndex::new(run, ikb, order.to_usize()).await?;
let doc_key = self.rid.into();
if let Some(n) = &self.n {
// TODO: Apply the analyzer
ft.index_document(run, doc_key, &n.to_string()).await
} else {
ft.remove_document(run, doc_key).await
}
}
async fn index_vector_search(&mut self, _az: &Ident, _hl: bool) -> Result<(), Error> {
Err(Error::FeatureNotYetImplemented {
feature: "VectorSearch indexing",
})
}
}

View file

@ -1,8 +1,11 @@
use crate::sql::idiom::Idiom;
use crate::sql::value::Value;
use bincode::Error as BincodeError;
use bung::encode::Error as SerdeError;
use fst::Error as FstError;
use serde::Serialize;
use std::borrow::Cow;
use std::string::FromUtf8Error;
use storekey::decode::Error as DecodeError;
use storekey::encode::Error as EncodeError;
use thiserror::Error;
@ -390,6 +393,32 @@ pub enum Error {
/// Represents an error when decoding a key-value entry
#[error("Key decoding error: {0}")]
Decode(#[from] DecodeError),
/// Represents an error when decoding a key-value entry
#[error("Index is corrupted")]
CorruptedIndex,
/// Represents an error when analyzing a value
#[error("A string can't be analyzed: {0}")]
AnalyzerError(String),
/// Represents an underlying error with Bincode serializing / deserializing
#[error("Bincode error: {0}")]
Bincode(#[from] BincodeError),
/// Represents an underlying error with FST
#[error("FstError error: {0}")]
FstError(#[from] FstError),
/// Represents an underlying error while reading UTF8 characters
#[error("Utf8 error: {0}")]
Utf8Error(#[from] FromUtf8Error),
/// The feature has not yet being implemented
#[error("Feature not yet implemented: {feature}")]
FeatureNotYetImplemented {
feature: &'static str,
},
}
impl From<Error> for String {

704
lib/src/idx/bkeys.rs Normal file
View file

@ -0,0 +1,704 @@
use crate::err::Error;
use crate::idx::btree::Payload;
use crate::kvs::{Key, Transaction};
use async_trait::async_trait;
use fst::{IntoStreamer, Map, MapBuilder, Streamer};
use radix_trie::{Trie, TrieCommon};
use serde::{de, ser, Deserialize, Serialize};
use std::collections::VecDeque;
use std::fmt::{Display, Formatter};
use std::io;
#[async_trait]
pub(super) trait KeyVisitor {
async fn visit(
&mut self,
tx: &mut Transaction,
key: &Key,
payload: Payload,
) -> Result<(), Error>;
}
#[async_trait]
pub(super) trait BKeys: Display + Sized {
fn with_key_val(key: Key, payload: Payload) -> Result<Self, Error>;
fn len(&self) -> usize;
fn get(&self, key: &Key) -> Option<Payload>;
async fn collect_with_prefix<V>(
&self,
tx: &mut Transaction,
prefix_key: &Key,
visitor: &mut V,
) -> Result<bool, Error>
where
V: KeyVisitor + Send;
fn insert(&mut self, key: Key, payload: Payload);
fn append(&mut self, keys: Self);
fn remove(&mut self, key: &Key) -> Option<Payload>;
fn split_keys(&self) -> SplitKeys<Self>;
fn get_key(&self, idx: usize) -> Option<Key>;
fn get_child_idx(&self, searched_key: &Key) -> usize;
fn get_first_key(&self) -> Option<(Key, Payload)>;
fn get_last_key(&self) -> Option<(Key, Payload)>;
fn compile(&mut self) {}
fn debug<F>(&self, to_string: F) -> Result<(), Error>
where
F: Fn(Key) -> Result<String, Error>;
}
pub(super) struct SplitKeys<BK>
where
BK: BKeys,
{
pub(super) left: BK,
pub(super) right: BK,
pub(super) median_idx: usize,
pub(super) median_key: Key,
pub(super) median_payload: Payload,
}
#[derive(Default)]
pub(super) struct FstKeys {
map: Map<Vec<u8>>,
additions: Trie<Key, Payload>,
deletions: Trie<Key, bool>,
len: usize,
}
#[async_trait]
impl BKeys for FstKeys {
fn with_key_val(key: Key, payload: Payload) -> Result<Self, Error> {
let mut builder = MapBuilder::memory();
builder.insert(key, payload).unwrap();
Ok(Self::try_from(builder)?)
}
fn len(&self) -> usize {
self.len
}
fn get(&self, key: &Key) -> Option<Payload> {
if let Some(payload) = self.additions.get(key) {
Some(*payload)
} else {
self.map.get(key).filter(|_| self.deletions.get(key).is_none())
}
}
async fn collect_with_prefix<V>(
&self,
_tx: &mut Transaction,
_prefix_key: &Key,
_visitor: &mut V,
) -> Result<bool, Error>
where
V: KeyVisitor,
{
panic!("Not supported!")
}
fn insert(&mut self, key: Key, payload: Payload) {
self.deletions.remove(&key);
let existing_key = self.map.get(&key).is_some();
if self.additions.insert(key, payload).is_none() && !existing_key {
self.len += 1;
}
}
fn append(&mut self, mut keys: Self) {
keys.compile();
let mut s = keys.map.stream();
while let Some((key, payload)) = s.next() {
self.insert(key.to_vec(), payload);
}
}
fn remove(&mut self, key: &Key) -> Option<Payload> {
if self.deletions.get(key).is_some() {
return None;
}
if let Some(payload) = self.additions.remove(key) {
self.len -= 1;
return Some(payload);
}
self.get(key).map(|payload| {
if self.deletions.insert(key.clone(), true).is_none() {
self.len -= 1;
}
payload
})
}
fn split_keys(&self) -> SplitKeys<Self> {
let median_idx = self.map.len() / 2;
let mut s = self.map.stream();
let mut left = MapBuilder::memory();
let mut n = median_idx;
while n > 0 {
if let Some((key, payload)) = s.next() {
left.insert(key, payload).unwrap();
}
n -= 1;
}
let (median_key, median_payload) = s
.next()
.map_or_else(|| panic!("The median key/value should exist"), |(k, v)| (k.into(), v));
let mut right = MapBuilder::memory();
while let Some((key, value)) = s.next() {
right.insert(key, value).unwrap();
}
SplitKeys {
left: Self::try_from(left).unwrap(),
right: Self::try_from(right).unwrap(),
median_idx,
median_key,
median_payload,
}
}
fn get_key(&self, mut idx: usize) -> Option<Key> {
let mut s = self.map.keys().into_stream();
while let Some(key) = s.next() {
if idx == 0 {
return Some(key.to_vec());
}
idx -= 1;
}
None
}
fn get_child_idx(&self, searched_key: &Key) -> usize {
let searched_key = searched_key.as_slice();
let mut s = self.map.keys().into_stream();
let mut child_idx = 0;
while let Some(key) = s.next() {
if searched_key.le(key) {
break;
}
child_idx += 1;
}
child_idx
}
fn get_first_key(&self) -> Option<(Key, Payload)> {
self.map.stream().next().map(|(k, p)| (k.to_vec(), p))
}
fn get_last_key(&self) -> Option<(Key, Payload)> {
let mut last = None;
let mut s = self.map.stream();
while let Some((k, p)) = s.next() {
last = Some((k.to_vec(), p));
}
last
}
/// Rebuilt the FST by incorporating the changes (additions and deletions)
fn compile(&mut self) {
if self.additions.is_empty() && self.deletions.is_empty() {
return;
}
let mut existing_keys = self.map.stream();
let mut new_keys = self.additions.iter();
let mut current_existing = existing_keys.next();
let mut current_new = new_keys.next();
let mut builder = MapBuilder::memory();
// We use a double iterator because the map as to be filled with sorted terms
loop {
match current_new {
None => break,
Some((new_key_vec, new_value)) => match current_existing {
None => break,
Some((existing_key_vec, existing_value)) => {
if self.deletions.get(existing_key_vec).is_some()
|| self.additions.get(existing_key_vec).is_some()
{
current_existing = existing_keys.next();
} else if new_key_vec.as_slice().ge(existing_key_vec) {
builder.insert(existing_key_vec, existing_value).unwrap();
current_existing = existing_keys.next();
} else {
builder.insert(new_key_vec, *new_value).unwrap();
current_new = new_keys.next();
}
}
},
};
}
// Insert any existing term left over
while let Some((existing_key_vec, value)) = current_existing {
if self.deletions.get(existing_key_vec).is_none()
&& self.additions.get(existing_key_vec).is_none()
{
builder.insert(existing_key_vec, value).unwrap();
}
current_existing = existing_keys.next();
}
// Insert any new term left over
while let Some((new_key_vec, value)) = current_new {
builder.insert(new_key_vec, *value).unwrap();
current_new = new_keys.next();
}
self.map = Map::new(builder.into_inner().unwrap()).unwrap();
self.additions = Default::default();
self.deletions = Default::default();
}
fn debug<F>(&self, to_string: F) -> Result<(), Error>
where
F: Fn(Key) -> Result<String, Error>,
{
let mut s = String::new();
let mut iter = self.map.stream();
let mut start = true;
while let Some((k, p)) = iter.next() {
if !start {
s.push(',');
} else {
start = false;
}
s.push_str(&format!("{}={}", to_string(k.to_vec())?.as_str(), p));
}
debug!("FSTKeys[{}]", s);
Ok(())
}
}
impl TryFrom<MapBuilder<Vec<u8>>> for FstKeys {
type Error = fst::Error;
fn try_from(builder: MapBuilder<Vec<u8>>) -> Result<Self, Self::Error> {
Self::try_from(builder.into_inner()?)
}
}
impl TryFrom<Vec<u8>> for FstKeys {
type Error = fst::Error;
fn try_from(bytes: Vec<u8>) -> Result<Self, Self::Error> {
let map = Map::new(bytes)?;
let len = map.len();
Ok(Self {
map,
len,
additions: Default::default(),
deletions: Default::default(),
})
}
}
impl Serialize for FstKeys {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
if !self.deletions.is_empty() || !self.additions.is_empty() {
Err(ser::Error::custom("bkeys.compile() should be called prior serializing"))
} else {
serializer.serialize_bytes(self.map.as_fst().as_bytes())
}
}
}
impl<'de> Deserialize<'de> for FstKeys {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
let buf: Vec<u8> = Deserialize::deserialize(deserializer)?;
Self::try_from(buf).map_err(de::Error::custom)
}
}
impl Display for FstKeys {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
let mut s = self.map.stream();
let mut start = true;
while let Some((key, val)) = s.next() {
let key = String::from_utf8_lossy(key);
if start {
start = false;
} else {
f.write_str(", ")?;
}
write!(f, "{}=>{}", key, val)?;
}
Ok(())
}
}
#[derive(Default)]
pub(super) struct TrieKeys {
keys: Trie<Key, Payload>,
}
impl Serialize for TrieKeys {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
let uncompressed = bincode::serialize(&self.keys).unwrap();
let mut reader = uncompressed.as_slice();
let mut compressed: Vec<u8> = Vec::new();
{
let mut wtr = snap::write::FrameEncoder::new(&mut compressed);
io::copy(&mut reader, &mut wtr).expect("I/O operation failed");
}
serializer.serialize_bytes(&compressed)
}
}
impl<'de> Deserialize<'de> for TrieKeys {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
let compressed: Vec<u8> = Deserialize::deserialize(deserializer)?;
let reader = compressed.as_slice();
let mut uncompressed: Vec<u8> = Vec::new();
{
let mut rdr = snap::read::FrameDecoder::new(reader);
io::copy(&mut rdr, &mut uncompressed).expect("I/O operation failed");
}
let keys: Trie<Vec<u8>, u64> = bincode::deserialize(&uncompressed).unwrap();
Ok(Self {
keys,
})
}
}
impl Display for TrieKeys {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
let mut start = true;
for (key, val) in self.keys.iter() {
let key = String::from_utf8_lossy(key);
if start {
start = false;
} else {
f.write_str(", ")?;
}
write!(f, "{}=>{}", key, val)?;
}
Ok(())
}
}
#[async_trait]
impl BKeys for TrieKeys {
fn with_key_val(key: Key, payload: Payload) -> Result<Self, Error> {
let mut trie_keys = Self {
keys: Trie::default(),
};
trie_keys.insert(key, payload);
Ok(trie_keys)
}
fn len(&self) -> usize {
self.keys.len()
}
fn get(&self, key: &Key) -> Option<Payload> {
self.keys.get(key).copied()
}
async fn collect_with_prefix<V>(
&self,
tx: &mut Transaction,
prefix: &Key,
visitor: &mut V,
) -> Result<bool, Error>
where
V: KeyVisitor + Send,
{
let mut node_queue = VecDeque::new();
if let Some(node) = self.keys.get_raw_descendant(prefix) {
node_queue.push_front(node);
}
let mut found = false;
while let Some(node) = node_queue.pop_front() {
if let Some(value) = node.value() {
if let Some(node_key) = node.key() {
if node_key.starts_with(prefix) {
found = true;
visitor.visit(tx, node_key, *value).await?;
}
}
}
for children in node.children() {
node_queue.push_front(children);
}
}
Ok(found)
}
fn insert(&mut self, key: Key, payload: Payload) {
self.keys.insert(key, payload);
}
fn append(&mut self, keys: Self) {
for (k, p) in keys.keys.iter() {
self.insert(k.clone(), *p);
}
}
fn remove(&mut self, key: &Key) -> Option<Payload> {
self.keys.remove(key)
}
fn split_keys(&self) -> SplitKeys<Self> {
let median_idx = self.keys.len() / 2;
let mut s = self.keys.iter();
let mut left = Trie::default();
let mut n = median_idx;
while n > 0 {
if let Some((key, payload)) = s.next() {
left.insert(key.clone(), *payload);
}
n -= 1;
}
let (median_key, median_payload) = s
.next()
.map_or_else(|| panic!("The median key/value should exist"), |(k, v)| (k.clone(), *v));
let mut right = Trie::default();
for (key, val) in s {
right.insert(key.clone(), *val);
}
SplitKeys {
left: Self::from(left),
right: Self::from(right),
median_idx,
median_key,
median_payload,
}
}
fn get_key(&self, mut idx: usize) -> Option<Key> {
for key in self.keys.keys() {
if idx == 0 {
return Some(key.clone());
}
idx -= 1;
}
None
}
fn get_child_idx(&self, searched_key: &Key) -> usize {
let mut child_idx = 0;
for key in self.keys.keys() {
if searched_key.le(key) {
break;
}
child_idx += 1;
}
child_idx
}
fn get_first_key(&self) -> Option<(Key, Payload)> {
self.keys.iter().next().map(|(k, p)| (k.clone(), *p))
}
fn get_last_key(&self) -> Option<(Key, Payload)> {
self.keys.iter().last().map(|(k, p)| (k.clone(), *p))
}
fn debug<F>(&self, to_string: F) -> Result<(), Error>
where
F: Fn(Key) -> Result<String, Error>,
{
let mut s = String::new();
let mut start = true;
for (k, p) in self.keys.iter() {
if !start {
s.push(',');
} else {
start = false;
}
s.push_str(&format!("{}={}", to_string(k.to_vec())?.as_str(), p));
}
debug!("TrieKeys[{}]", s);
Ok(())
}
}
impl From<Trie<Vec<u8>, u64>> for TrieKeys {
fn from(keys: Trie<Vec<u8>, u64>) -> Self {
Self {
keys,
}
}
}
#[cfg(test)]
mod tests {
use crate::idx::bkeys::{BKeys, FstKeys, TrieKeys};
use crate::idx::tests::HashVisitor;
use crate::kvs::{Datastore, Key};
use std::collections::HashSet;
#[test]
fn test_fst_keys_serde() {
let key: Key = "a".as_bytes().into();
let keys = FstKeys::with_key_val(key.clone(), 130).unwrap();
let buf = bincode::serialize(&keys).unwrap();
let keys: FstKeys = bincode::deserialize(&buf).unwrap();
assert_eq!(keys.get(&key), Some(130));
}
#[test]
fn test_trie_keys_serde() {
let key: Key = "a".as_bytes().into();
let keys = TrieKeys::with_key_val(key.clone(), 130).unwrap();
let buf = bincode::serialize(&keys).unwrap();
let keys: TrieKeys = bincode::deserialize(&buf).unwrap();
assert_eq!(keys.get(&key), Some(130));
}
fn test_keys_additions<BK: BKeys>(mut keys: BK) {
let terms = [
"the", "quick", "brown", "fox", "jumped", "over", "the", "lazy", "dog", "the", "fast",
"fox", "jumped", "over", "the", "lazy", "dog",
];
let mut i = 1;
assert_eq!(keys.len(), 0);
let mut term_set = HashSet::new();
for term in terms {
term_set.insert(term.to_string());
let key: Key = term.into();
keys.insert(key.clone(), i);
keys.compile();
assert_eq!(keys.get(&key), Some(i));
assert_eq!(keys.len(), term_set.len());
i += 1;
}
}
#[test]
fn test_fst_keys_additions() {
test_keys_additions(FstKeys::default())
}
#[test]
fn test_trie_keys_additions() {
test_keys_additions(TrieKeys::default())
}
fn test_keys_deletions<BK: BKeys>(mut keys: BK) {
assert_eq!(keys.remove(&"dummy".into()), None);
assert_eq!(keys.len(), 0);
keys.insert("foo".into(), 1);
keys.insert("bar".into(), 2);
assert_eq!(keys.len(), 2);
assert_eq!(keys.remove(&"bar".into()), Some(2));
assert_eq!(keys.len(), 1);
assert_eq!(keys.remove(&"bar".into()), None);
assert_eq!(keys.len(), 1);
assert_eq!(keys.remove(&"foo".into()), Some(1));
assert_eq!(keys.len(), 0);
assert_eq!(keys.remove(&"foo".into()), None);
assert_eq!(keys.len(), 0);
}
#[test]
fn test_fst_keys_deletions() {
test_keys_deletions(FstKeys::default())
}
#[test]
fn test_trie_keys_deletions() {
test_keys_deletions(TrieKeys::default())
}
#[tokio::test]
async fn test_tries_keys_collect_with_prefix() {
let ds = Datastore::new("memory").await.unwrap();
let mut tx = ds.transaction(true, false).await.unwrap();
let mut keys = TrieKeys::default();
keys.insert("apple".into(), 1);
keys.insert("applicant".into(), 2);
keys.insert("application".into(), 3);
keys.insert("applicative".into(), 4);
keys.insert("banana".into(), 5);
keys.insert("blueberry".into(), 6);
keys.insert("the".into(), 7);
keys.insert("these".into(), 11);
keys.insert("theses".into(), 12);
keys.insert("their".into(), 8);
keys.insert("theirs".into(), 9);
keys.insert("there".into(), 10);
{
let mut visitor = HashVisitor::default();
keys.collect_with_prefix(&mut tx, &"appli".into(), &mut visitor).await.unwrap();
visitor.check(
vec![("applicant".into(), 2), ("application".into(), 3), ("applicative".into(), 4)],
"appli",
);
}
{
let mut visitor = HashVisitor::default();
keys.collect_with_prefix(&mut tx, &"the".into(), &mut visitor).await.unwrap();
visitor.check(
vec![
("the".into(), 7),
("their".into(), 8),
("theirs".into(), 9),
("there".into(), 10),
("these".into(), 11),
("theses".into(), 12),
],
"the",
);
}
{
let mut visitor = HashVisitor::default();
keys.collect_with_prefix(&mut tx, &"blue".into(), &mut visitor).await.unwrap();
visitor.check(vec![("blueberry".into(), 6)], "blue");
}
{
let mut visitor = HashVisitor::default();
keys.collect_with_prefix(&mut tx, &"apple".into(), &mut visitor).await.unwrap();
visitor.check(vec![("apple".into(), 1)], "apple");
}
{
let mut visitor = HashVisitor::default();
keys.collect_with_prefix(&mut tx, &"zz".into(), &mut visitor).await.unwrap();
visitor.check(vec![], "zz");
}
}
fn test_keys_split<BK: BKeys>(mut keys: BK) {
keys.insert("a".into(), 1);
keys.insert("b".into(), 2);
keys.insert("c".into(), 3);
keys.insert("d".into(), 4);
keys.insert("e".into(), 5);
keys.compile();
let r = keys.split_keys();
assert_eq!(r.median_payload, 3);
let c: Key = "c".into();
assert_eq!(r.median_key, c);
assert_eq!(r.median_idx, 2);
assert_eq!(r.left.len(), 2);
assert_eq!(r.left.get(&"a".into()), Some(1));
assert_eq!(r.left.get(&"b".into()), Some(2));
assert_eq!(r.right.len(), 2);
assert_eq!(r.right.get(&"d".into()), Some(4));
assert_eq!(r.right.get(&"e".into()), Some(5));
}
#[test]
fn test_fst_keys_split() {
test_keys_split(FstKeys::default());
}
#[test]
fn test_trie_keys_split() {
test_keys_split(TrieKeys::default());
}
}

1490
lib/src/idx/btree.rs Normal file

File diff suppressed because it is too large Load diff

296
lib/src/idx/ft/docids.rs Normal file
View file

@ -0,0 +1,296 @@
use crate::err::Error;
use crate::idx::bkeys::TrieKeys;
use crate::idx::btree::{BTree, KeyProvider, NodeId, Statistics};
use crate::idx::{btree, IndexKeyBase, SerdeState};
use crate::kvs::{Key, Transaction};
use roaring::RoaringTreemap;
use serde::{Deserialize, Serialize};
pub(crate) type DocId = u64;
pub(super) struct DocIds {
state_key: Key,
index_key_base: IndexKeyBase,
btree: BTree<DocIdsKeyProvider>,
available_ids: Option<RoaringTreemap>,
next_doc_id: DocId,
updated: bool,
}
impl DocIds {
pub(super) async fn new(
tx: &mut Transaction,
index_key_base: IndexKeyBase,
default_btree_order: usize,
) -> Result<Self, Error> {
let keys = DocIdsKeyProvider {
index_key_base: index_key_base.clone(),
};
let state_key: Key = keys.get_state_key();
let state: State = if let Some(val) = tx.get(state_key.clone()).await? {
State::try_from_val(val)?
} else {
State::new(default_btree_order)
};
Ok(Self {
state_key,
index_key_base,
btree: BTree::new(keys, state.btree),
available_ids: state.available_ids,
next_doc_id: state.next_doc_id,
updated: false,
})
}
fn get_next_doc_id(&mut self) -> DocId {
// We check first if there is any available id
if let Some(available_ids) = &mut self.available_ids {
if let Some(available_id) = available_ids.iter().next() {
available_ids.remove(available_id);
if available_ids.is_empty() {
self.available_ids = None;
}
return available_id;
}
}
// If not, we use the sequence
let doc_id = self.next_doc_id;
self.next_doc_id += 1;
doc_id
}
/// Returns the doc_id for the given doc_key.
/// If the doc_id does not exists, a new one is created, and associated to the given key.
pub(super) async fn resolve_doc_id(
&mut self,
tx: &mut Transaction,
doc_key: Key,
) -> Result<Resolved, Error> {
if let Some(doc_id) = self.btree.search::<TrieKeys>(tx, &doc_key).await? {
Ok(Resolved::Existing(doc_id))
} else {
let doc_id = self.get_next_doc_id();
tx.set(self.index_key_base.new_bi_key(doc_id), doc_key.clone()).await?;
self.btree.insert::<TrieKeys>(tx, doc_key, doc_id).await?;
self.updated = true;
Ok(Resolved::New(doc_id))
}
}
pub(super) async fn remove_doc(
&mut self,
tx: &mut Transaction,
doc_key: Key,
) -> Result<Option<DocId>, Error> {
if let Some(doc_id) = self.btree.delete::<TrieKeys>(tx, doc_key).await? {
tx.del(self.index_key_base.new_bi_key(doc_id)).await?;
if let Some(available_ids) = &mut self.available_ids {
available_ids.insert(doc_id);
} else {
let mut available_ids = RoaringTreemap::new();
available_ids.insert(doc_id);
self.available_ids = Some(available_ids);
}
self.updated = true;
Ok(Some(doc_id))
} else {
Ok(None)
}
}
pub(super) async fn get_doc_key(
&self,
tx: &mut Transaction,
doc_id: DocId,
) -> Result<Option<Key>, Error> {
let doc_id_key = self.index_key_base.new_bi_key(doc_id);
if let Some(val) = tx.get(doc_id_key).await? {
Ok(Some(val))
} else {
Ok(None)
}
}
pub(super) async fn statistics(&self, tx: &mut Transaction) -> Result<Statistics, Error> {
self.btree.statistics::<TrieKeys>(tx).await
}
pub(super) async fn finish(self, tx: &mut Transaction) -> Result<(), Error> {
if self.updated || self.btree.is_updated() {
let state = State {
btree: self.btree.get_state().clone(),
available_ids: self.available_ids,
next_doc_id: self.next_doc_id,
};
tx.set(self.state_key, state.try_to_val()?).await?;
}
Ok(())
}
}
#[derive(Serialize, Deserialize)]
struct State {
btree: btree::State,
available_ids: Option<RoaringTreemap>,
next_doc_id: DocId,
}
impl SerdeState for State {}
impl State {
fn new(default_btree_order: usize) -> Self {
Self {
btree: btree::State::new(default_btree_order),
available_ids: None,
next_doc_id: 0,
}
}
}
#[derive(Debug, PartialEq)]
pub(super) enum Resolved {
New(DocId),
Existing(DocId),
}
impl Resolved {
pub(super) fn doc_id(&self) -> &DocId {
match self {
Resolved::New(doc_id) => doc_id,
Resolved::Existing(doc_id) => doc_id,
}
}
pub(super) fn was_existing(&self) -> bool {
match self {
Resolved::New(_) => false,
Resolved::Existing(_) => true,
}
}
}
struct DocIdsKeyProvider {
index_key_base: IndexKeyBase,
}
impl KeyProvider for DocIdsKeyProvider {
fn get_node_key(&self, node_id: NodeId) -> Key {
self.index_key_base.new_bd_key(Some(node_id))
}
fn get_state_key(&self) -> Key {
self.index_key_base.new_bd_key(None)
}
}
#[cfg(test)]
mod tests {
use crate::idx::ft::docids::{DocIds, Resolved};
use crate::idx::IndexKeyBase;
use crate::kvs::{Datastore, Transaction};
const BTREE_ORDER: usize = 7;
async fn get_doc_ids(ds: &Datastore) -> (Transaction, DocIds) {
let mut tx = ds.transaction(true, false).await.unwrap();
let d = DocIds::new(&mut tx, IndexKeyBase::default(), BTREE_ORDER).await.unwrap();
(tx, d)
}
async fn finish(mut tx: Transaction, d: DocIds) {
d.finish(&mut tx).await.unwrap();
tx.commit().await.unwrap();
}
#[tokio::test]
async fn test_resolve_doc_id() {
let ds = Datastore::new("memory").await.unwrap();
// Resolve a first doc key
let (mut tx, mut d) = get_doc_ids(&ds).await;
let doc_id = d.resolve_doc_id(&mut tx, "Foo".into()).await.unwrap();
assert_eq!(d.statistics(&mut tx).await.unwrap().keys_count, 1);
assert_eq!(d.get_doc_key(&mut tx, 0).await.unwrap(), Some("Foo".into()));
finish(tx, d).await;
assert_eq!(doc_id, Resolved::New(0));
// Resolve the same doc key
let (mut tx, mut d) = get_doc_ids(&ds).await;
let doc_id = d.resolve_doc_id(&mut tx, "Foo".into()).await.unwrap();
assert_eq!(d.statistics(&mut tx).await.unwrap().keys_count, 1);
assert_eq!(d.get_doc_key(&mut tx, 0).await.unwrap(), Some("Foo".into()));
finish(tx, d).await;
assert_eq!(doc_id, Resolved::Existing(0));
// Resolve another single doc key
let (mut tx, mut d) = get_doc_ids(&ds).await;
let doc_id = d.resolve_doc_id(&mut tx, "Bar".into()).await.unwrap();
assert_eq!(d.statistics(&mut tx).await.unwrap().keys_count, 2);
assert_eq!(d.get_doc_key(&mut tx, 1).await.unwrap(), Some("Bar".into()));
finish(tx, d).await;
assert_eq!(doc_id, Resolved::New(1));
// Resolve another two existing doc keys and two new doc keys (interlaced)
let (mut tx, mut d) = get_doc_ids(&ds).await;
assert_eq!(d.resolve_doc_id(&mut tx, "Foo".into()).await.unwrap(), Resolved::Existing(0));
assert_eq!(d.resolve_doc_id(&mut tx, "Hello".into()).await.unwrap(), Resolved::New(2));
assert_eq!(d.resolve_doc_id(&mut tx, "Bar".into()).await.unwrap(), Resolved::Existing(1));
assert_eq!(d.resolve_doc_id(&mut tx, "World".into()).await.unwrap(), Resolved::New(3));
assert_eq!(d.statistics(&mut tx).await.unwrap().keys_count, 4);
finish(tx, d).await;
let (mut tx, mut d) = get_doc_ids(&ds).await;
assert_eq!(d.resolve_doc_id(&mut tx, "Foo".into()).await.unwrap(), Resolved::Existing(0));
assert_eq!(d.resolve_doc_id(&mut tx, "Bar".into()).await.unwrap(), Resolved::Existing(1));
assert_eq!(d.resolve_doc_id(&mut tx, "Hello".into()).await.unwrap(), Resolved::Existing(2));
assert_eq!(d.resolve_doc_id(&mut tx, "World".into()).await.unwrap(), Resolved::Existing(3));
assert_eq!(d.get_doc_key(&mut tx, 0).await.unwrap(), Some("Foo".into()));
assert_eq!(d.get_doc_key(&mut tx, 1).await.unwrap(), Some("Bar".into()));
assert_eq!(d.get_doc_key(&mut tx, 2).await.unwrap(), Some("Hello".into()));
assert_eq!(d.get_doc_key(&mut tx, 3).await.unwrap(), Some("World".into()));
assert_eq!(d.statistics(&mut tx).await.unwrap().keys_count, 4);
finish(tx, d).await;
}
#[tokio::test]
async fn test_remove_doc() {
let ds = Datastore::new("memory").await.unwrap();
// Create two docs
let (mut tx, mut d) = get_doc_ids(&ds).await;
assert_eq!(d.resolve_doc_id(&mut tx, "Foo".into()).await.unwrap(), Resolved::New(0));
assert_eq!(d.resolve_doc_id(&mut tx, "Bar".into()).await.unwrap(), Resolved::New(1));
finish(tx, d).await;
// Remove doc 1
let (mut tx, mut d) = get_doc_ids(&ds).await;
assert_eq!(d.remove_doc(&mut tx, "Dummy".into()).await.unwrap(), None);
assert_eq!(d.remove_doc(&mut tx, "Foo".into()).await.unwrap(), Some(0));
finish(tx, d).await;
// Check 'Foo' has been removed
let (mut tx, mut d) = get_doc_ids(&ds).await;
assert_eq!(d.remove_doc(&mut tx, "Foo".into()).await.unwrap(), None);
finish(tx, d).await;
// Insert a new doc - should take the available id 1
let (mut tx, mut d) = get_doc_ids(&ds).await;
assert_eq!(d.resolve_doc_id(&mut tx, "Hello".into()).await.unwrap(), Resolved::New(0));
finish(tx, d).await;
// Remove doc 2
let (mut tx, mut d) = get_doc_ids(&ds).await;
assert_eq!(d.remove_doc(&mut tx, "Dummy".into()).await.unwrap(), None);
assert_eq!(d.remove_doc(&mut tx, "Bar".into()).await.unwrap(), Some(1));
finish(tx, d).await;
// Check 'Bar' has been removed
let (mut tx, mut d) = get_doc_ids(&ds).await;
assert_eq!(d.remove_doc(&mut tx, "Foo".into()).await.unwrap(), None);
finish(tx, d).await;
// Insert a new doc - should take the available id 2
let (mut tx, mut d) = get_doc_ids(&ds).await;
assert_eq!(d.resolve_doc_id(&mut tx, "World".into()).await.unwrap(), Resolved::New(1));
finish(tx, d).await;
}
}

128
lib/src/idx/ft/doclength.rs Normal file
View file

@ -0,0 +1,128 @@
use crate::err::Error;
use crate::idx::bkeys::TrieKeys;
use crate::idx::btree::{BTree, KeyProvider, NodeId, Payload, Statistics};
use crate::idx::ft::docids::DocId;
use crate::idx::{btree, IndexKeyBase, SerdeState};
use crate::kvs::{Key, Transaction};
pub(super) type DocLength = u64;
pub(super) struct DocLengths {
state_key: Key,
btree: BTree<DocLengthsKeyProvider>,
}
impl DocLengths {
pub(super) async fn new(
tx: &mut Transaction,
index_key_base: IndexKeyBase,
default_btree_order: usize,
) -> Result<Self, Error> {
let keys = DocLengthsKeyProvider {
index_key_base,
};
let state_key: Key = keys.get_state_key();
let state: btree::State = if let Some(val) = tx.get(state_key.clone()).await? {
btree::State::try_from_val(val)?
} else {
btree::State::new(default_btree_order)
};
Ok(Self {
state_key,
btree: BTree::new(keys, state),
})
}
pub(super) async fn get_doc_length(
&self,
tx: &mut Transaction,
doc_id: DocId,
) -> Result<Option<DocLength>, Error> {
self.btree.search::<TrieKeys>(tx, &doc_id.to_be_bytes().to_vec()).await
}
pub(super) async fn set_doc_length(
&mut self,
tx: &mut Transaction,
doc_id: DocId,
doc_length: DocLength,
) -> Result<(), Error> {
self.btree.insert::<TrieKeys>(tx, doc_id.to_be_bytes().to_vec(), doc_length).await
}
pub(super) async fn remove_doc_length(
&mut self,
tx: &mut Transaction,
doc_id: DocId,
) -> Result<Option<Payload>, Error> {
self.btree.delete::<TrieKeys>(tx, doc_id.to_be_bytes().to_vec()).await
}
pub(super) async fn statistics(&self, tx: &mut Transaction) -> Result<Statistics, Error> {
self.btree.statistics::<TrieKeys>(tx).await
}
pub(super) async fn finish(self, tx: &mut Transaction) -> Result<(), Error> {
if self.btree.is_updated() {
tx.set(self.state_key, self.btree.get_state().try_to_val()?).await?;
}
Ok(())
}
}
struct DocLengthsKeyProvider {
index_key_base: IndexKeyBase,
}
impl KeyProvider for DocLengthsKeyProvider {
fn get_node_key(&self, node_id: NodeId) -> Key {
self.index_key_base.new_bl_key(Some(node_id))
}
fn get_state_key(&self) -> Key {
self.index_key_base.new_bl_key(None)
}
}
#[cfg(test)]
mod tests {
use crate::idx::ft::doclength::DocLengths;
use crate::idx::IndexKeyBase;
use crate::kvs::Datastore;
#[tokio::test]
async fn test_doc_lengths() {
const BTREE_ORDER: usize = 7;
let ds = Datastore::new("memory").await.unwrap();
// Check empty state
let mut tx = ds.transaction(true, false).await.unwrap();
let l = DocLengths::new(&mut tx, IndexKeyBase::default(), BTREE_ORDER).await.unwrap();
assert_eq!(l.statistics(&mut tx).await.unwrap().keys_count, 0);
let dl = l.get_doc_length(&mut tx, 99).await.unwrap();
l.finish(&mut tx).await.unwrap();
assert_eq!(dl, None);
// Set a doc length
let mut l = DocLengths::new(&mut tx, IndexKeyBase::default(), BTREE_ORDER).await.unwrap();
l.set_doc_length(&mut tx, 99, 199).await.unwrap();
assert_eq!(l.statistics(&mut tx).await.unwrap().keys_count, 1);
let dl = l.get_doc_length(&mut tx, 99).await.unwrap();
l.finish(&mut tx).await.unwrap();
assert_eq!(dl, Some(199));
// Update doc length
let mut l = DocLengths::new(&mut tx, IndexKeyBase::default(), BTREE_ORDER).await.unwrap();
l.set_doc_length(&mut tx, 99, 299).await.unwrap();
assert_eq!(l.statistics(&mut tx).await.unwrap().keys_count, 1);
let dl = l.get_doc_length(&mut tx, 99).await.unwrap();
l.finish(&mut tx).await.unwrap();
assert_eq!(dl, Some(299));
// Remove doc lengths
let mut l = DocLengths::new(&mut tx, IndexKeyBase::default(), BTREE_ORDER).await.unwrap();
assert_eq!(l.remove_doc_length(&mut tx, 99).await.unwrap(), Some(299));
assert_eq!(l.remove_doc_length(&mut tx, 99).await.unwrap(), None);
}
}

607
lib/src/idx/ft/mod.rs Normal file
View file

@ -0,0 +1,607 @@
pub(crate) mod docids;
mod doclength;
mod postings;
pub(crate) mod terms;
use crate::err::Error;
use crate::error::Db::AnalyzerError;
use crate::idx::ft::docids::{DocId, DocIds};
use crate::idx::ft::doclength::{DocLength, DocLengths};
use crate::idx::ft::postings::{Postings, PostingsVisitor, TermFrequency};
use crate::idx::ft::terms::Terms;
use crate::idx::{btree, IndexKeyBase, SerdeState};
use crate::kvs::{Key, Transaction};
use crate::sql::error::IResult;
use async_trait::async_trait;
use nom::bytes::complete::take_while;
use nom::character::complete::multispace0;
use roaring::RoaringTreemap;
use serde::{Deserialize, Serialize};
use std::collections::hash_map::Entry;
use std::collections::HashMap;
pub(crate) struct FtIndex {
state_key: Key,
index_key_base: IndexKeyBase,
state: State,
bm25: Bm25Params,
btree_default_order: usize,
}
pub(crate) trait HitVisitor {
fn visit(&mut self, tx: &mut Transaction, doc_key: Key, score: Score);
}
#[derive(Clone)]
struct Bm25Params {
k1: f32,
b: f32,
}
impl Default for Bm25Params {
fn default() -> Self {
Self {
k1: 1.2,
b: 0.75,
}
}
}
pub(super) struct Statistics {
doc_ids: btree::Statistics,
terms: btree::Statistics,
doc_lengths: btree::Statistics,
postings: btree::Statistics,
}
#[derive(Default, Serialize, Deserialize)]
struct State {
total_docs_lengths: u128,
doc_count: u64,
}
impl SerdeState for State {}
type Score = f32;
impl FtIndex {
pub(crate) async fn new(
tx: &mut Transaction,
index_key_base: IndexKeyBase,
btree_default_order: usize,
) -> Result<Self, Error> {
let state_key: Key = index_key_base.new_bs_key();
let state: State = if let Some(val) = tx.get(state_key.clone()).await? {
State::try_from_val(val)?
} else {
State::default()
};
Ok(Self {
state,
state_key,
index_key_base,
bm25: Bm25Params::default(),
btree_default_order,
})
}
async fn doc_ids(&self, tx: &mut Transaction) -> Result<DocIds, Error> {
DocIds::new(tx, self.index_key_base.clone(), self.btree_default_order).await
}
async fn terms(&self, tx: &mut Transaction) -> Result<Terms, Error> {
Terms::new(tx, self.index_key_base.clone(), self.btree_default_order).await
}
async fn doc_lengths(&self, tx: &mut Transaction) -> Result<DocLengths, Error> {
DocLengths::new(tx, self.index_key_base.clone(), self.btree_default_order).await
}
async fn postings(&self, tx: &mut Transaction) -> Result<Postings, Error> {
Postings::new(tx, self.index_key_base.clone(), self.btree_default_order).await
}
pub(crate) async fn remove_document(
&mut self,
tx: &mut Transaction,
doc_key: Key,
) -> Result<(), Error> {
// Extract and remove the doc_id (if any)
let mut d = self.doc_ids(tx).await?;
if let Some(doc_id) = d.remove_doc(tx, doc_key).await? {
self.state.doc_count -= 1;
// Remove the doc length
let mut l = self.doc_lengths(tx).await?;
if let Some(doc_lengths) = l.remove_doc_length(tx, doc_id).await? {
self.state.total_docs_lengths -= doc_lengths as u128;
l.finish(tx).await?;
}
// Get the term list
if let Some(term_list_vec) = tx.get(self.index_key_base.new_bk_key(doc_id)).await? {
let term_list = RoaringTreemap::try_from_val(term_list_vec)?;
// Remove the postings
let mut p = self.postings(tx).await?;
let mut t = self.terms(tx).await?;
for term_id in term_list {
p.remove_posting(tx, term_id, doc_id).await?;
// if the term is not present in any document in the index, we can remove it
if p.count_postings(tx, term_id).await? == 0 {
t.remove_term_id(tx, term_id).await?;
}
}
t.finish(tx).await?;
p.finish(tx).await?;
}
d.finish(tx).await?;
}
Ok(())
}
pub(crate) async fn index_document(
&mut self,
tx: &mut Transaction,
doc_key: Key,
field_content: &str,
) -> Result<(), Error> {
// Resolve the doc_id
let mut d = self.doc_ids(tx).await?;
let resolved = d.resolve_doc_id(tx, doc_key).await?;
let doc_id = *resolved.doc_id();
// Extract the doc_lengths, terms en frequencies
let mut t = self.terms(tx).await?;
let (doc_length, terms_and_frequencies) =
Self::extract_sorted_terms_with_frequencies(field_content)?;
// Set the doc length
let mut l = self.doc_lengths(tx).await?;
if resolved.was_existing() {
if let Some(old_doc_length) = l.get_doc_length(tx, doc_id).await? {
self.state.total_docs_lengths -= old_doc_length as u128;
}
}
l.set_doc_length(tx, doc_id, doc_length).await?;
// Retrieve the existing terms for this document (if any)
let term_ids_key = self.index_key_base.new_bk_key(doc_id);
let mut old_term_ids = if let Some(val) = tx.get(term_ids_key.clone()).await? {
Some(RoaringTreemap::try_from_val(val)?)
} else {
None
};
// Set the terms postings
let terms = t.resolve_term_ids(tx, terms_and_frequencies).await?;
let mut terms_ids = RoaringTreemap::default();
let mut p = self.postings(tx).await?;
for (term_id, term_freq) in terms {
p.update_posting(tx, term_id, doc_id, term_freq).await?;
if let Some(old_term_ids) = &mut old_term_ids {
old_term_ids.remove(term_id);
}
terms_ids.insert(term_id);
}
// Remove any remaining postings
if let Some(old_term_ids) = old_term_ids {
for old_term_id in old_term_ids {
p.remove_posting(tx, old_term_id, doc_id).await?;
// if the term does not have anymore postings, we can remove the term
if p.count_postings(tx, old_term_id).await? == 0 {
t.remove_term_id(tx, old_term_id).await?;
}
}
}
// Stores the term list for this doc_id
tx.set(term_ids_key, terms_ids.try_to_val()?).await?;
// Update the index state
self.state.total_docs_lengths += doc_length as u128;
if !resolved.was_existing() {
self.state.doc_count += 1;
}
// Update the states
tx.set(self.state_key.clone(), self.state.try_to_val()?).await?;
d.finish(tx).await?;
l.finish(tx).await?;
p.finish(tx).await?;
t.finish(tx).await?;
Ok(())
}
// TODO: This is currently a place holder. It has to be replaced by the analyzer/token/filter logic.
fn extract_sorted_terms_with_frequencies(
input: &str,
) -> Result<(DocLength, HashMap<&str, TermFrequency>), Error> {
let mut doc_length = 0;
let mut terms = HashMap::new();
let mut rest = input;
while !rest.is_empty() {
// Extract the next token
match Self::next_token(rest) {
Ok((remaining_input, token)) => {
if !input.is_empty() {
doc_length += 1;
match terms.entry(token) {
Entry::Vacant(e) => {
e.insert(1);
}
Entry::Occupied(mut e) => {
e.insert(*e.get() + 1);
}
}
}
rest = remaining_input;
}
Err(e) => return Err(AnalyzerError(e.to_string())),
}
}
Ok((doc_length, terms))
}
/// Extracting the next token. The string is left trimmed first.
fn next_token(i: &str) -> IResult<&str, &str> {
let (i, _) = multispace0(i)?;
take_while(|c| c != ' ' && c != '\n' && c != '\t')(i)
}
pub(super) async fn search<V>(
&self,
tx: &mut Transaction,
term: &str,
visitor: &mut V,
) -> Result<(), Error>
where
V: HitVisitor + Send,
{
let terms = self.terms(tx).await?;
if let Some(term_id) = terms.get_term_id(tx, term).await? {
let postings = self.postings(tx).await?;
let term_doc_count = postings.get_doc_count(tx, term_id).await?;
let doc_lengths = self.doc_lengths(tx).await?;
let doc_ids = self.doc_ids(tx).await?;
if term_doc_count > 0 {
let mut scorer = BM25Scorer::new(
visitor,
doc_lengths,
doc_ids,
self.state.total_docs_lengths,
self.state.doc_count,
term_doc_count,
self.bm25.clone(),
);
postings.collect_postings(tx, term_id, &mut scorer).await?;
}
}
Ok(())
}
pub(super) async fn statistics(&self, tx: &mut Transaction) -> Result<Statistics, Error> {
// TODO do parallel execution
Ok(Statistics {
doc_ids: self.doc_ids(tx).await?.statistics(tx).await?,
terms: self.terms(tx).await?.statistics(tx).await?,
doc_lengths: self.doc_lengths(tx).await?.statistics(tx).await?,
postings: self.postings(tx).await?.statistics(tx).await?,
})
}
}
struct BM25Scorer<'a, V>
where
V: HitVisitor,
{
visitor: &'a mut V,
doc_lengths: DocLengths,
doc_ids: DocIds,
average_doc_length: f32,
doc_count: f32,
term_doc_count: f32,
bm25: Bm25Params,
}
#[async_trait]
impl<'a, V> PostingsVisitor for BM25Scorer<'a, V>
where
V: HitVisitor + Send,
{
async fn visit(
&mut self,
tx: &mut Transaction,
doc_id: DocId,
term_frequency: TermFrequency,
) -> Result<(), Error> {
if let Some(doc_key) = self.doc_ids.get_doc_key(tx, doc_id).await? {
let doc_length = self.doc_lengths.get_doc_length(tx, doc_id).await?.unwrap_or(0);
let bm25_score = self.compute_bm25_score(
term_frequency as f32,
self.term_doc_count,
doc_length as f32,
);
self.visitor.visit(tx, doc_key, bm25_score);
}
Ok(())
}
}
impl<'a, V> BM25Scorer<'a, V>
where
V: HitVisitor,
{
fn new(
visitor: &'a mut V,
doc_lengths: DocLengths,
doc_ids: DocIds,
total_docs_length: u128,
doc_count: u64,
term_doc_count: u64,
bm25: Bm25Params,
) -> Self {
Self {
visitor,
doc_lengths,
doc_ids,
average_doc_length: (total_docs_length as f32) / (doc_count as f32),
doc_count: doc_count as f32,
term_doc_count: term_doc_count as f32,
bm25,
}
}
// https://en.wikipedia.org/wiki/Okapi_BM25
// Including the lower-bounding term frequency normalization (2011 CIKM)
fn compute_bm25_score(&self, term_freq: f32, term_doc_count: f32, doc_length: f32) -> f32 {
// (n(qi) + 0.5)
let denominator = term_doc_count + 0.5;
// (N - n(qi) + 0.5)
let numerator = self.doc_count - term_doc_count + 0.5;
let idf = (numerator / denominator).ln();
if idf.is_nan() || idf <= 0.0 {
return 0.0;
}
let tf_prim = 1.0 + term_freq.ln();
// idf * (k1 + 1)
let numerator = idf * (self.bm25.k1 + 1.0) * tf_prim;
// 1 - b + b * (|D| / avgDL)
let denominator = 1.0 - self.bm25.b + self.bm25.b * (doc_length / self.average_doc_length);
// numerator / (k1 * denominator + 1)
numerator / (self.bm25.k1 * denominator + 1.0)
}
}
#[cfg(test)]
mod tests {
use crate::idx::ft::{FtIndex, HitVisitor, Score};
use crate::idx::IndexKeyBase;
use crate::kvs::{Datastore, Key, Transaction};
use std::collections::HashMap;
use test_log::test;
#[test(tokio::test)]
async fn test_ft_index() {
let ds = Datastore::new("memory").await.unwrap();
let default_btree_order = 5;
{
// Add one document
let mut tx = ds.transaction(true, false).await.unwrap();
let mut fti =
FtIndex::new(&mut tx, IndexKeyBase::default(), default_btree_order).await.unwrap();
fti.index_document(&mut tx, "doc1".into(), "hello the world").await.unwrap();
tx.commit().await.unwrap();
}
{
// Add two documents
let mut tx = ds.transaction(true, false).await.unwrap();
let mut fti =
FtIndex::new(&mut tx, IndexKeyBase::default(), default_btree_order).await.unwrap();
fti.index_document(&mut tx, "doc2".into(), "a yellow hello").await.unwrap();
fti.index_document(&mut tx, "doc3".into(), "foo bar").await.unwrap();
tx.commit().await.unwrap();
}
{
let mut tx = ds.transaction(true, false).await.unwrap();
let fti =
FtIndex::new(&mut tx, IndexKeyBase::default(), default_btree_order).await.unwrap();
// Check the statistics
let statistics = fti.statistics(&mut tx).await.unwrap();
assert_eq!(statistics.terms.keys_count, 7);
assert_eq!(statistics.postings.keys_count, 8);
assert_eq!(statistics.doc_ids.keys_count, 3);
assert_eq!(statistics.doc_lengths.keys_count, 3);
// Search & score
let mut visitor = HashHitVisitor::default();
fti.search(&mut tx, "hello", &mut visitor).await.unwrap();
visitor.check(vec![("doc1".into(), 0.0), ("doc2".into(), 0.0)]);
let mut visitor = HashHitVisitor::default();
fti.search(&mut tx, "world", &mut visitor).await.unwrap();
visitor.check(vec![("doc1".into(), 0.4859746)]);
let mut visitor = HashHitVisitor::default();
fti.search(&mut tx, "yellow", &mut visitor).await.unwrap();
visitor.check(vec![("doc2".into(), 0.4859746)]);
let mut visitor = HashHitVisitor::default();
fti.search(&mut tx, "foo", &mut visitor).await.unwrap();
visitor.check(vec![("doc3".into(), 0.56902087)]);
let mut visitor = HashHitVisitor::default();
fti.search(&mut tx, "bar", &mut visitor).await.unwrap();
visitor.check(vec![("doc3".into(), 0.56902087)]);
let mut visitor = HashHitVisitor::default();
fti.search(&mut tx, "dummy", &mut visitor).await.unwrap();
visitor.check(Vec::<(Key, f32)>::new());
}
{
// Reindex one document
let mut tx = ds.transaction(true, false).await.unwrap();
let mut fti =
FtIndex::new(&mut tx, IndexKeyBase::default(), default_btree_order).await.unwrap();
fti.index_document(&mut tx, "doc3".into(), "nobar foo").await.unwrap();
tx.commit().await.unwrap();
// We can still find 'foo'
let mut tx = ds.transaction(false, false).await.unwrap();
let mut visitor = HashHitVisitor::default();
fti.search(&mut tx, "foo", &mut visitor).await.unwrap();
visitor.check(vec![("doc3".into(), 0.56902087)]);
// We can't anymore find 'bar'
let mut visitor = HashHitVisitor::default();
fti.search(&mut tx, "bar", &mut visitor).await.unwrap();
visitor.check(vec![]);
// We can now find 'nobar'
let mut visitor = HashHitVisitor::default();
fti.search(&mut tx, "nobar", &mut visitor).await.unwrap();
visitor.check(vec![("doc3".into(), 0.56902087)]);
}
{
// Remove documents
let mut tx = ds.transaction(true, false).await.unwrap();
let mut fti =
FtIndex::new(&mut tx, IndexKeyBase::default(), default_btree_order).await.unwrap();
fti.remove_document(&mut tx, "doc1".into()).await.unwrap();
fti.remove_document(&mut tx, "doc2".into()).await.unwrap();
fti.remove_document(&mut tx, "doc3".into()).await.unwrap();
tx.commit().await.unwrap();
let mut tx = ds.transaction(false, false).await.unwrap();
let mut visitor = HashHitVisitor::default();
fti.search(&mut tx, "hello", &mut visitor).await.unwrap();
visitor.check(vec![]);
let mut visitor = HashHitVisitor::default();
fti.search(&mut tx, "foo", &mut visitor).await.unwrap();
visitor.check(vec![]);
}
}
#[test(tokio::test)]
async fn test_ft_index_bm_25() {
// The function `extract_sorted_terms_with_frequencies` is non-deterministic.
// the inner structures (BTrees) are built with the same terms and frequencies,
// but the insertion order is different, ending up in different BTree structures.
// Therefore it makes sense to do multiple runs.
for _ in 0..10 {
let ds = Datastore::new("memory").await.unwrap();
let default_btree_order = 5;
{
let mut tx = ds.transaction(true, false).await.unwrap();
let mut fti = FtIndex::new(&mut tx, IndexKeyBase::default(), default_btree_order)
.await
.unwrap();
fti.index_document(
&mut tx,
"doc1".into(),
"the quick brown fox jumped over the lazy dog",
)
.await
.unwrap();
fti.index_document(&mut tx, "doc2".into(), "the fast fox jumped over the lazy dog")
.await
.unwrap();
fti.index_document(&mut tx, "doc3".into(), "the dog sat there and did nothing")
.await
.unwrap();
fti.index_document(&mut tx, "doc4".into(), "the other animals sat there watching")
.await
.unwrap();
tx.commit().await.unwrap();
}
{
let mut tx = ds.transaction(true, false).await.unwrap();
let fti = FtIndex::new(&mut tx, IndexKeyBase::default(), default_btree_order)
.await
.unwrap();
let statistics = fti.statistics(&mut tx).await.unwrap();
assert_eq!(statistics.terms.keys_count, 17);
assert_eq!(statistics.postings.keys_count, 28);
assert_eq!(statistics.doc_ids.keys_count, 4);
assert_eq!(statistics.doc_lengths.keys_count, 4);
let mut visitor = HashHitVisitor::default();
fti.search(&mut tx, "the", &mut visitor).await.unwrap();
visitor.check(vec![
("doc1".into(), 0.0),
("doc2".into(), 0.0),
("doc3".into(), 0.0),
("doc4".into(), 0.0),
]);
let mut visitor = HashHitVisitor::default();
fti.search(&mut tx, "dog", &mut visitor).await.unwrap();
visitor.check(vec![
("doc1".into(), 0.0),
("doc2".into(), 0.0),
("doc3".into(), 0.0),
]);
let mut visitor = HashHitVisitor::default();
fti.search(&mut tx, "fox", &mut visitor).await.unwrap();
visitor.check(vec![("doc1".into(), 0.0), ("doc2".into(), 0.0)]);
let mut visitor = HashHitVisitor::default();
fti.search(&mut tx, "over", &mut visitor).await.unwrap();
visitor.check(vec![("doc1".into(), 0.0), ("doc2".into(), 0.0)]);
let mut visitor = HashHitVisitor::default();
fti.search(&mut tx, "lazy", &mut visitor).await.unwrap();
visitor.check(vec![("doc1".into(), 0.0), ("doc2".into(), 0.0)]);
let mut visitor = HashHitVisitor::default();
fti.search(&mut tx, "jumped", &mut visitor).await.unwrap();
visitor.check(vec![("doc1".into(), 0.0), ("doc2".into(), 0.0)]);
let mut visitor = HashHitVisitor::default();
fti.search(&mut tx, "nothing", &mut visitor).await.unwrap();
visitor.check(vec![("doc3".into(), 0.87105393)]);
let mut visitor = HashHitVisitor::default();
fti.search(&mut tx, "animals", &mut visitor).await.unwrap();
visitor.check(vec![("doc4".into(), 0.92279965)]);
let mut visitor = HashHitVisitor::default();
fti.search(&mut tx, "dummy", &mut visitor).await.unwrap();
visitor.check(Vec::<(Key, f32)>::new());
}
}
}
#[derive(Default)]
pub(super) struct HashHitVisitor {
map: HashMap<Key, Score>,
}
impl HitVisitor for HashHitVisitor {
fn visit(&mut self, _tx: &mut Transaction, doc_key: Key, score: Score) {
self.map.insert(doc_key, score);
}
}
impl HashHitVisitor {
pub(super) fn check(&self, res: Vec<(Key, Score)>) {
assert_eq!(res.len(), self.map.len(), "{:?}", self.map);
for (k, p) in res {
assert_eq!(self.map.get(&k), Some(&p));
}
}
}
}

276
lib/src/idx/ft/postings.rs Normal file
View file

@ -0,0 +1,276 @@
use crate::err::Error;
use crate::idx::bkeys::{KeyVisitor, TrieKeys};
use crate::idx::btree::{BTree, KeyProvider, NodeId, Payload, Statistics};
use crate::idx::ft::docids::DocId;
use crate::idx::ft::terms::TermId;
use crate::idx::{btree, IndexKeyBase, SerdeState};
use crate::key::bf::Bf;
use crate::kvs::{Key, Transaction};
use async_trait::async_trait;
pub(super) type TermFrequency = u64;
pub(super) struct Postings {
state_key: Key,
index_key_base: IndexKeyBase,
btree: BTree<PostingsKeyProvider>,
}
#[async_trait]
pub(super) trait PostingsVisitor {
async fn visit(
&mut self,
tx: &mut Transaction,
doc_id: DocId,
term_frequency: TermFrequency,
) -> Result<(), Error>;
}
impl Postings {
pub(super) async fn new(
tx: &mut Transaction,
index_key_base: IndexKeyBase,
default_btree_order: usize,
) -> Result<Self, Error> {
let keys = PostingsKeyProvider {
index_key_base: index_key_base.clone(),
};
let state_key: Key = keys.get_state_key();
let state: btree::State = if let Some(val) = tx.get(state_key.clone()).await? {
btree::State::try_from_val(val)?
} else {
btree::State::new(default_btree_order)
};
Ok(Self {
state_key,
index_key_base,
btree: BTree::new(keys, state),
})
}
pub(super) async fn update_posting(
&mut self,
tx: &mut Transaction,
term_id: TermId,
doc_id: DocId,
term_freq: TermFrequency,
) -> Result<(), Error> {
let key = self.index_key_base.new_bf_key(term_id, doc_id);
self.btree.insert::<TrieKeys>(tx, key, term_freq).await
}
pub(super) async fn remove_posting(
&mut self,
tx: &mut Transaction,
term_id: TermId,
doc_id: DocId,
) -> Result<Option<TermFrequency>, Error> {
let key = self.index_key_base.new_bf_key(term_id, doc_id);
self.btree.delete::<TrieKeys>(tx, key).await
}
pub(super) async fn get_doc_count(
&self,
tx: &mut Transaction,
term_id: TermId,
) -> Result<u64, Error> {
let prefix_key = self.index_key_base.new_bf_prefix_key(term_id);
let mut counter = PostingsDocCount::default();
self.btree.search_by_prefix::<TrieKeys, _>(tx, &prefix_key, &mut counter).await?;
Ok(counter.doc_count)
}
pub(super) async fn collect_postings<V>(
&self,
tx: &mut Transaction,
term_id: TermId,
visitor: &mut V,
) -> Result<(), Error>
where
V: PostingsVisitor + Send,
{
let prefix_key = self.index_key_base.new_bf_prefix_key(term_id);
let mut key_visitor = PostingsAdapter {
visitor,
};
self.btree.search_by_prefix::<TrieKeys, _>(tx, &prefix_key, &mut key_visitor).await
}
pub(super) async fn count_postings(
&self,
tx: &mut Transaction,
term_id: TermId,
) -> Result<usize, Error> {
let mut counter = PostingCounter::default();
self.collect_postings(tx, term_id, &mut counter).await?;
Ok(counter.count)
}
pub(super) async fn statistics(&self, tx: &mut Transaction) -> Result<Statistics, Error> {
self.btree.statistics::<TrieKeys>(tx).await
}
pub(super) async fn finish(self, tx: &mut Transaction) -> Result<(), Error> {
if self.btree.is_updated() {
tx.set(self.state_key, self.btree.get_state().try_to_val()?).await?;
}
Ok(())
}
}
#[derive(Default)]
struct PostingCounter {
count: usize,
}
#[async_trait]
impl PostingsVisitor for PostingCounter {
async fn visit(
&mut self,
_tx: &mut Transaction,
_doc_id: DocId,
_term_frequency: TermFrequency,
) -> Result<(), Error> {
self.count += 1;
Ok(())
}
}
struct PostingsKeyProvider {
index_key_base: IndexKeyBase,
}
impl KeyProvider for PostingsKeyProvider {
fn get_node_key(&self, node_id: NodeId) -> Key {
self.index_key_base.new_bp_key(Some(node_id))
}
fn get_state_key(&self) -> Key {
self.index_key_base.new_bp_key(None)
}
}
struct PostingsAdapter<'a, V>
where
V: PostingsVisitor,
{
visitor: &'a mut V,
}
#[async_trait]
impl<'a, V> KeyVisitor for PostingsAdapter<'a, V>
where
V: PostingsVisitor + Send,
{
async fn visit(
&mut self,
tx: &mut Transaction,
key: &Key,
payload: Payload,
) -> Result<(), Error> {
let posting_key: Bf = key.into();
self.visitor.visit(tx, posting_key.doc_id, payload).await
}
}
#[derive(Default)]
struct PostingsDocCount {
doc_count: u64,
}
#[async_trait]
impl KeyVisitor for PostingsDocCount {
async fn visit(
&mut self,
_tx: &mut Transaction,
_key: &Key,
_payload: Payload,
) -> Result<(), Error> {
self.doc_count += 1;
Ok(())
}
}
#[cfg(test)]
mod tests {
use crate::err::Error;
use crate::idx::ft::docids::DocId;
use crate::idx::ft::postings::{Postings, PostingsVisitor, TermFrequency};
use crate::idx::IndexKeyBase;
use crate::kvs::{Datastore, Transaction};
use async_trait::async_trait;
use std::collections::HashMap;
use test_log::test;
#[test(tokio::test)]
async fn test_postings() {
const DEFAULT_BTREE_ORDER: usize = 5;
let ds = Datastore::new("memory").await.unwrap();
let mut tx = ds.transaction(true, false).await.unwrap();
// Check empty state
let mut p =
Postings::new(&mut tx, IndexKeyBase::default(), DEFAULT_BTREE_ORDER).await.unwrap();
assert_eq!(p.statistics(&mut tx).await.unwrap().keys_count, 0);
p.update_posting(&mut tx, 1, 2, 3).await.unwrap();
p.update_posting(&mut tx, 1, 4, 5).await.unwrap();
p.finish(&mut tx).await.unwrap();
tx.commit().await.unwrap();
let mut tx = ds.transaction(true, false).await.unwrap();
let mut p =
Postings::new(&mut tx, IndexKeyBase::default(), DEFAULT_BTREE_ORDER).await.unwrap();
assert_eq!(p.statistics(&mut tx).await.unwrap().keys_count, 2);
let mut v = TestPostingVisitor::default();
p.collect_postings(&mut tx, 1, &mut v).await.unwrap();
v.check_len(2, "Postings");
v.check(vec![(2, 3), (4, 5)], "Postings");
// Check removal of doc 2
assert_eq!(p.remove_posting(&mut tx, 1, 2).await.unwrap(), Some(3));
assert_eq!(p.count_postings(&mut tx, 1).await.unwrap(), 1);
// Again the same
assert_eq!(p.remove_posting(&mut tx, 1, 2).await.unwrap(), None);
assert_eq!(p.count_postings(&mut tx, 1).await.unwrap(), 1);
// Remove doc 4
assert_eq!(p.remove_posting(&mut tx, 1, 4).await.unwrap(), Some(5));
assert_eq!(p.count_postings(&mut tx, 1).await.unwrap(), 0);
// The underlying b-tree should be empty now
assert_eq!(p.statistics(&mut tx).await.unwrap().keys_count, 0);
}
#[derive(Default)]
pub(super) struct TestPostingVisitor {
map: HashMap<DocId, TermFrequency>,
}
#[async_trait]
impl PostingsVisitor for TestPostingVisitor {
async fn visit(
&mut self,
_tx: &mut Transaction,
doc_id: DocId,
term_frequency: TermFrequency,
) -> Result<(), Error> {
assert_eq!(self.map.insert(doc_id, term_frequency), None);
Ok(())
}
}
impl TestPostingVisitor {
pub(super) fn check_len(&self, len: usize, info: &str) {
assert_eq!(self.map.len(), len, "len issue: {}", info);
}
pub(super) fn check(&self, res: Vec<(DocId, TermFrequency)>, info: &str) {
self.check_len(res.len(), info);
for (d, f) in res {
assert_eq!(self.map.get(&d), Some(&f));
}
}
}
}

333
lib/src/idx/ft/terms.rs Normal file
View file

@ -0,0 +1,333 @@
use crate::err::Error;
use crate::idx::bkeys::FstKeys;
use crate::idx::btree::{BTree, KeyProvider, NodeId, Statistics};
use crate::idx::ft::postings::TermFrequency;
use crate::idx::{btree, IndexKeyBase, SerdeState};
use crate::kvs::{Key, Transaction};
use roaring::RoaringTreemap;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
pub(crate) type TermId = u64;
pub(super) struct Terms {
state_key: Key,
index_key_base: IndexKeyBase,
btree: BTree<TermsKeyProvider>,
available_ids: Option<RoaringTreemap>,
next_term_id: TermId,
updated: bool,
}
impl Terms {
pub(super) async fn new(
tx: &mut Transaction,
index_key_base: IndexKeyBase,
default_btree_order: usize,
) -> Result<Self, Error> {
let keys = TermsKeyProvider {
index_key_base: index_key_base.clone(),
};
let state_key: Key = keys.get_state_key();
let state: State = if let Some(val) = tx.get(state_key.clone()).await? {
State::try_from_val(val)?
} else {
State::new(default_btree_order)
};
Ok(Self {
state_key,
index_key_base,
btree: BTree::new(keys, state.btree),
available_ids: state.available_ids,
next_term_id: state.next_term_id,
updated: false,
})
}
fn get_next_term_id(&mut self) -> TermId {
// We check first if there is any available id
if let Some(available_ids) = &mut self.available_ids {
if let Some(available_id) = available_ids.iter().next() {
available_ids.remove(available_id);
if available_ids.is_empty() {
self.available_ids = None;
}
return available_id;
}
}
// If not, we use the sequence
let term_id = self.next_term_id;
self.next_term_id += 1;
term_id
}
pub(super) async fn resolve_term_ids(
&mut self,
tx: &mut Transaction,
terms_frequencies: HashMap<&str, TermFrequency>,
) -> Result<HashMap<TermId, TermFrequency>, Error> {
let mut res = HashMap::with_capacity(terms_frequencies.len());
for (term, freq) in terms_frequencies {
res.insert(self.resolve_term_id(tx, term).await?, freq);
}
Ok(res)
}
async fn resolve_term_id(&mut self, tx: &mut Transaction, term: &str) -> Result<TermId, Error> {
let term_key = term.into();
if let Some(term_id) = self.btree.search::<FstKeys>(tx, &term_key).await? {
Ok(term_id)
} else {
let term_id = self.get_next_term_id();
tx.set(self.index_key_base.new_bu_key(term_id), term_key.clone()).await?;
self.btree.insert::<FstKeys>(tx, term_key, term_id).await?;
self.updated = true;
Ok(term_id)
}
}
pub(super) async fn get_term_id(
&self,
tx: &mut Transaction,
term: &str,
) -> Result<Option<TermId>, Error> {
self.btree.search::<FstKeys>(tx, &term.into()).await
}
pub(super) async fn remove_term_id(
&mut self,
tx: &mut Transaction,
term_id: TermId,
) -> Result<(), Error> {
let term_id_key = self.index_key_base.new_bu_key(term_id);
if let Some(term_key) = tx.get(term_id_key.clone()).await? {
debug!("Delete In {}", String::from_utf8(term_key.clone()).unwrap());
self.btree.delete::<FstKeys>(tx, term_key.clone()).await?;
debug!("Delete Out {}", String::from_utf8(term_key.clone()).unwrap());
tx.del(term_id_key).await?;
if let Some(available_ids) = &mut self.available_ids {
available_ids.insert(term_id);
} else {
let mut available_ids = RoaringTreemap::new();
available_ids.insert(term_id);
self.available_ids = Some(available_ids);
}
self.updated = true;
}
Ok(())
}
pub(super) async fn statistics(&self, tx: &mut Transaction) -> Result<Statistics, Error> {
self.btree.statistics::<FstKeys>(tx).await
}
pub(super) async fn finish(self, tx: &mut Transaction) -> Result<(), Error> {
if self.updated || self.btree.is_updated() {
let state = State {
btree: self.btree.get_state().clone(),
available_ids: self.available_ids,
next_term_id: self.next_term_id,
};
tx.set(self.state_key, state.try_to_val()?).await?;
}
Ok(())
}
}
#[derive(Serialize, Deserialize)]
struct State {
btree: btree::State,
available_ids: Option<RoaringTreemap>,
next_term_id: TermId,
}
impl SerdeState for State {}
impl State {
fn new(default_btree_order: usize) -> Self {
Self {
btree: btree::State::new(default_btree_order),
available_ids: None,
next_term_id: 0,
}
}
}
struct TermsKeyProvider {
index_key_base: IndexKeyBase,
}
impl KeyProvider for TermsKeyProvider {
fn get_node_key(&self, node_id: NodeId) -> Key {
self.index_key_base.new_bt_key(Some(node_id))
}
fn get_state_key(&self) -> Key {
self.index_key_base.new_bt_key(None)
}
}
#[cfg(test)]
mod tests {
use crate::idx::ft::postings::TermFrequency;
use crate::idx::ft::terms::Terms;
use crate::idx::IndexKeyBase;
use crate::kvs::Datastore;
use rand::{thread_rng, Rng};
use std::collections::{HashMap, HashSet};
fn random_term(key_length: usize) -> String {
thread_rng()
.sample_iter(&rand::distributions::Alphanumeric)
.take(key_length)
.map(char::from)
.collect()
}
fn unique_terms(key_length: usize, count: usize) -> HashSet<String> {
let mut set = HashSet::new();
while set.len() < count {
set.insert(random_term(key_length));
}
set
}
#[tokio::test]
async fn test_resolve_terms() {
const BTREE_ORDER: usize = 7;
let idx = IndexKeyBase::default();
let ds = Datastore::new("memory").await.unwrap();
let mut tx = ds.transaction(true, false).await.unwrap();
let t = Terms::new(&mut tx, idx.clone(), BTREE_ORDER).await.unwrap();
t.finish(&mut tx).await.unwrap();
tx.commit().await.unwrap();
// Resolve a first term
let mut tx = ds.transaction(true, false).await.unwrap();
let mut t = Terms::new(&mut tx, idx.clone(), BTREE_ORDER).await.unwrap();
let res = t.resolve_term_ids(&mut tx, HashMap::from([("C", 103)])).await.unwrap();
assert_eq!(t.statistics(&mut tx).await.unwrap().keys_count, 1);
t.finish(&mut tx).await.unwrap();
tx.commit().await.unwrap();
assert_eq!(res, HashMap::from([(0, 103)]));
// Resolve a second term
let mut tx = ds.transaction(true, false).await.unwrap();
let mut t = Terms::new(&mut tx, idx.clone(), BTREE_ORDER).await.unwrap();
let res = t.resolve_term_ids(&mut tx, HashMap::from([("D", 104)])).await.unwrap();
assert_eq!(t.statistics(&mut tx).await.unwrap().keys_count, 2);
t.finish(&mut tx).await.unwrap();
tx.commit().await.unwrap();
assert_eq!(res, HashMap::from([(1, 104)]));
// Resolve two existing terms with new frequencies
let mut tx = ds.transaction(true, false).await.unwrap();
let mut t = Terms::new(&mut tx, idx.clone(), BTREE_ORDER).await.unwrap();
let res =
t.resolve_term_ids(&mut tx, HashMap::from([("C", 113), ("D", 114)])).await.unwrap();
assert_eq!(t.statistics(&mut tx).await.unwrap().keys_count, 2);
t.finish(&mut tx).await.unwrap();
tx.commit().await.unwrap();
assert_eq!(res, HashMap::from([(0, 113), (1, 114)]));
// Resolve one existing terms and two new terms
let mut tx = ds.transaction(true, false).await.unwrap();
let mut t = Terms::new(&mut tx, idx.clone(), BTREE_ORDER).await.unwrap();
let res = t
.resolve_term_ids(&mut tx, HashMap::from([("A", 101), ("C", 123), ("E", 105)]))
.await
.unwrap();
assert_eq!(t.statistics(&mut tx).await.unwrap().keys_count, 4);
t.finish(&mut tx).await.unwrap();
tx.commit().await.unwrap();
assert!(
res.eq(&HashMap::from([(3, 101), (0, 123), (2, 105)]))
|| res.eq(&HashMap::from([(2, 101), (0, 123), (3, 105)]))
);
}
#[tokio::test]
async fn test_deletion() {
const BTREE_ORDER: usize = 7;
let idx = IndexKeyBase::default();
let ds = Datastore::new("memory").await.unwrap();
let mut tx = ds.transaction(true, false).await.unwrap();
let mut t = Terms::new(&mut tx, idx.clone(), BTREE_ORDER).await.unwrap();
// Check removing an non-existing term id returns None
assert!(t.remove_term_id(&mut tx, 0).await.is_ok());
// Create few terms
t.resolve_term_ids(&mut tx, HashMap::from([("A", 101), ("C", 123), ("E", 105)]))
.await
.unwrap();
for term in ["A", "C", "E"] {
let term_id = t.get_term_id(&mut tx, term).await.unwrap();
if let Some(term_id) = term_id {
t.remove_term_id(&mut tx, term_id).await.unwrap();
assert_eq!(t.get_term_id(&mut tx, term).await.unwrap(), None);
} else {
panic!("Term ID not found: {}", term);
}
}
// Check id recycling
let res =
t.resolve_term_ids(&mut tx, HashMap::from([("B", 102), ("D", 104)])).await.unwrap();
assert!(
res.eq(&HashMap::from([(0, 102), (1, 104)]))
|| res.eq(&HashMap::from([(0, 104), (1, 102)]))
);
t.finish(&mut tx).await.unwrap();
tx.commit().await.unwrap();
}
fn random_term_freq_vec(term_count: usize) -> Vec<(String, TermFrequency)> {
let mut i = 1;
let mut vec = Vec::with_capacity(term_count);
for term in unique_terms(5, term_count) {
vec.push((term, i));
i += 1;
}
vec
}
#[tokio::test]
async fn test_resolve_100_docs_with_50_words_one_by_one() {
let ds = Datastore::new("memory").await.unwrap();
for _ in 0..100 {
let mut tx = ds.transaction(true, false).await.unwrap();
let mut t = Terms::new(&mut tx, IndexKeyBase::default(), 100).await.unwrap();
let terms_string = random_term_freq_vec(50);
let terms_str: HashMap<&str, TermFrequency> =
terms_string.iter().map(|(t, f)| (t.as_str(), *f)).collect();
t.resolve_term_ids(&mut tx, terms_str).await.unwrap();
t.finish(&mut tx).await.unwrap();
tx.commit().await.unwrap();
}
}
#[tokio::test]
async fn test_resolve_100_docs_with_50_words_batch_of_10() {
let ds = Datastore::new("memory").await.unwrap();
for _ in 0..10 {
let mut tx = ds.transaction(true, false).await.unwrap();
let mut t = Terms::new(&mut tx, IndexKeyBase::default(), 100).await.unwrap();
for _ in 0..10 {
let terms_string = random_term_freq_vec(50);
let terms_str: HashMap<&str, TermFrequency> =
terms_string.iter().map(|(t, f)| (t.as_str(), *f)).collect();
t.resolve_term_ids(&mut tx, terms_str).await.unwrap();
}
t.finish(&mut tx).await.unwrap();
tx.commit().await.unwrap();
}
}
}

160
lib/src/idx/mod.rs Normal file
View file

@ -0,0 +1,160 @@
mod bkeys;
pub(crate) mod btree;
pub(crate) mod ft;
use crate::dbs::Options;
use crate::err::Error;
use crate::idx::btree::NodeId;
use crate::idx::ft::docids::DocId;
use crate::idx::ft::terms::TermId;
use crate::key::bd::Bd;
use crate::key::bf::{Bf, BfPrefix};
use crate::key::bi::Bi;
use crate::key::bk::Bk;
use crate::key::bl::Bl;
use crate::key::bp::Bp;
use crate::key::bs::Bs;
use crate::key::bt::Bt;
use crate::key::bu::Bu;
use crate::kvs::{Key, Val};
use crate::sql::statements::DefineIndexStatement;
use roaring::RoaringTreemap;
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
#[derive(Default, Debug, Clone, Serialize, Deserialize)]
pub(crate) struct IndexKeyBase {
ns: String,
db: String,
tb: String,
ix: String,
}
impl IndexKeyBase {
pub(crate) fn new(opt: &Options, ix: &DefineIndexStatement) -> Self {
Self {
ns: opt.ns().to_string(),
db: opt.db().to_string(),
tb: ix.what.to_string(),
ix: ix.name.to_string(),
}
}
fn new_bd_key(&self, node_id: Option<NodeId>) -> Key {
Bd::new(self.ns.as_str(), self.db.as_str(), self.tb.as_str(), self.ix.as_str(), node_id)
.into()
}
fn new_bi_key(&self, doc_id: DocId) -> Key {
Bi::new(self.ns.as_str(), self.db.as_str(), self.tb.as_str(), self.ix.as_str(), doc_id)
.into()
}
fn new_bk_key(&self, doc_id: DocId) -> Key {
Bk::new(self.ns.as_str(), self.db.as_str(), self.tb.as_str(), self.ix.as_str(), doc_id)
.into()
}
fn new_bl_key(&self, node_id: Option<NodeId>) -> Key {
Bl::new(self.ns.as_str(), self.db.as_str(), self.tb.as_str(), self.ix.as_str(), node_id)
.into()
}
fn new_bp_key(&self, node_id: Option<NodeId>) -> Key {
Bp::new(self.ns.as_str(), self.db.as_str(), self.tb.as_str(), self.ix.as_str(), node_id)
.into()
}
fn new_bf_key(&self, term_id: TermId, doc_id: DocId) -> Key {
Bf::new(
self.ns.as_str(),
self.db.as_str(),
self.tb.as_str(),
self.ix.as_str(),
term_id,
doc_id,
)
.into()
}
fn new_bf_prefix_key(&self, term_id: TermId) -> Key {
BfPrefix::new(
self.ns.as_str(),
self.db.as_str(),
self.tb.as_str(),
self.ix.as_str(),
term_id,
)
.into()
}
fn new_bt_key(&self, node_id: Option<NodeId>) -> Key {
Bt::new(self.ns.as_str(), self.db.as_str(), self.tb.as_str(), self.ix.as_str(), node_id)
.into()
}
fn new_bs_key(&self) -> Key {
Bs::new(self.ns.as_str(), self.db.as_str(), self.tb.as_str(), self.ix.as_str()).into()
}
fn new_bu_key(&self, term_id: TermId) -> Key {
Bu::new(self.ns.as_str(), self.db.as_str(), self.tb.as_str(), self.ix.as_str(), term_id)
.into()
}
}
/// This trait provides `bincode` based default implementations for serialization/deserialization
trait SerdeState
where
Self: Sized + Serialize + DeserializeOwned,
{
fn try_to_val(&self) -> Result<Val, Error> {
Ok(bincode::serialize(self)?)
}
fn try_from_val(val: Val) -> Result<Self, Error> {
Ok(bincode::deserialize(&val)?)
}
}
impl SerdeState for RoaringTreemap {}
#[cfg(test)]
mod tests {
use crate::err::Error;
use crate::idx::bkeys::KeyVisitor;
use crate::idx::btree::Payload;
use crate::kvs::{Key, Transaction};
use async_trait::async_trait;
use std::collections::HashMap;
#[derive(Default)]
pub(super) struct HashVisitor {
map: HashMap<Key, Payload>,
}
#[async_trait]
impl KeyVisitor for HashVisitor {
async fn visit(
&mut self,
_tx: &mut Transaction,
key: &Key,
payload: Payload,
) -> Result<(), Error> {
self.map.insert(key.clone(), payload);
Ok(())
}
}
impl HashVisitor {
pub(super) fn check_len(&self, len: usize, info: &str) {
assert_eq!(self.map.len(), len, "len issue: {}", info);
}
pub(super) fn check(&self, res: Vec<(Key, Payload)>, info: &str) {
self.check_len(res.len(), info);
for (k, p) in res {
assert_eq!(self.map.get(&k), Some(&p));
}
}
}
}

65
lib/src/key/bd.rs Normal file
View file

@ -0,0 +1,65 @@
use crate::idx::btree::NodeId;
use derive::Key;
use serde::{Deserialize, Serialize};
#[derive(Clone, Debug, Eq, PartialEq, PartialOrd, Serialize, Deserialize, Key)]
pub struct Bd<'a> {
__: u8,
_a: u8,
pub ns: &'a str,
_b: u8,
pub db: &'a str,
_c: u8,
pub tb: &'a str,
_d: u8,
_e: u8,
_f: u8,
pub ix: &'a str,
_g: u8,
pub node_id: Option<NodeId>,
}
impl<'a> Bd<'a> {
pub fn new(
ns: &'a str,
db: &'a str,
tb: &'a str,
ix: &'a str,
node_id: Option<NodeId>,
) -> Self {
Self {
__: 0x2f, // /
_a: 0x2a, // *
ns,
_b: 0x2a, // *
db,
_c: 0x2a, // *
tb,
_d: 0x21, // !
_e: 0x62, // b
_f: 0x64, // d
ix,
_g: 0x2a, // *
node_id,
}
}
}
#[cfg(test)]
mod tests {
#[test]
fn key() {
use super::*;
#[rustfmt::skip]
let val = Bd::new(
"test",
"test",
"test",
"test",
Some(7)
);
let enc = Bd::encode(&val).unwrap();
let dec = Bd::decode(&enc).unwrap();
assert_eq!(val, dec);
}
}

123
lib/src/key/bf.rs Normal file
View file

@ -0,0 +1,123 @@
use crate::idx::ft::docids::DocId;
use crate::idx::ft::terms::TermId;
use derive::Key;
use serde::{Deserialize, Serialize};
#[derive(Clone, Debug, Eq, PartialEq, PartialOrd, Serialize, Deserialize, Key)]
pub struct Bf<'a> {
__: u8,
_a: u8,
pub ns: &'a str,
_b: u8,
pub db: &'a str,
_c: u8,
pub tb: &'a str,
_d: u8,
_e: u8,
_f: u8,
pub ix: &'a str,
_g: u8,
pub term_id: TermId,
pub doc_id: DocId,
}
impl<'a> Bf<'a> {
pub fn new(
ns: &'a str,
db: &'a str,
tb: &'a str,
ix: &'a str,
term_id: TermId,
doc_id: DocId,
) -> Self {
Self {
__: 0x2f, // /
_a: 0x2a, // *
ns,
_b: 0x2a, // *
db,
_c: 0x2a, // *
tb,
_d: 0x21, // !
_e: 0x62, // b
_f: 0x78, // x
ix,
_g: 0x2a, // *
term_id,
doc_id,
}
}
}
#[derive(Clone, Debug, Eq, PartialEq, PartialOrd, Serialize, Deserialize, Key)]
pub struct BfPrefix<'a> {
__: u8,
_a: u8,
pub ns: &'a str,
_b: u8,
pub db: &'a str,
_c: u8,
pub tb: &'a str,
_d: u8,
_e: u8,
_f: u8,
pub ix: &'a str,
_g: u8,
term_id: TermId,
}
impl<'a> BfPrefix<'a> {
pub fn new(ns: &'a str, db: &'a str, tb: &'a str, ix: &'a str, term_id: TermId) -> Self {
Self {
__: 0x2f, // /
_a: 0x2a, // *
ns,
_b: 0x2a, // *
db,
_c: 0x2a, // *
tb,
_d: 0x21, // !
_e: 0x62, // b
_f: 0x78, // x
ix,
_g: 0x2a, // *
term_id,
}
}
}
#[cfg(test)]
mod tests {
#[test]
fn key() {
use super::*;
#[rustfmt::skip]
let val = Bf::new(
"test",
"test",
"test",
"test",
1,
2
);
let enc = Bf::encode(&val).unwrap();
let dec = Bf::decode(&enc).unwrap();
assert_eq!(val, dec);
}
#[test]
fn key_prefix() {
use super::*;
#[rustfmt::skip]
let val = BfPrefix::new(
"test",
"test",
"test",
"test",
3
);
let enc = BfPrefix::encode(&val).unwrap();
let dec = BfPrefix::decode(&enc).unwrap();
assert_eq!(val, dec);
}
}

59
lib/src/key/bi.rs Normal file
View file

@ -0,0 +1,59 @@
use crate::idx::btree::NodeId;
use derive::Key;
use serde::{Deserialize, Serialize};
#[derive(Clone, Debug, Eq, PartialEq, PartialOrd, Serialize, Deserialize, Key)]
pub struct Bi<'a> {
__: u8,
_a: u8,
pub ns: &'a str,
_b: u8,
pub db: &'a str,
_c: u8,
pub tb: &'a str,
_d: u8,
_e: u8,
_f: u8,
pub ix: &'a str,
_g: u8,
pub node_id: NodeId,
}
impl<'a> Bi<'a> {
pub fn new(ns: &'a str, db: &'a str, tb: &'a str, ix: &'a str, node_id: NodeId) -> Self {
Bi {
__: 0x2f, // /
_a: 0x2a, // *
ns,
_b: 0x2a, // *
db,
_c: 0x2a, // *
tb,
_d: 0x21, // !
_e: 0x62, // b
_f: 0x69, // i
ix,
_g: 0x2a, // *
node_id,
}
}
}
#[cfg(test)]
mod tests {
#[test]
fn key() {
use super::*;
#[rustfmt::skip]
let val = Bi::new(
"test",
"test",
"test",
"test",
7
);
let enc = Bi::encode(&val).unwrap();
let dec = Bi::decode(&enc).unwrap();
assert_eq!(val, dec);
}
}

59
lib/src/key/bk.rs Normal file
View file

@ -0,0 +1,59 @@
use crate::idx::ft::docids::DocId;
use derive::Key;
use serde::{Deserialize, Serialize};
#[derive(Clone, Debug, Eq, PartialEq, PartialOrd, Serialize, Deserialize, Key)]
pub struct Bk<'a> {
__: u8,
_a: u8,
pub ns: &'a str,
_b: u8,
pub db: &'a str,
_c: u8,
pub tb: &'a str,
_d: u8,
_e: u8,
_f: u8,
pub ix: &'a str,
_g: u8,
pub doc_id: DocId,
}
impl<'a> Bk<'a> {
pub fn new(ns: &'a str, db: &'a str, tb: &'a str, ix: &'a str, doc_id: DocId) -> Self {
Self {
__: 0x2f, // /
_a: 0x2a, // *
ns,
_b: 0x2a, // *
db,
_c: 0x2a, // *
tb,
_d: 0x21, // !
_e: 0x62, // b
_f: 0x6b, // k
ix,
_g: 0x2a, // *
doc_id,
}
}
}
#[cfg(test)]
mod tests {
#[test]
fn key() {
use super::*;
#[rustfmt::skip]
let val = Bk::new(
"test",
"test",
"test",
"test",
7
);
let enc = Bk::encode(&val).unwrap();
let dec = Bk::decode(&enc).unwrap();
assert_eq!(val, dec);
}
}

65
lib/src/key/bl.rs Normal file
View file

@ -0,0 +1,65 @@
use crate::idx::btree::NodeId;
use derive::Key;
use serde::{Deserialize, Serialize};
#[derive(Clone, Debug, Eq, PartialEq, PartialOrd, Serialize, Deserialize, Key)]
pub struct Bl<'a> {
__: u8,
_a: u8,
pub ns: &'a str,
_b: u8,
pub db: &'a str,
_c: u8,
pub tb: &'a str,
_d: u8,
_e: u8,
_f: u8,
pub ix: &'a str,
_g: u8,
pub node_id: Option<NodeId>,
}
impl<'a> Bl<'a> {
pub fn new(
ns: &'a str,
db: &'a str,
tb: &'a str,
ix: &'a str,
node_id: Option<NodeId>,
) -> Self {
Self {
__: 0x2f, // /
_a: 0x2a, // *
ns,
_b: 0x2a, // *
db,
_c: 0x2a, // *
tb,
_d: 0x21, // !
_e: 0x62, // b
_f: 0x6c, // l
ix,
_g: 0x2a, // *
node_id,
}
}
}
#[cfg(test)]
mod tests {
#[test]
fn key() {
use super::*;
#[rustfmt::skip]
let val = Bl::new(
"test",
"test",
"test",
"test",
Some(7)
);
let enc = Bl::encode(&val).unwrap();
let dec = Bl::decode(&enc).unwrap();
assert_eq!(val, dec);
}
}

65
lib/src/key/bp.rs Normal file
View file

@ -0,0 +1,65 @@
use crate::idx::btree::NodeId;
use derive::Key;
use serde::{Deserialize, Serialize};
#[derive(Clone, Debug, Eq, PartialEq, PartialOrd, Serialize, Deserialize, Key)]
pub struct Bp<'a> {
__: u8,
_a: u8,
pub ns: &'a str,
_b: u8,
pub db: &'a str,
_c: u8,
pub tb: &'a str,
_d: u8,
_e: u8,
_f: u8,
pub ix: &'a str,
_g: u8,
pub node_id: Option<NodeId>,
}
impl<'a> Bp<'a> {
pub fn new(
ns: &'a str,
db: &'a str,
tb: &'a str,
ix: &'a str,
node_id: Option<NodeId>,
) -> Self {
Self {
__: 0x2f, // /
_a: 0x2a, // *
ns,
_b: 0x2a, // *
db,
_c: 0x2a, // *
tb,
_d: 0x21, // !
_e: 0x62, // b
_f: 0x70, // p
ix,
_g: 0x2a, // *
node_id,
}
}
}
#[cfg(test)]
mod tests {
#[test]
fn key() {
use super::*;
#[rustfmt::skip]
let val = Bp::new(
"test",
"test",
"test",
"test",
Some(7)
);
let enc = Bp::encode(&val).unwrap();
let dec = Bp::decode(&enc).unwrap();
assert_eq!(val, dec);
}
}

53
lib/src/key/bs.rs Normal file
View file

@ -0,0 +1,53 @@
use derive::Key;
use serde::{Deserialize, Serialize};
#[derive(Clone, Debug, Eq, PartialEq, PartialOrd, Serialize, Deserialize, Key)]
pub struct Bs<'a> {
__: u8,
_a: u8,
pub ns: &'a str,
_b: u8,
pub db: &'a str,
_c: u8,
pub tb: &'a str,
_d: u8,
_e: u8,
_f: u8,
pub ix: &'a str,
}
impl<'a> Bs<'a> {
pub fn new(ns: &'a str, db: &'a str, tb: &'a str, ix: &'a str) -> Self {
Bs {
__: 0x2f, // /
_a: 0x2a, // *
ns,
_b: 0x2a, // *
db,
_c: 0x2a, // *
tb,
_d: 0x21, // !
_e: 0x62, // b
_f: 0x73, // s,
ix,
}
}
}
#[cfg(test)]
mod tests {
#[test]
fn key() {
use super::*;
#[rustfmt::skip]
let val = Bs::new(
"test",
"test",
"test",
"test",
);
let enc = Bs::encode(&val).unwrap();
let dec = Bs::decode(&enc).unwrap();
assert_eq!(val, dec);
}
}

65
lib/src/key/bt.rs Normal file
View file

@ -0,0 +1,65 @@
use crate::idx::btree::NodeId;
use derive::Key;
use serde::{Deserialize, Serialize};
#[derive(Clone, Debug, Eq, PartialEq, PartialOrd, Serialize, Deserialize, Key)]
pub struct Bt<'a> {
__: u8,
_a: u8,
pub ns: &'a str,
_b: u8,
pub db: &'a str,
_c: u8,
pub tb: &'a str,
_d: u8,
_e: u8,
_f: u8,
pub ix: &'a str,
_g: u8,
pub node_id: Option<NodeId>,
}
impl<'a> Bt<'a> {
pub fn new(
ns: &'a str,
db: &'a str,
tb: &'a str,
ix: &'a str,
node_id: Option<NodeId>,
) -> Self {
Self {
__: 0x2f, // /
_a: 0x2a, // *
ns,
_b: 0x2a, // *
db,
_c: 0x2a, // *
tb,
_d: 0x21, // !
_e: 0x62, // b
_f: 0x74, // t
ix,
_g: 0x2a, // *
node_id,
}
}
}
#[cfg(test)]
mod tests {
#[test]
fn key() {
use super::*;
#[rustfmt::skip]
let val = Bt::new(
"test",
"test",
"test",
"test",
Some(7)
);
let enc = Bt::encode(&val).unwrap();
let dec = Bt::decode(&enc).unwrap();
assert_eq!(val, dec);
}
}

59
lib/src/key/bu.rs Normal file
View file

@ -0,0 +1,59 @@
use crate::idx::ft::terms::TermId;
use derive::Key;
use serde::{Deserialize, Serialize};
#[derive(Clone, Debug, Eq, PartialEq, PartialOrd, Serialize, Deserialize, Key)]
pub struct Bu<'a> {
__: u8,
_a: u8,
pub ns: &'a str,
_b: u8,
pub db: &'a str,
_c: u8,
pub tb: &'a str,
_d: u8,
_e: u8,
_f: u8,
pub ix: &'a str,
_g: u8,
pub term_id: TermId,
}
impl<'a> Bu<'a> {
pub fn new(ns: &'a str, db: &'a str, tb: &'a str, ix: &'a str, term_id: TermId) -> Self {
Self {
__: 0x2f, // /
_a: 0x2a, // *
ns,
_b: 0x2a, // *
db,
_c: 0x2a, // *
tb,
_d: 0x21, // !
_e: 0x62, // b
_f: 0x75, // u
ix,
_g: 0x2a, // *
term_id,
}
}
}
#[cfg(test)]
mod tests {
#[test]
fn key() {
use super::*;
#[rustfmt::skip]
let val = Bu::new(
"test",
"test",
"test",
"test",
7
);
let enc = Bu::encode(&val).unwrap();
let dec = Bu::decode(&enc).unwrap();
assert_eq!(val, dec);
}
}

View file

@ -19,6 +19,8 @@
/// Scope /*{ns}*{db}±{sc}
/// ST /*{ns}*{db}±{sc}!st{tk}
///
/// AZ /*{ns}*{db}!az{az}
///
/// Table /*{ns}*{db}*{tb}
/// EV /*{ns}*{db}*{tb}!ev{ev}
/// FD /*{ns}*{db}*{tb}!fd{fd}
@ -32,7 +34,20 @@
///
/// Index /*{ns}*{db}*{tb}¤{ix}{fd}{id}
///
/// BD /*{ns}*{db}*{tb}¤{ix}{bd}{id}
/// BL /*{ns}*{db}*{tb}¤{ix}{bl}{id}
/// BP /*{ns}*{db}*{tb}¤{ix}{bp}{id}
/// BT /*{ns}*{db}*{tb}¤{ix}{bt}{id}
pub mod az; // Stores a DEFINE ANALYZER config definition
pub mod bd; // Stores BTree nodes for doc ids
pub mod bf; // Stores Term/Doc frequency
pub mod bi; // Stores doc keys for doc_ids
pub mod bk; // Stores the term list for doc_ids
pub mod bl; // Stores BTree nodes for doc lengths
pub mod bp; // Stores BTree nodes for postings
pub mod bs; // Stores FullText index states
pub mod bt; // Stores BTree nodes for terms
pub mod bu; // Stores terms for term_ids
pub mod database; // Stores the key prefix for all keys under a database
pub mod db; // Stores a DEFINE DATABASE config definition
pub mod dl; // Stores a DEFINE LOGIN ON DATABASE config definition
@ -57,7 +72,7 @@ pub mod scope; // Stores the key prefix for all keys under a scope
pub mod st; // Stores a DEFINE TOKEN ON SCOPE config definition
pub mod table; // Stores the key prefix for all keys under a table
pub mod tb; // Stores a DEFINE TABLE config definition
pub mod thing; // Stores a record id
pub mod thing;
const CHAR_PATH: u8 = 0xb1; // ±
const CHAR_INDEX: u8 = 0xa4; // ¤

View file

@ -123,6 +123,8 @@ pub mod env;
#[doc(hidden)]
pub mod err;
#[doc(hidden)]
pub mod idx;
#[doc(hidden)]
pub mod kvs;
#[doc(inline)]

85
lib/src/sql/index.rs Normal file
View file

@ -0,0 +1,85 @@
use crate::sql::comment::{mightbespace, shouldbespace};
use crate::sql::error::IResult;
use crate::sql::ident::{ident, Ident};
use crate::sql::scoring::{scoring, Scoring};
use nom::branch::alt;
use nom::bytes::complete::{tag, tag_no_case};
use nom::combinator::map;
use serde::{Deserialize, Serialize};
use std::fmt;
#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Hash)]
pub enum Index {
/// (Basic) non unique
Idx,
/// Unique index
Uniq,
/// Index with Full-Text search capabilities
Search {
az: Ident,
hl: bool,
sc: Scoring,
},
}
impl Default for Index {
fn default() -> Self {
Self::Idx
}
}
impl fmt::Display for Index {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
Self::Idx => Ok(()),
Self::Uniq => f.write_str("UNIQUE"),
Self::Search {
az,
hl,
sc,
} => {
write!(f, "SEARCH {} {}", az, sc)?;
if *hl {
f.write_str(" HIGHLIGHTS")?
}
Ok(())
}
}
}
}
pub fn index(i: &str) -> IResult<&str, Index> {
alt((unique, search, non_unique))(i)
}
pub fn non_unique(i: &str) -> IResult<&str, Index> {
let (i, _) = tag("")(i)?;
Ok((i, Index::Idx))
}
pub fn unique(i: &str) -> IResult<&str, Index> {
let (i, _) = tag_no_case("UNIQUE")(i)?;
Ok((i, Index::Uniq))
}
pub fn highlights(i: &str) -> IResult<&str, bool> {
let (i, _) = mightbespace(i)?;
alt((map(tag("HIGHLIGHTS"), |_| true), map(tag(""), |_| false)))(i)
}
pub fn search(i: &str) -> IResult<&str, Index> {
let (i, _) = tag_no_case("SEARCH")(i)?;
let (i, _) = shouldbespace(i)?;
let (i, az) = ident(i)?;
let (i, _) = shouldbespace(i)?;
let (i, sc) = scoring(i)?;
let (i, hl) = highlights(i)?;
Ok((
i,
Index::Search {
az,
sc,
hl,
},
))
}

View file

@ -30,6 +30,7 @@ pub(crate) mod group;
pub(crate) mod id;
pub(crate) mod ident;
pub(crate) mod idiom;
pub(crate) mod index;
pub(crate) mod kind;
pub(crate) mod language;
pub(crate) mod limit;
@ -48,6 +49,7 @@ pub(crate) mod permission;
pub(crate) mod query;
pub(crate) mod range;
pub(crate) mod regex;
pub(crate) mod scoring;
pub(crate) mod script;
pub(crate) mod special;
pub(crate) mod split;

View file

@ -5,9 +5,11 @@ use nom::branch::alt;
use nom::bytes::complete::tag;
use nom::bytes::complete::tag_no_case;
use nom::character::complete::char;
use nom::combinator::map;
use nom::character::complete::u8 as uint8;
use nom::combinator::{map, opt};
use serde::{Deserialize, Serialize};
use std::fmt;
use std::fmt::Write;
#[derive(Clone, Debug, Eq, PartialEq, PartialOrd, Serialize, Deserialize, Hash)]
pub enum Operator {
@ -32,10 +34,11 @@ pub enum Operator {
AllEqual, // *=
AnyEqual, // ?=
//
Like, // ~
NotLike, // !~
AllLike, // *~
AnyLike, // ?~
Like, // ~
NotLike, // !~
AllLike, // *~
AnyLike, // ?~
Matches(Option<u8>), // @{ref}@
//
LessThan, // <
LessThanOrEqual, // <=
@ -82,45 +85,52 @@ impl Operator {
impl fmt::Display for Operator {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.write_str(match self {
Self::Or => "OR",
Self::And => "AND",
Self::Tco => "?:",
Self::Nco => "??",
Self::Add => "+",
Self::Sub => "-",
Self::Mul => "*",
Self::Div => "/",
Self::Pow => "**",
Self::Inc => "+=",
Self::Dec => "-=",
Self::Ext => "+?=",
Self::Equal => "=",
Self::Exact => "==",
Self::NotEqual => "!=",
Self::AllEqual => "*=",
Self::AnyEqual => "?=",
Self::Like => "~",
Self::NotLike => "!~",
Self::AllLike => "*~",
Self::AnyLike => "?~",
Self::LessThan => "<",
Self::LessThanOrEqual => "<=",
Self::MoreThan => ">",
Self::MoreThanOrEqual => ">=",
Self::Contain => "CONTAINS",
Self::NotContain => "CONTAINSNOT",
Self::ContainAll => "CONTAINSALL",
Self::ContainAny => "CONTAINSANY",
Self::ContainNone => "CONTAINSNONE",
Self::Inside => "INSIDE",
Self::NotInside => "NOTINSIDE",
Self::AllInside => "ALLINSIDE",
Self::AnyInside => "ANYINSIDE",
Self::NoneInside => "NONEINSIDE",
Self::Outside => "OUTSIDE",
Self::Intersects => "INTERSECTS",
})
match self {
Self::Or => f.write_str("OR"),
Self::And => f.write_str("AND"),
Self::Tco => f.write_str("?:"),
Self::Nco => f.write_str("??"),
Self::Add => f.write_str("+"),
Self::Sub => f.write_char('-'),
Self::Mul => f.write_char('*'),
Self::Div => f.write_char('/'),
Self::Pow => f.write_str("**"),
Self::Inc => f.write_str("+="),
Self::Dec => f.write_str("-="),
Self::Ext => f.write_str("+?="),
Self::Equal => f.write_char('='),
Self::Exact => f.write_str("=="),
Self::NotEqual => f.write_str("!="),
Self::AllEqual => f.write_str("*="),
Self::AnyEqual => f.write_str("?="),
Self::Like => f.write_char('~'),
Self::NotLike => f.write_str("!~"),
Self::AllLike => f.write_str("*~"),
Self::AnyLike => f.write_str("?~"),
Self::LessThan => f.write_char('<'),
Self::LessThanOrEqual => f.write_str("<="),
Self::MoreThan => f.write_char('>'),
Self::MoreThanOrEqual => f.write_str(">="),
Self::Contain => f.write_str("CONTAINS"),
Self::NotContain => f.write_str("CONTAINSNOT"),
Self::ContainAll => f.write_str("CONTAINSALL"),
Self::ContainAny => f.write_str("CONTAINSANY"),
Self::ContainNone => f.write_str("CONTAINSNONE"),
Self::Inside => f.write_str("INSIDE"),
Self::NotInside => f.write_str("NOTINSIDE"),
Self::AllInside => f.write_str("ALLINSIDE"),
Self::AnyInside => f.write_str("ANYINSIDE"),
Self::NoneInside => f.write_str("NONEINSIDE"),
Self::Outside => f.write_str("OUTSIDE"),
Self::Intersects => f.write_str("INTERSECTS"),
Self::Matches(reference) => {
if let Some(r) = reference {
write!(f, "@{}@", r)
} else {
f.write_str("@@")
}
}
}
}
}
@ -158,6 +168,7 @@ pub fn symbols(i: &str) -> IResult<&str, Operator> {
map(tag("*~"), |_| Operator::AllLike),
map(tag("?~"), |_| Operator::AnyLike),
map(char('~'), |_| Operator::Like),
matches,
)),
alt((
map(tag("<="), |_| Operator::LessThanOrEqual),
@ -221,3 +232,39 @@ pub fn phrases(i: &str) -> IResult<&str, Operator> {
let (i, _) = shouldbespace(i)?;
Ok((i, v))
}
pub fn matches(i: &str) -> IResult<&str, Operator> {
let (i, _) = char('@')(i)?;
let (i, reference) = opt(|i| uint8(i))(i)?;
let (i, _) = char('@')(i)?;
Ok((i, Operator::Matches(reference)))
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn matches_without_reference() {
let res = matches("@@");
assert!(res.is_ok());
let out = res.unwrap().1;
assert_eq!("@@", format!("{}", out));
assert_eq!(out, Operator::Matches(None));
}
#[test]
fn matches_with_reference() {
let res = matches("@12@");
assert!(res.is_ok());
let out = res.unwrap().1;
assert_eq!("@12@", format!("{}", out));
assert_eq!(out, Operator::Matches(Some(12u8)));
}
#[test]
fn matches_with_invalid_reference() {
let res = matches("@256@");
assert!(res.is_err());
}
}

86
lib/src/sql/scoring.rs Normal file
View file

@ -0,0 +1,86 @@
use crate::sql::common::{closeparentheses, commas, openparentheses};
use crate::sql::error::IResult;
use crate::sql::number::number;
use crate::sql::Number;
use nom::branch::alt;
use nom::bytes::complete::tag_no_case;
use nom::combinator::map;
use serde::{Deserialize, Serialize};
use std::fmt;
#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Hash)]
pub enum Scoring {
Bm {
k1: Number,
b: Number,
order: Number,
}, // BestMatching25
Vs, // VectorSearch
}
impl Default for Scoring {
fn default() -> Self {
Self::Bm {
k1: Number::Float(1.2),
b: Number::Float(0.75),
order: Number::Int(1000),
}
}
}
impl fmt::Display for Scoring {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
Self::Bm {
k1,
b,
order,
} => write!(f, "BM25({},{},{})", k1, b, order),
Self::Vs => f.write_str("VS"),
}
}
}
pub fn scoring(i: &str) -> IResult<&str, Scoring> {
alt((map(tag_no_case("VS"), |_| Scoring::Vs), |i| {
let (i, _) = tag_no_case("BM25")(i)?;
let (i, _) = openparentheses(i)?;
let (i, k1) = number(i)?;
let (i, _) = commas(i)?;
let (i, b) = number(i)?;
let (i, _) = commas(i)?;
let (i, order) = number(i)?;
let (i, _) = closeparentheses(i)?;
Ok((
i,
Scoring::Bm {
k1,
b,
order,
},
))
}))(i)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn scoring_bm_25() {
let sql = "BM25(1.0,0.6,100)";
let res = scoring(sql);
assert!(res.is_ok());
let out = res.unwrap().1;
assert_eq!("BM25(1.0,0.6,100)", format!("{}", out))
}
#[test]
fn scoring_vs() {
let sql = "VS";
let res = scoring(sql);
assert!(res.is_ok());
let out = res.unwrap().1;
assert_eq!("VS", format!("{}", out))
}
}

View file

@ -14,10 +14,10 @@ use crate::sql::escape::escape_str;
use crate::sql::filter::{filters, Filter};
use crate::sql::fmt::is_pretty;
use crate::sql::fmt::pretty_indent;
use crate::sql::ident;
use crate::sql::ident::{ident, Ident};
use crate::sql::idiom;
use crate::sql::idiom::{Idiom, Idioms};
use crate::sql::index::Index;
use crate::sql::kind::{kind, Kind};
use crate::sql::permission::{permissions, Permissions};
use crate::sql::statements::UpdateStatement;
@ -25,6 +25,7 @@ use crate::sql::strand::strand_raw;
use crate::sql::tokenizer::{tokenizers, Tokenizer};
use crate::sql::value::{value, values, Value, Values};
use crate::sql::view::{view, View};
use crate::sql::{ident, index};
use argon2::password_hash::{PasswordHasher, SaltString};
use argon2::Argon2;
use derive::Store;
@ -1267,7 +1268,7 @@ pub struct DefineIndexStatement {
pub name: Ident,
pub what: Ident,
pub cols: Idioms,
pub uniq: bool,
pub index: Index,
}
impl DefineIndexStatement {
@ -1324,8 +1325,8 @@ impl DefineIndexStatement {
impl Display for DefineIndexStatement {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "DEFINE INDEX {} ON {} FIELDS {}", self.name, self.what, self.cols)?;
if self.uniq {
write!(f, " UNIQUE")?
if Index::Idx != self.index {
write!(f, " {}", self.index)?;
}
Ok(())
}
@ -1346,22 +1347,24 @@ fn index(i: &str) -> IResult<&str, DefineIndexStatement> {
let (i, _) = alt((tag_no_case("COLUMNS"), tag_no_case("FIELDS")))(i)?;
let (i, _) = shouldbespace(i)?;
let (i, cols) = idiom::locals(i)?;
let (i, uniq) = opt(tuple((shouldbespace, tag_no_case("UNIQUE"))))(i)?;
let (i, _) = mightbespace(i)?;
let (i, index) = index::index(i)?;
Ok((
i,
DefineIndexStatement {
name,
what,
cols,
uniq: uniq.is_some(),
index,
},
))
}
#[cfg(test)]
mod tests {
use super::*;
use crate::sql::scoring::Scoring;
use crate::sql::{Number, Part};
#[test]
fn check_define_serialize() {
@ -1370,4 +1373,83 @@ mod tests {
});
assert_eq!(22, stm.to_vec().len());
}
#[test]
fn check_create_non_unique_index() {
let sql = "DEFINE INDEX my_index ON TABLE my_table COLUMNS my_col";
let (_, idx) = index(sql).unwrap();
assert_eq!(
idx,
DefineIndexStatement {
name: Ident("my_index".to_string()),
what: Ident("my_table".to_string()),
cols: Idioms(vec![Idiom(vec![Part::Field(Ident("my_col".to_string()))])]),
index: Index::Idx,
}
);
assert_eq!(idx.to_string(), "DEFINE INDEX my_index ON my_table FIELDS my_col");
}
#[test]
fn check_create_unique_index() {
let sql = "DEFINE INDEX my_index ON TABLE my_table COLUMNS my_col UNIQUE";
let (_, idx) = index(sql).unwrap();
assert_eq!(
idx,
DefineIndexStatement {
name: Ident("my_index".to_string()),
what: Ident("my_table".to_string()),
cols: Idioms(vec![Idiom(vec![Part::Field(Ident("my_col".to_string()))])]),
index: Index::Uniq,
}
);
assert_eq!(idx.to_string(), "DEFINE INDEX my_index ON my_table FIELDS my_col UNIQUE");
}
#[test]
fn check_create_search_index_with_highlights() {
let sql = "DEFINE INDEX my_index ON TABLE my_table COLUMNS my_col SEARCH my_analyzer BM25(1.2,0.75,1000) HIGHLIGHTS";
let (_, idx) = index(sql).unwrap();
assert_eq!(
idx,
DefineIndexStatement {
name: Ident("my_index".to_string()),
what: Ident("my_table".to_string()),
cols: Idioms(vec![Idiom(vec![Part::Field(Ident("my_col".to_string()))])]),
index: Index::Search {
az: Ident("my_analyzer".to_string()),
hl: true,
sc: Scoring::Bm {
k1: Number::Float(1.2),
b: Number::Float(0.75),
order: Number::Int(1000)
},
},
}
);
assert_eq!(idx.to_string(), "DEFINE INDEX my_index ON my_table FIELDS my_col SEARCH my_analyzer BM25(1.2,0.75,1000) HIGHLIGHTS");
}
#[test]
fn check_create_search_index() {
let sql = "DEFINE INDEX my_index ON TABLE my_table COLUMNS my_col SEARCH my_analyzer VS";
let (_, idx) = index(sql).unwrap();
assert_eq!(
idx,
DefineIndexStatement {
name: Ident("my_index".to_string()),
what: Ident("my_table".to_string()),
cols: Idioms(vec![Idiom(vec![Part::Field(Ident("my_col".to_string()))])]),
index: Index::Search {
az: Ident("my_analyzer".to_string()),
hl: false,
sc: Scoring::Vs,
},
}
);
assert_eq!(
idx.to_string(),
"DEFINE INDEX my_index ON my_table FIELDS my_col SEARCH my_analyzer VS"
);
}
}

View file

@ -844,14 +844,10 @@ async fn define_statement_index_single_unique_existing() -> Result<(), Error> {
let res = &mut dbs.execute(&sql, &ses, None, false).await?;
assert_eq!(res.len(), 6);
//
let tmp = res.remove(0).result;
assert!(tmp.is_ok());
//
let tmp = res.remove(0).result;
assert!(tmp.is_ok());
//
let tmp = res.remove(0).result;
assert!(tmp.is_ok());
for _ in 0..3 {
let tmp = res.remove(0).result;
assert!(tmp.is_ok());
}
//
let tmp = res.remove(0).result;
assert!(matches!(
@ -895,17 +891,10 @@ async fn define_statement_index_multiple_unique_existing() -> Result<(), Error>
let res = &mut dbs.execute(&sql, &ses, None, false).await?;
assert_eq!(res.len(), 7);
//
let tmp = res.remove(0).result;
assert!(tmp.is_ok());
//
let tmp = res.remove(0).result;
assert!(tmp.is_ok());
//
let tmp = res.remove(0).result;
assert!(tmp.is_ok());
//
let tmp = res.remove(0).result;
assert!(tmp.is_ok());
for _ in 0..4 {
let tmp = res.remove(0).result;
assert!(tmp.is_ok());
}
//
let tmp = res.remove(0).result;
assert!(matches!(
@ -970,3 +959,38 @@ async fn define_statement_analyzer() -> Result<(), Error> {
assert_eq!(tmp, val);
Ok(())
}
#[tokio::test]
async fn define_statement_search_index() -> Result<(), Error> {
let sql = r#"
CREATE blog:1 SET title = 'Understanding SurrealQL and how it is different from PostgreSQL';
CREATE blog:3 SET title = 'This blog is going to be deleted';
DEFINE ANALYZER english TOKENIZERS space,case FILTERS lowercase,snowball(english);
DEFINE INDEX blog_title ON blog FIELDS title SEARCH english BM25(1.2,0.75,100) HIGHLIGHTS;
CREATE blog:2 SET title = 'Behind the scenes of the exciting beta 9 release';
DELETE blog:3;
INFO FOR TABLE blog;
"#;
let dbs = Datastore::new("memory").await?;
let ses = Session::for_kv().with_ns("test").with_db("test");
let res = &mut dbs.execute(&sql, &ses, None, false).await?;
assert_eq!(res.len(), 7);
//
for _ in 0..6 {
let tmp = res.remove(0).result;
assert!(tmp.is_ok());
}
let tmp = res.remove(0).result?;
let val = Value::parse(
"{
ev: {},
fd: {},
ft: {},
ix: { blog_title: 'DEFINE INDEX blog_title ON blog FIELDS title SEARCH english BM25(1.2,0.75,100) HIGHLIGHTS' },
}",
);
assert_eq!(tmp, val);
Ok(())
}