Improve index BTree insertion & search performance ()

This commit is contained in:
Emmanuel Keller 2023-07-11 19:22:31 +01:00 committed by GitHub
parent 676327f781
commit 1d68fd5622
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
17 changed files with 1401 additions and 881 deletions

10
Cargo.lock generated
View file

@ -2629,6 +2629,15 @@ version = "0.4.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b06a4cde4c0f271a446782e3eff8de789548ce57dbc8eca9292c27f4a42004b4"
[[package]]
name = "lru"
version = "0.10.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "718e8fae447df0c7e1ba7f5189829e63fd536945c8988d61444c19039f16b670"
dependencies = [
"hashbrown 0.13.2",
]
[[package]]
name = "lz4-sys"
version = "1.9.4"
@ -4663,6 +4672,7 @@ dependencies = [
"indxdb",
"jsonwebtoken",
"lexicmp",
"lru",
"md-5",
"nanoid",
"native-tls",

View file

@ -78,6 +78,7 @@ indxdb = { version = "0.3.0", optional = true }
js = { version = "0.3.1" , package = "rquickjs", features = ["array-buffer", "bindgen", "classes", "futures", "loader", "macro", "parallel", "properties","rust-alloc"], optional = true }
jsonwebtoken = "8.3.0"
lexicmp = "0.1.0"
lru = "0.10.1"
md-5 = "0.10.5"
nanoid = "0.4.0"
native-tls = { version = "0.2.11", optional = true }
@ -150,4 +151,8 @@ harness = false
[[bench]]
name = "processor"
harness = false
[[bench]]
name = "index_btree"
harness = false

View file

@ -0,0 +1,72 @@
use criterion::async_executor::FuturesExecutor;
use criterion::{black_box, criterion_group, criterion_main, Criterion, Throughput};
use rand::prelude::SliceRandom;
use rand::thread_rng;
use serde::de::DeserializeOwned;
use serde::Serialize;
use std::time::Duration;
use surrealdb::idx::bkeys::{BKeys, FstKeys, TrieKeys};
use surrealdb::idx::btree::store::{BTreeNodeStore, BTreeStoreType, KeyProvider};
use surrealdb::idx::btree::{BTree, Payload, State};
use surrealdb::kvs::{Datastore, Key};
macro_rules! get_key_value {
($idx:expr) => {{
(format!("{}", $idx).into(), ($idx * 10) as Payload)
}};
}
fn bench_index_btree(c: &mut Criterion) {
let (samples_len, samples) = setup();
let mut group = c.benchmark_group("index_btree");
group.throughput(Throughput::Elements(1));
group.sample_size(10);
group.measurement_time(Duration::from_secs(30));
group.bench_function("btree-insertion-fst", |b| {
b.to_async(FuturesExecutor)
.iter(|| bench::<_, FstKeys>(samples_len, |i| get_key_value!(samples[i])))
});
group.bench_function("btree-insertion-trie", |b| {
b.to_async(FuturesExecutor)
.iter(|| bench::<_, TrieKeys>(samples_len, |i| get_key_value!(samples[i])))
});
group.finish();
}
fn setup() -> (usize, Vec<usize>) {
let samples_len = if cfg!(debug_assertions) {
1000 // debug is much slower!
} else {
100_000
};
let mut samples: Vec<usize> = (0..samples_len).collect();
let mut rng = thread_rng();
samples.shuffle(&mut rng);
(samples_len, samples)
}
async fn bench<F, BK>(samples_size: usize, sample_provider: F)
where
F: Fn(usize) -> (Key, Payload),
BK: BKeys + Serialize + DeserializeOwned + Default,
{
let ds = Datastore::new("memory").await.unwrap();
let mut tx = ds.transaction(true, false).await.unwrap();
let mut t = BTree::<BK>::new(State::new(100));
let s = BTreeNodeStore::new(KeyProvider::Debug, BTreeStoreType::Write, 20);
let mut s = s.lock().await;
for i in 0..samples_size {
let (key, payload) = sample_provider(i);
// Insert the sample
t.insert(&mut tx, &mut s, key.clone(), payload).await.unwrap();
// Search for it
black_box(t.search(&mut tx, &mut s, &key).await.unwrap());
}
}
criterion_group!(benches, bench_index_btree);
criterion_main!(benches);

View file

@ -3,6 +3,7 @@ use crate::dbs::Statement;
use crate::dbs::{Options, Transaction};
use crate::doc::{CursorDoc, Document};
use crate::err::Error;
use crate::idx::btree::store::BTreeStoreType;
use crate::idx::ft::FtIndex;
use crate::idx::IndexKeyBase;
use crate::sql::array::Array;
@ -192,12 +193,12 @@ impl<'a> IndexOperation<'a> {
) -> Result<(), Error> {
let ikb = IndexKeyBase::new(self.opt, self.ix);
let az = run.get_az(self.opt.ns(), self.opt.db(), az.as_str()).await?;
let mut ft = FtIndex::new(run, az, ikb, order, scoring, hl).await?;
let mut ft = FtIndex::new(run, az, ikb, order, scoring, hl, BTreeStoreType::Write).await?;
if let Some(n) = &self.n {
// TODO: Apply the analyzer
ft.index_document(run, self.rid, n).await
ft.index_document(run, self.rid, n).await?;
} else {
ft.remove_document(run, self.rid).await
ft.remove_document(run, self.rid).await?;
}
ft.finish(run).await
}
}

View file

@ -8,19 +8,20 @@ use std::collections::VecDeque;
use std::fmt::{Display, Formatter};
use std::io;
pub(super) trait BKeys: Display + Sized {
pub trait BKeys: Display + Sized {
fn with_key_val(key: Key, payload: Payload) -> Result<Self, Error>;
fn len(&self) -> u32;
fn is_empty(&self) -> bool;
fn get(&self, key: &Key) -> Option<Payload>;
// It is okay to return a owned Vec rather than an iterator,
// because BKeys are intended to be stored as Node in the BTree.
// The size of the Node should be small, therefore one instance of
// BKeys would never be store a large volume of keys.
fn collect_with_prefix(&self, prefix_key: &Key) -> VecDeque<(Key, Payload)>;
fn collect_with_prefix(&self, prefix_key: &Key) -> Result<VecDeque<(Key, Payload)>, Error>;
fn insert(&mut self, key: Key, payload: Payload);
fn append(&mut self, keys: Self);
fn remove(&mut self, key: &Key) -> Option<Payload>;
fn split_keys(&self) -> SplitKeys<Self>;
fn split_keys(self) -> Result<SplitKeys<Self>, Error>;
fn get_key(&self, idx: usize) -> Option<Key>;
fn get_child_idx(&self, searched_key: &Key) -> usize;
fn get_first_key(&self) -> Option<(Key, Payload)>;
@ -31,7 +32,7 @@ pub(super) trait BKeys: Display + Sized {
F: Fn(Key) -> Result<String, Error>;
}
pub(super) struct SplitKeys<BK>
pub struct SplitKeys<BK>
where
BK: BKeys,
{
@ -42,204 +43,209 @@ where
pub(super) median_payload: Payload,
}
#[derive(Default)]
pub(super) struct FstKeys {
map: Map<Vec<u8>>,
additions: Trie<Key, Payload>,
deletions: Trie<Key, bool>,
len: u32,
pub struct FstKeys {
i: Inner,
}
enum Inner {
Map(Map<Vec<u8>>),
Trie(TrieKeys),
}
impl FstKeys {
fn edit(&mut self) {
if let Inner::Map(m) = &self.i {
let t: TrieKeys = m.into();
self.i = Inner::Trie(t);
}
}
}
impl Default for FstKeys {
fn default() -> Self {
Self {
i: Inner::Trie(TrieKeys::default()),
}
}
}
impl BKeys for FstKeys {
fn with_key_val(key: Key, payload: Payload) -> Result<Self, Error> {
let mut builder = MapBuilder::memory();
builder.insert(key, payload).unwrap();
Ok(Self::try_from(builder)?)
let i = Inner::Trie(TrieKeys::with_key_val(key, payload)?);
Ok(Self {
i,
})
}
fn len(&self) -> u32 {
self.len
match &self.i {
Inner::Map(m) => m.len() as u32,
Inner::Trie(t) => t.len(),
}
}
fn is_empty(&self) -> bool {
match &self.i {
Inner::Map(m) => m.is_empty(),
Inner::Trie(t) => t.is_empty(),
}
}
fn get(&self, key: &Key) -> Option<Payload> {
if let Some(payload) = self.additions.get(key) {
Some(*payload)
} else {
self.map.get(key).filter(|_| self.deletions.get(key).is_none())
match &self.i {
Inner::Map(m) => m.get(key),
Inner::Trie(t) => t.get(key),
}
}
fn collect_with_prefix(&self, _prefix_key: &Key) -> VecDeque<(Key, Payload)> {
panic!("Not supported!")
fn collect_with_prefix(&self, _prefix_key: &Key) -> Result<VecDeque<(Key, Payload)>, Error> {
Err(Error::Unreachable)
}
fn insert(&mut self, key: Key, payload: Payload) {
self.deletions.remove(&key);
let existing_key = self.map.get(&key).is_some();
if self.additions.insert(key, payload).is_none() && !existing_key {
self.len += 1;
self.edit();
if let Inner::Trie(t) = &mut self.i {
t.insert(key, payload);
}
}
fn append(&mut self, mut keys: Self) {
keys.compile();
let mut s = keys.map.stream();
while let Some((key, payload)) = s.next() {
self.insert(key.to_vec(), payload);
fn append(&mut self, keys: Self) {
if keys.is_empty() {
return;
}
self.edit();
match keys.i {
Inner::Map(other) => {
let mut s = other.stream();
while let Some((key, payload)) = s.next() {
self.insert(key.to_vec(), payload);
}
}
Inner::Trie(other) => {
if let Inner::Trie(t) = &mut self.i {
t.append(other)
}
}
}
}
fn remove(&mut self, key: &Key) -> Option<Payload> {
if self.deletions.get(key).is_some() {
return None;
self.edit();
if let Inner::Trie(t) = &mut self.i {
t.remove(key)
} else {
None
}
if let Some(payload) = self.additions.remove(key) {
self.len -= 1;
return Some(payload);
}
self.get(key).map(|payload| {
if self.deletions.insert(key.clone(), true).is_none() {
self.len -= 1;
}
payload
})
}
fn split_keys(&self) -> SplitKeys<Self> {
let median_idx = self.map.len() / 2;
let mut s = self.map.stream();
let mut left = MapBuilder::memory();
let mut n = median_idx;
while n > 0 {
if let Some((key, payload)) = s.next() {
left.insert(key, payload).unwrap();
}
n -= 1;
}
let (median_key, median_payload) = s
.next()
.map_or_else(|| panic!("The median key/value should exist"), |(k, v)| (k.into(), v));
let mut right = MapBuilder::memory();
while let Some((key, value)) = s.next() {
right.insert(key, value).unwrap();
}
SplitKeys {
left: Self::try_from(left).unwrap(),
right: Self::try_from(right).unwrap(),
median_idx,
median_key,
median_payload,
fn split_keys(mut self) -> Result<SplitKeys<Self>, Error> {
self.edit();
if let Inner::Trie(t) = self.i {
let s = t.split_keys()?;
Ok(SplitKeys {
left: Self {
i: Inner::Trie(s.left),
},
right: Self {
i: Inner::Trie(s.right),
},
median_idx: s.median_idx,
median_key: s.median_key,
median_payload: s.median_payload,
})
} else {
Err(Error::Unreachable)
}
}
fn get_key(&self, mut idx: usize) -> Option<Key> {
let mut s = self.map.keys().into_stream();
while let Some(key) = s.next() {
if idx == 0 {
return Some(key.to_vec());
match &self.i {
Inner::Map(m) => {
let mut s = m.keys().into_stream();
while let Some(key) = s.next() {
if idx == 0 {
return Some(key.to_vec());
}
idx -= 1;
}
None
}
idx -= 1;
Inner::Trie(t) => t.get_key(idx),
}
None
}
fn get_child_idx(&self, searched_key: &Key) -> usize {
let searched_key = searched_key.as_slice();
let mut s = self.map.keys().into_stream();
let mut child_idx = 0;
while let Some(key) = s.next() {
if searched_key.le(key) {
break;
match &self.i {
Inner::Map(m) => {
let searched_key = searched_key.as_slice();
let mut s = m.keys().into_stream();
let mut child_idx = 0;
while let Some(key) = s.next() {
if searched_key.le(key) {
break;
}
child_idx += 1;
}
child_idx
}
child_idx += 1;
Inner::Trie(t) => t.get_child_idx(searched_key),
}
child_idx
}
fn get_first_key(&self) -> Option<(Key, Payload)> {
self.map.stream().next().map(|(k, p)| (k.to_vec(), p))
match &self.i {
Inner::Map(m) => m.stream().next().map(|(k, p)| (k.to_vec(), p)),
Inner::Trie(t) => t.get_first_key(),
}
}
fn get_last_key(&self) -> Option<(Key, Payload)> {
let mut last = None;
let mut s = self.map.stream();
while let Some((k, p)) = s.next() {
last = Some((k.to_vec(), p));
match &self.i {
Inner::Map(m) => {
let mut last = None;
let mut s = m.stream();
while let Some((k, p)) = s.next() {
last = Some((k.to_vec(), p));
}
last
}
Inner::Trie(t) => t.get_last_key(),
}
last
}
/// Rebuilt the FST by incorporating the changes (additions and deletions)
fn compile(&mut self) {
if self.additions.is_empty() && self.deletions.is_empty() {
return;
}
let mut existing_keys = self.map.stream();
let mut new_keys = self.additions.iter();
let mut current_existing = existing_keys.next();
let mut current_new = new_keys.next();
let mut builder = MapBuilder::memory();
// We use a double iterator because the map as to be filled with sorted terms
loop {
match current_new {
None => break,
Some((new_key_vec, new_value)) => match current_existing {
None => break,
Some((existing_key_vec, existing_value)) => {
if self.deletions.get(existing_key_vec).is_some()
|| self.additions.get(existing_key_vec).is_some()
{
current_existing = existing_keys.next();
} else if new_key_vec.as_slice().ge(existing_key_vec) {
builder.insert(existing_key_vec, existing_value).unwrap();
current_existing = existing_keys.next();
} else {
builder.insert(new_key_vec, *new_value).unwrap();
current_new = new_keys.next();
}
}
},
};
}
// Insert any existing term left over
while let Some((existing_key_vec, value)) = current_existing {
if self.deletions.get(existing_key_vec).is_none()
&& self.additions.get(existing_key_vec).is_none()
{
builder.insert(existing_key_vec, value).unwrap();
if let Inner::Trie(t) = &self.i {
let mut builder = MapBuilder::memory();
for (key, payload) in t.keys.iter() {
builder.insert(key, *payload).unwrap();
}
current_existing = existing_keys.next();
let m = Map::new(builder.into_inner().unwrap()).unwrap();
self.i = Inner::Map(m);
}
// Insert any new term left over
while let Some((new_key_vec, value)) = current_new {
builder.insert(new_key_vec, *value).unwrap();
current_new = new_keys.next();
}
self.map = Map::new(builder.into_inner().unwrap()).unwrap();
self.additions = Default::default();
self.deletions = Default::default();
}
fn debug<F>(&self, to_string: F) -> Result<(), Error>
where
F: Fn(Key) -> Result<String, Error>,
{
let mut s = String::new();
let mut iter = self.map.stream();
let mut start = true;
while let Some((k, p)) = iter.next() {
if !start {
s.push(',');
} else {
start = false;
match &self.i {
Inner::Map(m) => {
let mut s = String::new();
let mut iter = m.stream();
let mut start = true;
while let Some((k, p)) = iter.next() {
if !start {
s.push(',');
} else {
start = false;
}
s.push_str(&format!("{}={}", to_string(k.to_vec())?.as_str(), p));
}
debug!("FSTKeys[{}]", s);
Ok(())
}
s.push_str(&format!("{}={}", to_string(k.to_vec())?.as_str(), p));
Inner::Trie(t) => t.debug(to_string),
}
debug!("FSTKeys[{}]", s);
Ok(())
}
}
@ -254,12 +260,8 @@ impl TryFrom<Vec<u8>> for FstKeys {
type Error = fst::Error;
fn try_from(bytes: Vec<u8>) -> Result<Self, Self::Error> {
let map = Map::new(bytes)?;
let len = map.len() as u32;
Ok(Self {
map,
len,
additions: Default::default(),
deletions: Default::default(),
i: Inner::Map(map),
})
}
}
@ -269,10 +271,10 @@ impl Serialize for FstKeys {
where
S: serde::Serializer,
{
if !self.deletions.is_empty() || !self.additions.is_empty() {
Err(ser::Error::custom("bkeys.compile() should be called prior serializing"))
if let Inner::Map(m) = &self.i {
serializer.serialize_bytes(m.as_fst().as_bytes())
} else {
serializer.serialize_bytes(self.map.as_fst().as_bytes())
Err(ser::Error::custom("bkeys.to_map() should be called prior serializing"))
}
}
}
@ -289,23 +291,28 @@ impl<'de> Deserialize<'de> for FstKeys {
impl Display for FstKeys {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
let mut s = self.map.stream();
let mut start = true;
while let Some((key, val)) = s.next() {
let key = String::from_utf8_lossy(key);
if start {
start = false;
} else {
f.write_str(", ")?;
match &self.i {
Inner::Map(m) => {
let mut s = m.stream();
let mut start = true;
while let Some((key, val)) = s.next() {
let key = String::from_utf8_lossy(key);
if start {
start = false;
} else {
f.write_str(", ")?;
}
write!(f, "{}=>{}", key, val)?;
}
Ok(())
}
write!(f, "{}=>{}", key, val)?;
Inner::Trie(t) => t.fmt(f),
}
Ok(())
}
}
#[derive(Default)]
pub(super) struct TrieKeys {
pub struct TrieKeys {
keys: Trie<Key, Payload>,
}
@ -360,6 +367,17 @@ impl Display for TrieKeys {
}
}
impl From<&Map<Vec<u8>>> for TrieKeys {
fn from(m: &Map<Vec<u8>>) -> Self {
let mut keys = TrieKeys::default();
let mut s = m.stream();
while let Some((key, payload)) = s.next() {
keys.insert(key.to_vec(), payload);
}
keys
}
}
impl BKeys for TrieKeys {
fn with_key_val(key: Key, payload: Payload) -> Result<Self, Error> {
let mut trie_keys = Self {
@ -373,17 +391,21 @@ impl BKeys for TrieKeys {
self.keys.len() as u32
}
fn is_empty(&self) -> bool {
self.keys.is_empty()
}
fn get(&self, key: &Key) -> Option<Payload> {
self.keys.get(key).copied()
}
fn collect_with_prefix(&self, prefix: &Key) -> VecDeque<(Key, Payload)> {
fn collect_with_prefix(&self, prefix: &Key) -> Result<VecDeque<(Key, Payload)>, Error> {
let mut i = KeysIterator::new(prefix, &self.keys);
let mut r = VecDeque::new();
while let Some((k, p)) = i.next() {
r.push_back((k.clone(), p))
}
r
Ok(r)
}
fn insert(&mut self, key: Key, payload: Payload) {
@ -400,7 +422,7 @@ impl BKeys for TrieKeys {
self.keys.remove(key)
}
fn split_keys(&self) -> SplitKeys<Self> {
fn split_keys(self) -> Result<SplitKeys<Self>, Error> {
let median_idx = self.keys.len() / 2;
let mut s = self.keys.iter();
let mut left = Trie::default();
@ -411,20 +433,22 @@ impl BKeys for TrieKeys {
}
n -= 1;
}
let (median_key, median_payload) = s
.next()
.map_or_else(|| panic!("The median key/value should exist"), |(k, v)| (k.clone(), *v));
let (median_key, median_payload) = if let Some((k, v)) = s.next() {
(k.clone(), *v)
} else {
return Err(Error::Unreachable);
};
let mut right = Trie::default();
for (key, val) in s {
right.insert(key.clone(), *val);
}
SplitKeys {
Ok(SplitKeys {
left: Self::from(left),
right: Self::from(right),
median_idx,
median_key,
median_payload,
}
})
}
fn get_key(&self, mut idx: usize) -> Option<Key> {
@ -530,7 +554,8 @@ mod tests {
#[test]
fn test_fst_keys_serde() {
let key: Key = "a".as_bytes().into();
let keys = FstKeys::with_key_val(key.clone(), 130).unwrap();
let mut keys = FstKeys::with_key_val(key.clone(), 130).unwrap();
keys.compile();
let buf = bincode::serialize(&keys).unwrap();
let keys: FstKeys = bincode::deserialize(&buf).unwrap();
assert_eq!(keys.get(&key), Some(130));
@ -539,7 +564,8 @@ mod tests {
#[test]
fn test_trie_keys_serde() {
let key: Key = "a".as_bytes().into();
let keys = TrieKeys::with_key_val(key.clone(), 130).unwrap();
let mut keys = TrieKeys::with_key_val(key.clone(), 130).unwrap();
keys.compile();
let buf = bincode::serialize(&keys).unwrap();
let keys: TrieKeys = bincode::deserialize(&buf).unwrap();
assert_eq!(keys.get(&key), Some(130));
@ -628,7 +654,7 @@ mod tests {
keys.insert("there".into(), 10);
{
let r = keys.collect_with_prefix(&"appli".into());
let r = keys.collect_with_prefix(&"appli".into()).unwrap();
check_keys(
r,
vec![("applicant".into(), 2), ("application".into(), 3), ("applicative".into(), 4)],
@ -636,7 +662,7 @@ mod tests {
}
{
let r = keys.collect_with_prefix(&"the".into());
let r = keys.collect_with_prefix(&"the".into()).unwrap();
check_keys(
r,
vec![
@ -651,17 +677,17 @@ mod tests {
}
{
let r = keys.collect_with_prefix(&"blue".into());
let r = keys.collect_with_prefix(&"blue".into()).unwrap();
check_keys(r, vec![("blueberry".into(), 6)]);
}
{
let r = keys.collect_with_prefix(&"apple".into());
let r = keys.collect_with_prefix(&"apple".into()).unwrap();
check_keys(r, vec![("apple".into(), 1)]);
}
{
let r = keys.collect_with_prefix(&"zz".into());
let r = keys.collect_with_prefix(&"zz".into()).unwrap();
check_keys(r, vec![]);
}
}
@ -673,7 +699,7 @@ mod tests {
keys.insert("d".into(), 4);
keys.insert("e".into(), 5);
keys.compile();
let r = keys.split_keys();
let r = keys.split_keys().unwrap();
assert_eq!(r.median_payload, 3);
let c: Key = "c".into();
assert_eq!(r.median_key, c);

File diff suppressed because it is too large Load diff

280
lib/src/idx/btree/store.rs Normal file
View file

@ -0,0 +1,280 @@
use crate::err::Error;
use crate::idx::bkeys::BKeys;
use crate::idx::btree::{Node, NodeId};
use crate::idx::IndexKeyBase;
use crate::kvs::{Key, Transaction};
use lru::LruCache;
use serde::de::DeserializeOwned;
use serde::Serialize;
use std::collections::{HashMap, HashSet};
use std::num::NonZeroUsize;
use std::sync::Arc;
use tokio::sync::Mutex;
#[derive(Clone, Copy)]
pub enum BTreeStoreType {
Write,
Read,
Traversal,
}
pub enum BTreeNodeStore<BK>
where
BK: BKeys + Serialize + DeserializeOwned,
{
/// caches every read nodes, and keeps track of updated and created nodes
Write(BTreeWriteCache<BK>),
/// Uses an LRU cache to keep in memory the last node read
Read(BTreeReadCache<BK>),
/// Read the nodes from the KV store without any cache
Traversal(KeyProvider),
}
impl<BK> BTreeNodeStore<BK>
where
BK: BKeys + Serialize + DeserializeOwned,
{
pub fn new(
keys: KeyProvider,
store_type: BTreeStoreType,
read_size: usize,
) -> Arc<Mutex<Self>> {
Arc::new(Mutex::new(match store_type {
BTreeStoreType::Write => Self::Write(BTreeWriteCache::new(keys)),
BTreeStoreType::Read => Self::Read(BTreeReadCache::new(keys, read_size)),
BTreeStoreType::Traversal => Self::Traversal(keys),
}))
}
pub(super) async fn get_node(
&mut self,
tx: &mut Transaction,
node_id: NodeId,
) -> Result<StoredNode<BK>, Error> {
match self {
BTreeNodeStore::Write(w) => w.get_node(tx, node_id).await,
BTreeNodeStore::Read(r) => r.get_node(tx, node_id).await,
BTreeNodeStore::Traversal(keys) => keys.load_node::<BK>(tx, node_id).await,
}
}
pub(super) fn set_node(&mut self, node: StoredNode<BK>, updated: bool) -> Result<(), Error> {
match self {
BTreeNodeStore::Write(w) => w.set_node(node, updated),
BTreeNodeStore::Read(r) => {
if updated {
Err(Error::Unreachable)
} else {
r.set_node(node);
Ok(())
}
}
BTreeNodeStore::Traversal(_) => Ok(()),
}
}
pub(super) fn new_node(&mut self, id: NodeId, node: Node<BK>) -> Result<StoredNode<BK>, Error> {
match self {
BTreeNodeStore::Write(w) => Ok(w.new_node(id, node)),
_ => Err(Error::Unreachable),
}
}
pub(super) fn remove_node(&mut self, node_id: NodeId, node_key: Key) -> Result<(), Error> {
match self {
BTreeNodeStore::Write(w) => w.remove_node(node_id, node_key),
_ => Err(Error::Unreachable),
}
}
pub(in crate::idx) async fn finish(&mut self, tx: &mut Transaction) -> Result<bool, Error> {
if let BTreeNodeStore::Write(w) = self {
w.finish(tx).await
} else {
Err(Error::Unreachable)
}
}
}
pub struct BTreeWriteCache<BK>
where
BK: BKeys + Serialize + DeserializeOwned,
{
keys: KeyProvider,
nodes: HashMap<NodeId, StoredNode<BK>>,
updated: HashSet<NodeId>,
removed: HashMap<NodeId, Key>,
#[cfg(debug_assertions)]
out: HashSet<NodeId>,
}
impl<BK> BTreeWriteCache<BK>
where
BK: BKeys + Serialize + DeserializeOwned,
{
fn new(keys: KeyProvider) -> Self {
Self {
keys,
nodes: HashMap::new(),
updated: HashSet::new(),
removed: HashMap::new(),
#[cfg(debug_assertions)]
out: HashSet::new(),
}
}
async fn get_node(
&mut self,
tx: &mut Transaction,
node_id: NodeId,
) -> Result<StoredNode<BK>, Error> {
#[cfg(debug_assertions)]
self.out.insert(node_id);
if let Some(n) = self.nodes.remove(&node_id) {
return Ok(n);
}
self.keys.load_node::<BK>(tx, node_id).await
}
fn set_node(&mut self, node: StoredNode<BK>, updated: bool) -> Result<(), Error> {
#[cfg(debug_assertions)]
self.out.remove(&node.id);
if updated {
self.updated.insert(node.id);
}
if self.removed.contains_key(&node.id) {
return Err(Error::Unreachable);
}
self.nodes.insert(node.id, node);
Ok(())
}
fn new_node(&mut self, id: NodeId, node: Node<BK>) -> StoredNode<BK> {
#[cfg(debug_assertions)]
self.out.insert(id);
StoredNode {
node,
id,
key: self.keys.get_node_key(id),
size: 0,
}
}
fn remove_node(&mut self, node_id: NodeId, node_key: Key) -> Result<(), Error> {
#[cfg(debug_assertions)]
{
if self.nodes.contains_key(&node_id) {
return Err(Error::Unreachable);
}
self.out.remove(&node_id);
}
self.updated.remove(&node_id);
self.removed.insert(node_id, node_key);
Ok(())
}
async fn finish(&mut self, tx: &mut Transaction) -> Result<bool, Error> {
let update = !self.updated.is_empty() || !self.removed.is_empty();
#[cfg(debug_assertions)]
{
if !self.out.is_empty() {
return Err(Error::Unreachable);
}
}
for node_id in &self.updated {
if let Some(mut node) = self.nodes.remove(node_id) {
node.node.write(tx, node.key).await?;
} else {
return Err(Error::Unreachable);
}
}
self.updated.clear();
let node_ids: Vec<NodeId> = self.removed.keys().copied().collect();
for node_id in node_ids {
if let Some(node_key) = self.removed.remove(&node_id) {
tx.del(node_key).await?;
}
}
Ok(update)
}
}
pub struct BTreeReadCache<BK>
where
BK: BKeys + Serialize + DeserializeOwned,
{
keys: KeyProvider,
nodes: LruCache<NodeId, StoredNode<BK>>,
}
impl<BK> BTreeReadCache<BK>
where
BK: BKeys + Serialize + DeserializeOwned,
{
fn new(keys: KeyProvider, size: usize) -> Self {
Self {
keys,
nodes: LruCache::new(NonZeroUsize::new(size).unwrap()),
}
}
async fn get_node(
&mut self,
tx: &mut Transaction,
node_id: NodeId,
) -> Result<StoredNode<BK>, Error> {
if let Some(n) = self.nodes.pop(&node_id) {
return Ok(n);
}
self.keys.load_node::<BK>(tx, node_id).await
}
fn set_node(&mut self, node: StoredNode<BK>) {
self.nodes.put(node.id, node);
}
}
#[derive(Clone)]
pub enum KeyProvider {
DocIds(IndexKeyBase),
DocLengths(IndexKeyBase),
Postings(IndexKeyBase),
Terms(IndexKeyBase),
Debug,
}
impl KeyProvider {
pub(in crate::idx) fn get_node_key(&self, node_id: NodeId) -> Key {
match self {
KeyProvider::DocIds(ikb) => ikb.new_bd_key(Some(node_id)),
KeyProvider::DocLengths(ikb) => ikb.new_bl_key(Some(node_id)),
KeyProvider::Postings(ikb) => ikb.new_bp_key(Some(node_id)),
KeyProvider::Terms(ikb) => ikb.new_bt_key(Some(node_id)),
KeyProvider::Debug => node_id.to_be_bytes().to_vec(),
}
}
async fn load_node<BK>(&self, tx: &mut Transaction, id: NodeId) -> Result<StoredNode<BK>, Error>
where
BK: BKeys + Serialize + DeserializeOwned,
{
let key = self.get_node_key(id);
let (node, size) = Node::<BK>::read(tx, key.clone()).await?;
Ok(StoredNode {
node,
id,
key,
size,
})
}
}
pub(super) struct StoredNode<BK>
where
BK: BKeys,
{
pub(super) node: Node<BK>,
pub(super) id: NodeId,
pub(super) key: Key,
pub(super) size: u32,
}

View file

@ -1,10 +1,13 @@
use crate::err::Error;
use crate::idx::bkeys::TrieKeys;
use crate::idx::btree::{BTree, KeyProvider, Statistics};
use crate::idx::btree::store::{BTreeNodeStore, BTreeStoreType, KeyProvider};
use crate::idx::btree::{BTree, Statistics};
use crate::idx::{btree, IndexKeyBase, SerdeState};
use crate::kvs::{Key, Transaction};
use roaring::RoaringTreemap;
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use tokio::sync::Mutex;
pub(crate) type DocId = u64;
@ -13,7 +16,8 @@ pub(crate) const NO_DOC_ID: u64 = u64::MAX;
pub(crate) struct DocIds {
state_key: Key,
index_key_base: IndexKeyBase,
btree: BTree,
btree: BTree<TrieKeys>,
store: Arc<Mutex<BTreeNodeStore<TrieKeys>>>,
available_ids: Option<RoaringTreemap>,
next_doc_id: DocId,
updated: bool,
@ -24,18 +28,21 @@ impl DocIds {
tx: &mut Transaction,
index_key_base: IndexKeyBase,
default_btree_order: u32,
store_type: BTreeStoreType,
) -> Result<Self, Error> {
let keys = KeyProvider::DocIds(index_key_base.clone());
let state_key: Key = keys.get_state_key();
let state_key: Key = index_key_base.new_bd_key(None);
let state: State = if let Some(val) = tx.get(state_key.clone()).await? {
State::try_from_val(val)?
} else {
State::new(default_btree_order)
};
let store =
BTreeNodeStore::new(KeyProvider::DocIds(index_key_base.clone()), store_type, 20);
Ok(Self {
state_key,
index_key_base,
btree: BTree::new(keys, state.btree),
btree: BTree::new(state.btree),
store,
available_ids: state.available_ids,
next_doc_id: state.next_doc_id,
updated: false,
@ -64,7 +71,8 @@ impl DocIds {
tx: &mut Transaction,
doc_key: Key,
) -> Result<Option<DocId>, Error> {
self.btree.search::<TrieKeys>(tx, &doc_key).await
let mut store = self.store.lock().await;
self.btree.search(tx, &mut store, &doc_key).await
}
/// Returns the doc_id for the given doc_key.
@ -74,15 +82,18 @@ impl DocIds {
tx: &mut Transaction,
doc_key: Key,
) -> Result<Resolved, Error> {
if let Some(doc_id) = self.btree.search::<TrieKeys>(tx, &doc_key).await? {
Ok(Resolved::Existing(doc_id))
} else {
let doc_id = self.get_next_doc_id();
tx.set(self.index_key_base.new_bi_key(doc_id), doc_key.clone()).await?;
self.btree.insert::<TrieKeys>(tx, doc_key, doc_id).await?;
self.updated = true;
Ok(Resolved::New(doc_id))
{
let mut store = self.store.lock().await;
if let Some(doc_id) = self.btree.search(tx, &mut store, &doc_key).await? {
return Ok(Resolved::Existing(doc_id));
}
}
let doc_id = self.get_next_doc_id();
tx.set(self.index_key_base.new_bi_key(doc_id), doc_key.clone()).await?;
let mut store = self.store.lock().await;
self.btree.insert(tx, &mut store, doc_key, doc_id).await?;
self.updated = true;
Ok(Resolved::New(doc_id))
}
pub(super) async fn remove_doc(
@ -90,7 +101,8 @@ impl DocIds {
tx: &mut Transaction,
doc_key: Key,
) -> Result<Option<DocId>, Error> {
if let Some(doc_id) = self.btree.delete::<TrieKeys>(tx, doc_key).await? {
let mut store = self.store.lock().await;
if let Some(doc_id) = self.btree.delete(tx, &mut store, doc_key).await? {
tx.del(self.index_key_base.new_bi_key(doc_id)).await?;
if let Some(available_ids) = &mut self.available_ids {
available_ids.insert(doc_id);
@ -120,17 +132,19 @@ impl DocIds {
}
pub(super) async fn statistics(&self, tx: &mut Transaction) -> Result<Statistics, Error> {
self.btree.statistics::<TrieKeys>(tx).await
let mut store = self.store.lock().await;
self.btree.statistics(tx, &mut store).await
}
pub(super) async fn finish(self, tx: &mut Transaction) -> Result<(), Error> {
if self.updated || self.btree.is_updated() {
pub(super) async fn finish(&mut self, tx: &mut Transaction) -> Result<(), Error> {
let updated = self.store.lock().await.finish(tx).await?;
if self.updated || updated {
let state = State {
btree: self.btree.get_state().clone(),
available_ids: self.available_ids,
available_ids: self.available_ids.take(),
next_doc_id: self.next_doc_id,
};
tx.set(self.state_key, state.try_to_val()?).await?;
tx.set(self.state_key.clone(), state.try_to_val()?).await?;
}
Ok(())
}
@ -179,19 +193,21 @@ impl Resolved {
#[cfg(test)]
mod tests {
use crate::idx::btree::store::BTreeStoreType;
use crate::idx::ft::docids::{DocIds, Resolved};
use crate::idx::IndexKeyBase;
use crate::kvs::{Datastore, Transaction};
const BTREE_ORDER: u32 = 7;
async fn get_doc_ids(ds: &Datastore) -> (Transaction, DocIds) {
async fn get_doc_ids(ds: &Datastore, store_type: BTreeStoreType) -> (Transaction, DocIds) {
let mut tx = ds.transaction(true, false).await.unwrap();
let d = DocIds::new(&mut tx, IndexKeyBase::default(), BTREE_ORDER).await.unwrap();
let d =
DocIds::new(&mut tx, IndexKeyBase::default(), BTREE_ORDER, store_type).await.unwrap();
(tx, d)
}
async fn finish(mut tx: Transaction, d: DocIds) {
async fn finish(mut tx: Transaction, mut d: DocIds) {
d.finish(&mut tx).await.unwrap();
tx.commit().await.unwrap();
}
@ -201,49 +217,77 @@ mod tests {
let ds = Datastore::new("memory").await.unwrap();
// Resolve a first doc key
let (mut tx, mut d) = get_doc_ids(&ds).await;
let doc_id = d.resolve_doc_id(&mut tx, "Foo".into()).await.unwrap();
assert_eq!(d.statistics(&mut tx).await.unwrap().keys_count, 1);
assert_eq!(d.get_doc_key(&mut tx, 0).await.unwrap(), Some("Foo".into()));
finish(tx, d).await;
assert_eq!(doc_id, Resolved::New(0));
{
let (mut tx, mut d) = get_doc_ids(&ds, BTreeStoreType::Write).await;
let doc_id = d.resolve_doc_id(&mut tx, "Foo".into()).await.unwrap();
assert_eq!(d.statistics(&mut tx).await.unwrap().keys_count, 1);
assert_eq!(d.get_doc_key(&mut tx, 0).await.unwrap(), Some("Foo".into()));
finish(tx, d).await;
assert_eq!(doc_id, Resolved::New(0));
}
// Resolve the same doc key
let (mut tx, mut d) = get_doc_ids(&ds).await;
let doc_id = d.resolve_doc_id(&mut tx, "Foo".into()).await.unwrap();
assert_eq!(d.statistics(&mut tx).await.unwrap().keys_count, 1);
assert_eq!(d.get_doc_key(&mut tx, 0).await.unwrap(), Some("Foo".into()));
finish(tx, d).await;
assert_eq!(doc_id, Resolved::Existing(0));
{
let (mut tx, mut d) = get_doc_ids(&ds, BTreeStoreType::Write).await;
let doc_id = d.resolve_doc_id(&mut tx, "Foo".into()).await.unwrap();
assert_eq!(d.statistics(&mut tx).await.unwrap().keys_count, 1);
assert_eq!(d.get_doc_key(&mut tx, 0).await.unwrap(), Some("Foo".into()));
finish(tx, d).await;
assert_eq!(doc_id, Resolved::Existing(0));
}
// Resolve another single doc key
let (mut tx, mut d) = get_doc_ids(&ds).await;
let doc_id = d.resolve_doc_id(&mut tx, "Bar".into()).await.unwrap();
assert_eq!(d.statistics(&mut tx).await.unwrap().keys_count, 2);
assert_eq!(d.get_doc_key(&mut tx, 1).await.unwrap(), Some("Bar".into()));
finish(tx, d).await;
assert_eq!(doc_id, Resolved::New(1));
{
let (mut tx, mut d) = get_doc_ids(&ds, BTreeStoreType::Write).await;
let doc_id = d.resolve_doc_id(&mut tx, "Bar".into()).await.unwrap();
assert_eq!(d.statistics(&mut tx).await.unwrap().keys_count, 2);
assert_eq!(d.get_doc_key(&mut tx, 1).await.unwrap(), Some("Bar".into()));
finish(tx, d).await;
assert_eq!(doc_id, Resolved::New(1));
}
// Resolve another two existing doc keys and two new doc keys (interlaced)
let (mut tx, mut d) = get_doc_ids(&ds).await;
assert_eq!(d.resolve_doc_id(&mut tx, "Foo".into()).await.unwrap(), Resolved::Existing(0));
assert_eq!(d.resolve_doc_id(&mut tx, "Hello".into()).await.unwrap(), Resolved::New(2));
assert_eq!(d.resolve_doc_id(&mut tx, "Bar".into()).await.unwrap(), Resolved::Existing(1));
assert_eq!(d.resolve_doc_id(&mut tx, "World".into()).await.unwrap(), Resolved::New(3));
assert_eq!(d.statistics(&mut tx).await.unwrap().keys_count, 4);
finish(tx, d).await;
{
let (mut tx, mut d) = get_doc_ids(&ds, BTreeStoreType::Write).await;
assert_eq!(
d.resolve_doc_id(&mut tx, "Foo".into()).await.unwrap(),
Resolved::Existing(0)
);
assert_eq!(d.resolve_doc_id(&mut tx, "Hello".into()).await.unwrap(), Resolved::New(2));
assert_eq!(
d.resolve_doc_id(&mut tx, "Bar".into()).await.unwrap(),
Resolved::Existing(1)
);
assert_eq!(d.resolve_doc_id(&mut tx, "World".into()).await.unwrap(), Resolved::New(3));
assert_eq!(d.statistics(&mut tx).await.unwrap().keys_count, 4);
finish(tx, d).await;
}
let (mut tx, mut d) = get_doc_ids(&ds).await;
assert_eq!(d.resolve_doc_id(&mut tx, "Foo".into()).await.unwrap(), Resolved::Existing(0));
assert_eq!(d.resolve_doc_id(&mut tx, "Bar".into()).await.unwrap(), Resolved::Existing(1));
assert_eq!(d.resolve_doc_id(&mut tx, "Hello".into()).await.unwrap(), Resolved::Existing(2));
assert_eq!(d.resolve_doc_id(&mut tx, "World".into()).await.unwrap(), Resolved::Existing(3));
assert_eq!(d.get_doc_key(&mut tx, 0).await.unwrap(), Some("Foo".into()));
assert_eq!(d.get_doc_key(&mut tx, 1).await.unwrap(), Some("Bar".into()));
assert_eq!(d.get_doc_key(&mut tx, 2).await.unwrap(), Some("Hello".into()));
assert_eq!(d.get_doc_key(&mut tx, 3).await.unwrap(), Some("World".into()));
assert_eq!(d.statistics(&mut tx).await.unwrap().keys_count, 4);
finish(tx, d).await;
{
let (mut tx, mut d) = get_doc_ids(&ds, BTreeStoreType::Write).await;
assert_eq!(
d.resolve_doc_id(&mut tx, "Foo".into()).await.unwrap(),
Resolved::Existing(0)
);
assert_eq!(
d.resolve_doc_id(&mut tx, "Bar".into()).await.unwrap(),
Resolved::Existing(1)
);
assert_eq!(
d.resolve_doc_id(&mut tx, "Hello".into()).await.unwrap(),
Resolved::Existing(2)
);
assert_eq!(
d.resolve_doc_id(&mut tx, "World".into()).await.unwrap(),
Resolved::Existing(3)
);
assert_eq!(d.get_doc_key(&mut tx, 0).await.unwrap(), Some("Foo".into()));
assert_eq!(d.get_doc_key(&mut tx, 1).await.unwrap(), Some("Bar".into()));
assert_eq!(d.get_doc_key(&mut tx, 2).await.unwrap(), Some("Hello".into()));
assert_eq!(d.get_doc_key(&mut tx, 3).await.unwrap(), Some("World".into()));
assert_eq!(d.statistics(&mut tx).await.unwrap().keys_count, 4);
finish(tx, d).await;
}
}
#[tokio::test]
@ -251,41 +295,55 @@ mod tests {
let ds = Datastore::new("memory").await.unwrap();
// Create two docs
let (mut tx, mut d) = get_doc_ids(&ds).await;
assert_eq!(d.resolve_doc_id(&mut tx, "Foo".into()).await.unwrap(), Resolved::New(0));
assert_eq!(d.resolve_doc_id(&mut tx, "Bar".into()).await.unwrap(), Resolved::New(1));
finish(tx, d).await;
{
let (mut tx, mut d) = get_doc_ids(&ds, BTreeStoreType::Write).await;
assert_eq!(d.resolve_doc_id(&mut tx, "Foo".into()).await.unwrap(), Resolved::New(0));
assert_eq!(d.resolve_doc_id(&mut tx, "Bar".into()).await.unwrap(), Resolved::New(1));
finish(tx, d).await;
}
// Remove doc 1
let (mut tx, mut d) = get_doc_ids(&ds).await;
assert_eq!(d.remove_doc(&mut tx, "Dummy".into()).await.unwrap(), None);
assert_eq!(d.remove_doc(&mut tx, "Foo".into()).await.unwrap(), Some(0));
finish(tx, d).await;
{
let (mut tx, mut d) = get_doc_ids(&ds, BTreeStoreType::Write).await;
assert_eq!(d.remove_doc(&mut tx, "Dummy".into()).await.unwrap(), None);
assert_eq!(d.remove_doc(&mut tx, "Foo".into()).await.unwrap(), Some(0));
finish(tx, d).await;
}
// Check 'Foo' has been removed
let (mut tx, mut d) = get_doc_ids(&ds).await;
assert_eq!(d.remove_doc(&mut tx, "Foo".into()).await.unwrap(), None);
finish(tx, d).await;
{
let (mut tx, mut d) = get_doc_ids(&ds, BTreeStoreType::Write).await;
assert_eq!(d.remove_doc(&mut tx, "Foo".into()).await.unwrap(), None);
finish(tx, d).await;
}
// Insert a new doc - should take the available id 1
let (mut tx, mut d) = get_doc_ids(&ds).await;
assert_eq!(d.resolve_doc_id(&mut tx, "Hello".into()).await.unwrap(), Resolved::New(0));
finish(tx, d).await;
{
let (mut tx, mut d) = get_doc_ids(&ds, BTreeStoreType::Write).await;
assert_eq!(d.resolve_doc_id(&mut tx, "Hello".into()).await.unwrap(), Resolved::New(0));
finish(tx, d).await;
}
// Remove doc 2
let (mut tx, mut d) = get_doc_ids(&ds).await;
assert_eq!(d.remove_doc(&mut tx, "Dummy".into()).await.unwrap(), None);
assert_eq!(d.remove_doc(&mut tx, "Bar".into()).await.unwrap(), Some(1));
finish(tx, d).await;
{
let (mut tx, mut d) = get_doc_ids(&ds, BTreeStoreType::Write).await;
assert_eq!(d.remove_doc(&mut tx, "Dummy".into()).await.unwrap(), None);
assert_eq!(d.remove_doc(&mut tx, "Bar".into()).await.unwrap(), Some(1));
finish(tx, d).await;
}
// Check 'Bar' has been removed
let (mut tx, mut d) = get_doc_ids(&ds).await;
assert_eq!(d.remove_doc(&mut tx, "Foo".into()).await.unwrap(), None);
finish(tx, d).await;
{
let (mut tx, mut d) = get_doc_ids(&ds, BTreeStoreType::Write).await;
assert_eq!(d.remove_doc(&mut tx, "Foo".into()).await.unwrap(), None);
finish(tx, d).await;
}
// Insert a new doc - should take the available id 2
let (mut tx, mut d) = get_doc_ids(&ds).await;
assert_eq!(d.resolve_doc_id(&mut tx, "World".into()).await.unwrap(), Resolved::New(1));
finish(tx, d).await;
{
let (mut tx, mut d) = get_doc_ids(&ds, BTreeStoreType::Write).await;
assert_eq!(d.resolve_doc_id(&mut tx, "World".into()).await.unwrap(), Resolved::New(1));
finish(tx, d).await;
}
}
}

View file

@ -1,15 +1,19 @@
use crate::err::Error;
use crate::idx::bkeys::TrieKeys;
use crate::idx::btree::{BTree, KeyProvider, Payload, Statistics};
use crate::idx::btree::store::{BTreeNodeStore, BTreeStoreType, KeyProvider};
use crate::idx::btree::{BTree, Payload, Statistics};
use crate::idx::ft::docids::DocId;
use crate::idx::{btree, IndexKeyBase, SerdeState};
use crate::kvs::{Key, Transaction};
use std::sync::Arc;
use tokio::sync::Mutex;
pub(super) type DocLength = u64;
pub(super) struct DocLengths {
state_key: Key,
btree: BTree,
btree: BTree<TrieKeys>,
store: Arc<Mutex<BTreeNodeStore<TrieKeys>>>,
}
impl DocLengths {
@ -17,17 +21,19 @@ impl DocLengths {
tx: &mut Transaction,
index_key_base: IndexKeyBase,
default_btree_order: u32,
store_type: BTreeStoreType,
) -> Result<Self, Error> {
let keys = KeyProvider::DocLengths(index_key_base);
let state_key: Key = keys.get_state_key();
let state_key: Key = index_key_base.new_bl_key(None);
let state: btree::State = if let Some(val) = tx.get(state_key.clone()).await? {
btree::State::try_from_val(val)?
} else {
btree::State::new(default_btree_order)
};
let store = BTreeNodeStore::new(KeyProvider::DocLengths(index_key_base), store_type, 20);
Ok(Self {
state_key,
btree: BTree::new(keys, state),
btree: BTree::new(state),
store,
})
}
@ -36,7 +42,8 @@ impl DocLengths {
tx: &mut Transaction,
doc_id: DocId,
) -> Result<Option<DocLength>, Error> {
self.btree.search::<TrieKeys>(tx, &doc_id.to_be_bytes().to_vec()).await
let mut store = self.store.lock().await;
self.btree.search(tx, &mut store, &doc_id.to_be_bytes().to_vec()).await
}
pub(super) async fn set_doc_length(
@ -45,7 +52,8 @@ impl DocLengths {
doc_id: DocId,
doc_length: DocLength,
) -> Result<(), Error> {
self.btree.insert::<TrieKeys>(tx, doc_id.to_be_bytes().to_vec(), doc_length).await
let mut store = self.store.lock().await;
self.btree.insert(tx, &mut store, doc_id.to_be_bytes().to_vec(), doc_length).await
}
pub(super) async fn remove_doc_length(
@ -53,16 +61,18 @@ impl DocLengths {
tx: &mut Transaction,
doc_id: DocId,
) -> Result<Option<Payload>, Error> {
self.btree.delete::<TrieKeys>(tx, doc_id.to_be_bytes().to_vec()).await
let mut store = self.store.lock().await;
self.btree.delete(tx, &mut store, doc_id.to_be_bytes().to_vec()).await
}
pub(super) async fn statistics(&self, tx: &mut Transaction) -> Result<Statistics, Error> {
self.btree.statistics::<TrieKeys>(tx).await
let mut store = self.store.lock().await;
self.btree.statistics(tx, &mut store).await
}
pub(super) async fn finish(self, tx: &mut Transaction) -> Result<(), Error> {
if self.btree.is_updated() {
tx.set(self.state_key, self.btree.get_state().try_to_val()?).await?;
pub(super) async fn finish(&self, tx: &mut Transaction) -> Result<(), Error> {
if self.store.lock().await.finish(tx).await? {
tx.set(self.state_key.clone(), self.btree.get_state().try_to_val()?).await?;
}
Ok(())
}
@ -70,6 +80,7 @@ impl DocLengths {
#[cfg(test)]
mod tests {
use crate::idx::btree::store::BTreeStoreType;
use crate::idx::ft::doclength::DocLengths;
use crate::idx::IndexKeyBase;
use crate::kvs::Datastore;
@ -82,14 +93,23 @@ mod tests {
// Check empty state
let mut tx = ds.transaction(true, false).await.unwrap();
let l = DocLengths::new(&mut tx, IndexKeyBase::default(), BTREE_ORDER).await.unwrap();
let l = DocLengths::new(
&mut tx,
IndexKeyBase::default(),
BTREE_ORDER,
BTreeStoreType::Traversal,
)
.await
.unwrap();
assert_eq!(l.statistics(&mut tx).await.unwrap().keys_count, 0);
let dl = l.get_doc_length(&mut tx, 99).await.unwrap();
l.finish(&mut tx).await.unwrap();
assert_eq!(dl, None);
// Set a doc length
let mut l = DocLengths::new(&mut tx, IndexKeyBase::default(), BTREE_ORDER).await.unwrap();
let mut l =
DocLengths::new(&mut tx, IndexKeyBase::default(), BTREE_ORDER, BTreeStoreType::Write)
.await
.unwrap();
l.set_doc_length(&mut tx, 99, 199).await.unwrap();
assert_eq!(l.statistics(&mut tx).await.unwrap().keys_count, 1);
let dl = l.get_doc_length(&mut tx, 99).await.unwrap();
@ -97,7 +117,10 @@ mod tests {
assert_eq!(dl, Some(199));
// Update doc length
let mut l = DocLengths::new(&mut tx, IndexKeyBase::default(), BTREE_ORDER).await.unwrap();
let mut l =
DocLengths::new(&mut tx, IndexKeyBase::default(), BTREE_ORDER, BTreeStoreType::Write)
.await
.unwrap();
l.set_doc_length(&mut tx, 99, 299).await.unwrap();
assert_eq!(l.statistics(&mut tx).await.unwrap().keys_count, 1);
let dl = l.get_doc_length(&mut tx, 99).await.unwrap();
@ -105,7 +128,10 @@ mod tests {
assert_eq!(dl, Some(299));
// Remove doc lengths
let mut l = DocLengths::new(&mut tx, IndexKeyBase::default(), BTREE_ORDER).await.unwrap();
let mut l =
DocLengths::new(&mut tx, IndexKeyBase::default(), BTREE_ORDER, BTreeStoreType::Write)
.await
.unwrap();
assert_eq!(l.remove_doc_length(&mut tx, 99).await.unwrap(), Some(299));
assert_eq!(l.remove_doc_length(&mut tx, 99).await.unwrap(), None);
}

View file

@ -9,6 +9,7 @@ pub(super) mod termdocs;
pub(crate) mod terms;
use crate::err::Error;
use crate::idx::btree::store::BTreeStoreType;
use crate::idx::ft::analyzer::Analyzer;
use crate::idx::ft::docids::{DocId, DocIds};
use crate::idx::ft::doclength::DocLengths;
@ -28,6 +29,7 @@ use roaring::RoaringTreemap;
use serde::{Deserialize, Serialize};
use std::ops::BitAnd;
use std::sync::Arc;
use tokio::sync::RwLock;
pub(crate) type MatchRef = u8;
@ -37,8 +39,13 @@ pub(crate) struct FtIndex {
index_key_base: IndexKeyBase,
state: State,
bm25: Option<Bm25Params>,
order: u32,
highlighting: bool,
doc_ids: Arc<RwLock<DocIds>>,
doc_lengths: Arc<RwLock<DocLengths>>,
postings: Arc<RwLock<Postings>>,
terms: Arc<RwLock<Terms>>,
offsets: Offsets,
term_docs: TermDocs,
}
#[derive(Clone)]
@ -90,6 +97,7 @@ impl FtIndex {
order: u32,
scoring: &Scoring,
hl: bool,
store_type: BTreeStoreType,
) -> Result<Self, Error> {
let state_key: Key = index_key_base.new_bs_key();
let state: State = if let Some(val) = tx.get(state_key.clone()).await? {
@ -97,6 +105,19 @@ impl FtIndex {
} else {
State::default()
};
let doc_ids = Arc::new(RwLock::new(
DocIds::new(tx, index_key_base.clone(), order, store_type).await?,
));
let doc_lengths = Arc::new(RwLock::new(
DocLengths::new(tx, index_key_base.clone(), order, store_type).await?,
));
let postings = Arc::new(RwLock::new(
Postings::new(tx, index_key_base.clone(), order, store_type).await?,
));
let terms =
Arc::new(RwLock::new(Terms::new(tx, index_key_base.clone(), order, store_type).await?));
let termdocs = TermDocs::new(index_key_base.clone());
let offsets = Offsets::new(index_key_base.clone());
let mut bm25 = None;
if let Scoring::Bm {
k1,
@ -113,34 +134,19 @@ impl FtIndex {
state_key,
index_key_base,
bm25,
order,
highlighting: hl,
analyzer: az.into(),
doc_ids,
doc_lengths,
postings,
terms,
term_docs: termdocs,
offsets,
})
}
pub(crate) async fn doc_ids(&self, tx: &mut Transaction) -> Result<DocIds, Error> {
DocIds::new(tx, self.index_key_base.clone(), self.order).await
}
async fn terms(&self, tx: &mut Transaction) -> Result<Terms, Error> {
Terms::new(tx, self.index_key_base.clone(), self.order).await
}
fn term_docs(&self) -> TermDocs {
TermDocs::new(self.index_key_base.clone())
}
async fn doc_lengths(&self, tx: &mut Transaction) -> Result<DocLengths, Error> {
DocLengths::new(tx, self.index_key_base.clone(), self.order).await
}
async fn postings(&self, tx: &mut Transaction) -> Result<Postings, Error> {
Postings::new(tx, self.index_key_base.clone(), self.order).await
}
fn offsets(&self) -> Offsets {
Offsets::new(self.index_key_base.clone())
pub(super) fn doc_ids(&self) -> Arc<RwLock<DocIds>> {
self.doc_ids.clone()
}
pub(crate) async fn remove_document(
@ -149,45 +155,38 @@ impl FtIndex {
rid: &Thing,
) -> Result<(), Error> {
// Extract and remove the doc_id (if any)
let mut d = self.doc_ids(tx).await?;
if let Some(doc_id) = d.remove_doc(tx, rid.into()).await? {
if let Some(doc_id) = self.doc_ids.write().await.remove_doc(tx, rid.into()).await? {
self.state.doc_count -= 1;
// Remove the doc length
let mut l = self.doc_lengths(tx).await?;
if let Some(doc_lengths) = l.remove_doc_length(tx, doc_id).await? {
if let Some(doc_lengths) =
self.doc_lengths.write().await.remove_doc_length(tx, doc_id).await?
{
self.state.total_docs_lengths -= doc_lengths as u128;
l.finish(tx).await?;
}
// Get the term list
if let Some(term_list_vec) = tx.get(self.index_key_base.new_bk_key(doc_id)).await? {
let term_list = RoaringTreemap::try_from_val(term_list_vec)?;
// Remove the postings
let mut p = self.postings(tx).await?;
let mut t = self.terms(tx).await?;
let td = self.term_docs();
let mut p = self.postings.write().await;
let mut t = self.terms.write().await;
for term_id in &term_list {
p.remove_posting(tx, term_id, doc_id).await?;
// if the term is not present in any document in the index, we can remove it
let doc_count = td.remove_doc(tx, term_id, doc_id).await?;
let doc_count = self.term_docs.remove_doc(tx, term_id, doc_id).await?;
if doc_count == 0 {
t.remove_term_id(tx, term_id).await?;
}
}
// Remove the offsets if any
if self.highlighting {
let o = self.offsets();
for term_id in term_list {
// TODO?: Removal can be done with a prefix on doc_id
o.remove_offsets(tx, doc_id, term_id).await?;
self.offsets.remove_offsets(tx, doc_id, term_id).await?;
}
}
t.finish(tx).await?;
p.finish(tx).await?;
}
d.finish(tx).await?;
}
Ok(())
}
@ -199,12 +198,11 @@ impl FtIndex {
field_content: &Array,
) -> Result<(), Error> {
// Resolve the doc_id
let mut d = self.doc_ids(tx).await?;
let resolved = d.resolve_doc_id(tx, rid.into()).await?;
let resolved = self.doc_ids.write().await.resolve_doc_id(tx, rid.into()).await?;
let doc_id = *resolved.doc_id();
// Extract the doc_lengths, terms en frequencies (and offset)
let mut t = self.terms(tx).await?;
let mut t = self.terms.write().await;
let (doc_length, terms_and_frequencies, offsets) = if self.highlighting {
let (dl, tf, ofs) = self
.analyzer
@ -218,13 +216,13 @@ impl FtIndex {
};
// Set the doc length
let mut l = self.doc_lengths(tx).await?;
let mut dl = self.doc_lengths.write().await;
if resolved.was_existing() {
if let Some(old_doc_length) = l.get_doc_length(tx, doc_id).await? {
if let Some(old_doc_length) = dl.get_doc_length(tx, doc_id).await? {
self.state.total_docs_lengths -= old_doc_length as u128;
}
}
l.set_doc_length(tx, doc_id, doc_length).await?;
dl.set_doc_length(tx, doc_id, doc_length).await?;
// Retrieve the existing terms for this document (if any)
let term_ids_key = self.index_key_base.new_bk_key(doc_id);
@ -235,15 +233,14 @@ impl FtIndex {
};
// Set the terms postings and term docs
let term_docs = self.term_docs();
let mut terms_ids = RoaringTreemap::default();
let mut p = self.postings(tx).await?;
let mut p = self.postings.write().await;
for (term_id, term_freq) in terms_and_frequencies {
p.update_posting(tx, term_id, doc_id, term_freq).await?;
if let Some(old_term_ids) = &mut old_term_ids {
old_term_ids.remove(term_id);
}
term_docs.set_doc(tx, term_id, doc_id).await?;
self.term_docs.set_doc(tx, term_id, doc_id).await?;
terms_ids.insert(term_id);
}
@ -251,7 +248,7 @@ impl FtIndex {
if let Some(old_term_ids) = &old_term_ids {
for old_term_id in old_term_ids {
p.remove_posting(tx, old_term_id, doc_id).await?;
let doc_count = term_docs.remove_doc(tx, old_term_id, doc_id).await?;
let doc_count = self.term_docs.remove_doc(tx, old_term_id, doc_id).await?;
// if the term does not have anymore postings, we can remove the term
if doc_count == 0 {
t.remove_term_id(tx, old_term_id).await?;
@ -260,19 +257,18 @@ impl FtIndex {
}
if self.highlighting {
let o = self.offsets();
// Set the offset if any
if let Some(ofs) = offsets {
if !ofs.is_empty() {
for (tid, or) in ofs {
o.set_offsets(tx, doc_id, tid, or).await?;
self.offsets.set_offsets(tx, doc_id, tid, or).await?;
}
}
}
// In case of an update, w remove the offset for the terms that does not exist anymore
if let Some(old_term_ids) = old_term_ids {
for old_term_id in old_term_ids {
o.remove_offsets(tx, doc_id, old_term_id).await?;
self.offsets.remove_offsets(tx, doc_id, old_term_id).await?;
}
}
}
@ -288,10 +284,6 @@ impl FtIndex {
// Update the states
tx.set(self.state_key.clone(), self.state.try_to_val()?).await?;
d.finish(tx).await?;
l.finish(tx).await?;
p.finish(tx).await?;
t.finish(tx).await?;
Ok(())
}
@ -300,7 +292,7 @@ impl FtIndex {
tx: &mut Transaction,
query_string: String,
) -> Result<Vec<Option<TermId>>, Error> {
let t = self.terms(tx).await?;
let t = self.terms.read().await;
let terms = self.analyzer.extract_terms(&t, tx, query_string).await?;
Ok(terms)
}
@ -311,10 +303,9 @@ impl FtIndex {
terms: &Vec<Option<TermId>>,
) -> Result<Vec<Option<(TermId, RoaringTreemap)>>, Error> {
let mut terms_docs = Vec::with_capacity(terms.len());
let td = self.term_docs();
for opt_term_id in terms {
if let Some(term_id) = opt_term_id {
let docs = td.get_docs(tx, *term_id).await?;
let docs = self.term_docs.get_docs(tx, *term_id).await?;
if let Some(docs) = docs {
terms_docs.push(Some((*term_id, docs)));
} else {
@ -327,9 +318,8 @@ impl FtIndex {
Ok(terms_docs)
}
pub(super) async fn new_hits_iterator(
pub(super) fn new_hits_iterator(
&self,
tx: &mut Transaction,
terms_docs: Arc<Vec<Option<(TermId, RoaringTreemap)>>>,
) -> Result<Option<HitsIterator>, Error> {
let mut hits: Option<RoaringTreemap> = None;
@ -346,23 +336,21 @@ impl FtIndex {
}
if let Some(hits) = hits {
if !hits.is_empty() {
let doc_ids = self.doc_ids(tx).await?;
return Ok(Some(HitsIterator::new(doc_ids, hits)));
return Ok(Some(HitsIterator::new(self.doc_ids.clone(), hits)));
}
}
Ok(None)
}
pub(super) async fn new_scorer(
pub(super) fn new_scorer(
&self,
tx: &mut Transaction,
terms_docs: Arc<Vec<Option<(TermId, RoaringTreemap)>>>,
) -> Result<Option<BM25Scorer>, Error> {
if let Some(bm25) = &self.bm25 {
return Ok(Some(BM25Scorer::new(
self.postings(tx).await?,
self.postings.clone(),
terms_docs,
self.doc_lengths(tx).await?,
self.doc_lengths.clone(),
self.state.total_docs_lengths,
self.state.doc_count,
bm25.clone(),
@ -383,12 +371,10 @@ impl FtIndex {
doc: &Value,
) -> Result<Value, Error> {
let doc_key: Key = thg.into();
let doc_ids = self.doc_ids(tx).await?;
if let Some(doc_id) = doc_ids.get_doc_id(tx, doc_key).await? {
let o = self.offsets();
if let Some(doc_id) = self.doc_ids.read().await.get_doc_id(tx, doc_key).await? {
let mut hl = Highlighter::new(prefix, suffix, idiom, doc);
for term_id in terms.iter().flatten() {
let o = o.get_offsets(tx, doc_id, *term_id).await?;
let o = self.offsets.get_offsets(tx, doc_id, *term_id).await?;
if let Some(o) = o {
hl.highlight(o.0);
}
@ -405,12 +391,10 @@ impl FtIndex {
terms: &[Option<TermId>],
) -> Result<Value, Error> {
let doc_key: Key = thg.into();
let doc_ids = self.doc_ids(tx).await?;
if let Some(doc_id) = doc_ids.get_doc_id(tx, doc_key).await? {
let o = self.offsets();
if let Some(doc_id) = self.doc_ids.read().await.get_doc_id(tx, doc_key).await? {
let mut or = Offseter::default();
for term_id in terms.iter().flatten() {
let o = o.get_offsets(tx, doc_id, *term_id).await?;
let o = self.offsets.get_offsets(tx, doc_id, *term_id).await?;
if let Some(o) = o {
or.highlight(o.0);
}
@ -423,21 +407,29 @@ impl FtIndex {
pub(crate) async fn statistics(&self, tx: &mut Transaction) -> Result<Statistics, Error> {
// TODO do parallel execution
Ok(Statistics {
doc_ids: self.doc_ids(tx).await?.statistics(tx).await?,
terms: self.terms(tx).await?.statistics(tx).await?,
doc_lengths: self.doc_lengths(tx).await?.statistics(tx).await?,
postings: self.postings(tx).await?.statistics(tx).await?,
doc_ids: self.doc_ids.read().await.statistics(tx).await?,
terms: self.terms.read().await.statistics(tx).await?,
doc_lengths: self.doc_lengths.read().await.statistics(tx).await?,
postings: self.postings.read().await.statistics(tx).await?,
})
}
pub(crate) async fn finish(self, tx: &mut Transaction) -> Result<(), Error> {
self.doc_ids.write().await.finish(tx).await?;
self.doc_lengths.write().await.finish(tx).await?;
self.postings.write().await.finish(tx).await?;
self.terms.write().await.finish(tx).await?;
Ok(())
}
}
pub(crate) struct HitsIterator {
doc_ids: DocIds,
doc_ids: Arc<RwLock<DocIds>>,
iter: IntoIter,
}
impl HitsIterator {
fn new(doc_ids: DocIds, hits: RoaringTreemap) -> Self {
fn new(doc_ids: Arc<RwLock<DocIds>>, hits: RoaringTreemap) -> Self {
Self {
doc_ids,
iter: hits.into_iter(),
@ -449,7 +441,7 @@ impl HitsIterator {
tx: &mut Transaction,
) -> Result<Option<(Thing, DocId)>, Error> {
for doc_id in self.iter.by_ref() {
if let Some(doc_key) = self.doc_ids.get_doc_key(tx, doc_id).await? {
if let Some(doc_key) = self.doc_ids.read().await.get_doc_key(tx, doc_id).await? {
return Ok(Some((doc_key.into(), doc_id)));
}
}
@ -459,12 +451,14 @@ impl HitsIterator {
#[cfg(test)]
mod tests {
use crate::idx::btree::store::BTreeStoreType;
use crate::idx::ft::scorer::{BM25Scorer, Score};
use crate::idx::ft::{FtIndex, HitsIterator};
use crate::idx::IndexKeyBase;
use crate::kvs::{Datastore, Transaction};
use crate::sql::scoring::Scoring;
use crate::sql::statements::define::analyzer;
use crate::sql::statements::DefineAnalyzerStatement;
use crate::sql::{Array, Thing};
use std::collections::HashMap;
use std::sync::Arc;
@ -498,17 +492,45 @@ mod tests {
) -> (Option<HitsIterator>, BM25Scorer) {
let t = fti.extract_terms(tx, qs.to_string()).await.unwrap();
let td = Arc::new(fti.get_terms_docs(tx, &t).await.unwrap());
let scr = fti.new_scorer(tx, td.clone()).await.unwrap().unwrap();
let hits = fti.new_hits_iterator(tx, td).await.unwrap();
let scr = fti.new_scorer(td.clone()).unwrap().unwrap();
let hits = fti.new_hits_iterator(td).unwrap();
(hits, scr)
}
pub(super) async fn tx_fti(
ds: &Datastore,
store_type: BTreeStoreType,
az: &DefineAnalyzerStatement,
order: u32,
hl: bool,
) -> (Transaction, FtIndex) {
let write = matches!(store_type, BTreeStoreType::Write);
let mut tx = ds.transaction(write, false).await.unwrap();
let fti = FtIndex::new(
&mut tx,
az.clone(),
IndexKeyBase::default(),
order,
&Scoring::bm25(),
hl,
BTreeStoreType::Write,
)
.await
.unwrap();
(tx, fti)
}
pub(super) async fn finish(mut tx: Transaction, fti: FtIndex) {
fti.finish(&mut tx).await.unwrap();
tx.commit().await.unwrap();
}
#[test(tokio::test)]
async fn test_ft_index() {
let ds = Datastore::new("memory").await.unwrap();
let (_, az) = analyzer("DEFINE ANALYZER test TOKENIZERS blank;").unwrap();
let default_btree_order = 5;
let btree_order = 5;
let doc1: Thing = ("t", "doc1").into();
let doc2: Thing = ("t", "doc2").into();
@ -516,54 +538,25 @@ mod tests {
{
// Add one document
let mut tx = ds.transaction(true, false).await.unwrap();
let mut fti = FtIndex::new(
&mut tx,
az.clone(),
IndexKeyBase::default(),
default_btree_order,
&Scoring::bm25(),
false,
)
.await
.unwrap();
let (mut tx, mut fti) =
tx_fti(&ds, BTreeStoreType::Write, &az, btree_order, false).await;
fti.index_document(&mut tx, &doc1, &Array::from(vec!["hello the world"]))
.await
.unwrap();
tx.commit().await.unwrap();
finish(tx, fti).await;
}
{
// Add two documents
let mut tx = ds.transaction(true, false).await.unwrap();
let mut fti = FtIndex::new(
&mut tx,
az.clone(),
IndexKeyBase::default(),
default_btree_order,
&Scoring::bm25(),
false,
)
.await
.unwrap();
let (mut tx, mut fti) =
tx_fti(&ds, BTreeStoreType::Write, &az, btree_order, false).await;
fti.index_document(&mut tx, &doc2, &Array::from(vec!["a yellow hello"])).await.unwrap();
fti.index_document(&mut tx, &doc3, &Array::from(vec!["foo bar"])).await.unwrap();
tx.commit().await.unwrap();
finish(tx, fti).await;
}
{
let mut tx = ds.transaction(true, false).await.unwrap();
let fti = FtIndex::new(
&mut tx,
az.clone(),
IndexKeyBase::default(),
default_btree_order,
&Scoring::bm25(),
false,
)
.await
.unwrap();
let (mut tx, fti) = tx_fti(&ds, BTreeStoreType::Read, &az, btree_order, false).await;
// Check the statistics
let statistics = fti.statistics(&mut tx).await.unwrap();
assert_eq!(statistics.terms.keys_count, 7);
@ -593,22 +586,14 @@ mod tests {
{
// Reindex one document
let mut tx = ds.transaction(true, false).await.unwrap();
let mut fti = FtIndex::new(
&mut tx,
az.clone(),
IndexKeyBase::default(),
default_btree_order,
&Scoring::bm25(),
false,
)
.await
.unwrap();
let (mut tx, mut fti) =
tx_fti(&ds, BTreeStoreType::Write, &az, btree_order, false).await;
fti.index_document(&mut tx, &doc3, &Array::from(vec!["nobar foo"])).await.unwrap();
tx.commit().await.unwrap();
finish(tx, fti).await;
let (mut tx, fti) = tx_fti(&ds, BTreeStoreType::Read, &az, btree_order, false).await;
// We can still find 'foo'
let mut tx = ds.transaction(false, false).await.unwrap();
let (hits, scr) = search(&mut tx, &fti, "foo").await;
check_hits(&mut tx, hits, scr, vec![(&doc3, Some(0.56902087))]).await;
@ -623,26 +608,18 @@ mod tests {
{
// Remove documents
let mut tx = ds.transaction(true, false).await.unwrap();
let mut fti = FtIndex::new(
&mut tx,
az.clone(),
IndexKeyBase::default(),
default_btree_order,
&Scoring::bm25(),
false,
)
.await
.unwrap();
let (mut tx, mut fti) =
tx_fti(&ds, BTreeStoreType::Write, &az, btree_order, false).await;
fti.remove_document(&mut tx, &doc1).await.unwrap();
fti.remove_document(&mut tx, &doc2).await.unwrap();
fti.remove_document(&mut tx, &doc3).await.unwrap();
tx.commit().await.unwrap();
finish(tx, fti).await;
}
let mut tx = ds.transaction(false, false).await.unwrap();
{
let (mut tx, fti) = tx_fti(&ds, BTreeStoreType::Read, &az, btree_order, false).await;
let (hits, _) = search(&mut tx, &fti, "hello").await;
assert!(hits.is_none());
let (hits, _) = search(&mut tx, &fti, "foo").await;
assert!(hits.is_none());
}
@ -662,19 +639,10 @@ mod tests {
let doc3: Thing = ("t", "doc3").into();
let doc4: Thing = ("t", "doc4").into();
let default_btree_order = 5;
let btree_order = 5;
{
let mut tx = ds.transaction(true, false).await.unwrap();
let mut fti = FtIndex::new(
&mut tx,
az.clone(),
IndexKeyBase::default(),
default_btree_order,
&Scoring::bm25(),
hl,
)
.await
.unwrap();
let (mut tx, mut fti) =
tx_fti(&ds, BTreeStoreType::Write, &az, btree_order, hl).await;
fti.index_document(
&mut tx,
&doc1,
@ -703,21 +671,11 @@ mod tests {
)
.await
.unwrap();
tx.commit().await.unwrap();
finish(tx, fti).await;
}
{
let mut tx = ds.transaction(true, false).await.unwrap();
let fti = FtIndex::new(
&mut tx,
az.clone(),
IndexKeyBase::default(),
default_btree_order,
&Scoring::bm25(),
hl,
)
.await
.unwrap();
let (mut tx, fti) = tx_fti(&ds, BTreeStoreType::Read, &az, btree_order, hl).await;
let statistics = fti.statistics(&mut tx).await.unwrap();
assert_eq!(statistics.terms.keys_count, 17);

View file

@ -1,17 +1,21 @@
use crate::err::Error;
use crate::idx::bkeys::TrieKeys;
use crate::idx::btree::{BTree, KeyProvider, Statistics};
use crate::idx::btree::store::{BTreeNodeStore, BTreeStoreType, KeyProvider};
use crate::idx::btree::{BTree, Statistics};
use crate::idx::ft::docids::DocId;
use crate::idx::ft::terms::TermId;
use crate::idx::{btree, IndexKeyBase, SerdeState};
use crate::kvs::{Key, Transaction};
use std::sync::Arc;
use tokio::sync::Mutex;
pub(super) type TermFrequency = u64;
pub(super) struct Postings {
state_key: Key,
index_key_base: IndexKeyBase,
btree: BTree,
btree: BTree<TrieKeys>,
store: Arc<Mutex<BTreeNodeStore<TrieKeys>>>,
}
impl Postings {
@ -19,18 +23,21 @@ impl Postings {
tx: &mut Transaction,
index_key_base: IndexKeyBase,
order: u32,
store_type: BTreeStoreType,
) -> Result<Self, Error> {
let keys = KeyProvider::Postings(index_key_base.clone());
let state_key: Key = keys.get_state_key();
let state_key: Key = index_key_base.new_bp_key(None);
let state: btree::State = if let Some(val) = tx.get(state_key.clone()).await? {
btree::State::try_from_val(val)?
} else {
btree::State::new(order)
};
let store =
BTreeNodeStore::new(KeyProvider::Postings(index_key_base.clone()), store_type, 20);
Ok(Self {
state_key,
index_key_base,
btree: BTree::new(keys, state),
btree: BTree::new(state),
store,
})
}
@ -42,7 +49,8 @@ impl Postings {
term_freq: TermFrequency,
) -> Result<(), Error> {
let key = self.index_key_base.new_bf_key(term_id, doc_id);
self.btree.insert::<TrieKeys>(tx, key, term_freq).await
let mut store = self.store.lock().await;
self.btree.insert(tx, &mut store, key, term_freq).await
}
pub(super) async fn get_term_frequency(
@ -52,7 +60,8 @@ impl Postings {
doc_id: DocId,
) -> Result<Option<TermFrequency>, Error> {
let key = self.index_key_base.new_bf_key(term_id, doc_id);
self.btree.search::<TrieKeys>(tx, &key).await
let mut store = self.store.lock().await;
self.btree.search(tx, &mut store, &key).await
}
pub(super) async fn remove_posting(
@ -62,16 +71,19 @@ impl Postings {
doc_id: DocId,
) -> Result<Option<TermFrequency>, Error> {
let key = self.index_key_base.new_bf_key(term_id, doc_id);
self.btree.delete::<TrieKeys>(tx, key).await
let mut store = self.store.lock().await;
self.btree.delete(tx, &mut store, key).await
}
pub(super) async fn statistics(&self, tx: &mut Transaction) -> Result<Statistics, Error> {
self.btree.statistics::<TrieKeys>(tx).await
let mut store = self.store.lock().await;
self.btree.statistics(tx, &mut store).await
}
pub(super) async fn finish(self, tx: &mut Transaction) -> Result<(), Error> {
if self.btree.is_updated() {
tx.set(self.state_key, self.btree.get_state().try_to_val()?).await?;
pub(super) async fn finish(&self, tx: &mut Transaction) -> Result<(), Error> {
let updated = self.store.lock().await.finish(tx).await?;
if self.btree.is_updated() || updated {
tx.set(self.state_key.clone(), self.btree.get_state().try_to_val()?).await?;
}
Ok(())
}
@ -79,6 +91,7 @@ impl Postings {
#[cfg(test)]
mod tests {
use crate::idx::btree::store::BTreeStoreType;
use crate::idx::ft::postings::Postings;
use crate::idx::IndexKeyBase;
use crate::kvs::Datastore;
@ -90,10 +103,15 @@ mod tests {
let ds = Datastore::new("memory").await.unwrap();
let mut tx = ds.transaction(true, false).await.unwrap();
// Check empty state
let mut p =
Postings::new(&mut tx, IndexKeyBase::default(), DEFAULT_BTREE_ORDER).await.unwrap();
let mut p = Postings::new(
&mut tx,
IndexKeyBase::default(),
DEFAULT_BTREE_ORDER,
BTreeStoreType::Write,
)
.await
.unwrap();
assert_eq!(p.statistics(&mut tx).await.unwrap().keys_count, 0);
@ -104,8 +122,14 @@ mod tests {
tx.commit().await.unwrap();
let mut tx = ds.transaction(true, false).await.unwrap();
let mut p =
Postings::new(&mut tx, IndexKeyBase::default(), DEFAULT_BTREE_ORDER).await.unwrap();
let mut p = Postings::new(
&mut tx,
IndexKeyBase::default(),
DEFAULT_BTREE_ORDER,
BTreeStoreType::Write,
)
.await
.unwrap();
assert_eq!(p.statistics(&mut tx).await.unwrap().keys_count, 2);
assert_eq!(p.get_term_frequency(&mut tx, 1, 2).await.unwrap(), Some(3));

View file

@ -7,13 +7,14 @@ use crate::idx::ft::Bm25Params;
use crate::kvs::Transaction;
use roaring::RoaringTreemap;
use std::sync::Arc;
use tokio::sync::RwLock;
pub(super) type Score = f32;
pub(crate) struct BM25Scorer {
postings: Postings,
postings: Arc<RwLock<Postings>>,
terms_docs: Arc<Vec<Option<(TermId, RoaringTreemap)>>>,
doc_lengths: DocLengths,
doc_lengths: Arc<RwLock<DocLengths>>,
average_doc_length: f32,
doc_count: f32,
bm25: Bm25Params,
@ -21,9 +22,9 @@ pub(crate) struct BM25Scorer {
impl BM25Scorer {
pub(super) fn new(
postings: Postings,
postings: Arc<RwLock<Postings>>,
terms_docs: Arc<Vec<Option<(TermId, RoaringTreemap)>>>,
doc_lengths: DocLengths,
doc_lengths: Arc<RwLock<DocLengths>>,
total_docs_length: u128,
doc_count: u64,
bm25: Bm25Params,
@ -45,7 +46,8 @@ impl BM25Scorer {
term_doc_count: DocLength,
term_frequency: TermFrequency,
) -> Result<Score, Error> {
let doc_length = self.doc_lengths.get_doc_length(tx, doc_id).await?.unwrap_or(0);
let doc_length =
self.doc_lengths.read().await.get_doc_length(tx, doc_id).await?.unwrap_or(0);
Ok(self.compute_bm25_score(term_frequency as f32, term_doc_count as f32, doc_length as f32))
}
@ -58,7 +60,7 @@ impl BM25Scorer {
for (term_id, docs) in self.terms_docs.iter().flatten() {
if docs.contains(doc_id) {
if let Some(term_freq) =
self.postings.get_term_frequency(tx, *term_id, doc_id).await?
self.postings.read().await.get_term_frequency(tx, *term_id, doc_id).await?
{
sc += self.term_score(tx, doc_id, docs.len(), term_freq).await?;
}

View file

@ -1,17 +1,21 @@
use crate::err::Error;
use crate::idx::bkeys::FstKeys;
use crate::idx::btree::{BTree, KeyProvider, Statistics};
use crate::idx::btree::store::{BTreeNodeStore, BTreeStoreType, KeyProvider};
use crate::idx::btree::{BTree, Statistics};
use crate::idx::{btree, IndexKeyBase, SerdeState};
use crate::kvs::{Key, Transaction};
use roaring::RoaringTreemap;
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use tokio::sync::Mutex;
pub(crate) type TermId = u64;
pub(super) struct Terms {
state_key: Key,
index_key_base: IndexKeyBase,
btree: BTree,
btree: BTree<FstKeys>,
store: Arc<Mutex<BTreeNodeStore<FstKeys>>>,
available_ids: Option<RoaringTreemap>,
next_term_id: TermId,
updated: bool,
@ -22,18 +26,20 @@ impl Terms {
tx: &mut Transaction,
index_key_base: IndexKeyBase,
default_btree_order: u32,
store_type: BTreeStoreType,
) -> Result<Self, Error> {
let keys = KeyProvider::Terms(index_key_base.clone());
let state_key: Key = keys.get_state_key();
let state_key: Key = index_key_base.new_bt_key(None);
let state: State = if let Some(val) = tx.get(state_key.clone()).await? {
State::try_from_val(val)?
} else {
State::new(default_btree_order)
};
let store = BTreeNodeStore::new(KeyProvider::Terms(index_key_base.clone()), store_type, 20);
Ok(Self {
state_key,
index_key_base,
btree: BTree::new(keys, state.btree),
btree: BTree::new(state.btree),
store,
available_ids: state.available_ids,
next_term_id: state.next_term_id,
updated: false,
@ -63,15 +69,18 @@ impl Terms {
term: &str,
) -> Result<TermId, Error> {
let term_key = term.into();
if let Some(term_id) = self.btree.search::<FstKeys>(tx, &term_key).await? {
Ok(term_id)
} else {
let term_id = self.get_next_term_id();
tx.set(self.index_key_base.new_bu_key(term_id), term_key.clone()).await?;
self.btree.insert::<FstKeys>(tx, term_key, term_id).await?;
self.updated = true;
Ok(term_id)
{
let mut store = self.store.lock().await;
if let Some(term_id) = self.btree.search(tx, &mut store, &term_key).await? {
return Ok(term_id);
}
}
let term_id = self.get_next_term_id();
tx.set(self.index_key_base.new_bu_key(term_id), term_key.clone()).await?;
let mut store = self.store.lock().await;
self.btree.insert(tx, &mut store, term_key, term_id).await?;
self.updated = true;
Ok(term_id)
}
pub(super) async fn get_term_id(
@ -79,7 +88,8 @@ impl Terms {
tx: &mut Transaction,
term: &str,
) -> Result<Option<TermId>, Error> {
self.btree.search::<FstKeys>(tx, &term.into()).await
let mut store = self.store.lock().await;
self.btree.search(tx, &mut store, &term.into()).await
}
pub(super) async fn remove_term_id(
@ -89,9 +99,8 @@ impl Terms {
) -> Result<(), Error> {
let term_id_key = self.index_key_base.new_bu_key(term_id);
if let Some(term_key) = tx.get(term_id_key.clone()).await? {
debug!("Delete In {}", String::from_utf8(term_key.clone()).unwrap());
self.btree.delete::<FstKeys>(tx, term_key.clone()).await?;
debug!("Delete Out {}", String::from_utf8(term_key.clone()).unwrap());
let mut store = self.store.lock().await;
self.btree.delete(tx, &mut store, term_key.clone()).await?;
tx.del(term_id_key).await?;
if let Some(available_ids) = &mut self.available_ids {
available_ids.insert(term_id);
@ -106,17 +115,19 @@ impl Terms {
}
pub(super) async fn statistics(&self, tx: &mut Transaction) -> Result<Statistics, Error> {
self.btree.statistics::<FstKeys>(tx).await
let mut store = self.store.lock().await;
self.btree.statistics(tx, &mut store).await
}
pub(super) async fn finish(self, tx: &mut Transaction) -> Result<(), Error> {
if self.updated || self.btree.is_updated() {
pub(super) async fn finish(&mut self, tx: &mut Transaction) -> Result<(), Error> {
let updated = self.store.lock().await.finish(tx).await?;
if self.updated || updated {
let state = State {
btree: self.btree.get_state().clone(),
available_ids: self.available_ids,
available_ids: self.available_ids.take(),
next_term_id: self.next_term_id,
};
tx.set(self.state_key, state.try_to_val()?).await?;
tx.set(self.state_key.clone(), state.try_to_val()?).await?;
}
Ok(())
}
@ -143,6 +154,7 @@ impl State {
#[cfg(test)]
mod tests {
use crate::idx::btree::store::BTreeStoreType;
use crate::idx::ft::postings::TermFrequency;
use crate::idx::ft::terms::Terms;
use crate::idx::IndexKeyBase;
@ -174,48 +186,63 @@ mod tests {
let ds = Datastore::new("memory").await.unwrap();
let mut tx = ds.transaction(true, false).await.unwrap();
let t = Terms::new(&mut tx, idx.clone(), BTREE_ORDER).await.unwrap();
t.finish(&mut tx).await.unwrap();
tx.commit().await.unwrap();
{
let mut tx = ds.transaction(true, false).await.unwrap();
let mut t =
Terms::new(&mut tx, idx.clone(), BTREE_ORDER, BTreeStoreType::Write).await.unwrap();
t.finish(&mut tx).await.unwrap();
tx.commit().await.unwrap();
}
// Resolve a first term
let mut tx = ds.transaction(true, false).await.unwrap();
let mut t = Terms::new(&mut tx, idx.clone(), BTREE_ORDER).await.unwrap();
assert_eq!(t.resolve_term_id(&mut tx, "C").await.unwrap(), 0);
assert_eq!(t.statistics(&mut tx).await.unwrap().keys_count, 1);
t.finish(&mut tx).await.unwrap();
tx.commit().await.unwrap();
{
let mut tx = ds.transaction(true, false).await.unwrap();
let mut t =
Terms::new(&mut tx, idx.clone(), BTREE_ORDER, BTreeStoreType::Write).await.unwrap();
assert_eq!(t.resolve_term_id(&mut tx, "C").await.unwrap(), 0);
assert_eq!(t.statistics(&mut tx).await.unwrap().keys_count, 1);
t.finish(&mut tx).await.unwrap();
tx.commit().await.unwrap();
}
// Resolve a second term
let mut tx = ds.transaction(true, false).await.unwrap();
let mut t = Terms::new(&mut tx, idx.clone(), BTREE_ORDER).await.unwrap();
assert_eq!(t.resolve_term_id(&mut tx, "D").await.unwrap(), 1);
assert_eq!(t.statistics(&mut tx).await.unwrap().keys_count, 2);
t.finish(&mut tx).await.unwrap();
tx.commit().await.unwrap();
{
let mut tx = ds.transaction(true, false).await.unwrap();
let mut t =
Terms::new(&mut tx, idx.clone(), BTREE_ORDER, BTreeStoreType::Write).await.unwrap();
assert_eq!(t.resolve_term_id(&mut tx, "D").await.unwrap(), 1);
assert_eq!(t.statistics(&mut tx).await.unwrap().keys_count, 2);
t.finish(&mut tx).await.unwrap();
tx.commit().await.unwrap();
}
// Resolve two existing terms with new frequencies
let mut tx = ds.transaction(true, false).await.unwrap();
let mut t = Terms::new(&mut tx, idx.clone(), BTREE_ORDER).await.unwrap();
assert_eq!(t.resolve_term_id(&mut tx, "C").await.unwrap(), 0);
assert_eq!(t.resolve_term_id(&mut tx, "D").await.unwrap(), 1);
{
let mut tx = ds.transaction(true, false).await.unwrap();
let mut t =
Terms::new(&mut tx, idx.clone(), BTREE_ORDER, BTreeStoreType::Write).await.unwrap();
assert_eq!(t.resolve_term_id(&mut tx, "C").await.unwrap(), 0);
assert_eq!(t.resolve_term_id(&mut tx, "D").await.unwrap(), 1);
assert_eq!(t.statistics(&mut tx).await.unwrap().keys_count, 2);
t.finish(&mut tx).await.unwrap();
tx.commit().await.unwrap();
assert_eq!(t.statistics(&mut tx).await.unwrap().keys_count, 2);
t.finish(&mut tx).await.unwrap();
tx.commit().await.unwrap();
}
// Resolve one existing terms and two new terms
let mut tx = ds.transaction(true, false).await.unwrap();
let mut t = Terms::new(&mut tx, idx.clone(), BTREE_ORDER).await.unwrap();
{
let mut tx = ds.transaction(true, false).await.unwrap();
let mut t =
Terms::new(&mut tx, idx.clone(), BTREE_ORDER, BTreeStoreType::Write).await.unwrap();
assert_eq!(t.resolve_term_id(&mut tx, "A").await.unwrap(), 2);
assert_eq!(t.resolve_term_id(&mut tx, "C").await.unwrap(), 0);
assert_eq!(t.resolve_term_id(&mut tx, "E").await.unwrap(), 3);
assert_eq!(t.resolve_term_id(&mut tx, "A").await.unwrap(), 2);
assert_eq!(t.resolve_term_id(&mut tx, "C").await.unwrap(), 0);
assert_eq!(t.resolve_term_id(&mut tx, "E").await.unwrap(), 3);
assert_eq!(t.statistics(&mut tx).await.unwrap().keys_count, 4);
t.finish(&mut tx).await.unwrap();
tx.commit().await.unwrap();
assert_eq!(t.statistics(&mut tx).await.unwrap().keys_count, 4);
t.finish(&mut tx).await.unwrap();
tx.commit().await.unwrap();
}
}
#[tokio::test]
@ -227,7 +254,8 @@ mod tests {
let ds = Datastore::new("memory").await.unwrap();
let mut tx = ds.transaction(true, false).await.unwrap();
let mut t = Terms::new(&mut tx, idx.clone(), BTREE_ORDER).await.unwrap();
let mut t =
Terms::new(&mut tx, idx.clone(), BTREE_ORDER, BTreeStoreType::Write).await.unwrap();
// Check removing an non-existing term id returns None
assert!(t.remove_term_id(&mut tx, 0).await.is_ok());
@ -270,7 +298,9 @@ mod tests {
let ds = Datastore::new("memory").await.unwrap();
for _ in 0..100 {
let mut tx = ds.transaction(true, false).await.unwrap();
let mut t = Terms::new(&mut tx, IndexKeyBase::default(), 100).await.unwrap();
let mut t = Terms::new(&mut tx, IndexKeyBase::default(), 100, BTreeStoreType::Write)
.await
.unwrap();
let terms_string = random_term_freq_vec(50);
for (term, _) in terms_string {
t.resolve_term_id(&mut tx, &term).await.unwrap();
@ -285,7 +315,9 @@ mod tests {
let ds = Datastore::new("memory").await.unwrap();
for _ in 0..10 {
let mut tx = ds.transaction(true, false).await.unwrap();
let mut t = Terms::new(&mut tx, IndexKeyBase::default(), 100).await.unwrap();
let mut t = Terms::new(&mut tx, IndexKeyBase::default(), 100, BTreeStoreType::Write)
.await
.unwrap();
for _ in 0..10 {
let terms_string = random_term_freq_vec(50);
for (term, _) in terms_string {

View file

@ -1,5 +1,5 @@
mod bkeys;
pub(crate) mod btree;
pub mod bkeys;
pub mod btree;
pub(crate) mod ft;
pub(crate) mod planner;
@ -27,7 +27,7 @@ use serde::Serialize;
use std::sync::Arc;
#[derive(Debug, Clone, Default)]
pub(crate) struct IndexKeyBase {
pub struct IndexKeyBase {
inner: Arc<Inner>,
}

View file

@ -1,5 +1,6 @@
use crate::dbs::{Options, Transaction};
use crate::err::Error;
use crate::idx::btree::store::BTreeStoreType;
use crate::idx::ft::docids::{DocId, DocIds};
use crate::idx::ft::scorer::BM25Scorer;
use crate::idx::ft::termdocs::TermsDocs;
@ -15,6 +16,7 @@ use crate::sql::{Expression, Table, Thing, Value};
use roaring::RoaringTreemap;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::RwLock;
pub(crate) struct QueryExecutor {
table: String,
@ -58,7 +60,8 @@ impl QueryExecutor {
} else {
let ikb = IndexKeyBase::new(opt, io.ix());
let az = run.get_az(opt.ns(), opt.db(), az.as_str()).await?;
let ft = FtIndex::new(&mut run, az, ikb, *order, sc, *hl).await?;
let ft = FtIndex::new(&mut run, az, ikb, *order, sc, *hl, BTreeStoreType::Read)
.await?;
let ixn = ixn.to_owned();
if entry.is_none() {
entry = FtEntry::new(&mut run, &ft, io).await?;
@ -130,7 +133,9 @@ impl QueryExecutor {
if let Some(ft) = self.exp_entries.get(exp) {
let mut run = txn.lock().await;
let doc_key: Key = thg.into();
if let Some(doc_id) = ft.0.doc_ids.get_doc_id(&mut run, doc_key).await? {
if let Some(doc_id) =
ft.0.doc_ids.read().await.get_doc_id(&mut run, doc_key).await?
{
let term_goals = ft.0.terms_docs.len();
// If there is no terms, it can't be a match
if term_goals == 0 {
@ -218,7 +223,7 @@ impl QueryExecutor {
let mut run = txn.lock().await;
if doc_id.is_none() {
let key: Key = rid.into();
doc_id = e.0.doc_ids.get_doc_id(&mut run, key).await?;
doc_id = e.0.doc_ids.read().await.get_doc_id(&mut run, key).await?;
};
if let Some(doc_id) = doc_id {
let score = scorer.score(&mut run, doc_id).await?;
@ -237,7 +242,7 @@ struct FtEntry(Arc<Inner>);
struct Inner {
index_option: IndexOption,
doc_ids: DocIds,
doc_ids: Arc<RwLock<DocIds>>,
terms: Vec<Option<TermId>>,
terms_docs: Arc<Vec<Option<(TermId, RoaringTreemap)>>>,
scorer: Option<BM25Scorer>,
@ -254,8 +259,8 @@ impl FtEntry {
let terms_docs = Arc::new(ft.get_terms_docs(tx, &terms).await?);
Ok(Some(Self(Arc::new(Inner {
index_option: io,
doc_ids: ft.doc_ids(tx).await?,
scorer: ft.new_scorer(tx, terms_docs.clone()).await?,
doc_ids: ft.doc_ids(),
scorer: ft.new_scorer(terms_docs.clone())?,
terms,
terms_docs,
}))))

View file

@ -1,5 +1,6 @@
use crate::dbs::{Options, Transaction};
use crate::err::Error;
use crate::idx::btree::store::BTreeStoreType;
use crate::idx::ft::docids::{DocId, NO_DOC_ID};
use crate::idx::ft::termdocs::TermsDocs;
use crate::idx::ft::{FtIndex, HitsIterator, MatchRef};
@ -273,9 +274,9 @@ impl MatchesThingIterator {
{
let mut run = txn.lock().await;
let az = run.get_az(opt.ns(), opt.db(), az.as_str()).await?;
let fti = FtIndex::new(&mut run, az, ikb, order, sc, hl).await?;
let fti = FtIndex::new(&mut run, az, ikb, order, sc, hl, BTreeStoreType::Read).await?;
if let Some(terms_docs) = terms_docs {
let hits = fti.new_hits_iterator(&mut run, terms_docs).await?;
let hits = fti.new_hits_iterator(terms_docs)?;
Ok(Self {
hits,
})

View file

@ -3,6 +3,7 @@ use crate::dbs::Options;
use crate::dbs::{Level, Transaction};
use crate::doc::CursorDoc;
use crate::err::Error;
use crate::idx::btree::store::BTreeStoreType;
use crate::idx::ft::FtIndex;
use crate::idx::IndexKeyBase;
use crate::sql::comment::shouldbespace;
@ -51,7 +52,16 @@ impl AnalyzeStatement {
hl,
} => {
let az = run.get_az(opt.ns(), opt.db(), az.as_str()).await?;
let ft = FtIndex::new(&mut run, az, ikb, *order, sc, *hl).await?;
let ft = FtIndex::new(
&mut run,
az,
ikb,
*order,
sc,
*hl,
BTreeStoreType::Traversal,
)
.await?;
ft.statistics(&mut run).await?
}
_ => {