From a6c50cb5f58fc984831ddd8ac4ca3d58b1b574f7 Mon Sep 17 00:00:00 2001 From: Emmanuel Keller Date: Wed, 13 Dec 2023 13:37:24 +0000 Subject: [PATCH] Feat: In Memory index store (#3020) --- Cargo.lock | 23 +- lib/Cargo.toml | 2 +- lib/benches/index_btree.rs | 28 +- lib/benches/index_mtree.rs | 72 +- lib/src/ctx/context.rs | 32 +- lib/src/dbs/options.rs | 2 +- lib/src/doc/index.rs | 26 +- lib/src/err/mod.rs | 8 +- lib/src/idx/docids.rs | 109 +- lib/src/idx/ft/doclength.rs | 169 +- lib/src/idx/ft/mod.rs | 149 +- lib/src/idx/ft/offsets.rs | 8 +- lib/src/idx/ft/postings.rs | 127 +- lib/src/idx/ft/terms.rs | 171 +- lib/src/idx/planner/executor.rs | 26 +- lib/src/idx/trees/bkeys.rs | 74 +- lib/src/idx/trees/btree.rs | 1582 +++++++++++------ lib/src/idx/trees/mtree.rs | 721 ++++---- lib/src/idx/trees/store.rs | 322 ---- lib/src/idx/trees/store/cache.rs | 217 +++ lib/src/idx/trees/store/mod.rs | 306 ++++ lib/src/idx/trees/store/tree.rs | 177 ++ lib/src/idx/trees/vector.rs | 6 +- lib/src/kvs/ds.rs | 19 +- lib/src/kvs/tx.rs | 13 +- lib/src/sql/index.rs | 28 +- lib/src/sql/regex.rs | 29 +- lib/src/sql/statements/analyze.rs | 26 +- lib/src/sql/statements/remove/index.rs | 4 +- lib/src/sql/statements/remove/namespace.rs | 3 +- lib/src/sql/statements/remove/table.rs | 4 +- lib/src/sql/value/serde/ser/index/mod.rs | 12 +- .../sql/value/serde/ser/index/mtreeparams.rs | 12 + .../sql/value/serde/ser/index/searchparams.rs | 32 +- lib/src/syn/v1/part/index.rs | 33 + lib/src/syn/v1/stmt/define/index.rs | 30 +- lib/tests/define.rs | 5 +- lib/tests/matches.rs | 22 +- lib/tests/remove.rs | 3 + 39 files changed, 3009 insertions(+), 1623 deletions(-) delete mode 100644 lib/src/idx/trees/store.rs create mode 100644 lib/src/idx/trees/store/cache.rs create mode 100644 lib/src/idx/trees/store/mod.rs create mode 100644 lib/src/idx/trees/store/tree.rs diff --git a/Cargo.lock b/Cargo.lock index 1b852d09..6366a67e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3014,15 +3014,6 @@ version = "0.4.20" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f" -[[package]] -name = "lru" -version = "0.12.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2994eeba8ed550fd9b47a0b38f0242bc3344e496483c6180b69139cc2fa5d1d7" -dependencies = [ - "hashbrown 0.14.3", -] - [[package]] name = "lz4-sys" version = "1.9.4" @@ -3994,6 +3985,18 @@ dependencies = [ "memchr", ] +[[package]] +name = "quick_cache" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f69f8d22fa3f34f3083d9a4375c038732c7a7e964de1beb81c544da92dfc40b8" +dependencies = [ + "ahash 0.8.6", + "equivalent", + "hashbrown 0.14.3", + "parking_lot", +] + [[package]] name = "quote" version = "1.0.33" @@ -5311,7 +5314,6 @@ dependencies = [ "indxdb", "ipnet", "lexicmp", - "lru", "md-5", "nanoid", "native-tls", @@ -5325,6 +5327,7 @@ dependencies = [ "pharos", "pin-project-lite", "pprof", + "quick_cache", "radix_trie", "rand 0.8.5", "regex", diff --git a/lib/Cargo.toml b/lib/Cargo.toml index fdf1b3f4..1b4867cb 100644 --- a/lib/Cargo.toml +++ b/lib/Cargo.toml @@ -82,7 +82,6 @@ ipnet = "2.9.0" js = { version = "=0.4.0-beta.4", package = "rquickjs", features = ["array-buffer", "bindgen", "classes", "futures", "loader", "macro", "parallel", "properties","rust-alloc"], optional = true } jsonwebtoken = { version = "8.3.0-surreal.1", package = "surrealdb-jsonwebtoken" } lexicmp = "0.1.0" -lru = "0.12.1" md-5 = "0.10.6" nanoid = "0.4.0" native-tls = { version = "0.2.11", optional = true } @@ -94,6 +93,7 @@ once_cell = "1.18.0" path-clean = "1.0.1" pbkdf2 = { version = "0.12.2", features = ["simple"] } pin-project-lite = "0.2.13" +quick_cache = "0.4.0" radix_trie = { version = "0.2.1", features = ["serde"] } rand = "0.8.5" regex = "1.10.2" diff --git a/lib/benches/index_btree.rs b/lib/benches/index_btree.rs index 375cb1d9..fff79531 100644 --- a/lib/benches/index_btree.rs +++ b/lib/benches/index_btree.rs @@ -5,7 +5,8 @@ use std::fmt::Debug; use std::time::Duration; use surrealdb::idx::trees::bkeys::{BKeys, FstKeys, TrieKeys}; use surrealdb::idx::trees::btree::{BState, BTree, Payload}; -use surrealdb::idx::trees::store::{TreeNodeProvider, TreeNodeStore, TreeStoreType}; +use surrealdb::idx::trees::store::cache::TreeCache; +use surrealdb::idx::trees::store::{TreeNodeProvider, TreeStore}; use surrealdb::kvs::{Datastore, Key, LockType::*, TransactionType::*}; use tokio::runtime::Runtime; macro_rules! get_key_value { @@ -24,12 +25,22 @@ fn bench_index_btree(c: &mut Criterion) { group.bench_function("trees-insertion-fst", |b| { b.to_async(Runtime::new().unwrap()) - .iter(|| bench::<_, FstKeys>(samples_len, |i| get_key_value!(samples[i]))) + .iter(|| bench::<_, FstKeys>(samples_len, 100, |i| get_key_value!(samples[i]))) }); group.bench_function("trees-insertion-trie", |b| { b.to_async(Runtime::new().unwrap()) - .iter(|| bench::<_, TrieKeys>(samples_len, |i| get_key_value!(samples[i]))) + .iter(|| bench::<_, TrieKeys>(samples_len, 100, |i| get_key_value!(samples[i]))) + }); + + group.bench_function("trees-insertion-fst-fullcache", |b| { + b.to_async(Runtime::new().unwrap()) + .iter(|| bench::<_, FstKeys>(samples_len, 0, |i| get_key_value!(samples[i]))) + }); + + group.bench_function("trees-insertion-trie-fullcache", |b| { + b.to_async(Runtime::new().unwrap()) + .iter(|| bench::<_, TrieKeys>(samples_len, 0, |i| get_key_value!(samples[i]))) }); group.finish(); @@ -47,23 +58,24 @@ fn setup() -> (usize, Vec) { (samples_len, samples) } -async fn bench(samples_size: usize, sample_provider: F) +async fn bench(samples_size: usize, cache_size: usize, sample_provider: F) where F: Fn(usize) -> (Key, Payload), - BK: BKeys + Default + Debug, + BK: BKeys + Clone + Default + Debug, { let ds = Datastore::new("memory").await.unwrap(); let mut tx = ds.transaction(Write, Optimistic).await.unwrap(); let mut t = BTree::::new(BState::new(100)); - let s = TreeNodeStore::new(TreeNodeProvider::Debug, TreeStoreType::Write, 20); - let mut s = s.lock().await; + let c = TreeCache::new(0, TreeNodeProvider::Debug, cache_size); + let mut s = TreeStore::new(TreeNodeProvider::Debug, c, Write).await; for i in 0..samples_size { let (key, payload) = sample_provider(i); // Insert the sample t.insert(&mut tx, &mut s, key.clone(), payload).await.unwrap(); // Search for it - black_box(t.search(&mut tx, &mut s, &key).await.unwrap()); + black_box(t.search_mut(&mut tx, &mut s, &key).await.unwrap()); } + s.finish(&mut tx).await.unwrap(); tx.commit().await.unwrap(); } diff --git a/lib/benches/index_mtree.rs b/lib/benches/index_mtree.rs index 4de58950..864f4c2b 100644 --- a/lib/benches/index_mtree.rs +++ b/lib/benches/index_mtree.rs @@ -7,7 +7,8 @@ use std::sync::Arc; use std::time::Duration; use surrealdb::idx::docids::DocId; use surrealdb::idx::trees::mtree::{MState, MTree}; -use surrealdb::idx::trees::store::{TreeNodeProvider, TreeNodeStore, TreeStoreType}; +use surrealdb::idx::trees::store::cache::TreeCache; +use surrealdb::idx::trees::store::{TreeNodeProvider, TreeStore}; use surrealdb::idx::trees::vector::Vector; use surrealdb::kvs::Datastore; use surrealdb::kvs::LockType::Optimistic; @@ -16,19 +17,35 @@ use surrealdb::sql::index::Distance; use tokio::runtime::Runtime; fn bench_index_mtree_dim_3(c: &mut Criterion) { - bench_index_mtree(c, 1_000, 100_000, 3, 120); + bench_index_mtree(c, 1_000, 100_000, 3, 120, 100); +} + +fn bench_index_mtree_dim_3_full_cache(c: &mut Criterion) { + bench_index_mtree(c, 1_000, 100_000, 3, 120, 0); } fn bench_index_mtree_dim_50(c: &mut Criterion) { - bench_index_mtree(c, 100, 10_000, 50, 20); + bench_index_mtree(c, 100, 10_000, 50, 20, 100); +} + +fn bench_index_mtree_dim_50_full_cache(c: &mut Criterion) { + bench_index_mtree(c, 100, 10_000, 50, 20, 0); } fn bench_index_mtree_dim_300(c: &mut Criterion) { - bench_index_mtree(c, 50, 5_000, 300, 40); + bench_index_mtree(c, 50, 5_000, 300, 40, 100); +} + +fn bench_index_mtree_dim_300_full_cache(c: &mut Criterion) { + bench_index_mtree(c, 50, 5_000, 300, 40, 0); } fn bench_index_mtree_dim_2048(c: &mut Criterion) { - bench_index_mtree(c, 10, 1_000, 2048, 60); + bench_index_mtree(c, 10, 1_000, 2048, 60, 100); +} + +fn bench_index_mtree_dim_2048_full_cache(c: &mut Criterion) { + bench_index_mtree(c, 10, 1_000, 2048, 60, 0); } fn bench_index_mtree( @@ -37,6 +54,7 @@ fn bench_index_mtree( release_samples_len: usize, vector_dimension: usize, measurement_secs: u64, + cache_size: usize, ) { let samples_len = if cfg!(debug_assertions) { debug_samples_len // Debug is slow @@ -50,22 +68,26 @@ fn bench_index_mtree( // Indexing benchmark group { let mut group = get_group(c, "index_mtree_insert", samples_len, measurement_secs); - let id = format!("len_{}_dim_{}", samples_len, vector_dimension); + let id = format!("len_{}_dim_{}_cache_{}", samples_len, vector_dimension, cache_size); group.bench_function(id, |b| { b.to_async(Runtime::new().unwrap()) - .iter(|| insert_objects(&ds, samples_len, vector_dimension)); + .iter(|| insert_objects(&ds, samples_len, vector_dimension, cache_size)); }); group.finish(); } // Knn lookup benchmark group { - let mut group = get_group(c, "index_mtree_lookup", 100_000, 10); + let mut group = get_group(c, "index_mtree_lookup", samples_len, 10); for knn in [1, 10] { - let id = format!("knn_{}_len_{}_dim_{}", knn, samples_len, vector_dimension); + let id = format!( + "knn_{}_len_{}_dim_{}_cache_{}", + knn, samples_len, vector_dimension, cache_size + ); group.bench_function(id, |b| { - b.to_async(Runtime::new().unwrap()) - .iter(|| knn_lookup_objects(&ds, 100_000, vector_dimension, knn)); + b.to_async(Runtime::new().unwrap()).iter(|| { + knn_lookup_objects(&ds, samples_len, vector_dimension, knn, cache_size) + }); }); } group.finish(); @@ -96,26 +118,38 @@ fn mtree() -> MTree { MTree::new(MState::new(40), Distance::Euclidean) } -async fn insert_objects(ds: &Datastore, samples_size: usize, vector_size: usize) { +async fn insert_objects( + ds: &Datastore, + samples_size: usize, + vector_size: usize, + cache_size: usize, +) { let mut rng = thread_rng(); let mut t = mtree(); let mut tx = ds.transaction(Write, Optimistic).await.unwrap(); - let s = TreeNodeStore::new(TreeNodeProvider::Debug, TreeStoreType::Write, 20); - let mut s = s.lock().await; + let c = TreeCache::new(0, TreeNodeProvider::Debug, cache_size); + let mut s = TreeStore::new(TreeNodeProvider::Debug, c.clone(), Write).await; for i in 0..samples_size { let object = random_object(&mut rng, vector_size); // Insert the sample t.insert(&mut tx, &mut s, object, i as DocId).await.unwrap(); } + s.finish(&mut tx).await.unwrap(); tx.commit().await.unwrap(); } -async fn knn_lookup_objects(ds: &Datastore, samples_size: usize, vector_size: usize, knn: usize) { +async fn knn_lookup_objects( + ds: &Datastore, + samples_size: usize, + vector_size: usize, + knn: usize, + cache_size: usize, +) { let mut rng = thread_rng(); let t = mtree(); let mut tx = ds.transaction(Read, Optimistic).await.unwrap(); - let s = TreeNodeStore::new(TreeNodeProvider::Debug, TreeStoreType::Read, 20); - let mut s = s.lock().await; + let c = TreeCache::new(0, TreeNodeProvider::Debug, cache_size); + let mut s = TreeStore::new(TreeNodeProvider::Debug, c, Read).await; for _ in 0..samples_size { let object = Arc::new(random_object(&mut rng, vector_size)); // Insert the sample @@ -127,8 +161,12 @@ async fn knn_lookup_objects(ds: &Datastore, samples_size: usize, vector_size: us criterion_group!( benches, bench_index_mtree_dim_3, + bench_index_mtree_dim_3_full_cache, bench_index_mtree_dim_50, + bench_index_mtree_dim_50_full_cache, bench_index_mtree_dim_300, + bench_index_mtree_dim_300_full_cache, bench_index_mtree_dim_2048, + bench_index_mtree_dim_2048_full_cache ); criterion_main!(benches); diff --git a/lib/src/ctx/context.rs b/lib/src/ctx/context.rs index 22c855c7..10d86cfd 100644 --- a/lib/src/ctx/context.rs +++ b/lib/src/ctx/context.rs @@ -6,6 +6,7 @@ use crate::dbs::capabilities::NetTarget; use crate::dbs::{Capabilities, Notification}; use crate::err::Error; use crate::idx::planner::QueryPlanner; +use crate::idx::trees::store::IndexStores; use crate::sql::value::Value; use channel::Sender; use std::borrow::Cow; @@ -43,6 +44,8 @@ pub struct Context<'a> { notifications: Option>, // An optional query planner query_planner: Option<&'a QueryPlanner<'a>>, + // The index store + index_stores: IndexStores, // Capabilities capabilities: Arc, } @@ -65,9 +68,29 @@ impl<'a> Debug for Context<'a> { } impl<'a> Context<'a> { + pub(crate) fn from_ds( + time_out: Option, + capabilities: Capabilities, + index_stores: IndexStores, + ) -> Context<'a> { + let mut ctx = Self { + values: HashMap::default(), + parent: None, + deadline: None, + cancelled: Arc::new(AtomicBool::new(false)), + notifications: None, + query_planner: None, + capabilities: Arc::new(capabilities), + index_stores, + }; + if let Some(timeout) = time_out { + ctx.add_timeout(timeout); + } + ctx + } /// Create an empty background context. pub fn background() -> Self { - Context { + Self { values: HashMap::default(), parent: None, deadline: None, @@ -75,6 +98,7 @@ impl<'a> Context<'a> { notifications: None, query_planner: None, capabilities: Arc::new(Capabilities::default()), + index_stores: IndexStores::default(), } } @@ -88,6 +112,7 @@ impl<'a> Context<'a> { notifications: parent.notifications.clone(), query_planner: parent.query_planner, capabilities: parent.capabilities.clone(), + index_stores: parent.index_stores.clone(), } } @@ -148,6 +173,11 @@ impl<'a> Context<'a> { self.query_planner } + /// Get the index_store for this context/ds + pub(crate) fn get_index_stores(&self) -> &IndexStores { + &self.index_stores + } + /// Check if the context is done. If it returns `None` the operation may /// proceed, otherwise the operation should be stopped. pub fn done(&self) -> Option { diff --git a/lib/src/dbs/options.rs b/lib/src/dbs/options.rs index de9de6ba..048df6ae 100644 --- a/lib/src/dbs/options.rs +++ b/lib/src/dbs/options.rs @@ -409,7 +409,7 @@ impl Options { /// Get current Node ID pub fn id(&self) -> Result { - self.id.ok_or(Error::Unreachable) + self.id.ok_or(Error::Unreachable("Options::id")) } /// Get currently selected NS diff --git a/lib/src/doc/index.rs b/lib/src/doc/index.rs index 271ac1d0..afae5f34 100644 --- a/lib/src/doc/index.rs +++ b/lib/src/doc/index.rs @@ -5,9 +5,9 @@ use crate::doc::{CursorDoc, Document}; use crate::err::Error; use crate::idx::ft::FtIndex; use crate::idx::trees::mtree::MTreeIndex; -use crate::idx::trees::store::TreeStoreType; use crate::idx::IndexKeyBase; use crate::key; +use crate::kvs::TransactionType; use crate::sql::array::Array; use crate::sql::index::{Index, MTreeParams, SearchParams}; use crate::sql::statements::DefineIndexStatement; @@ -53,7 +53,7 @@ impl<'a> Document<'a> { Index::Uniq => ic.index_unique(txn).await?, Index::Idx => ic.index_non_unique(txn).await?, Index::Search(p) => ic.index_full_text(ctx, txn, p).await?, - Index::MTree(p) => ic.index_mtree(txn, p).await?, + Index::MTree(p) => ic.index_mtree(ctx, txn, p).await?, }; } } @@ -335,7 +335,16 @@ impl<'a> IndexOperation<'a> { ) -> Result<(), Error> { let ikb = IndexKeyBase::new(self.opt, self.ix); - let mut ft = FtIndex::new(self.opt, txn, &p.az, ikb, p, TreeStoreType::Write).await?; + let mut ft = FtIndex::new( + ctx.get_index_stores(), + self.opt, + txn, + &p.az, + ikb, + p, + TransactionType::Write, + ) + .await?; if let Some(n) = self.n.take() { ft.index_document(ctx, self.opt, txn, self.rid, n).await?; @@ -345,10 +354,17 @@ impl<'a> IndexOperation<'a> { ft.finish(txn).await } - async fn index_mtree(&mut self, txn: &Transaction, p: &MTreeParams) -> Result<(), Error> { + async fn index_mtree( + &mut self, + ctx: &Context<'_>, + txn: &Transaction, + p: &MTreeParams, + ) -> Result<(), Error> { let mut tx = txn.lock().await; let ikb = IndexKeyBase::new(self.opt, self.ix); - let mut mt = MTreeIndex::new(&mut tx, ikb, p, TreeStoreType::Write).await?; + let mut mt = + MTreeIndex::new(ctx.get_index_stores(), &mut tx, ikb, p, TransactionType::Write) + .await?; // Delete the old index data if let Some(o) = self.o.take() { mt.remove_document(&mut tx, self.rid, o).await?; diff --git a/lib/src/err/mod.rs b/lib/src/err/mod.rs index 56725aa3..843d90df 100644 --- a/lib/src/err/mod.rs +++ b/lib/src/err/mod.rs @@ -46,8 +46,8 @@ pub enum Error { RetryWithId(Thing), /// The database encountered unreachable logic - #[error("The database encountered unreachable logic")] - Unreachable, + #[error("The database encountered unreachable logic: {0}")] + Unreachable(&'static str), /// Statement has been deprecated #[error("{0}")] @@ -619,8 +619,8 @@ pub enum Error { Revision(#[from] RevisionError), /// The index has been found to be inconsistent - #[error("Index is corrupted")] - CorruptedIndex, + #[error("Index is corrupted: {0}")] + CorruptedIndex(&'static str), /// The query planner did not find an index able to support the match @@ or knn <> operator for a given expression #[error("There was no suitable index supporting the expression '{value}'")] diff --git a/lib/src/idx/docids.rs b/lib/src/idx/docids.rs index 9e04276a..0793f13c 100644 --- a/lib/src/idx/docids.rs +++ b/lib/src/idx/docids.rs @@ -1,14 +1,12 @@ use crate::err::Error; use crate::idx::trees::bkeys::TrieKeys; -use crate::idx::trees::btree::{BStatistics, BTree, BTreeNodeStore}; -use crate::idx::trees::store::{TreeNodeProvider, TreeNodeStore, TreeStoreType}; +use crate::idx::trees::btree::{BStatistics, BTree, BTreeStore}; +use crate::idx::trees::store::{IndexStores, TreeNodeProvider}; use crate::idx::{trees, IndexKeyBase, VersionedSerdeState}; -use crate::kvs::{Key, Transaction}; +use crate::kvs::{Key, Transaction, TransactionType}; use revision::revisioned; use roaring::RoaringTreemap; use serde::{Deserialize, Serialize}; -use std::sync::Arc; -use tokio::sync::Mutex; pub type DocId = u64; @@ -18,35 +16,41 @@ pub(crate) struct DocIds { state_key: Key, index_key_base: IndexKeyBase, btree: BTree, - store: Arc>>, + store: BTreeStore, available_ids: Option, next_doc_id: DocId, - updated: bool, } impl DocIds { pub(in crate::idx) async fn new( + ixs: &IndexStores, tx: &mut Transaction, - index_key_base: IndexKeyBase, + tt: TransactionType, + ikb: IndexKeyBase, default_btree_order: u32, - store_type: TreeStoreType, + cache_size: u32, ) -> Result { - let state_key: Key = index_key_base.new_bd_key(None); + let state_key: Key = ikb.new_bd_key(None); let state: State = if let Some(val) = tx.get(state_key.clone()).await? { State::try_from_val(val)? } else { State::new(default_btree_order) }; - let store = - TreeNodeStore::new(TreeNodeProvider::DocIds(index_key_base.clone()), store_type, 20); + let store = ixs + .get_store_btree_trie( + TreeNodeProvider::DocIds(ikb.clone()), + state.btree.generation(), + tt, + cache_size as usize, + ) + .await; Ok(Self { state_key, - index_key_base, + index_key_base: ikb, btree: BTree::new(state.btree), store, available_ids: state.available_ids, next_doc_id: state.next_doc_id, - updated: false, }) } @@ -72,8 +76,7 @@ impl DocIds { tx: &mut Transaction, doc_key: Key, ) -> Result, Error> { - let mut store = self.store.lock().await; - self.btree.search(tx, &mut store, &doc_key).await + self.btree.search(tx, &self.store, &doc_key).await } /// Returns the doc_id for the given doc_key. @@ -84,16 +87,13 @@ impl DocIds { doc_key: Key, ) -> Result { { - let mut store = self.store.lock().await; - if let Some(doc_id) = self.btree.search(tx, &mut store, &doc_key).await? { + if let Some(doc_id) = self.btree.search_mut(tx, &mut self.store, &doc_key).await? { return Ok(Resolved::Existing(doc_id)); } } let doc_id = self.get_next_doc_id(); tx.set(self.index_key_base.new_bi_key(doc_id), doc_key.clone()).await?; - let mut store = self.store.lock().await; - self.btree.insert(tx, &mut store, doc_key, doc_id).await?; - self.updated = true; + self.btree.insert(tx, &mut self.store, doc_key, doc_id).await?; Ok(Resolved::New(doc_id)) } @@ -102,8 +102,7 @@ impl DocIds { tx: &mut Transaction, doc_key: Key, ) -> Result, Error> { - let mut store = self.store.lock().await; - if let Some(doc_id) = self.btree.delete(tx, &mut store, doc_key).await? { + if let Some(doc_id) = self.btree.delete(tx, &mut self.store, doc_key).await? { tx.del(self.index_key_base.new_bi_key(doc_id)).await?; if let Some(available_ids) = &mut self.available_ids { available_ids.insert(doc_id); @@ -112,7 +111,6 @@ impl DocIds { available_ids.insert(doc_id); self.available_ids = Some(available_ids); } - self.updated = true; Ok(Some(doc_id)) } else { Ok(None) @@ -136,15 +134,14 @@ impl DocIds { &self, tx: &mut Transaction, ) -> Result { - let mut store = self.store.lock().await; - self.btree.statistics(tx, &mut store).await + self.btree.statistics(tx, &self.store).await } pub(in crate::idx) async fn finish(&mut self, tx: &mut Transaction) -> Result<(), Error> { - let updated = self.store.lock().await.finish(tx).await?; - if self.updated || updated { + if self.store.finish(tx).await? { + let btree = self.btree.inc_generation().clone(); let state = State { - btree: self.btree.get_state().clone(), + btree, available_ids: self.available_ids.take(), next_doc_id: self.next_doc_id, }; @@ -199,16 +196,18 @@ impl Resolved { #[cfg(test)] mod tests { use crate::idx::docids::{DocIds, Resolved}; - use crate::idx::trees::store::TreeStoreType; use crate::idx::IndexKeyBase; - use crate::kvs::{Datastore, LockType::*, Transaction, TransactionType::*}; + use crate::kvs::TransactionType::*; + use crate::kvs::{Datastore, LockType::*, Transaction, TransactionType}; const BTREE_ORDER: u32 = 7; - async fn get_doc_ids(ds: &Datastore, store_type: TreeStoreType) -> (Transaction, DocIds) { - let mut tx = ds.transaction(Write, Optimistic).await.unwrap(); + async fn new_operation(ds: &Datastore, tt: TransactionType) -> (Transaction, DocIds) { + let mut tx = ds.transaction(tt, Optimistic).await.unwrap(); let d = - DocIds::new(&mut tx, IndexKeyBase::default(), BTREE_ORDER, store_type).await.unwrap(); + DocIds::new(ds.index_store(), &mut tx, tt, IndexKeyBase::default(), BTREE_ORDER, 100) + .await + .unwrap(); (tx, d) } @@ -223,37 +222,43 @@ mod tests { // Resolve a first doc key { - let (mut tx, mut d) = get_doc_ids(&ds, TreeStoreType::Write).await; + let (mut tx, mut d) = new_operation(&ds, Write).await; let doc_id = d.resolve_doc_id(&mut tx, "Foo".into()).await.unwrap(); + finish(tx, d).await; + + let (mut tx, d) = new_operation(&ds, Read).await; assert_eq!(d.statistics(&mut tx).await.unwrap().keys_count, 1); assert_eq!(d.get_doc_key(&mut tx, 0).await.unwrap(), Some("Foo".into())); - finish(tx, d).await; assert_eq!(doc_id, Resolved::New(0)); } // Resolve the same doc key { - let (mut tx, mut d) = get_doc_ids(&ds, TreeStoreType::Write).await; + let (mut tx, mut d) = new_operation(&ds, Write).await; let doc_id = d.resolve_doc_id(&mut tx, "Foo".into()).await.unwrap(); + finish(tx, d).await; + + let (mut tx, d) = new_operation(&ds, Read).await; assert_eq!(d.statistics(&mut tx).await.unwrap().keys_count, 1); assert_eq!(d.get_doc_key(&mut tx, 0).await.unwrap(), Some("Foo".into())); - finish(tx, d).await; assert_eq!(doc_id, Resolved::Existing(0)); } // Resolve another single doc key { - let (mut tx, mut d) = get_doc_ids(&ds, TreeStoreType::Write).await; + let (mut tx, mut d) = new_operation(&ds, Write).await; let doc_id = d.resolve_doc_id(&mut tx, "Bar".into()).await.unwrap(); + finish(tx, d).await; + + let (mut tx, d) = new_operation(&ds, Read).await; assert_eq!(d.statistics(&mut tx).await.unwrap().keys_count, 2); assert_eq!(d.get_doc_key(&mut tx, 1).await.unwrap(), Some("Bar".into())); - finish(tx, d).await; assert_eq!(doc_id, Resolved::New(1)); } // Resolve another two existing doc keys and two new doc keys (interlaced) { - let (mut tx, mut d) = get_doc_ids(&ds, TreeStoreType::Write).await; + let (mut tx, mut d) = new_operation(&ds, Write).await; assert_eq!( d.resolve_doc_id(&mut tx, "Foo".into()).await.unwrap(), Resolved::Existing(0) @@ -264,12 +269,13 @@ mod tests { Resolved::Existing(1) ); assert_eq!(d.resolve_doc_id(&mut tx, "World".into()).await.unwrap(), Resolved::New(3)); - assert_eq!(d.statistics(&mut tx).await.unwrap().keys_count, 4); finish(tx, d).await; + let (mut tx, d) = new_operation(&ds, Read).await; + assert_eq!(d.statistics(&mut tx).await.unwrap().keys_count, 4); } { - let (mut tx, mut d) = get_doc_ids(&ds, TreeStoreType::Write).await; + let (mut tx, mut d) = new_operation(&ds, Write).await; assert_eq!( d.resolve_doc_id(&mut tx, "Foo".into()).await.unwrap(), Resolved::Existing(0) @@ -286,12 +292,13 @@ mod tests { d.resolve_doc_id(&mut tx, "World".into()).await.unwrap(), Resolved::Existing(3) ); + finish(tx, d).await; + let (mut tx, d) = new_operation(&ds, Read).await; assert_eq!(d.get_doc_key(&mut tx, 0).await.unwrap(), Some("Foo".into())); assert_eq!(d.get_doc_key(&mut tx, 1).await.unwrap(), Some("Bar".into())); assert_eq!(d.get_doc_key(&mut tx, 2).await.unwrap(), Some("Hello".into())); assert_eq!(d.get_doc_key(&mut tx, 3).await.unwrap(), Some("World".into())); assert_eq!(d.statistics(&mut tx).await.unwrap().keys_count, 4); - finish(tx, d).await; } } @@ -301,7 +308,7 @@ mod tests { // Create two docs { - let (mut tx, mut d) = get_doc_ids(&ds, TreeStoreType::Write).await; + let (mut tx, mut d) = new_operation(&ds, Write).await; assert_eq!(d.resolve_doc_id(&mut tx, "Foo".into()).await.unwrap(), Resolved::New(0)); assert_eq!(d.resolve_doc_id(&mut tx, "Bar".into()).await.unwrap(), Resolved::New(1)); finish(tx, d).await; @@ -309,7 +316,7 @@ mod tests { // Remove doc 1 { - let (mut tx, mut d) = get_doc_ids(&ds, TreeStoreType::Write).await; + let (mut tx, mut d) = new_operation(&ds, Write).await; assert_eq!(d.remove_doc(&mut tx, "Dummy".into()).await.unwrap(), None); assert_eq!(d.remove_doc(&mut tx, "Foo".into()).await.unwrap(), Some(0)); finish(tx, d).await; @@ -317,21 +324,21 @@ mod tests { // Check 'Foo' has been removed { - let (mut tx, mut d) = get_doc_ids(&ds, TreeStoreType::Write).await; + let (mut tx, mut d) = new_operation(&ds, Write).await; assert_eq!(d.remove_doc(&mut tx, "Foo".into()).await.unwrap(), None); finish(tx, d).await; } // Insert a new doc - should take the available id 1 { - let (mut tx, mut d) = get_doc_ids(&ds, TreeStoreType::Write).await; + let (mut tx, mut d) = new_operation(&ds, Write).await; assert_eq!(d.resolve_doc_id(&mut tx, "Hello".into()).await.unwrap(), Resolved::New(0)); finish(tx, d).await; } // Remove doc 2 { - let (mut tx, mut d) = get_doc_ids(&ds, TreeStoreType::Write).await; + let (mut tx, mut d) = new_operation(&ds, Write).await; assert_eq!(d.remove_doc(&mut tx, "Dummy".into()).await.unwrap(), None); assert_eq!(d.remove_doc(&mut tx, "Bar".into()).await.unwrap(), Some(1)); finish(tx, d).await; @@ -339,14 +346,14 @@ mod tests { // Check 'Bar' has been removed { - let (mut tx, mut d) = get_doc_ids(&ds, TreeStoreType::Write).await; + let (mut tx, mut d) = new_operation(&ds, Write).await; assert_eq!(d.remove_doc(&mut tx, "Foo".into()).await.unwrap(), None); finish(tx, d).await; } // Insert a new doc - should take the available id 2 { - let (mut tx, mut d) = get_doc_ids(&ds, TreeStoreType::Write).await; + let (mut tx, mut d) = new_operation(&ds, Write).await; assert_eq!(d.resolve_doc_id(&mut tx, "World".into()).await.unwrap(), Resolved::New(1)); finish(tx, d).await; } diff --git a/lib/src/idx/ft/doclength.rs b/lib/src/idx/ft/doclength.rs index 08cb4d78..cd30b8c3 100644 --- a/lib/src/idx/ft/doclength.rs +++ b/lib/src/idx/ft/doclength.rs @@ -1,36 +1,42 @@ use crate::err::Error; use crate::idx::docids::DocId; use crate::idx::trees::bkeys::TrieKeys; -use crate::idx::trees::btree::{BState, BStatistics, BTree, BTreeNodeStore, Payload}; -use crate::idx::trees::store::{TreeNodeProvider, TreeNodeStore, TreeStoreType}; +use crate::idx::trees::btree::{BState, BStatistics, BTree, BTreeStore, Payload}; +use crate::idx::trees::store::{IndexStores, TreeNodeProvider}; use crate::idx::{IndexKeyBase, VersionedSerdeState}; -use crate::kvs::{Key, Transaction}; -use std::sync::Arc; -use tokio::sync::Mutex; +use crate::kvs::{Key, Transaction, TransactionType}; pub(super) type DocLength = u64; pub(super) struct DocLengths { state_key: Key, btree: BTree, - store: Arc>>, + store: BTreeStore, } impl DocLengths { pub(super) async fn new( + ixs: &IndexStores, tx: &mut Transaction, - index_key_base: IndexKeyBase, + ikb: IndexKeyBase, default_btree_order: u32, - store_type: TreeStoreType, + tt: TransactionType, + cache_size: u32, ) -> Result { - let state_key: Key = index_key_base.new_bl_key(None); + let state_key: Key = ikb.new_bl_key(None); let state: BState = if let Some(val) = tx.get(state_key.clone()).await? { BState::try_from_val(val)? } else { BState::new(default_btree_order) }; - let store = - TreeNodeStore::new(TreeNodeProvider::DocLengths(index_key_base), store_type, 20); + let store = ixs + .get_store_btree_trie( + TreeNodeProvider::DocLengths(ikb), + state.generation(), + tt, + cache_size as usize, + ) + .await; Ok(Self { state_key, btree: BTree::new(state), @@ -43,8 +49,15 @@ impl DocLengths { tx: &mut Transaction, doc_id: DocId, ) -> Result, Error> { - let mut store = self.store.lock().await; - self.btree.search(tx, &mut store, &doc_id.to_be_bytes().to_vec()).await + self.btree.search(tx, &self.store, &doc_id.to_be_bytes().to_vec()).await + } + + pub(super) async fn get_doc_length_mut( + &mut self, + tx: &mut Transaction, + doc_id: DocId, + ) -> Result, Error> { + self.btree.search_mut(tx, &mut self.store, &doc_id.to_be_bytes().to_vec()).await } pub(super) async fn set_doc_length( @@ -53,8 +66,8 @@ impl DocLengths { doc_id: DocId, doc_length: DocLength, ) -> Result<(), Error> { - let mut store = self.store.lock().await; - self.btree.insert(tx, &mut store, doc_id.to_be_bytes().to_vec(), doc_length).await + self.btree.insert(tx, &mut self.store, doc_id.to_be_bytes().to_vec(), doc_length).await?; + Ok(()) } pub(super) async fn remove_doc_length( @@ -62,18 +75,18 @@ impl DocLengths { tx: &mut Transaction, doc_id: DocId, ) -> Result, Error> { - let mut store = self.store.lock().await; - self.btree.delete(tx, &mut store, doc_id.to_be_bytes().to_vec()).await + self.btree.delete(tx, &mut self.store, doc_id.to_be_bytes().to_vec()).await } pub(super) async fn statistics(&self, tx: &mut Transaction) -> Result { - let mut store = self.store.lock().await; - self.btree.statistics(tx, &mut store).await + self.btree.statistics(tx, &self.store).await } - pub(super) async fn finish(&self, tx: &mut Transaction) -> Result<(), Error> { - self.store.lock().await.finish(tx).await?; - self.btree.get_state().finish(tx, &self.state_key).await?; + pub(super) async fn finish(&mut self, tx: &mut Transaction) -> Result<(), Error> { + if self.store.finish(tx).await? { + let state = self.btree.inc_generation(); + tx.set(self.state_key.clone(), state.try_to_val()?).await?; + } Ok(()) } } @@ -81,9 +94,26 @@ impl DocLengths { #[cfg(test)] mod tests { use crate::idx::ft::doclength::DocLengths; - use crate::idx::trees::store::TreeStoreType; use crate::idx::IndexKeyBase; - use crate::kvs::{Datastore, LockType::*, TransactionType::*}; + use crate::kvs::{Datastore, LockType::*, Transaction, TransactionType}; + + async fn doc_length( + ds: &Datastore, + order: u32, + tt: TransactionType, + ) -> (Transaction, DocLengths) { + let mut tx = ds.transaction(TransactionType::Write, Optimistic).await.unwrap(); + let dl = + DocLengths::new(ds.index_store(), &mut tx, IndexKeyBase::default(), order, tt, 100) + .await + .unwrap(); + (tx, dl) + } + + async fn finish(mut l: DocLengths, mut tx: Transaction) { + l.finish(&mut tx).await.unwrap(); + tx.commit().await.unwrap() + } #[tokio::test] async fn test_doc_lengths() { @@ -91,49 +121,58 @@ mod tests { let ds = Datastore::new("memory").await.unwrap(); - // Check empty state - let mut tx = ds.transaction(Write, Optimistic).await.unwrap(); - let l = DocLengths::new( - &mut tx, - IndexKeyBase::default(), - BTREE_ORDER, - TreeStoreType::Traversal, - ) - .await - .unwrap(); - assert_eq!(l.statistics(&mut tx).await.unwrap().keys_count, 0); - let dl = l.get_doc_length(&mut tx, 99).await.unwrap(); - assert_eq!(dl, None); + { + // Check empty state + let (mut tx, l) = doc_length(&ds, BTREE_ORDER, TransactionType::Read).await; + assert_eq!(l.statistics(&mut tx).await.unwrap().keys_count, 0); + let dl = l.get_doc_length(&mut tx, 99).await.unwrap(); + assert_eq!(dl, None); + tx.cancel().await.unwrap(); + } - // Set a doc length - let mut l = - DocLengths::new(&mut tx, IndexKeyBase::default(), BTREE_ORDER, TreeStoreType::Write) - .await - .unwrap(); - l.set_doc_length(&mut tx, 99, 199).await.unwrap(); - assert_eq!(l.statistics(&mut tx).await.unwrap().keys_count, 1); - let dl = l.get_doc_length(&mut tx, 99).await.unwrap(); - l.finish(&mut tx).await.unwrap(); - assert_eq!(dl, Some(199)); + { + // Set a doc length + let (mut tx, mut l) = doc_length(&ds, BTREE_ORDER, TransactionType::Write).await; + l.set_doc_length(&mut tx, 99, 199).await.unwrap(); + finish(l, tx).await; + } - // Update doc length - let mut l = - DocLengths::new(&mut tx, IndexKeyBase::default(), BTREE_ORDER, TreeStoreType::Write) - .await - .unwrap(); - l.set_doc_length(&mut tx, 99, 299).await.unwrap(); - assert_eq!(l.statistics(&mut tx).await.unwrap().keys_count, 1); - let dl = l.get_doc_length(&mut tx, 99).await.unwrap(); - l.finish(&mut tx).await.unwrap(); - assert_eq!(dl, Some(299)); + { + let (mut tx, l) = doc_length(&ds, BTREE_ORDER, TransactionType::Read).await; + assert_eq!(l.statistics(&mut tx).await.unwrap().keys_count, 1); + let dl = l.get_doc_length(&mut tx, 99).await.unwrap(); + assert_eq!(dl, Some(199)); + tx.cancel().await.unwrap(); + } - // Remove doc lengths - let mut l = - DocLengths::new(&mut tx, IndexKeyBase::default(), BTREE_ORDER, TreeStoreType::Write) - .await - .unwrap(); - assert_eq!(l.remove_doc_length(&mut tx, 99).await.unwrap(), Some(299)); - assert_eq!(l.remove_doc_length(&mut tx, 99).await.unwrap(), None); - tx.commit().await.unwrap() + { + // Update doc length + let (mut tx, mut l) = doc_length(&ds, BTREE_ORDER, TransactionType::Write).await; + l.set_doc_length(&mut tx, 99, 299).await.unwrap(); + finish(l, tx).await; + } + + { + let (mut tx, l) = doc_length(&ds, BTREE_ORDER, TransactionType::Read).await; + assert_eq!(l.statistics(&mut tx).await.unwrap().keys_count, 1); + let dl = l.get_doc_length(&mut tx, 99).await.unwrap(); + assert_eq!(dl, Some(299)); + tx.cancel().await.unwrap(); + } + + { + // Remove doc lengths + let (mut tx, mut l) = doc_length(&ds, BTREE_ORDER, TransactionType::Write).await; + assert_eq!(l.remove_doc_length(&mut tx, 99).await.unwrap(), Some(299)); + assert_eq!(l.remove_doc_length(&mut tx, 99).await.unwrap(), None); + finish(l, tx).await; + } + + { + let (mut tx, l) = doc_length(&ds, BTREE_ORDER, TransactionType::Read).await; + let dl = l.get_doc_length(&mut tx, 99).await.unwrap(); + assert_eq!(dl, None); + tx.cancel().await.unwrap(); + } } } diff --git a/lib/src/idx/ft/mod.rs b/lib/src/idx/ft/mod.rs index 21ad069c..0e04161b 100644 --- a/lib/src/idx/ft/mod.rs +++ b/lib/src/idx/ft/mod.rs @@ -20,10 +20,10 @@ use crate::idx::ft::scorer::BM25Scorer; use crate::idx::ft::termdocs::{TermDocs, TermsDocs}; use crate::idx::ft::terms::{TermId, Terms}; use crate::idx::trees::btree::BStatistics; -use crate::idx::trees::store::TreeStoreType; +use crate::idx::trees::store::IndexStores; use crate::idx::{IndexKeyBase, VersionedSerdeState}; use crate::kvs; -use crate::kvs::Key; +use crate::kvs::{Key, TransactionType}; use crate::sql::index::SearchParams; use crate::sql::scoring::Scoring; use crate::sql::statements::DefineAnalyzerStatement; @@ -97,24 +97,25 @@ impl VersionedSerdeState for State {} impl FtIndex { pub(crate) async fn new( + ixs: &IndexStores, opt: &Options, txn: &Transaction, az: &str, index_key_base: IndexKeyBase, p: &SearchParams, - store_type: TreeStoreType, + tt: TransactionType, ) -> Result { let mut tx = txn.lock().await; let az = tx.get_db_analyzer(opt.ns(), opt.db(), az).await?; - Self::with_analyzer(&mut tx, az, index_key_base, p, store_type).await + Self::with_analyzer(ixs, &mut tx, az, index_key_base, p, tt).await } - async fn with_analyzer( + ixs: &IndexStores, run: &mut kvs::Transaction, az: DefineAnalyzerStatement, index_key_base: IndexKeyBase, p: &SearchParams, - store_type: TreeStoreType, + tt: TransactionType, ) -> Result { let state_key: Key = index_key_base.new_bs_key(); let state: State = if let Some(val) = run.get(state_key.clone()).await? { @@ -123,16 +124,26 @@ impl FtIndex { State::default() }; let doc_ids = Arc::new(RwLock::new( - DocIds::new(run, index_key_base.clone(), p.doc_ids_order, store_type).await?, + DocIds::new(ixs, run, tt, index_key_base.clone(), p.doc_ids_order, p.doc_ids_cache) + .await?, )); let doc_lengths = Arc::new(RwLock::new( - DocLengths::new(run, index_key_base.clone(), p.doc_lengths_order, store_type).await?, + DocLengths::new( + ixs, + run, + index_key_base.clone(), + p.doc_lengths_order, + tt, + p.doc_lengths_cache, + ) + .await?, )); let postings = Arc::new(RwLock::new( - Postings::new(run, index_key_base.clone(), p.postings_order, store_type).await?, + Postings::new(ixs, run, index_key_base.clone(), p.postings_order, tt, p.postings_cache) + .await?, )); let terms = Arc::new(RwLock::new( - Terms::new(run, index_key_base.clone(), p.terms_order, store_type).await?, + Terms::new(ixs, run, index_key_base.clone(), p.terms_order, tt, p.terms_cache).await?, )); let termdocs = TermDocs::new(index_key_base.clone()); let offsets = Offsets::new(index_key_base.clone()); @@ -244,7 +255,7 @@ impl FtIndex { let mut tx = txn.lock().await; let mut dl = self.doc_lengths.write().await; if resolved.was_existing() { - if let Some(old_doc_length) = dl.get_doc_length(&mut tx, doc_id).await? { + if let Some(old_doc_length) = dl.get_doc_length_mut(&mut tx, doc_id).await? { self.state.total_docs_lengths -= old_doc_length as u128; } } @@ -442,7 +453,7 @@ impl FtIndex { }) } - pub(crate) async fn finish(self, tx: &Transaction) -> Result<(), Error> { + pub(crate) async fn finish(&self, tx: &Transaction) -> Result<(), Error> { let mut run = tx.lock().await; self.doc_ids.write().await.finish(&mut run).await?; self.doc_lengths.write().await.finish(&mut run).await?; @@ -484,13 +495,12 @@ mod tests { use crate::dbs::{Options, Transaction}; use crate::idx::ft::scorer::{BM25Scorer, Score}; use crate::idx::ft::{FtIndex, HitsIterator}; - use crate::idx::trees::store::TreeStoreType; use crate::idx::IndexKeyBase; - use crate::kvs::{Datastore, LockType::*}; + use crate::kvs::{Datastore, LockType::*, TransactionType}; use crate::sql::index::SearchParams; use crate::sql::scoring::Scoring; use crate::sql::statements::{DefineAnalyzerStatement, DefineStatement}; - use crate::sql::{Statement, Thing, Value}; + use crate::sql::{Array, Statement, Thing, Value}; use crate::syn; use futures::lock::Mutex; use std::collections::HashMap; @@ -537,16 +547,17 @@ mod tests { pub(super) async fn tx_fti<'a>( ds: &Datastore, - store_type: TreeStoreType, + tt: TransactionType, az: &DefineAnalyzerStatement, order: u32, hl: bool, ) -> (Context<'a>, Options, Transaction, FtIndex) { - let write = matches!(store_type, TreeStoreType::Write); - let tx = ds.transaction(write.into(), Optimistic).await.unwrap(); + let ctx = Context::default(); + let tx = ds.transaction(tt, Optimistic).await.unwrap(); let txn = Arc::new(Mutex::new(tx)); let mut tx = txn.lock().await; let fti = FtIndex::with_analyzer( + ctx.get_index_stores(), &mut tx, az.clone(), IndexKeyBase::default(), @@ -558,13 +569,17 @@ mod tests { terms_order: order, sc: Scoring::bm25(), hl, + doc_ids_cache: 100, + doc_lengths_cache: 100, + postings_cache: 100, + terms_cache: 100, }, - TreeStoreType::Write, + tt, ) .await .unwrap(); drop(tx); - (Context::default(), Options::default(), txn, fti) + (ctx, Options::default(), txn, fti) } pub(super) async fn finish(txn: &Transaction, fti: FtIndex) { @@ -589,7 +604,7 @@ mod tests { { // Add one document let (ctx, opt, txn, mut fti) = - tx_fti(&ds, TreeStoreType::Write, &az, btree_order, false).await; + tx_fti(&ds, TransactionType::Write, &az, btree_order, false).await; fti.index_document(&ctx, &opt, &txn, &doc1, vec![Value::from("hello the world")]) .await .unwrap(); @@ -599,7 +614,7 @@ mod tests { { // Add two documents let (ctx, opt, txn, mut fti) = - tx_fti(&ds, TreeStoreType::Write, &az, btree_order, false).await; + tx_fti(&ds, TransactionType::Write, &az, btree_order, false).await; fti.index_document(&ctx, &opt, &txn, &doc2, vec![Value::from("a yellow hello")]) .await .unwrap(); @@ -611,7 +626,7 @@ mod tests { { let (ctx, opt, txn, fti) = - tx_fti(&ds, TreeStoreType::Read, &az, btree_order, false).await; + tx_fti(&ds, TransactionType::Read, &az, btree_order, false).await; // Check the statistics let statistics = fti.statistics(&txn).await.unwrap(); assert_eq!(statistics.terms.keys_count, 7); @@ -643,14 +658,14 @@ mod tests { { // Reindex one document let (ctx, opt, txn, mut fti) = - tx_fti(&ds, TreeStoreType::Write, &az, btree_order, false).await; + tx_fti(&ds, TransactionType::Write, &az, btree_order, false).await; fti.index_document(&ctx, &opt, &txn, &doc3, vec![Value::from("nobar foo")]) .await .unwrap(); finish(&txn, fti).await; let (ctx, opt, txn, fti) = - tx_fti(&ds, TreeStoreType::Read, &az, btree_order, false).await; + tx_fti(&ds, TransactionType::Read, &az, btree_order, false).await; // We can still find 'foo' let (hits, scr) = search(&ctx, &opt, &txn, &fti, "foo").await; @@ -668,7 +683,7 @@ mod tests { { // Remove documents let (_, _, txn, mut fti) = - tx_fti(&ds, TreeStoreType::Write, &az, btree_order, false).await; + tx_fti(&ds, TransactionType::Write, &az, btree_order, false).await; fti.remove_document(&txn, &doc1).await.unwrap(); fti.remove_document(&txn, &doc2).await.unwrap(); fti.remove_document(&txn, &doc3).await.unwrap(); @@ -677,7 +692,7 @@ mod tests { { let (ctx, opt, txn, fti) = - tx_fti(&ds, TreeStoreType::Read, &az, btree_order, false).await; + tx_fti(&ds, TransactionType::Read, &az, btree_order, false).await; let (hits, _) = search(&ctx, &opt, &txn, &fti, "hello").await; assert!(hits.is_none()); let (hits, _) = search(&ctx, &opt, &txn, &fti, "foo").await; @@ -705,7 +720,7 @@ mod tests { let btree_order = 5; { let (ctx, opt, txn, mut fti) = - tx_fti(&ds, TreeStoreType::Write, &az, btree_order, hl).await; + tx_fti(&ds, TransactionType::Write, &az, btree_order, hl).await; fti.index_document( &ctx, &opt, @@ -747,7 +762,7 @@ mod tests { { let (ctx, opt, txn, fti) = - tx_fti(&ds, TreeStoreType::Read, &az, btree_order, hl).await; + tx_fti(&ds, TransactionType::Read, &az, btree_order, hl).await; let statistics = fti.statistics(&txn).await.unwrap(); assert_eq!(statistics.terms.keys_count, 17); @@ -815,4 +830,80 @@ mod tests { async fn test_ft_index_bm_25_with_highlighting() { test_ft_index_bm_25(true).await; } + + async fn concurrent_task(ds: Arc, az: DefineAnalyzerStatement) { + let btree_order = 5; + let doc1: Thing = ("t", "doc1").into(); + let content1 = Value::from(Array::from(vec!["Enter a search term", "Welcome", "Docusaurus blogging features are powered by the blog plugin.", "Simply add Markdown files (or folders) to the blog directory.", "blog", "Regular blog authors can be added to authors.yml.", "authors.yml", "The blog post date can be extracted from filenames, such as:", "2019-05-30-welcome.md", "2019-05-30-welcome/index.md", "A blog post folder can be convenient to co-locate blog post images:", "The blog supports tags as well!", "And if you don't want a blog: just delete this directory, and use blog: false in your Docusaurus config.", "blog: false", "MDX Blog Post", "Blog posts support Docusaurus Markdown features, such as MDX.", "Use the power of React to create interactive blog posts.", "Long Blog Post", "This is the summary of a very long blog post,", "Use a comment to limit blog post size in the list view.", "", "First Blog Post", "Lorem ipsum dolor sit amet, consectetur adipiscing elit. Pellentesque elementum dignissim ultricies. Fusce rhoncus ipsum tempor eros aliquam consequat. Lorem ipsum dolor sit amet"])); + + let start = std::time::Instant::now(); + while start.elapsed().as_secs() < 3 { + remove_insert_task(ds.as_ref(), &az, btree_order, &doc1, &content1).await; + } + } + #[test(tokio::test)] + async fn concurrent_test() { + let ds = Arc::new(Datastore::new("memory").await.unwrap()); + let mut q = syn::parse("DEFINE ANALYZER test TOKENIZERS blank;").unwrap(); + let Statement::Define(DefineStatement::Analyzer(az)) = q.0 .0.pop().unwrap() else { + panic!() + }; + concurrent_task(ds.clone(), az.clone()).await; + let task1 = tokio::spawn(concurrent_task(ds.clone(), az.clone())); + let task2 = tokio::spawn(concurrent_task(ds.clone(), az.clone())); + let _ = tokio::try_join!(task1, task2).expect("Tasks failed"); + } + + async fn remove_insert_task( + ds: &Datastore, + az: &DefineAnalyzerStatement, + btree_order: u32, + rid: &Thing, + content: &Value, + ) { + let (ctx, opt, txn, mut fti) = + tx_fti(ds, TransactionType::Write, &az, btree_order, false).await; + fti.remove_document(&txn, &rid).await.unwrap(); + fti.index_document(&ctx, &opt, &txn, &rid, vec![content.clone()]).await.unwrap(); + finish(&txn, fti).await; + } + + #[test(tokio::test)] + async fn remove_insert_sequence() { + let ds = Datastore::new("memory").await.unwrap(); + let mut q = syn::parse("DEFINE ANALYZER test TOKENIZERS blank;").unwrap(); + let Statement::Define(DefineStatement::Analyzer(az)) = q.0 .0.pop().unwrap() else { + panic!() + }; + let doc: Thing = ("t", "doc1").into(); + let content = Value::from(Array::from(vec!["Enter a search term","Welcome","Docusaurus blogging features are powered by the blog plugin.","Simply add Markdown files (or folders) to the blog directory.","blog","Regular blog authors can be added to authors.yml.","authors.yml","The blog post date can be extracted from filenames, such as:","2019-05-30-welcome.md","2019-05-30-welcome/index.md","A blog post folder can be convenient to co-locate blog post images:","The blog supports tags as well!","And if you don't want a blog: just delete this directory, and use blog: false in your Docusaurus config.","blog: false","MDX Blog Post","Blog posts support Docusaurus Markdown features, such as MDX.","Use the power of React to create interactive blog posts.","Long Blog Post","This is the summary of a very long blog post,","Use a comment to limit blog post size in the list view.","","First Blog Post","Lorem ipsum dolor sit amet, consectetur adipiscing elit. Pellentesque elementum dignissim ultricies. Fusce rhoncus ipsum tempor eros aliquam consequat. Lorem ipsum dolor sit amet"])); + + for i in 0..5 { + debug!("Attempt {i}"); + { + let (ctx, opt, txn, mut fti) = + tx_fti(&ds, TransactionType::Write, &az, 5, false).await; + fti.index_document(&ctx, &opt, &txn, &doc, vec![content.clone()]).await.unwrap(); + finish(&txn, fti).await; + } + + { + let (_, _, txn, fti) = tx_fti(&ds, TransactionType::Read, &az, 5, false).await; + let s = fti.statistics(&txn).await.unwrap(); + assert_eq!(s.terms.keys_count, 113); + } + + { + let (_, _, txn, mut fti) = tx_fti(&ds, TransactionType::Write, &az, 5, false).await; + fti.remove_document(&txn, &doc).await.unwrap(); + finish(&txn, fti).await; + } + + { + let (_, _, txn, fti) = tx_fti(&ds, TransactionType::Read, &az, 5, false).await; + let s = fti.statistics(&txn).await.unwrap(); + assert_eq!(s.terms.keys_count, 0); + } + } + } } diff --git a/lib/src/idx/ft/offsets.rs b/lib/src/idx/ft/offsets.rs index 9c7732b6..7ebef3e4 100644 --- a/lib/src/idx/ft/offsets.rs +++ b/lib/src/idx/ft/offsets.rs @@ -109,16 +109,16 @@ impl TryFrom for OffsetRecords { } let decompressed: Vec = bincode::deserialize(&val)?; let mut iter = decompressed.iter(); - let s = *iter.next().ok_or(Error::CorruptedIndex)?; + let s = *iter.next().ok_or(Error::CorruptedIndex("OffsetRecords::try_from(1)"))?; let mut indexes = Vec::with_capacity(s as usize); for _ in 0..s { - let index = *iter.next().ok_or(Error::CorruptedIndex)?; + let index = *iter.next().ok_or(Error::CorruptedIndex("OffsetRecords::try_from(2)"))?; indexes.push(index); } let mut res = Vec::with_capacity(s as usize); for index in indexes { - let start = *iter.next().ok_or(Error::CorruptedIndex)?; - let end = *iter.next().ok_or(Error::CorruptedIndex)?; + let start = *iter.next().ok_or(Error::CorruptedIndex("OffsetRecords::try_from(3)"))?; + let end = *iter.next().ok_or(Error::CorruptedIndex("OffsetRecords::try_from(4)"))?; res.push(Offset::new(index, start, end)); } Ok(OffsetRecords(res)) diff --git a/lib/src/idx/ft/postings.rs b/lib/src/idx/ft/postings.rs index ce139047..692701e2 100644 --- a/lib/src/idx/ft/postings.rs +++ b/lib/src/idx/ft/postings.rs @@ -2,12 +2,10 @@ use crate::err::Error; use crate::idx::docids::DocId; use crate::idx::ft::terms::TermId; use crate::idx::trees::bkeys::TrieKeys; -use crate::idx::trees::btree::{BState, BStatistics, BTree, BTreeNodeStore}; -use crate::idx::trees::store::{TreeNodeProvider, TreeNodeStore, TreeStoreType}; +use crate::idx::trees::btree::{BState, BStatistics, BTree, BTreeStore}; +use crate::idx::trees::store::{IndexStores, TreeNodeProvider}; use crate::idx::{IndexKeyBase, VersionedSerdeState}; -use crate::kvs::{Key, Transaction}; -use std::sync::Arc; -use tokio::sync::Mutex; +use crate::kvs::{Key, Transaction, TransactionType}; pub(super) type TermFrequency = u64; @@ -15,15 +13,17 @@ pub(super) struct Postings { state_key: Key, index_key_base: IndexKeyBase, btree: BTree, - store: Arc>>, + store: BTreeStore, } impl Postings { pub(super) async fn new( + ixs: &IndexStores, tx: &mut Transaction, index_key_base: IndexKeyBase, order: u32, - store_type: TreeStoreType, + tt: TransactionType, + cache_size: u32, ) -> Result { let state_key: Key = index_key_base.new_bp_key(None); let state: BState = if let Some(val) = tx.get(state_key.clone()).await? { @@ -31,8 +31,14 @@ impl Postings { } else { BState::new(order) }; - let store = - TreeNodeStore::new(TreeNodeProvider::Postings(index_key_base.clone()), store_type, 20); + let store = ixs + .get_store_btree_trie( + TreeNodeProvider::Postings(index_key_base.clone()), + state.generation(), + tt, + cache_size as usize, + ) + .await; Ok(Self { state_key, index_key_base, @@ -49,8 +55,7 @@ impl Postings { term_freq: TermFrequency, ) -> Result<(), Error> { let key = self.index_key_base.new_bf_key(term_id, doc_id); - let mut store = self.store.lock().await; - self.btree.insert(tx, &mut store, key, term_freq).await + self.btree.insert(tx, &mut self.store, key, term_freq).await } pub(super) async fn get_term_frequency( @@ -60,8 +65,7 @@ impl Postings { doc_id: DocId, ) -> Result, Error> { let key = self.index_key_base.new_bf_key(term_id, doc_id); - let mut store = self.store.lock().await; - self.btree.search(tx, &mut store, &key).await + self.btree.search(tx, &self.store, &key).await } pub(super) async fn remove_posting( @@ -71,18 +75,18 @@ impl Postings { doc_id: DocId, ) -> Result, Error> { let key = self.index_key_base.new_bf_key(term_id, doc_id); - let mut store = self.store.lock().await; - self.btree.delete(tx, &mut store, key).await + self.btree.delete(tx, &mut self.store, key).await } pub(super) async fn statistics(&self, tx: &mut Transaction) -> Result { - let mut store = self.store.lock().await; - self.btree.statistics(tx, &mut store).await + self.btree.statistics(tx, &self.store).await } - pub(super) async fn finish(&self, tx: &mut Transaction) -> Result<(), Error> { - self.store.lock().await.finish(tx).await?; - self.btree.get_state().finish(tx, &self.state_key).await?; + pub(super) async fn finish(&mut self, tx: &mut Transaction) -> Result<(), Error> { + if self.store.finish(tx).await? { + let state = self.btree.inc_generation(); + tx.set(self.state_key.clone(), state.try_to_val()?).await?; + } Ok(()) } } @@ -90,58 +94,65 @@ impl Postings { #[cfg(test)] mod tests { use crate::idx::ft::postings::Postings; - use crate::idx::trees::store::TreeStoreType; use crate::idx::IndexKeyBase; - use crate::kvs::{Datastore, LockType::*, TransactionType::*}; + use crate::kvs::{Datastore, LockType::*, Transaction, TransactionType, TransactionType::*}; use test_log::test; + async fn new_operation( + ds: &Datastore, + order: u32, + tt: TransactionType, + ) -> (Transaction, Postings) { + let mut tx = ds.transaction(tt, Optimistic).await.unwrap(); + let p = Postings::new(ds.index_store(), &mut tx, IndexKeyBase::default(), order, tt, 100) + .await + .unwrap(); + (tx, p) + } + + async fn finish(mut tx: Transaction, mut p: Postings) { + p.finish(&mut tx).await.unwrap(); + tx.commit().await.unwrap(); + } + #[test(tokio::test)] async fn test_postings() { const DEFAULT_BTREE_ORDER: u32 = 5; let ds = Datastore::new("memory").await.unwrap(); - let mut tx = ds.transaction(Write, Optimistic).await.unwrap(); - // Check empty state - let mut p = Postings::new( - &mut tx, - IndexKeyBase::default(), - DEFAULT_BTREE_ORDER, - TreeStoreType::Write, - ) - .await - .unwrap(); - assert_eq!(p.statistics(&mut tx).await.unwrap().keys_count, 0); + { + // Check empty state + let (tx, p) = new_operation(&ds, DEFAULT_BTREE_ORDER, Write).await; + finish(tx, p).await; - p.update_posting(&mut tx, 1, 2, 3).await.unwrap(); - p.update_posting(&mut tx, 1, 4, 5).await.unwrap(); + let (mut tx, p) = new_operation(&ds, DEFAULT_BTREE_ORDER, Read).await; + assert_eq!(p.statistics(&mut tx).await.unwrap().keys_count, 0); - p.finish(&mut tx).await.unwrap(); - tx.commit().await.unwrap(); + // Add postings + let (mut tx, mut p) = new_operation(&ds, DEFAULT_BTREE_ORDER, Write).await; + p.update_posting(&mut tx, 1, 2, 3).await.unwrap(); + p.update_posting(&mut tx, 1, 4, 5).await.unwrap(); + finish(tx, p).await; - let mut tx = ds.transaction(Write, Optimistic).await.unwrap(); - let mut p = Postings::new( - &mut tx, - IndexKeyBase::default(), - DEFAULT_BTREE_ORDER, - TreeStoreType::Write, - ) - .await - .unwrap(); - assert_eq!(p.statistics(&mut tx).await.unwrap().keys_count, 2); + let (mut tx, p) = new_operation(&ds, DEFAULT_BTREE_ORDER, Read).await; + assert_eq!(p.statistics(&mut tx).await.unwrap().keys_count, 2); - assert_eq!(p.get_term_frequency(&mut tx, 1, 2).await.unwrap(), Some(3)); - assert_eq!(p.get_term_frequency(&mut tx, 1, 4).await.unwrap(), Some(5)); + assert_eq!(p.get_term_frequency(&mut tx, 1, 2).await.unwrap(), Some(3)); + assert_eq!(p.get_term_frequency(&mut tx, 1, 4).await.unwrap(), Some(5)); - // Check removal of doc 2 - assert_eq!(p.remove_posting(&mut tx, 1, 2).await.unwrap(), Some(3)); - // Again the same - assert_eq!(p.remove_posting(&mut tx, 1, 2).await.unwrap(), None); - // Remove doc 4 - assert_eq!(p.remove_posting(&mut tx, 1, 4).await.unwrap(), Some(5)); + let (mut tx, mut p) = new_operation(&ds, DEFAULT_BTREE_ORDER, Write).await; + // Check removal of doc 2 + assert_eq!(p.remove_posting(&mut tx, 1, 2).await.unwrap(), Some(3)); + // Again the same + assert_eq!(p.remove_posting(&mut tx, 1, 2).await.unwrap(), None); + // Remove doc 4 + assert_eq!(p.remove_posting(&mut tx, 1, 4).await.unwrap(), Some(5)); + finish(tx, p).await; - // The underlying b-tree should be empty now - assert_eq!(p.statistics(&mut tx).await.unwrap().keys_count, 0); - tx.commit().await.unwrap(); + // The underlying b-tree should be empty now + let (mut tx, p) = new_operation(&ds, DEFAULT_BTREE_ORDER, Read).await; + assert_eq!(p.statistics(&mut tx).await.unwrap().keys_count, 0); + } } } diff --git a/lib/src/idx/ft/terms.rs b/lib/src/idx/ft/terms.rs index 0002ff97..aa7c243e 100644 --- a/lib/src/idx/ft/terms.rs +++ b/lib/src/idx/ft/terms.rs @@ -1,32 +1,32 @@ use crate::err::Error; use crate::idx::trees::bkeys::FstKeys; -use crate::idx::trees::btree::{BState, BStatistics, BTree, BTreeNodeStore}; -use crate::idx::trees::store::{TreeNodeProvider, TreeNodeStore, TreeStoreType}; +use crate::idx::trees::btree::{BState, BStatistics, BTree, BTreeStore}; +use crate::idx::trees::store::{IndexStores, TreeNodeProvider}; use crate::idx::{IndexKeyBase, VersionedSerdeState}; -use crate::kvs::{Key, Transaction}; +use crate::kvs::{Key, Transaction, TransactionType}; use revision::revisioned; use roaring::RoaringTreemap; use serde::{Deserialize, Serialize}; -use std::sync::Arc; -use tokio::sync::Mutex; + pub(crate) type TermId = u64; pub(super) struct Terms { state_key: Key, index_key_base: IndexKeyBase, btree: BTree, - store: Arc>>, + store: BTreeStore, available_ids: Option, next_term_id: TermId, - updated: bool, } impl Terms { pub(super) async fn new( + ixs: &IndexStores, tx: &mut Transaction, index_key_base: IndexKeyBase, default_btree_order: u32, - store_type: TreeStoreType, + tt: TransactionType, + cache_size: u32, ) -> Result { let state_key: Key = index_key_base.new_bt_key(None); let state: State = if let Some(val) = tx.get(state_key.clone()).await? { @@ -34,8 +34,14 @@ impl Terms { } else { State::new(default_btree_order) }; - let store = - TreeNodeStore::new(TreeNodeProvider::Terms(index_key_base.clone()), store_type, 20); + let store = ixs + .get_store_btree_fst( + TreeNodeProvider::Terms(index_key_base.clone()), + state.btree.generation(), + tt, + cache_size as usize, + ) + .await; Ok(Self { state_key, index_key_base, @@ -43,7 +49,6 @@ impl Terms { store, available_ids: state.available_ids, next_term_id: state.next_term_id, - updated: false, }) } @@ -71,16 +76,13 @@ impl Terms { ) -> Result { let term_key = term.into(); { - let mut store = self.store.lock().await; - if let Some(term_id) = self.btree.search(tx, &mut store, &term_key).await? { + if let Some(term_id) = self.btree.search_mut(tx, &mut self.store, &term_key).await? { return Ok(term_id); } } let term_id = self.get_next_term_id(); tx.set(self.index_key_base.new_bu_key(term_id), term_key.clone()).await?; - let mut store = self.store.lock().await; - self.btree.insert(tx, &mut store, term_key, term_id).await?; - self.updated = true; + self.btree.insert(tx, &mut self.store, term_key, term_id).await?; Ok(term_id) } @@ -89,8 +91,7 @@ impl Terms { tx: &mut Transaction, term: &str, ) -> Result, Error> { - let mut store = self.store.lock().await; - self.btree.search(tx, &mut store, &term.into()).await + self.btree.search(tx, &self.store, &term.into()).await } pub(super) async fn remove_term_id( @@ -100,8 +101,7 @@ impl Terms { ) -> Result<(), Error> { let term_id_key = self.index_key_base.new_bu_key(term_id); if let Some(term_key) = tx.get(term_id_key.clone()).await? { - let mut store = self.store.lock().await; - self.btree.delete(tx, &mut store, term_key.clone()).await?; + self.btree.delete(tx, &mut self.store, term_key.clone()).await?; tx.del(term_id_key).await?; if let Some(available_ids) = &mut self.available_ids { available_ids.insert(term_id); @@ -110,21 +110,19 @@ impl Terms { available_ids.insert(term_id); self.available_ids = Some(available_ids); } - self.updated = true; } Ok(()) } pub(super) async fn statistics(&self, tx: &mut Transaction) -> Result { - let mut store = self.store.lock().await; - self.btree.statistics(tx, &mut store).await + self.btree.statistics(tx, &self.store).await } pub(super) async fn finish(&mut self, tx: &mut Transaction) -> Result<(), Error> { - let updated = self.store.lock().await.finish(tx).await?; - if self.updated || updated { + if self.store.finish(tx).await? { + let btree = self.btree.inc_generation().clone(); let state = State { - btree: self.btree.get_state().clone(), + btree, available_ids: self.available_ids.take(), next_term_id: self.next_term_id, }; @@ -158,11 +156,12 @@ impl State { mod tests { use crate::idx::ft::postings::TermFrequency; use crate::idx::ft::terms::Terms; - use crate::idx::trees::store::TreeStoreType; use crate::idx::IndexKeyBase; - use crate::kvs::{Datastore, LockType::*, TransactionType::*}; + use crate::kvs::TransactionType::{Read, Write}; + use crate::kvs::{Datastore, LockType::*, Transaction, TransactionType}; use rand::{thread_rng, Rng}; use std::collections::HashSet; + use test_log::test; fn random_term(key_length: usize) -> String { thread_rng() @@ -180,97 +179,106 @@ mod tests { set } - #[tokio::test] + async fn new_operation( + ds: &Datastore, + order: u32, + tt: TransactionType, + ) -> (Transaction, Terms) { + let mut tx = ds.transaction(tt, Optimistic).await.unwrap(); + let t = Terms::new(ds.index_store(), &mut tx, IndexKeyBase::default(), order, tt, 100) + .await + .unwrap(); + (tx, t) + } + + async fn finish(mut tx: Transaction, mut t: Terms) { + t.finish(&mut tx).await.unwrap(); + tx.commit().await.unwrap(); + } + + #[test(tokio::test)] async fn test_resolve_terms() { const BTREE_ORDER: u32 = 7; - let idx = IndexKeyBase::default(); - let ds = Datastore::new("memory").await.unwrap(); { - let mut tx = ds.transaction(Write, Optimistic).await.unwrap(); - let mut t = - Terms::new(&mut tx, idx.clone(), BTREE_ORDER, TreeStoreType::Write).await.unwrap(); - t.finish(&mut tx).await.unwrap(); - tx.commit().await.unwrap(); + // Empty operation + let (tx, t) = new_operation(&ds, BTREE_ORDER, Write).await; + finish(tx, t).await; } // Resolve a first term { - let mut tx = ds.transaction(Write, Optimistic).await.unwrap(); - let mut t = - Terms::new(&mut tx, idx.clone(), BTREE_ORDER, TreeStoreType::Write).await.unwrap(); + let (mut tx, mut t) = new_operation(&ds, BTREE_ORDER, Write).await; assert_eq!(t.resolve_term_id(&mut tx, "C").await.unwrap(), 0); + finish(tx, t).await; + let (mut tx, t) = new_operation(&ds, BTREE_ORDER, Read).await; assert_eq!(t.statistics(&mut tx).await.unwrap().keys_count, 1); - t.finish(&mut tx).await.unwrap(); - tx.commit().await.unwrap(); } // Resolve a second term { - let mut tx = ds.transaction(Write, Optimistic).await.unwrap(); - let mut t = - Terms::new(&mut tx, idx.clone(), BTREE_ORDER, TreeStoreType::Write).await.unwrap(); + let (mut tx, mut t) = new_operation(&ds, BTREE_ORDER, Write).await; assert_eq!(t.resolve_term_id(&mut tx, "D").await.unwrap(), 1); + finish(tx, t).await; + let (mut tx, t) = new_operation(&ds, BTREE_ORDER, Read).await; assert_eq!(t.statistics(&mut tx).await.unwrap().keys_count, 2); - t.finish(&mut tx).await.unwrap(); - tx.commit().await.unwrap(); } // Resolve two existing terms with new frequencies { - let mut tx = ds.transaction(Write, Optimistic).await.unwrap(); - let mut t = - Terms::new(&mut tx, idx.clone(), BTREE_ORDER, TreeStoreType::Write).await.unwrap(); + let (mut tx, mut t) = new_operation(&ds, BTREE_ORDER, Write).await; assert_eq!(t.resolve_term_id(&mut tx, "C").await.unwrap(), 0); assert_eq!(t.resolve_term_id(&mut tx, "D").await.unwrap(), 1); + finish(tx, t).await; + let (mut tx, t) = new_operation(&ds, BTREE_ORDER, Read).await; assert_eq!(t.statistics(&mut tx).await.unwrap().keys_count, 2); - t.finish(&mut tx).await.unwrap(); - tx.commit().await.unwrap(); } // Resolve one existing terms and two new terms { - let mut tx = ds.transaction(Write, Optimistic).await.unwrap(); - let mut t = - Terms::new(&mut tx, idx.clone(), BTREE_ORDER, TreeStoreType::Write).await.unwrap(); - + let (mut tx, mut t) = new_operation(&ds, BTREE_ORDER, Write).await; assert_eq!(t.resolve_term_id(&mut tx, "A").await.unwrap(), 2); assert_eq!(t.resolve_term_id(&mut tx, "C").await.unwrap(), 0); assert_eq!(t.resolve_term_id(&mut tx, "E").await.unwrap(), 3); + finish(tx, t).await; + let (mut tx, t) = new_operation(&ds, BTREE_ORDER, Read).await; assert_eq!(t.statistics(&mut tx).await.unwrap().keys_count, 4); - t.finish(&mut tx).await.unwrap(); - tx.commit().await.unwrap(); } } - #[tokio::test] + #[test(tokio::test)] async fn test_deletion() { const BTREE_ORDER: u32 = 7; - let idx = IndexKeyBase::default(); - let ds = Datastore::new("memory").await.unwrap(); - let mut tx = ds.transaction(Write, Optimistic).await.unwrap(); - let mut t = - Terms::new(&mut tx, idx.clone(), BTREE_ORDER, TreeStoreType::Write).await.unwrap(); + { + let (mut tx, mut t) = new_operation(&ds, BTREE_ORDER, Write).await; - // Check removing an non-existing term id returns None - assert!(t.remove_term_id(&mut tx, 0).await.is_ok()); + // Check removing an non-existing term id returns None + assert!(t.remove_term_id(&mut tx, 0).await.is_ok()); - // Create few terms - t.resolve_term_id(&mut tx, "A").await.unwrap(); - t.resolve_term_id(&mut tx, "C").await.unwrap(); - t.resolve_term_id(&mut tx, "E").await.unwrap(); + // Create few terms + t.resolve_term_id(&mut tx, "A").await.unwrap(); + t.resolve_term_id(&mut tx, "C").await.unwrap(); + t.resolve_term_id(&mut tx, "E").await.unwrap(); + finish(tx, t).await; + } for term in ["A", "C", "E"] { + let (mut tx, t) = new_operation(&ds, BTREE_ORDER, Read).await; let term_id = t.get_term_id(&mut tx, term).await.unwrap(); + if let Some(term_id) = term_id { + let (mut tx, mut t) = new_operation(&ds, BTREE_ORDER, Write).await; t.remove_term_id(&mut tx, term_id).await.unwrap(); + finish(tx, t).await; + + let (mut tx, t) = new_operation(&ds, BTREE_ORDER, Read).await; assert_eq!(t.get_term_id(&mut tx, term).await.unwrap(), None); } else { panic!("Term ID not found: {}", term); @@ -278,11 +286,10 @@ mod tests { } // Check id recycling + let (mut tx, mut t) = new_operation(&ds, BTREE_ORDER, Write).await; assert_eq!(t.resolve_term_id(&mut tx, "B").await.unwrap(), 0); assert_eq!(t.resolve_term_id(&mut tx, "D").await.unwrap(), 1); - - t.finish(&mut tx).await.unwrap(); - tx.commit().await.unwrap(); + finish(tx, t).await; } fn random_term_freq_vec(term_count: usize) -> Vec<(String, TermFrequency)> { @@ -295,39 +302,31 @@ mod tests { vec } - #[tokio::test] + #[test(tokio::test)] async fn test_resolve_100_docs_with_50_words_one_by_one() { let ds = Datastore::new("memory").await.unwrap(); for _ in 0..100 { - let mut tx = ds.transaction(Write, Optimistic).await.unwrap(); - let mut t = Terms::new(&mut tx, IndexKeyBase::default(), 100, TreeStoreType::Write) - .await - .unwrap(); + let (mut tx, mut t) = new_operation(&ds, 100, Write).await; let terms_string = random_term_freq_vec(50); for (term, _) in terms_string { t.resolve_term_id(&mut tx, &term).await.unwrap(); } - t.finish(&mut tx).await.unwrap(); - tx.commit().await.unwrap(); + finish(tx, t).await; } } - #[tokio::test] + #[test(tokio::test)] async fn test_resolve_100_docs_with_50_words_batch_of_10() { let ds = Datastore::new("memory").await.unwrap(); for _ in 0..10 { - let mut tx = ds.transaction(Write, Optimistic).await.unwrap(); - let mut t = Terms::new(&mut tx, IndexKeyBase::default(), 100, TreeStoreType::Write) - .await - .unwrap(); + let (mut tx, mut t) = new_operation(&ds, 100, Write).await; for _ in 0..10 { let terms_string = random_term_freq_vec(50); for (term, _) in terms_string { t.resolve_term_id(&mut tx, &term).await.unwrap(); } } - t.finish(&mut tx).await.unwrap(); - tx.commit().await.unwrap(); + finish(tx, t).await; } } } diff --git a/lib/src/idx/planner/executor.rs b/lib/src/idx/planner/executor.rs index 85fa95a2..deea8f23 100644 --- a/lib/src/idx/planner/executor.rs +++ b/lib/src/idx/planner/executor.rs @@ -14,10 +14,9 @@ use crate::idx::planner::plan::IndexOperator::Matches; use crate::idx::planner::plan::{IndexOperator, IndexOption, RangeValue}; use crate::idx::planner::tree::{IndexRef, IndexesMap}; use crate::idx::trees::mtree::MTreeIndex; -use crate::idx::trees::store::TreeStoreType; use crate::idx::IndexKeyBase; use crate::kvs; -use crate::kvs::Key; +use crate::kvs::{Key, TransactionType}; use crate::sql::index::Index; use crate::sql::statements::DefineIndexStatement; use crate::sql::{Array, Expression, Object, Table, Thing, Value}; @@ -85,9 +84,16 @@ impl QueryExecutor { } } else { let ikb = IndexKeyBase::new(opt, idx_def); - let ft = - FtIndex::new(opt, txn, p.az.as_str(), ikb, p, TreeStoreType::Read) - .await?; + let ft = FtIndex::new( + ctx.get_index_stores(), + opt, + txn, + p.az.as_str(), + ikb, + p, + TransactionType::Read, + ) + .await?; if ft_entry.is_none() { ft_entry = FtEntry::new(ctx, opt, txn, &ft, io).await?; } @@ -111,8 +117,14 @@ impl QueryExecutor { MtEntry::new(&mut tx, mt, a.clone(), *k).await? } else { let ikb = IndexKeyBase::new(opt, idx_def); - let mt = - MTreeIndex::new(&mut tx, ikb, p, TreeStoreType::Read).await?; + let mt = MTreeIndex::new( + ctx.get_index_stores(), + &mut tx, + ikb, + p, + TransactionType::Read, + ) + .await?; let entry = MtEntry::new(&mut tx, &mt, a.clone(), *k).await?; mt_map.insert(ix_ref, mt); entry diff --git a/lib/src/idx/trees/bkeys.rs b/lib/src/idx/trees/bkeys.rs index 458a0ae2..99ece032 100644 --- a/lib/src/idx/trees/bkeys.rs +++ b/lib/src/idx/trees/bkeys.rs @@ -5,11 +5,11 @@ use fst::{IntoStreamer, Map, MapBuilder, Streamer}; use radix_trie::{SubTrie, Trie, TrieCommon}; use serde::ser; use std::collections::VecDeque; -use std::fmt::{Display, Formatter}; +use std::fmt::{Debug, Display, Formatter}; use std::io; use std::io::Cursor; -pub trait BKeys: Default + Display + Sized { +pub trait BKeys: Default + Debug + Display + Sized { fn with_key_val(key: Key, payload: Payload) -> Result; fn len(&self) -> u32; fn is_empty(&self) -> bool; @@ -19,7 +19,7 @@ pub trait BKeys: Default + Display + Sized { // The size of the Node should be small, therefore one instance of // BKeys would never be store a large volume of keys. fn collect_with_prefix(&self, prefix_key: &Key) -> Result, Error>; - fn insert(&mut self, key: Key, payload: Payload); + fn insert(&mut self, key: Key, payload: Payload) -> Option; fn append(&mut self, keys: Self); fn remove(&mut self, key: &Key) -> Option; fn split_keys(self) -> Result, Error>; @@ -30,9 +30,6 @@ pub trait BKeys: Default + Display + Sized { fn read_from(c: &mut Cursor>) -> Result; fn write_to(&self, c: &mut Cursor>) -> Result<(), Error>; fn compile(&mut self) {} - fn debug(&self, to_string: F) -> Result<(), Error> - where - F: Fn(Key) -> Result; } pub struct SplitKeys @@ -46,12 +43,12 @@ where pub(in crate::idx) median_payload: Payload, } -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct FstKeys { i: Inner, } -#[derive(Debug)] +#[derive(Debug, Clone)] enum Inner { Map(Map>), Trie(TrieKeys), @@ -104,14 +101,15 @@ impl BKeys for FstKeys { } fn collect_with_prefix(&self, _prefix_key: &Key) -> Result, Error> { - Err(Error::Unreachable) + Err(Error::Unreachable("BKeys/FSTKeys::collect_with_prefix")) } - fn insert(&mut self, key: Key, payload: Payload) { + fn insert(&mut self, key: Key, payload: Payload) -> Option { self.edit(); if let Inner::Trie(t) = &mut self.i { - t.insert(key, payload); + return t.insert(key, payload); } + unreachable!() } fn append(&mut self, keys: Self) { @@ -159,7 +157,7 @@ impl BKeys for FstKeys { median_payload: s.median_payload, }) } else { - Err(Error::Unreachable) + Err(Error::Unreachable("BKeys/FSTKeys::split_keys")) } } @@ -245,30 +243,6 @@ impl BKeys for FstKeys { ))) } } - - fn debug(&self, to_string: F) -> Result<(), Error> - where - F: Fn(Key) -> Result, - { - match &self.i { - Inner::Map(m) => { - let mut s = String::new(); - let mut iter = m.stream(); - let mut start = true; - while let Some((k, p)) = iter.next() { - if !start { - s.push(','); - } else { - start = false; - } - s.push_str(&format!("{}={}", to_string(k.to_vec())?.as_str(), p)); - } - debug!("FSTKeys[{}]", s); - Ok(()) - } - Inner::Trie(t) => t.debug(to_string), - } - } } impl TryFrom>> for FstKeys { @@ -305,12 +279,12 @@ impl Display for FstKeys { } Ok(()) } - Inner::Trie(t) => t.fmt(f), + Inner::Trie(t) => write!(f, "{}", t), } } } -#[derive(Default, Debug)] +#[derive(Default, Debug, Clone)] pub struct TrieKeys { keys: Trie, } @@ -372,8 +346,8 @@ impl BKeys for TrieKeys { Ok(r) } - fn insert(&mut self, key: Key, payload: Payload) { - self.keys.insert(key, payload); + fn insert(&mut self, key: Key, payload: Payload) -> Option { + self.keys.insert(key, payload) } fn append(&mut self, keys: Self) { @@ -400,7 +374,7 @@ impl BKeys for TrieKeys { let (median_key, median_payload) = if let Some((k, v)) = s.next() { (k.clone(), *v) } else { - return Err(Error::Unreachable); + return Err(Error::Unreachable("BKeys/TrieKeys::split_keys")); }; let mut right = Trie::default(); for (key, val) in s { @@ -468,24 +442,6 @@ impl BKeys for TrieKeys { bincode::serialize_into(c, &compressed)?; Ok(()) } - - fn debug(&self, to_string: F) -> Result<(), Error> - where - F: Fn(Key) -> Result, - { - let mut s = String::new(); - let mut start = true; - for (k, p) in self.keys.iter() { - if !start { - s.push(','); - } else { - start = false; - } - s.push_str(&format!("{}={}", to_string(k.to_vec())?.as_str(), p)); - } - debug!("TrieKeys[{}]", s); - Ok(()) - } } impl From> for TrieKeys { diff --git a/lib/src/idx/trees/btree.rs b/lib/src/idx/trees/btree.rs index a9f02992..efa8a8dd 100644 --- a/lib/src/idx/trees/btree.rs +++ b/lib/src/idx/trees/btree.rs @@ -1,19 +1,22 @@ use crate::err::Error; use crate::idx::trees::bkeys::BKeys; -use crate::idx::trees::store::{NodeId, StoredNode, TreeNode, TreeNodeStore}; +use crate::idx::trees::store::{NodeId, StoredNode, TreeNode, TreeStore}; use crate::idx::VersionedSerdeState; use crate::kvs::{Key, Transaction, Val}; use crate::sql::{Object, Value}; use revision::revisioned; use serde::{Deserialize, Serialize}; +#[cfg(debug_assertions)] +use std::collections::HashSet; use std::collections::VecDeque; -use std::fmt::Debug; +use std::fmt::{Debug, Display, Formatter}; use std::io::Cursor; use std::marker::PhantomData; + pub type Payload = u64; type BStoredNode = StoredNode>; -pub(in crate::idx) type BTreeNodeStore = TreeNodeStore>; +pub(in crate::idx) type BTreeStore = TreeStore>; pub struct BTree where @@ -25,13 +28,13 @@ where } #[derive(Clone, Serialize, Deserialize)] -#[revisioned(revision = 1)] +#[revisioned(revision = 2)] pub struct BState { minimum_degree: u32, root: Option, next_node_id: NodeId, - #[serde(skip)] - updated: bool, + #[revision(start = 2)] + generation: u64, } impl VersionedSerdeState for BState {} @@ -43,33 +46,24 @@ impl BState { minimum_degree, root: None, next_node_id: 0, - updated: false, + generation: 0, } } fn set_root(&mut self, node_id: Option) { if node_id.ne(&self.root) { self.root = node_id; - self.updated = true; } } fn new_node_id(&mut self) -> NodeId { let new_node_id = self.next_node_id; self.next_node_id += 1; - self.updated = true; new_node_id } - pub(in crate::idx) async fn finish( - &self, - tx: &mut Transaction, - key: &Key, - ) -> Result<(), Error> { - if self.updated { - tx.set(key.clone(), self.try_to_val()?).await?; - } - Ok(()) + pub(in crate::idx) fn generation(&self) -> u64 { + self.generation } } @@ -92,10 +86,10 @@ impl From for Value { } } -#[derive(Debug)] +#[derive(Debug, Clone)] pub enum BTreeNode where - BK: BKeys, + BK: BKeys + Clone, { Internal(BK, Vec), Leaf(BK), @@ -103,7 +97,7 @@ where impl TreeNode for BTreeNode where - BK: BKeys, + BK: BKeys + Clone, { fn try_from_val(val: Val) -> Result { let mut c: Cursor> = Cursor::new(val); @@ -115,7 +109,7 @@ where Ok(BTreeNode::Internal(keys, child)) } 2u8 => Ok(BTreeNode::Leaf(keys)), - _ => Err(Error::CorruptedIndex), + _ => Err(Error::CorruptedIndex("BTreeNode::try_from_val")), } } @@ -139,7 +133,7 @@ where impl BTreeNode where - BK: BKeys, + BK: BKeys + Clone, { fn keys(&self) -> &BK { match self { @@ -155,29 +149,55 @@ where } } - fn append(&mut self, key: Key, payload: Payload, node: BTreeNode) -> Result<(), Error> { + fn append( + &mut self, + key: Key, + payload: Payload, + node: BTreeNode, + ) -> Result, Error> { match self { BTreeNode::Internal(keys, children) => { if let BTreeNode::Internal(append_keys, mut append_children) = node { - keys.insert(key, payload); keys.append(append_keys); children.append(&mut append_children); - Ok(()) + Ok(keys.insert(key, payload)) } else { - Err(Error::CorruptedIndex) + Err(Error::CorruptedIndex("BTree::append(1)")) } } BTreeNode::Leaf(keys) => { if let BTreeNode::Leaf(append_keys) = node { - keys.insert(key, payload); keys.append(append_keys); - Ok(()) + Ok(keys.insert(key, payload)) } else { - Err(Error::CorruptedIndex) + Err(Error::CorruptedIndex("BTree::append(2)")) } } } } + #[cfg(debug_assertions)] + fn check(&self) { + match self { + BTreeNode::Internal(k, c) => { + if (k.len() + 1) as usize != c.len() { + panic!("k: {} - c: {} - {}", k.len(), c.len(), self); + } + } + BTreeNode::Leaf(_) => {} + } + } +} + +impl Display for BTreeNode +where + BK: BKeys + Clone, +{ + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + match self { + BTreeNode::Internal(k, c) => write!(f, "(I) - k: {} - c: {:?}", k, c), + BTreeNode::Leaf(k) => write!(f, "(L) - k: {}", k), + } + } } struct SplitResult { @@ -188,7 +208,7 @@ struct SplitResult { impl BTree where - BK: BKeys + Debug, + BK: BKeys + Debug + Clone, { pub fn new(state: BState) -> Self { Self { @@ -198,24 +218,49 @@ where } } + pub(in crate::idx) fn inc_generation(&mut self) -> &BState { + self.state.generation += 1; + &self.state + } + pub async fn search( &self, tx: &mut Transaction, - store: &mut BTreeNodeStore, + store: &BTreeStore, searched_key: &Key, ) -> Result, Error> { let mut next_node = self.state.root; while let Some(node_id) = next_node.take() { let current = store.get_node(tx, node_id).await?; if let Some(payload) = current.n.keys().get(searched_key) { - store.set_node(current, false)?; return Ok(Some(payload)); } if let BTreeNode::Internal(keys, children) = ¤t.n { let child_idx = keys.get_child_idx(searched_key); next_node.replace(children[child_idx]); } - store.set_node(current, false)?; + } + Ok(None) + } + + pub async fn search_mut( + &self, + tx: &mut Transaction, + store: &mut BTreeStore, + searched_key: &Key, + ) -> Result, Error> { + let mut next_node = self.state.root; + while let Some(node_id) = next_node.take() { + let current = store.get_node_mut(tx, node_id).await?; + if let Some(payload) = current.n.keys().get(searched_key) { + store.set_node(current, false).await?; + return Ok(Some(payload)); + } + if let BTreeNode::Internal(keys, children) = ¤t.n { + let child_idx = keys.get_child_idx(searched_key); + next_node.replace(children[child_idx]); + } + store.set_node(current, false).await?; } Ok(None) } @@ -223,13 +268,13 @@ where pub async fn insert( &mut self, tx: &mut Transaction, - store: &mut BTreeNodeStore, + store: &mut BTreeStore, key: Key, payload: Payload, ) -> Result<(), Error> { if let Some(root_id) = self.state.root { // We already have a root node - let root = store.get_node(tx, root_id).await?; + let root = store.get_node_mut(tx, root_id).await?; if root.n.keys().len() == self.full_size { // The root node is full, let's split it let new_root_id = self.state.new_node_id(); @@ -241,7 +286,7 @@ where } else { // The root node has place, let's insert the value let root_id = root.id; - store.set_node(root, false)?; + store.set_node(root, false).await?; self.insert_non_full(tx, store, root_id, key, payload).await?; } } else { @@ -249,7 +294,9 @@ where let new_root_id = self.state.new_node_id(); let new_root_node = store.new_node(new_root_id, BTreeNode::Leaf(BK::with_key_val(key, payload)?))?; - store.set_node(new_root_node, true)?; + #[cfg(debug_assertions)] + new_root_node.n.check(); + store.set_node(new_root_node, true).await?; self.state.set_root(Some(new_root_id)); } Ok(()) @@ -258,28 +305,30 @@ where async fn insert_non_full( &mut self, tx: &mut Transaction, - store: &mut BTreeNodeStore, + store: &mut BTreeStore, node_id: NodeId, key: Key, payload: Payload, ) -> Result<(), Error> { let mut next_node_id = Some(node_id); while let Some(node_id) = next_node_id.take() { - let mut node = store.get_node(tx, node_id).await?; + let mut node = store.get_node_mut(tx, node_id).await?; let key: Key = key.clone(); match &mut node.n { BTreeNode::Leaf(keys) => { keys.insert(key, payload); - store.set_node(node, true)?; + store.set_node(node, true).await?; } BTreeNode::Internal(keys, children) => { if keys.get(&key).is_some() { keys.insert(key, payload); - store.set_node(node, true)?; + #[cfg(debug_assertions)] + node.n.check(); + store.set_node(node, true).await?; return Ok(()); } let child_idx = keys.get_child_idx(&key); - let child = store.get_node(tx, children[child_idx]).await?; + let child = store.get_node_mut(tx, children[child_idx]).await?; let next_id = if child.n.keys().len() == self.full_size { let split_result = self.split_child(store, node, child_idx, child).await?; if key.gt(&split_result.median_key) { @@ -289,8 +338,8 @@ where } } else { let child_id = child.id; - store.set_node(node, false)?; - store.set_node(child, false)?; + store.set_node(node, false).await?; + store.set_node(child, false).await?; child_id }; next_node_id.replace(next_id); @@ -302,8 +351,8 @@ where async fn split_child( &mut self, - store: &mut BTreeNodeStore, - mut parent_node: BStoredNode, + store: &mut BTreeStore, + mut parent_node: StoredNode>, idx: usize, child_node: BStoredNode, ) -> Result { @@ -314,7 +363,10 @@ where let right_node_id = self.state.new_node_id(); match parent_node.n { BTreeNode::Internal(ref mut keys, ref mut children) => { - keys.insert(median_key.clone(), median_payload); + if keys.insert(median_key.clone(), median_payload).is_some() { + #[cfg(debug_assertions)] + panic!("Existing key: {} - {}", String::from_utf8(median_key)?, parent_node.n) + } children.insert(idx + 1, right_node_id); } BTreeNode::Leaf(ref mut keys) => { @@ -324,12 +376,18 @@ where // Save the mutated split child with half the (lower) keys let left_node_id = child_node.id; let left_node = store.new_node(left_node_id, left_node)?; - store.set_node(left_node, true)?; + #[cfg(debug_assertions)] + left_node.n.check(); + store.set_node(left_node, true).await?; // Save the new child with half the (upper) keys let right_node = store.new_node(right_node_id, right_node)?; - store.set_node(right_node, true)?; + #[cfg(debug_assertions)] + right_node.n.check(); + store.set_node(right_node, true).await?; // Save the parent node - store.set_node(parent_node, true)?; + #[cfg(debug_assertions)] + parent_node.n.check(); + store.set_node(parent_node, true).await?; Ok(SplitResult { left_node_id, right_node_id, @@ -362,7 +420,7 @@ where pub(in crate::idx) async fn delete( &mut self, tx: &mut Transaction, - store: &mut BTreeNodeStore, + store: &mut BTreeStore, key_to_delete: Key, ) -> Result, Error> { let mut deleted_payload = None; @@ -371,10 +429,20 @@ where let mut next_node = Some((true, key_to_delete, root_id)); while let Some((is_main_key, key_to_delete, node_id)) = next_node.take() { - let mut node = store.get_node(tx, node_id).await?; + let mut node = store.get_node_mut(tx, node_id).await?; + #[cfg(debug_assertions)] + debug!( + "Delete loop - key_to_delete: {} - {node}", + String::from_utf8_lossy(&key_to_delete) + ); match &mut node.n { BTreeNode::Leaf(keys) => { // CLRS: 1 + #[cfg(debug_assertions)] + debug!( + "CLRS: 1 - node: {node_id} - key_to_delete: {} - keys: {keys}", + String::from_utf8_lossy(&key_to_delete) + ); if let Some(payload) = keys.get(&key_to_delete) { if is_main_key { deleted_payload = Some(payload); @@ -382,21 +450,28 @@ where keys.remove(&key_to_delete); if keys.len() == 0 { // The node is empty, we can delete it - store.remove_node(node.id, node.key)?; + store.remove_node(node.id, node.key).await?; // Check if this was the root node if Some(node_id) == self.state.root { self.state.set_root(None); } } else { - store.set_node(node, true)?; + #[cfg(debug_assertions)] + node.n.check(); + store.set_node(node, true).await?; } } else { - store.set_node(node, false)?; + store.set_node(node, false).await?; } } BTreeNode::Internal(keys, children) => { - // CLRS: 2 if let Some(payload) = keys.get(&key_to_delete) { + // CLRS: 2 + #[cfg(debug_assertions)] + debug!( + "CLRS: 2 - node: {node_id} - key_to_delete: {} - k: {keys} - c: {children:?}", + String::from_utf8_lossy(&key_to_delete) + ); if is_main_key { deleted_payload = Some(payload); } @@ -410,9 +485,16 @@ where ) .await?, ); - store.set_node(node, true)?; + #[cfg(debug_assertions)] + node.n.check(); + store.set_node(node, true).await?; } else { // CLRS: 3 + #[cfg(debug_assertions)] + debug!( + "CLRS: 3 - node: {node_id} - key_to_delete: {} - keys: {keys}", + String::from_utf8_lossy(&key_to_delete) + ); let (node_update, is_main_key, key_to_delete, next_stored_node) = self .deleted_traversal( tx, @@ -427,15 +509,15 @@ where if let Some(root_id) = self.state.root { // Delete the old root node if root_id != node.id { - return Err(Error::Unreachable); + return Err(Error::Unreachable("BTree::delete")); } } - store.remove_node(node_id, node.key)?; + store.remove_node(node.id, node.key).await?; self.state.set_root(Some(next_stored_node)); - } else if node_update { - store.set_node(node, true)?; } else { - store.set_node(node, false)?; + #[cfg(debug_assertions)] + node.n.check(); + store.set_node(node, node_update).await?; } next_node.replace((is_main_key, key_to_delete, next_stored_node)); } @@ -449,67 +531,164 @@ where async fn deleted_from_internal( &mut self, tx: &mut Transaction, - store: &mut BTreeNodeStore, + store: &mut BTreeStore, keys: &mut BK, children: &mut Vec, key_to_delete: Key, ) -> Result<(bool, Key, NodeId), Error> { + #[cfg(debug_assertions)] + debug!( + "Delete from internal - key_to_delete: {} - keys: {keys}", + String::from_utf8_lossy(&key_to_delete) + ); let left_idx = keys.get_child_idx(&key_to_delete); let left_id = children[left_idx]; - let mut left_node = store.get_node(tx, left_id).await?; + let mut left_node = store.get_node_mut(tx, left_id).await?; + // if the child y that precedes k in nodexx has at least t keys if left_node.n.keys().len() >= self.state.minimum_degree { // CLRS: 2a -> left_node is named `y` in the book - if let Some((key_prim, payload_prim)) = left_node.n.keys().get_last_key() { - keys.remove(&key_to_delete); - keys.insert(key_prim.clone(), payload_prim); - store.set_node(left_node, true)?; - return Ok((false, key_prim, left_id)); + #[cfg(debug_assertions)] + debug!( + "CLRS: 2a - key_to_delete: {} - left: {left_node} - keys: {keys}", + String::from_utf8_lossy(&key_to_delete) + ); + let (key_prim, payload_prim) = self.find_highest(tx, store, left_node).await?; + if keys.remove(&key_to_delete).is_none() { + #[cfg(debug_assertions)] + panic!("Remove key {} {} ", String::from_utf8(key_to_delete)?, keys); } + if keys.insert(key_prim.clone(), payload_prim).is_some() { + #[cfg(debug_assertions)] + panic!("Insert key {} {} ", String::from_utf8(key_prim)?, keys); + } + return Ok((false, key_prim, left_id)); } let right_idx = left_idx + 1; let right_id = children[right_idx]; - let right_node = store.get_node(tx, right_id).await?; + let right_node = store.get_node_mut(tx, right_id).await?; if right_node.n.keys().len() >= self.state.minimum_degree { + // Cleanup 2a evaluation + store.set_node(left_node, false).await?; // CLRS: 2b -> right_node is name `z` in the book - if let Some((key_prim, payload_prim)) = right_node.n.keys().get_first_key() { - keys.remove(&key_to_delete); - keys.insert(key_prim.clone(), payload_prim); - store.set_node(left_node, false)?; - store.set_node(right_node, true)?; - return Ok((false, key_prim, right_id)); + #[cfg(debug_assertions)] + debug!( + "CLRS: 2b - key_to_delete: {} - right: {right_node} - keys: {keys}", + String::from_utf8_lossy(&key_to_delete) + ); + let (key_prim, payload_prim) = self.find_lowest(tx, store, right_node).await?; + if keys.remove(&key_to_delete).is_none() { + #[cfg(debug_assertions)] + panic!("Remove key {} {} ", String::from_utf8(key_to_delete)?, keys); } + if keys.insert(key_prim.clone(), payload_prim).is_some() { + panic!("Insert key {} {} ", String::from_utf8(key_prim)?, keys); + } + return Ok((false, key_prim, right_id)); } // CLRS: 2c // Merge children - // The payload is set to 0. The value does not matter, as the key will be deleted after anyway. + // The payload is set to 0. The payload does not matter, as the key will be deleted after anyway. + #[cfg(debug_assertions)] + { + left_node.n.check(); + debug!("CLRS: 2c"); + } left_node.n.append(key_to_delete.clone(), 0, right_node.n)?; - store.set_node(left_node, true)?; - store.remove_node(right_id, right_node.key)?; + #[cfg(debug_assertions)] + left_node.n.check(); + store.set_node(left_node, true).await?; + store.remove_node(right_id, right_node.key).await?; keys.remove(&key_to_delete); children.remove(right_idx); Ok((false, key_to_delete, left_id)) } + async fn find_highest( + &mut self, + tx: &mut Transaction, + store: &mut BTreeStore, + node: StoredNode>, + ) -> Result<(Key, Payload), Error> { + let mut next_node = Some(node); + while let Some(node) = next_node.take() { + match &node.n { + BTreeNode::Internal(_, c) => { + let id = c[c.len() - 1]; + store.set_node(node, false).await?; + let node = store.get_node_mut(tx, id).await?; + next_node.replace(node); + } + BTreeNode::Leaf(k) => { + let (key, payload) = + k.get_last_key().ok_or(Error::Unreachable("BTree::find_highest(1)"))?; + #[cfg(debug_assertions)] + debug!("Find highest: {} - node: {}", String::from_utf8_lossy(&key), node); + store.set_node(node, false).await?; + return Ok((key, payload)); + } + } + } + Err(Error::Unreachable("BTree::find_highest(2)")) + } + + async fn find_lowest( + &mut self, + tx: &mut Transaction, + store: &mut BTreeStore, + node: StoredNode>, + ) -> Result<(Key, Payload), Error> { + let mut next_node = Some(node); + while let Some(node) = next_node.take() { + match &node.n { + BTreeNode::Internal(_, c) => { + let id = c[0]; + store.set_node(node, false).await?; + let node = store.get_node_mut(tx, id).await?; + next_node.replace(node); + } + BTreeNode::Leaf(k) => { + let (key, payload) = + k.get_first_key().ok_or(Error::Unreachable("BTree::find_lowest(1)"))?; + #[cfg(debug_assertions)] + debug!("Find lowest: {} - node: {}", String::from_utf8_lossy(&key), node.id); + store.set_node(node, false).await?; + return Ok((key, payload)); + } + } + } + Err(Error::Unreachable("BTree::find_lowest(2)")) + } + async fn deleted_traversal( &mut self, tx: &mut Transaction, - store: &mut BTreeNodeStore, + store: &mut BTreeStore, keys: &mut BK, children: &mut Vec, key_to_delete: Key, is_main_key: bool, ) -> Result<(bool, bool, Key, NodeId), Error> { - // CLRS 3a + // CLRS 3 Determine the root x.ci that must contain k let child_idx = keys.get_child_idx(&key_to_delete); let child_id = children[child_idx]; - let child_stored_node = store.get_node(tx, child_id).await?; + #[cfg(debug_assertions)] + debug!( + "CLRS: 3 - key_to_delete: {} - child_id: {child_id}", + String::from_utf8_lossy(&key_to_delete) + ); + let child_stored_node = store.get_node_mut(tx, child_id).await?; + // If x.ci has only t-1 keys, execute 3a or 3b if child_stored_node.n.keys().len() < self.state.minimum_degree { - // right child (successor) if child_idx < children.len() - 1 { - let right_child_stored_node = store.get_node(tx, children[child_idx + 1]).await?; + let right_child_stored_node = + store.get_node_mut(tx, children[child_idx + 1]).await?; return if right_child_stored_node.n.keys().len() >= self.state.minimum_degree { + #[cfg(debug_assertions)] + debug!( + "CLRS: 3a - xci_child: {child_stored_node} - right_sibling_child: {right_child_stored_node}" + ); Self::delete_adjust_successor( store, keys, @@ -522,6 +701,8 @@ where .await } else { // CLRS 3b successor + #[cfg(debug_assertions)] + debug!("CLRS: 3b merge - keys: {keys} - xci_child: {child_stored_node} - right_sibling_child: {right_child_stored_node}"); Self::merge_nodes( store, keys, @@ -539,20 +720,24 @@ where // left child (predecessor) if child_idx > 0 { let child_idx = child_idx - 1; - let left_child_stored_node = store.get_node(tx, children[child_idx]).await?; + let left_child_stored_node = store.get_node_mut(tx, children[child_idx]).await?; return if left_child_stored_node.n.keys().len() >= self.state.minimum_degree { + #[cfg(debug_assertions)] + debug!("CLRS: 3a - left_sibling_child: {left_child_stored_node} - xci_child: {child_stored_node}",); Self::delete_adjust_predecessor( store, keys, child_idx, key_to_delete, is_main_key, - child_stored_node, left_child_stored_node, + child_stored_node, ) .await } else { // CLRS 3b predecessor + #[cfg(debug_assertions)] + debug!("CLRS: 3b merge - keys: {keys} - left_sibling_child: {left_child_stored_node} - xci_child: {child_stored_node}"); Self::merge_nodes( store, keys, @@ -568,12 +753,12 @@ where } } - store.set_node(child_stored_node, false)?; + store.set_node(child_stored_node, false).await?; Ok((false, true, key_to_delete, child_id)) } async fn delete_adjust_successor( - store: &mut BTreeNodeStore, + store: &mut BTreeStore, keys: &mut BK, child_idx: usize, key_to_delete: Key, @@ -581,56 +766,96 @@ where mut child_stored_node: BStoredNode, mut right_child_stored_node: BStoredNode, ) -> Result<(bool, bool, Key, NodeId), Error> { - if let Some((ascending_key, ascending_payload)) = - right_child_stored_node.n.keys().get_first_key() - { - right_child_stored_node.n.keys_mut().remove(&ascending_key); - if let Some(descending_key) = keys.get_key(child_idx) { - if let Some(descending_payload) = keys.remove(&descending_key) { - child_stored_node.n.keys_mut().insert(descending_key, descending_payload); - keys.insert(ascending_key, ascending_payload); - let child_id = child_stored_node.id; - store.set_node(child_stored_node, true)?; - store.set_node(right_child_stored_node, true)?; - return Ok((true, is_main_key, key_to_delete, child_id)); - } + let (ascending_key, ascending_payload) = + right_child_stored_node + .n + .keys() + .get_first_key() + .ok_or(Error::CorruptedIndex("BTree::delete_adjust_successor(1)"))?; + right_child_stored_node.n.keys_mut().remove(&ascending_key); + let descending_key = keys + .get_key(child_idx) + .ok_or(Error::CorruptedIndex("BTree::delete_adjust_successor(2)"))?; + let descending_payload = keys + .remove(&descending_key) + .ok_or(Error::CorruptedIndex("BTree::delete_adjust_successor(3)"))?; + if child_stored_node.n.keys_mut().insert(descending_key, descending_payload).is_some() { + #[cfg(debug_assertions)] + panic!("Duplicate insert key {} ", child_stored_node.n); + } + if let BTreeNode::Internal(_, rc) = &mut right_child_stored_node.n { + if let BTreeNode::Internal(_, lc) = &mut child_stored_node.n { + lc.push(rc.remove(0)) } } - // If we reach this point, something was wrong in the BTree - Err(Error::CorruptedIndex) + if keys.insert(ascending_key, ascending_payload).is_some() { + #[cfg(debug_assertions)] + panic!("Duplicate insert key {} ", keys); + } + let child_id = child_stored_node.id; + #[cfg(debug_assertions)] + { + child_stored_node.n.check(); + right_child_stored_node.n.check(); + } + store.set_node(child_stored_node, true).await?; + store.set_node(right_child_stored_node, true).await?; + Ok((true, is_main_key, key_to_delete, child_id)) } async fn delete_adjust_predecessor( - store: &mut BTreeNodeStore, + store: &mut BTreeStore, keys: &mut BK, child_idx: usize, key_to_delete: Key, is_main_key: bool, - mut child_stored_node: BStoredNode, mut left_child_stored_node: BStoredNode, + mut child_stored_node: BStoredNode, ) -> Result<(bool, bool, Key, NodeId), Error> { - if let Some((ascending_key, ascending_payload)) = - left_child_stored_node.n.keys().get_last_key() - { - left_child_stored_node.n.keys_mut().remove(&ascending_key); - if let Some(descending_key) = keys.get_key(child_idx) { - if let Some(descending_payload) = keys.remove(&descending_key) { - child_stored_node.n.keys_mut().insert(descending_key, descending_payload); - keys.insert(ascending_key, ascending_payload); - let child_id = child_stored_node.id; - store.set_node(child_stored_node, true)?; - store.set_node(left_child_stored_node, true)?; - return Ok((true, is_main_key, key_to_delete, child_id)); - } + let (ascending_key, ascending_payload) = left_child_stored_node + .n + .keys() + .get_last_key() + .ok_or(Error::CorruptedIndex("BTree::delete_adjust_predecessor(1)"))?; + if left_child_stored_node.n.keys_mut().remove(&ascending_key).is_none() { + #[cfg(debug_assertions)] + panic!("Remove key {} {}", String::from_utf8(ascending_key)?, left_child_stored_node.n); + } + let descending_key = keys + .get_key(child_idx) + .ok_or(Error::CorruptedIndex("BTree::delete_adjust_predecessor(2)"))?; + let descending_payload = keys + .remove(&descending_key) + .ok_or(Error::CorruptedIndex("BTree::delete_adjust_predecessor(3)"))?; + if child_stored_node.n.keys_mut().insert(descending_key, descending_payload).is_some() { + #[cfg(debug_assertions)] + panic!("Insert key {}", child_stored_node.n); + } + if let BTreeNode::Internal(_, lc) = &mut left_child_stored_node.n { + if let BTreeNode::Internal(_, rc) = &mut child_stored_node.n { + rc.insert(0, lc.remove(lc.len() - 1)); } } - // If we reach this point, something was wrong in the BTree - Err(Error::CorruptedIndex) + if keys.insert(ascending_key, ascending_payload).is_some() { + #[cfg(debug_assertions)] + panic!("Insert key {}", keys); + } + let child_id = child_stored_node.id; + #[cfg(debug_assertions)] + { + child_stored_node.n.check(); + left_child_stored_node.n.check(); + debug!("{}", left_child_stored_node); + debug!("{}", child_stored_node); + } + store.set_node(child_stored_node, true).await?; + store.set_node(left_child_stored_node, true).await?; + Ok((true, is_main_key, key_to_delete, child_id)) } #[allow(clippy::too_many_arguments)] async fn merge_nodes( - store: &mut BTreeNodeStore, + store: &mut BTreeStore, keys: &mut BK, children: &mut Vec, child_idx: usize, @@ -639,26 +864,35 @@ where mut left_child: BStoredNode, right_child: BStoredNode, ) -> Result<(bool, bool, Key, NodeId), Error> { - if let Some(descending_key) = keys.get_key(child_idx) { - if let Some(descending_payload) = keys.remove(&descending_key) { - children.remove(child_idx + 1); - let left_id = left_child.id; - left_child.n.append(descending_key, descending_payload, right_child.n)?; - store.set_node(left_child, true)?; - store.remove_node(right_child.id, right_child.key)?; - return Ok((true, is_main_key, key_to_delete, left_id)); - } + #[cfg(debug_assertions)] + debug!("Keys: {keys}"); + let descending_key = + keys.get_key(child_idx).ok_or(Error::CorruptedIndex("BTree::merge_nodes(1)"))?; + let descending_payload = + keys.remove(&descending_key).ok_or(Error::CorruptedIndex("BTree::merge_nodes(2)"))?; + #[cfg(debug_assertions)] + debug!("descending_key: {}", String::from_utf8_lossy(&descending_key)); + children.remove(child_idx + 1); + let left_id = left_child.id; + if left_child.n.append(descending_key, descending_payload, right_child.n)?.is_some() { + #[cfg(debug_assertions)] + panic!("Key already present"); } - // If we reach this point, something was wrong in the BTree - Err(Error::CorruptedIndex) + #[cfg(debug_assertions)] + left_child.n.check(); + store.set_node(left_child, true).await?; + store.remove_node(right_child.id, right_child.key).await?; + Ok((true, is_main_key, key_to_delete, left_id)) } pub(in crate::idx) async fn statistics( &self, tx: &mut Transaction, - store: &mut BTreeNodeStore, + store: &BTreeStore, ) -> Result { let mut stats = BStatistics::default(); + #[cfg(debug_assertions)] + let mut keys = HashSet::new(); let mut node_queue = VecDeque::new(); if let Some(node_id) = self.state.root { node_queue.push_front((node_id, 1)); @@ -669,6 +903,17 @@ where if depth > stats.max_depth { stats.max_depth = depth; } + #[cfg(debug_assertions)] + { + let k = stored.n.keys(); + for i in 0..k.len() { + if let Some(k) = k.get_key(i as usize) { + if !keys.insert(k.clone()) { + panic!("Duplicate key: {}", String::from_utf8(k)?); + } + } + } + } stats.nodes_count += 1; stats.total_size += stored.size as u64; if let BTreeNode::Internal(_, children) = &stored.n { @@ -677,14 +922,9 @@ where node_queue.push_front((*child_id, depth)); } }; - store.set_node(stored, false)?; } Ok(stats) } - - pub(in crate::idx) fn get_state(&self) -> &BState { - &self.state - } } #[cfg(test)] @@ -692,18 +932,17 @@ mod tests { use crate::err::Error; use crate::idx::trees::bkeys::{BKeys, FstKeys, TrieKeys}; use crate::idx::trees::btree::{ - BState, BStatistics, BStoredNode, BTree, BTreeNode, BTreeNodeStore, Payload, - }; - use crate::idx::trees::store::{ - NodeId, TreeNode, TreeNodeProvider, TreeNodeStore, TreeStoreType, + BState, BStatistics, BStoredNode, BTree, BTreeNode, BTreeStore, Payload, }; + use crate::idx::trees::store::{NodeId, TreeNode, TreeNodeProvider}; use crate::idx::VersionedSerdeState; - use crate::kvs::TransactionType::*; - use crate::kvs::{Datastore, Key, LockType::*, Transaction}; + use crate::kvs::{Datastore, Key, LockType::*, Transaction, TransactionType}; use rand::prelude::SliceRandom; use rand::thread_rng; - use std::collections::{HashMap, VecDeque}; + use std::cmp::Ordering; + use std::collections::{BTreeMap, VecDeque}; use std::fmt::Debug; + use std::sync::Arc; use test_log::test; #[test] @@ -732,177 +971,262 @@ mod tests { } async fn insertions_test( - tx: &mut Transaction, - store: &mut BTreeNodeStore, + mut tx: Transaction, + mut st: BTreeStore, t: &mut BTree, samples_size: usize, sample_provider: F, ) where F: Fn(usize) -> (Key, Payload), - BK: BKeys + Debug, + BK: BKeys + Debug + Clone, { for i in 0..samples_size { let (key, payload) = sample_provider(i); // Insert the sample - t.insert(tx, store, key.clone(), payload).await.unwrap(); - // Check we can find it - assert_eq!(t.search(tx, store, &key).await.unwrap(), Some(payload)); + t.insert(&mut tx, &mut st, key, payload).await.unwrap(); } + st.finish(&mut tx).await.unwrap(); + tx.commit().await.unwrap(); + } + + async fn check_insertions( + mut tx: Transaction, + mut st: BTreeStore, + t: &mut BTree, + samples_size: usize, + sample_provider: F, + ) where + F: Fn(usize) -> (Key, Payload), + BK: BKeys + Debug + Clone, + { + for i in 0..samples_size { + let (key, payload) = sample_provider(i); + assert_eq!(t.search(&mut tx, &mut st, &key).await.unwrap(), Some(payload)); + } + tx.cancel().await.unwrap(); } fn get_key_value(idx: usize) -> (Key, Payload) { (format!("{}", idx).into(), (idx * 10) as Payload) } + async fn new_operation_fst( + ds: &Datastore, + t: &BTree, + tt: TransactionType, + cache_size: usize, + ) -> (Transaction, BTreeStore) + where + BK: BKeys + Debug + Clone, + { + let st = ds + .index_store() + .get_store_btree_fst(TreeNodeProvider::Debug, t.state.generation, tt, cache_size) + .await; + let tx = ds.transaction(tt, Optimistic).await.unwrap(); + (tx, st) + } + + async fn new_operation_trie( + ds: &Datastore, + t: &BTree, + tt: TransactionType, + cache_size: usize, + ) -> (Transaction, BTreeStore) + where + BK: BKeys + Debug + Clone, + { + let st = ds + .index_store() + .get_store_btree_trie(TreeNodeProvider::Debug, t.state.generation, tt, cache_size) + .await; + let tx = ds.transaction(tt, Optimistic).await.unwrap(); + (tx, st) + } + #[test(tokio::test)] async fn test_btree_fst_small_order_sequential_insertions() { - let s = TreeNodeStore::new(TreeNodeProvider::Debug, TreeStoreType::Write, 20); - let mut s = s.lock().await; - let mut t = BTree::new(BState::new(5)); let ds = Datastore::new("memory").await.unwrap(); - let mut tx = ds.transaction(Write, Optimistic).await.unwrap(); - insertions_test::<_, FstKeys>(&mut tx, &mut s, &mut t, 100, get_key_value).await; - s.finish(&mut tx).await.unwrap(); - tx.commit().await.unwrap(); - let mut tx = ds.transaction(Read, Optimistic).await.unwrap(); - assert_eq!( - t.statistics(&mut tx, &mut TreeNodeStore::Traversal(TreeNodeProvider::Debug)) - .await - .unwrap(), - BStatistics { - keys_count: 100, - max_depth: 3, - nodes_count: 22, - total_size: 1691, - } - ); - tx.cancel().await.unwrap(); + + let mut t = BTree::new(BState::new(5)); + + { + let (tx, st) = new_operation_fst(&ds, &t, TransactionType::Write, 20).await; + insertions_test::<_, FstKeys>(tx, st, &mut t, 100, get_key_value).await; + } + + { + let (tx, st) = new_operation_fst(&ds, &t, TransactionType::Read, 20).await; + check_insertions(tx, st, &mut t, 100, get_key_value).await; + } + + { + let (mut tx, mut st) = new_operation_fst(&ds, &t, TransactionType::Read, 20).await; + assert_eq!( + t.statistics(&mut tx, &mut st).await.unwrap(), + BStatistics { + keys_count: 100, + max_depth: 3, + nodes_count: 22, + total_size: 1691, + } + ); + tx.cancel().await.unwrap(); + } } #[test(tokio::test)] async fn test_btree_trie_small_order_sequential_insertions() { - let s = TreeNodeStore::new(TreeNodeProvider::Debug, TreeStoreType::Write, 20); - let mut s = s.lock().await; - let mut t = BTree::new(BState::new(6)); let ds = Datastore::new("memory").await.unwrap(); - let mut tx = ds.transaction(Write, Optimistic).await.unwrap(); - insertions_test::<_, TrieKeys>(&mut tx, &mut s, &mut t, 100, get_key_value).await; - s.finish(&mut tx).await.unwrap(); - tx.commit().await.unwrap(); + let mut t = BTree::new(BState::new(6)); - let mut tx = ds.transaction(Read, Optimistic).await.unwrap(); - assert_eq!( - t.statistics(&mut tx, &mut TreeNodeStore::Traversal(TreeNodeProvider::Debug)) - .await - .unwrap(), - BStatistics { - keys_count: 100, - max_depth: 3, - nodes_count: 18, - total_size: 1656, - } - ); - tx.cancel().await.unwrap(); + { + let (tx, st) = new_operation_trie(&ds, &t, TransactionType::Write, 20).await; + insertions_test::<_, TrieKeys>(tx, st, &mut t, 100, get_key_value).await; + } + + { + let (tx, st) = new_operation_trie(&ds, &t, TransactionType::Read, 20).await; + check_insertions(tx, st, &mut t, 100, get_key_value).await; + } + + { + let (mut tx, mut st) = new_operation_trie(&ds, &t, TransactionType::Read, 20).await; + assert_eq!( + t.statistics(&mut tx, &mut st).await.unwrap(), + BStatistics { + keys_count: 100, + max_depth: 3, + nodes_count: 18, + total_size: 1656, + } + ); + tx.cancel().await.unwrap(); + } } #[test(tokio::test)] async fn test_btree_fst_small_order_random_insertions() { - let s = TreeNodeStore::new(TreeNodeProvider::Debug, TreeStoreType::Write, 20); - let mut s = s.lock().await; let ds = Datastore::new("memory").await.unwrap(); - let mut tx = ds.transaction(Write, Optimistic).await.unwrap(); let mut t = BTree::new(BState::new(8)); + let mut samples: Vec = (0..100).collect(); let mut rng = thread_rng(); samples.shuffle(&mut rng); - insertions_test::<_, FstKeys>(&mut tx, &mut s, &mut t, 100, |i| get_key_value(samples[i])) - .await; - s.finish(&mut tx).await.unwrap(); - tx.commit().await.unwrap(); - let mut tx = ds.transaction(Read, Optimistic).await.unwrap(); - let s = t - .statistics(&mut tx, &mut TreeNodeStore::Traversal(TreeNodeProvider::Debug)) - .await - .unwrap(); - assert_eq!(s.keys_count, 100); - tx.cancel().await.unwrap(); + { + let (tx, st) = new_operation_fst(&ds, &t, TransactionType::Write, 20).await; + insertions_test(tx, st, &mut t, 100, |i| get_key_value(samples[i])).await; + } + + { + let (tx, st) = new_operation_fst(&ds, &t, TransactionType::Read, 20).await; + check_insertions(tx, st, &mut t, 100, |i| get_key_value(samples[i])).await; + } + + { + let (mut tx, mut st) = new_operation_fst(&ds, &t, TransactionType::Read, 20).await; + let s = t.statistics(&mut tx, &mut st).await.unwrap(); + assert_eq!(s.keys_count, 100); + tx.cancel().await.unwrap(); + } } #[test(tokio::test)] async fn test_btree_trie_small_order_random_insertions() { - let s = TreeNodeStore::new(TreeNodeProvider::Debug, TreeStoreType::Write, 20); - let mut s = s.lock().await; let ds = Datastore::new("memory").await.unwrap(); - let mut tx = ds.transaction(Write, Optimistic).await.unwrap(); let mut t = BTree::new(BState::new(75)); + let mut samples: Vec = (0..100).collect(); let mut rng = thread_rng(); samples.shuffle(&mut rng); - insertions_test::<_, TrieKeys>(&mut tx, &mut s, &mut t, 100, |i| get_key_value(samples[i])) - .await; - s.finish(&mut tx).await.unwrap(); - tx.commit().await.unwrap(); - let mut tx = ds.transaction(Read, Optimistic).await.unwrap(); - let s = t - .statistics(&mut tx, &mut TreeNodeStore::Traversal(TreeNodeProvider::Debug)) - .await - .unwrap(); - assert_eq!(s.keys_count, 100); - tx.cancel().await.unwrap(); + { + let (tx, st) = new_operation_trie(&ds, &t, TransactionType::Write, 20).await; + insertions_test(tx, st, &mut t, 100, |i| get_key_value(samples[i])).await; + } + + { + let (tx, st) = new_operation_trie(&ds, &t, TransactionType::Read, 20).await; + check_insertions(tx, st, &mut t, 100, |i| get_key_value(samples[i])).await; + } + + { + let (mut tx, mut st) = new_operation_trie(&ds, &t, TransactionType::Read, 20).await; + let s = t.statistics(&mut tx, &mut st).await.unwrap(); + assert_eq!(s.keys_count, 100); + tx.cancel().await.unwrap(); + } } #[test(tokio::test)] async fn test_btree_fst_keys_large_order_sequential_insertions() { - let s = TreeNodeStore::new(TreeNodeProvider::Debug, TreeStoreType::Write, 20); - let mut s = s.lock().await; let ds = Datastore::new("memory").await.unwrap(); - let mut tx = ds.transaction(Write, Optimistic).await.unwrap(); let mut t = BTree::new(BState::new(60)); - insertions_test::<_, FstKeys>(&mut tx, &mut s, &mut t, 10000, get_key_value).await; - s.finish(&mut tx).await.unwrap(); - tx.commit().await.unwrap(); - let mut tx = ds.transaction(Read, Optimistic).await.unwrap(); - assert_eq!( - t.statistics(&mut tx, &mut TreeNodeStore::Traversal(TreeNodeProvider::Debug)) - .await - .unwrap(), - BStatistics { - keys_count: 10000, - max_depth: 3, - nodes_count: 158, - total_size: 57486, - } - ); - tx.cancel().await.unwrap(); + { + let (tx, st) = new_operation_fst(&ds, &t, TransactionType::Write, 20).await; + insertions_test(tx, st, &mut t, 10000, get_key_value).await; + } + + { + let (tx, st) = new_operation_fst(&ds, &t, TransactionType::Read, 20).await; + check_insertions(tx, st, &mut t, 10000, get_key_value).await; + } + + { + let (mut tx, mut st) = new_operation_fst(&ds, &t, TransactionType::Read, 20).await; + assert_eq!( + t.statistics(&mut tx, &mut st).await.unwrap(), + BStatistics { + keys_count: 10000, + max_depth: 3, + nodes_count: 158, + total_size: 57486, + } + ); + tx.cancel().await.unwrap(); + } + } + + async fn test_btree_trie_keys_large_order_sequential_insertions(cache_size: usize) { + let ds = Datastore::new("memory").await.unwrap(); + let mut t = BTree::new(BState::new(60)); + + { + let (tx, st) = new_operation_trie(&ds, &t, TransactionType::Write, cache_size).await; + insertions_test(tx, st, &mut t, 10000, get_key_value).await; + } + + { + let (tx, st) = new_operation_trie(&ds, &t, TransactionType::Read, cache_size).await; + check_insertions(tx, st, &mut t, 10000, get_key_value).await; + } + + { + let (mut tx, mut st) = + new_operation_trie(&ds, &t, TransactionType::Read, cache_size).await; + assert_eq!( + t.statistics(&mut tx, &mut st).await.unwrap(), + BStatistics { + keys_count: 10000, + max_depth: 3, + nodes_count: 158, + total_size: 75206, + } + ); + tx.cancel().await.unwrap(); + } } #[test(tokio::test)] - async fn test_btree_trie_keys_large_order_sequential_insertions() { - let s = TreeNodeStore::new(TreeNodeProvider::Debug, TreeStoreType::Write, 20); - let mut s = s.lock().await; - let ds = Datastore::new("memory").await.unwrap(); - let mut tx = ds.transaction(Write, Optimistic).await.unwrap(); - let mut t = BTree::new(BState::new(60)); - insertions_test::<_, TrieKeys>(&mut tx, &mut s, &mut t, 10000, get_key_value).await; - s.finish(&mut tx).await.unwrap(); - tx.commit().await.unwrap(); + async fn test_btree_trie_keys_large_order_sequential_insertions_lru_cache() { + test_btree_trie_keys_large_order_sequential_insertions(20).await + } - let mut tx = ds.transaction(Read, Optimistic).await.unwrap(); - assert_eq!( - t.statistics(&mut tx, &mut TreeNodeStore::Traversal(TreeNodeProvider::Debug)) - .await - .unwrap(), - BStatistics { - keys_count: 10000, - max_depth: 3, - nodes_count: 158, - total_size: 75206, - } - ); - tx.cancel().await.unwrap(); + #[test(tokio::test)] + async fn test_btree_trie_keys_large_order_sequential_insertions_full_cache() { + test_btree_trie_keys_large_order_sequential_insertions(0).await } const REAL_WORLD_TERMS: [&str; 30] = [ @@ -911,85 +1235,89 @@ mod tests { "nothing", "the", "other", "animals", "sat", "there", "watching", ]; - async fn test_btree_read_world_insertions(default_minimum_degree: u32) -> BStatistics - where - BK: BKeys + Debug, - { - let s = TreeNodeStore::new(TreeNodeProvider::Debug, TreeStoreType::Write, 20); - let mut s = s.lock().await; + async fn test_btree_fst_real_world_insertions(default_minimum_degree: u32) -> BStatistics { let ds = Datastore::new("memory").await.unwrap(); - let mut tx = ds.transaction(Write, Optimistic).await.unwrap(); let mut t = BTree::new(BState::new(default_minimum_degree)); - insertions_test::<_, BK>(&mut tx, &mut s, &mut t, REAL_WORLD_TERMS.len(), |i| { - (REAL_WORLD_TERMS[i].as_bytes().to_vec(), i as Payload) - }) - .await; - s.finish(&mut tx).await.unwrap(); - tx.commit().await.unwrap(); - let mut tx = ds.transaction(Read, Optimistic).await.unwrap(); - let statistics = t - .statistics(&mut tx, &mut TreeNodeStore::Traversal(TreeNodeProvider::Debug)) - .await - .unwrap(); + { + let (tx, st) = new_operation_fst(&ds, &t, TransactionType::Write, 20).await; + insertions_test(tx, st, &mut t, REAL_WORLD_TERMS.len(), |i| { + (REAL_WORLD_TERMS[i].as_bytes().to_vec(), i as Payload) + }) + .await; + } + + let (mut tx, mut st) = new_operation_fst(&ds, &t, TransactionType::Read, 20).await; + let statistics = t.statistics(&mut tx, &mut st).await.unwrap(); tx.cancel().await.unwrap(); statistics } - #[test(tokio::test)] - async fn test_btree_fst_keys_read_world_insertions_small_order() { - let s = test_btree_read_world_insertions::(4).await; - assert_eq!( - s, - BStatistics { - keys_count: 17, - max_depth: 2, - nodes_count: 5, - total_size: 421, - } - ); + async fn test_btree_trie_real_world_insertions(default_minimum_degree: u32) -> BStatistics { + let ds = Datastore::new("memory").await.unwrap(); + let mut t = BTree::new(BState::new(default_minimum_degree)); + + { + let (tx, st) = new_operation_trie(&ds, &t, TransactionType::Write, 20).await; + insertions_test(tx, st, &mut t, REAL_WORLD_TERMS.len(), |i| { + (REAL_WORLD_TERMS[i].as_bytes().to_vec(), i as Payload) + }) + .await; + } + + let (mut tx, mut st) = new_operation_trie(&ds, &t, TransactionType::Read, 20).await; + let statistics = t.statistics(&mut tx, &mut st).await.unwrap(); + tx.cancel().await.unwrap(); + + statistics } #[test(tokio::test)] - async fn test_btree_fst_keys_read_world_insertions_large_order() { - let s = test_btree_read_world_insertions::(100).await; - assert_eq!( - s, - BStatistics { - keys_count: 17, - max_depth: 1, - nodes_count: 1, - total_size: 189, - } - ); + async fn test_btree_fst_keys_real_world_insertions_small_order() { + let expected = BStatistics { + keys_count: 17, + max_depth: 2, + nodes_count: 5, + total_size: 421, + }; + let s = test_btree_fst_real_world_insertions(4).await; + assert_eq!(s, expected); } #[test(tokio::test)] - async fn test_btree_trie_keys_read_world_insertions_small_order() { - let s = test_btree_read_world_insertions::(6).await; - assert_eq!( - s, - BStatistics { - keys_count: 17, - max_depth: 2, - nodes_count: 3, - total_size: 339, - } - ); + async fn test_btree_fst_keys_real_world_insertions_large_order() { + let expected = BStatistics { + keys_count: 17, + max_depth: 1, + nodes_count: 1, + total_size: 189, + }; + let s = test_btree_fst_real_world_insertions(100).await; + assert_eq!(s, expected); } #[test(tokio::test)] - async fn test_btree_trie_keys_read_world_insertions_large_order() { - let s = test_btree_read_world_insertions::(100).await; - assert_eq!( - s, - BStatistics { - keys_count: 17, - max_depth: 1, - nodes_count: 1, - total_size: 229, - } - ); + async fn test_btree_trie_keys_real_world_insertions_small() { + let expected = BStatistics { + keys_count: 17, + max_depth: 2, + nodes_count: 3, + total_size: 339, + }; + let s = test_btree_trie_real_world_insertions(6).await; + assert_eq!(s, expected); + } + + #[test(tokio::test)] + async fn test_btree_trie_keys_real_world_insertions_large_order() { + let expected = BStatistics { + keys_count: 17, + max_depth: 1, + nodes_count: 1, + total_size: 229, + }; + let s = test_btree_trie_real_world_insertions(100).await; + assert_eq!(s, expected); } // This is the examples from the chapter B-Trees in CLRS: @@ -1023,22 +1351,21 @@ mod tests { #[test(tokio::test)] // This check node splitting. CLRS: Figure 18.7, page 498. async fn clrs_insertion_test() { - let s = TreeNodeStore::new(TreeNodeProvider::Debug, TreeStoreType::Write, 20); - let mut s = s.lock().await; let ds = Datastore::new("memory").await.unwrap(); let mut t = BTree::::new(BState::new(3)); - let mut tx = ds.transaction(Write, Optimistic).await.unwrap(); - for (key, payload) in CLRS_EXAMPLE { - t.insert(&mut tx, &mut s, key.into(), payload).await.unwrap(); - } - s.finish(&mut tx).await.unwrap(); - tx.commit().await.unwrap(); - let mut tx = ds.transaction(Read, Optimistic).await.unwrap(); - let s = t - .statistics(&mut tx, &mut TreeNodeStore::Traversal(TreeNodeProvider::Debug)) - .await - .unwrap(); + { + let (mut tx, mut st) = new_operation_trie(&ds, &t, TransactionType::Write, 20).await; + for (key, payload) in CLRS_EXAMPLE { + t.insert(&mut tx, &mut st, key.into(), payload).await.unwrap(); + } + st.finish(&mut tx).await.unwrap(); + tx.commit().await.unwrap(); + } + + let (mut tx, mut st) = new_operation_trie(&ds, &t, TransactionType::Read, 20).await; + + let s = t.statistics(&mut tx, &mut st).await.unwrap(); assert_eq!(s.keys_count, 23); assert_eq!(s.max_depth, 3); assert_eq!(s.nodes_count, 10); @@ -1046,17 +1373,17 @@ mod tests { assert_eq!(10, tx.scan(vec![]..vec![0xf], 100).await.unwrap().len()); let nodes_count = t - .inspect_nodes(&mut tx, |count, depth, node_id, node| match count { + .inspect_nodes(&mut tx, &mut st, |count, depth, node_id, node| match count { 0 => { assert_eq!(depth, 1); assert_eq!(node_id, 7); - check_is_internal_node(node.n, vec![("p", 16)], vec![1, 8]); + check_is_internal_node(&node.n, vec![("p", 16)], vec![1, 8]); } 1 => { assert_eq!(depth, 2); assert_eq!(node_id, 1); check_is_internal_node( - node.n, + &node.n, vec![("c", 3), ("g", 7), ("m", 13)], vec![0, 9, 2, 3], ); @@ -1064,42 +1391,42 @@ mod tests { 2 => { assert_eq!(depth, 2); assert_eq!(node_id, 8); - check_is_internal_node(node.n, vec![("t", 20), ("x", 24)], vec![4, 6, 5]); + check_is_internal_node(&node.n, vec![("t", 20), ("x", 24)], vec![4, 6, 5]); } 3 => { assert_eq!(depth, 3); assert_eq!(node_id, 0); - check_is_leaf_node(node.n, vec![("a", 1), ("b", 2)]); + check_is_leaf_node(&node.n, vec![("a", 1), ("b", 2)]); } 4 => { assert_eq!(depth, 3); assert_eq!(node_id, 9); - check_is_leaf_node(node.n, vec![("d", 4), ("e", 5), ("f", 6)]); + check_is_leaf_node(&node.n, vec![("d", 4), ("e", 5), ("f", 6)]); } 5 => { assert_eq!(depth, 3); assert_eq!(node_id, 2); - check_is_leaf_node(node.n, vec![("j", 10), ("k", 11), ("l", 12)]); + check_is_leaf_node(&node.n, vec![("j", 10), ("k", 11), ("l", 12)]); } 6 => { assert_eq!(depth, 3); assert_eq!(node_id, 3); - check_is_leaf_node(node.n, vec![("n", 14), ("o", 15)]); + check_is_leaf_node(&node.n, vec![("n", 14), ("o", 15)]); } 7 => { assert_eq!(depth, 3); assert_eq!(node_id, 4); - check_is_leaf_node(node.n, vec![("q", 17), ("r", 18), ("s", 19)]); + check_is_leaf_node(&node.n, vec![("q", 17), ("r", 18), ("s", 19)]); } 8 => { assert_eq!(depth, 3); assert_eq!(node_id, 6); - check_is_leaf_node(node.n, vec![("u", 21), ("v", 22)]); + check_is_leaf_node(&node.n, vec![("u", 21), ("v", 22)]); } 9 => { assert_eq!(depth, 3); assert_eq!(node_id, 5); - check_is_leaf_node(node.n, vec![("y", 25), ("z", 26)]); + check_is_leaf_node(&node.n, vec![("y", 25), ("z", 26)]); } _ => panic!("This node should not exist {}", count), }) @@ -1109,41 +1436,76 @@ mod tests { tx.cancel().await.unwrap(); } - // This check the possible deletion cases. CRLS, Figure 18.8, pages 500-501 - async fn test_btree_clrs_deletion_test(mut t: BTree) + async fn check_finish_commit( + t: &mut BTree, + mut st: BTreeStore, + mut tx: Transaction, + mut gen: u64, + info: String, + ) -> Result where - BK: BKeys + Debug, + BK: BKeys + Clone + Debug, { - let ds = Datastore::new("memory").await.unwrap(); + if st.finish(&mut tx).await? { + t.state.generation += 1; + } + gen += 1; + assert_eq!(t.state.generation, gen, "{}", info); + tx.commit().await?; + Ok(gen) + } + // This check the possible deletion cases. CRLS, Figure 18.8, pages 500-501 + #[test(tokio::test)] + async fn test_btree_clrs_deletion_test() -> Result<(), Error> { + let ds = Datastore::new("memory").await?; + let mut t = BTree::::new(BState::new(3)); + let mut check_generation = 0; { - let s = TreeNodeStore::new(TreeNodeProvider::Debug, TreeStoreType::Write, 20); - let mut s = s.lock().await; - let mut tx = ds.transaction(Write, Optimistic).await.unwrap(); + let (mut tx, mut st) = new_operation_trie(&ds, &t, TransactionType::Write, 20).await; for (key, payload) in CLRS_EXAMPLE { - t.insert(&mut tx, &mut s, key.into(), payload).await.unwrap(); + t.insert(&mut tx, &mut st, key.into(), payload).await?; } - s.finish(&mut tx).await.unwrap(); - tx.commit().await.unwrap(); + check_generation = check_finish_commit( + &mut t, + st, + tx, + check_generation, + format!("Insert CLRS example"), + ) + .await?; } { + let mut key_count = CLRS_EXAMPLE.len() as u64; for (key, payload) in [("f", 6), ("m", 13), ("g", 7), ("d", 4), ("b", 2)] { - let s = TreeNodeStore::new(TreeNodeProvider::Debug, TreeStoreType::Write, 20); - let mut s = s.lock().await; - let mut tx = ds.transaction(Write, Optimistic).await.unwrap(); - debug!("Delete {}", key); - assert_eq!(t.delete(&mut tx, &mut s, key.into()).await.unwrap(), Some(payload)); - s.finish(&mut tx).await.unwrap(); - tx.commit().await.unwrap(); + { + let (mut tx, mut st) = + new_operation_trie(&ds, &t, TransactionType::Write, 20).await; + debug!("Delete {}", key); + assert_eq!(t.delete(&mut tx, &mut st, key.into()).await?, Some(payload)); + check_generation = check_finish_commit( + &mut t, + st, + tx, + check_generation, + format!("Delete {key}"), + ) + .await?; + } + key_count -= 1; + { + let (mut tx, mut st) = + new_operation_trie(&ds, &t, TransactionType::Read, 20).await; + let s = t.statistics(&mut tx, &mut st).await?; + assert_eq!(s.keys_count, key_count); + } } } - let mut tx = ds.transaction(Read, Optimistic).await.unwrap(); - let s = t - .statistics(&mut tx, &mut TreeNodeStore::Traversal(TreeNodeProvider::Debug)) - .await - .unwrap(); + let (mut tx, mut st) = new_operation_trie(&ds, &t, TransactionType::Read, 20).await; + + let s = t.statistics(&mut tx, &mut st).await.unwrap(); assert_eq!(s.keys_count, 18); assert_eq!(s.max_depth, 2); assert_eq!(s.nodes_count, 7); @@ -1151,150 +1513,313 @@ mod tests { assert_eq!(7, tx.scan(vec![]..vec![0xf], 100).await.unwrap().len()); let nodes_count = t - .inspect_nodes(&mut tx, |count, depth, node_id, node| { - debug!("{} -> {}", depth, node_id); - node.n.debug(|k| Ok(String::from_utf8(k)?)).unwrap(); - match count { - 0 => { - assert_eq!(depth, 1); - assert_eq!(node_id, 1); - check_is_internal_node( - node.n, - vec![("e", 5), ("l", 12), ("p", 16), ("t", 20), ("x", 24)], - vec![0, 9, 3, 4, 6, 5], - ); - } - 1 => { - assert_eq!(depth, 2); - assert_eq!(node_id, 0); - check_is_leaf_node(node.n, vec![("a", 1), ("c", 3)]); - } - 2 => { - assert_eq!(depth, 2); - assert_eq!(node_id, 9); - check_is_leaf_node(node.n, vec![("j", 10), ("k", 11)]); - } - 3 => { - assert_eq!(depth, 2); - assert_eq!(node_id, 3); - check_is_leaf_node(node.n, vec![("n", 14), ("o", 15)]); - } - 4 => { - assert_eq!(depth, 2); - assert_eq!(node_id, 4); - check_is_leaf_node(node.n, vec![("q", 17), ("r", 18), ("s", 19)]); - } - 5 => { - assert_eq!(depth, 2); - assert_eq!(node_id, 6); - check_is_leaf_node(node.n, vec![("u", 21), ("v", 22)]); - } - 6 => { - assert_eq!(depth, 2); - assert_eq!(node_id, 5); - check_is_leaf_node(node.n, vec![("y", 25), ("z", 26)]); - } - _ => panic!("This node should not exist {}", count), + .inspect_nodes(&mut tx, &mut st, |count, depth, node_id, node| match count { + 0 => { + assert_eq!(depth, 1); + assert_eq!(node_id, 1); + check_is_internal_node( + &node.n, + vec![("e", 5), ("l", 12), ("p", 16), ("t", 20), ("x", 24)], + vec![0, 9, 3, 4, 6, 5], + ); } + 1 => { + assert_eq!(depth, 2); + assert_eq!(node_id, 0); + check_is_leaf_node(&node.n, vec![("a", 1), ("c", 3)]); + } + 2 => { + assert_eq!(depth, 2); + assert_eq!(node_id, 9); + check_is_leaf_node(&node.n, vec![("j", 10), ("k", 11)]); + } + 3 => { + assert_eq!(depth, 2); + assert_eq!(node_id, 3); + check_is_leaf_node(&node.n, vec![("n", 14), ("o", 15)]); + } + 4 => { + assert_eq!(depth, 2); + assert_eq!(node_id, 4); + check_is_leaf_node(&node.n, vec![("q", 17), ("r", 18), ("s", 19)]); + } + 5 => { + assert_eq!(depth, 2); + assert_eq!(node_id, 6); + check_is_leaf_node(&node.n, vec![("u", 21), ("v", 22)]); + } + 6 => { + assert_eq!(depth, 2); + assert_eq!(node_id, 5); + check_is_leaf_node(&node.n, vec![("y", 25), ("z", 26)]); + } + _ => panic!("This node should not exist {}", count), }) .await .unwrap(); assert_eq!(nodes_count, 7); - tx.cancel().await.unwrap(); - } - - #[test(tokio::test)] - async fn test_btree_trie_keys_clrs_deletion_test() { - let t = BTree::::new(BState::new(3)); - test_btree_clrs_deletion_test(t).await - } - - #[test(tokio::test)] - async fn test_btree_fst_keys_clrs_deletion_test() { - let t = BTree::::new(BState::new(3)); - test_btree_clrs_deletion_test(t).await + tx.cancel().await?; + Ok(()) } // This check the possible deletion cases. CRLS, Figure 18.8, pages 500-501 - async fn test_btree_fill_and_empty(mut t: BTree) - where - BK: BKeys + Debug, - { - let ds = Datastore::new("memory").await.unwrap(); + #[test(tokio::test)] + async fn test_btree_fill_and_empty() -> Result<(), Error> { + let ds = Datastore::new("memory").await?; + let mut t = BTree::::new(BState::new(3)); - let mut expected_keys = HashMap::new(); + let mut expected_keys = BTreeMap::new(); + let mut check_generation = 0; { - let s = TreeNodeStore::new(TreeNodeProvider::Debug, TreeStoreType::Write, 20); - let mut s = s.lock().await; - let mut tx = ds.transaction(Write, Optimistic).await.unwrap(); + let (mut tx, mut st) = new_operation_trie(&ds, &t, TransactionType::Write, 20).await; for (key, payload) in CLRS_EXAMPLE { expected_keys.insert(key.to_string(), payload); - t.insert(&mut tx, &mut s, key.into(), payload).await.unwrap(); + t.insert(&mut tx, &mut st, key.into(), payload).await?; + let (_, tree_keys) = check_btree_properties(&t, &mut tx, &mut st).await?; + assert_eq!(expected_keys, tree_keys); } - s.finish(&mut tx).await.unwrap(); - tx.commit().await.unwrap(); + check_generation = check_finish_commit( + &mut t, + st, + tx, + check_generation, + format!("Insert CLRS example"), + ) + .await?; } { - let mut tx = ds.transaction(Write, Optimistic).await.unwrap(); - print_tree(&mut tx, &t).await; - tx.commit().await.unwrap(); + let (mut tx, mut st) = new_operation_trie(&ds, &t, TransactionType::Read, 20).await; + print_tree(&mut tx, &mut st, &t).await; + tx.cancel().await?; } for (key, _) in CLRS_EXAMPLE { debug!("------------------------"); debug!("Delete {}", key); { - let mut tx = ds.transaction(Write, Optimistic).await.unwrap(); - let s = TreeNodeStore::new(TreeNodeProvider::Debug, TreeStoreType::Write, 20); - let mut s = s.lock().await; - t.delete(&mut tx, &mut s, key.into()).await.unwrap(); - print_tree::(&mut tx, &t).await; - s.finish(&mut tx).await.unwrap(); - tx.commit().await.unwrap(); + let (mut tx, mut st) = + new_operation_trie(&ds, &t, TransactionType::Write, 20).await; + assert!(t.delete(&mut tx, &mut &mut st, key.into()).await?.is_some()); + expected_keys.remove(key); + let (_, tree_keys) = check_btree_properties(&t, &mut tx, &mut st).await?; + assert_eq!(expected_keys, tree_keys); + check_generation = check_finish_commit( + &mut t, + st, + tx, + check_generation, + format!("Delete CLRS example {key}"), + ) + .await?; } // Check that every expected keys are still found in the tree - expected_keys.remove(key); - { - let mut tx = ds.transaction(Write, Optimistic).await.unwrap(); - let s = TreeNodeStore::new(TreeNodeProvider::Debug, TreeStoreType::Read, 20); - let mut s = s.lock().await; + let (mut tx, mut st) = new_operation_trie(&ds, &t, TransactionType::Read, 20).await; for (key, payload) in &expected_keys { assert_eq!( - t.search(&mut tx, &mut s, &key.as_str().into()).await.unwrap(), - Some(*payload) + t.search(&mut tx, &mut st, &key.as_str().into()).await?, + Some(*payload), + "Can't find: {key}", ) } - tx.commit().await.unwrap(); + tx.cancel().await?; } } - let mut tx = ds.transaction(Read, Optimistic).await.unwrap(); - let s = t - .statistics(&mut tx, &mut TreeNodeStore::Traversal(TreeNodeProvider::Debug)) - .await - .unwrap(); + let (mut tx, mut st) = new_operation_trie(&ds, &t, TransactionType::Read, 20).await; + let s = t.statistics(&mut tx, &mut st).await?; assert_eq!(s.keys_count, 0); assert_eq!(s.max_depth, 0); assert_eq!(s.nodes_count, 0); // There should not be any record in the database - assert_eq!(0, tx.scan(vec![]..vec![0xf], 100).await.unwrap().len()); - tx.cancel().await.unwrap(); + assert_eq!(0, tx.scan(vec![]..vec![0xf], 100).await?.len()); + tx.cancel().await?; + Ok(()) } #[test(tokio::test)] - async fn test_btree_trie_keys_fill_and_empty() { - let t = BTree::::new(BState::new(3)); - test_btree_fill_and_empty(t).await + async fn test_delete_adjust() -> Result<(), Error> { + let ds = Datastore::new("memory").await?; + let mut t = BTree::::new(BState::new(3)); + + let terms = [ + "aliquam", + "delete", + "if", + "from", + "Docusaurus", + "amet,", + "don't", + "And", + "interactive", + "well!", + "supports", + "ultricies.", + "Fusce", + "consequat.", + "just", + "use", + "elementum", + "term", + "blogging", + "to", + "want", + "added", + "Lorem", + "ipsum", + "blog:", + "MDX.", + "posts.", + "features", + "posts", + "features,", + "truncate", + "images:", + "Long", + "Pellentesque", + "authors.yml.", + "filenames,", + "such", + "co-locate", + "you", + "can", + "the", + "-->", + "comment", + "tags", + "A", + "React", + "The", + "adipiscing", + "consectetur", + "very", + "this", + "and", + "sit", + "directory,", + "Regular", + "Markdown", + "Simply", + "blog", + "MDX", + "list", + "extracted", + "summary", + "amet", + "plugin.", + "your", + "long", + "First", + "power", + "post,", + "convenient", + "folders)", + "of", + "date", + "powered", + "2019-05-30-welcome.md", + "view.", + "are", + "be", + "