diff --git a/Cargo.lock b/Cargo.lock index 4dadd42e..ac1c0ac7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -522,6 +522,17 @@ version = "1.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ec8a7b6a70fde80372154c65702f00a0f56f3e1c36abbc6c440484be248856db" +[[package]] +name = "bzip2-sys" +version = "0.1.11+1.0.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "736a955f3fa7875102d57c82b8cac37ec45224a07fd32d58f9f7a186b6cd4cdc" +dependencies = [ + "cc", + "libc", + "pkg-config", +] + [[package]] name = "cache-padded" version = "1.2.0" @@ -533,6 +544,9 @@ name = "cc" version = "1.0.73" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2fff2a6927b3bb87f9595d67196a70493f627687a71d87a0d692242c33f58c11" +dependencies = [ + "jobserver", +] [[package]] name = "cexpr" @@ -1889,6 +1903,15 @@ version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6c8af84674fe1f223a982c933a0ee1086ac4d4052aa0fb8060c12c6ad838e754" +[[package]] +name = "jobserver" +version = "0.1.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "af25a77299a7f711a01975c35a6a424eb6862092cc2d6c72c4ed6cbc56dfc1fa" +dependencies = [ + "libc", +] + [[package]] name = "js-sys" version = "0.3.59" @@ -1974,6 +1997,21 @@ dependencies = [ "libc", ] +[[package]] +name = "librocksdb-sys" +version = "0.8.0+7.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "611804e4666a25136fcc5f8cf425ab4d26c7f74ea245ffe92ea23b85b6420b5d" +dependencies = [ + "bindgen 0.60.1", + "bzip2-sys", + "cc", + "glob", + "libc", + "libz-sys", + "zstd-sys", +] + [[package]] name = "libz-sys" version = "1.1.8" @@ -2963,6 +3001,16 @@ version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e5864e7ef1a6b7bcf1d6ca3f655e65e724ed3b52546a0d0a663c991522f552ea" +[[package]] +name = "rocksdb" +version = "0.19.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7e9562ea1d70c0cc63a34a22d977753b50cca91cc6b6527750463bd5dd8697bc" +dependencies = [ + "libc", + "librocksdb-sys", +] + [[package]] name = "rquickjs" version = "0.1.7" @@ -3618,6 +3666,7 @@ dependencies = [ "rand 0.8.5", "regex", "rmp-serde", + "rocksdb", "rquickjs", "scrypt", "serde", @@ -4508,3 +4557,13 @@ name = "xml-rs" version = "0.8.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d2d7d3948613f75c98fd9328cfdcc45acc4d360655289d0a7d4ec931392200a3" + +[[package]] +name = "zstd-sys" +version = "2.0.1+zstd.1.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9fd07cbbc53846d9145dbffdf6dd09a7a0aa52be46741825f5c97bdd4f73f12b" +dependencies = [ + "cc", + "libc", +] diff --git a/lib/Cargo.toml b/lib/Cargo.toml index fd4c1f8a..9fb007d5 100644 --- a/lib/Cargo.toml +++ b/lib/Cargo.toml @@ -14,13 +14,13 @@ categories = ["database-implementations", "data-structures", "embedded"] license = "Apache-2.0" [features] -default = ["parallel", "kv-echodb", "kv-rocksdb", "scripting", "http"] +default = ["parallel", "kv-mem", "kv-rocksdb", "scripting", "http"] parallel = ["dep:executor"] kv-tikv = ["dep:tikv"] kv-fdb = ["dep:foundationdb"] -kv-echodb = ["dep:echodb"] +kv-mem = ["dep:echodb"] kv-indxdb = ["dep:indxdb"] -kv-rocksdb = [] +kv-rocksdb = ["dep:rocksdb"] scripting = ["dep:js", "dep:executor"] http = ["dep:surf"] @@ -51,6 +51,7 @@ once_cell = "1.13.1" pbkdf2 = "0.11.0" rand = "0.8.5" regex = "1.6.0" +rocksdb = { version = "0.19.0", optional = true } scrypt = "0.10.0" serde = { version = "1.0.144", features = ["derive"] } sha-1 = "0.10.0" diff --git a/lib/src/err/mod.rs b/lib/src/err/mod.rs index 5915e7c6..75553a03 100644 --- a/lib/src/err/mod.rs +++ b/lib/src/err/mod.rs @@ -260,7 +260,7 @@ impl From for String { } } -#[cfg(feature = "kv-echodb")] +#[cfg(feature = "kv-mem")] impl From for Error { fn from(e: echodb::err::Error) -> Error { match e { @@ -290,6 +290,13 @@ impl From for Error { } } +#[cfg(feature = "kv-rocksdb")] +impl From for Error { + fn from(e: rocksdb::Error) -> Error { + Error::Tx(e.to_string()) + } +} + impl From for Error { fn from(e: channel::RecvError) -> Error { Error::Channel(e.to_string()) diff --git a/lib/src/kvs/ds.rs b/lib/src/kvs/ds.rs index 138b15a3..762ecea9 100644 --- a/lib/src/kvs/ds.rs +++ b/lib/src/kvs/ds.rs @@ -22,12 +22,12 @@ pub struct Datastore { #[allow(clippy::large_enum_variant)] pub(super) enum Inner { - #[cfg(feature = "kv-echodb")] + #[cfg(feature = "kv-mem")] Mem(super::mem::Datastore), + #[cfg(feature = "kv-rocksdb")] + RocksDB(super::rocksdb::Datastore), #[cfg(feature = "kv-indxdb")] - IxDB(super::ixdb::Datastore), - #[cfg(feature = "kv-yokudb")] - File(super::file::Datastore), + IndxDB(super::indxdb::Datastore), #[cfg(feature = "kv-tikv")] TiKV(super::tikv::Datastore), #[cfg(feature = "kv-fdb")] @@ -74,7 +74,7 @@ impl Datastore { /// ``` pub async fn new(path: &str) -> Result { match path { - #[cfg(feature = "kv-echodb")] + #[cfg(feature = "kv-mem")] "memory" => { info!(target: LOG, "Starting kvs store in {}", path); let v = super::mem::Datastore::new().await.map(|v| Datastore { @@ -83,44 +83,60 @@ impl Datastore { info!(target: LOG, "Started kvs store in {}", path); v } - // Parse and initiate an IxDB database - #[cfg(feature = "kv-indxdb")] - s if s.starts_with("ixdb:") => { - info!(target: LOG, "Starting kvs store at {}", path); - let s = s.trim_start_matches("ixdb://"); - let v = super::ixdb::Datastore::new(s).await.map(|v| Datastore { - inner: Inner::IxDB(v), - }); - info!(target: LOG, "Started kvs store at {}", path); - v - } // Parse and initiate an File database - #[cfg(feature = "kv-yokudb")] + #[cfg(feature = "kv-rocksdb")] s if s.starts_with("file:") => { info!(target: LOG, "Starting kvs store at {}", path); let s = s.trim_start_matches("file://"); - let v = super::file::Datastore::new(s).await.map(|v| Datastore { - inner: Inner::File(v), + let s = s.trim_start_matches("file:"); + let v = super::rocksdb::Datastore::new(s).await.map(|v| Datastore { + inner: Inner::RocksDB(v), }); info!(target: LOG, "Started kvs store at {}", path); v } - // Parse and initiate an TiKV database + // Parse and initiate an RocksDB database + #[cfg(feature = "kv-rocksdb")] + s if s.starts_with("rocksdb:") => { + info!(target: LOG, "Starting kvs store at {}", path); + let s = s.trim_start_matches("rocksdb://"); + let s = s.trim_start_matches("rocksdb:"); + let v = super::rocksdb::Datastore::new(s).await.map(|v| Datastore { + inner: Inner::RocksDB(v), + }); + info!(target: LOG, "Started kvs store at {}", path); + v + } + // Parse and initiate an IndxDB database + #[cfg(feature = "kv-indxdb")] + s if s.starts_with("indxdb:") => { + info!(target: LOG, "Starting kvs store at {}", path); + let s = s.trim_start_matches("indxdb://"); + let s = s.trim_start_matches("indxdb:"); + let v = super::indxdb::Datastore::new(s).await.map(|v| Datastore { + inner: Inner::IndxDB(v), + }); + info!(target: LOG, "Started kvs store at {}", path); + v + } + // Parse and initiate a TiKV database #[cfg(feature = "kv-tikv")] s if s.starts_with("tikv:") => { info!(target: LOG, "Connecting to kvs store at {}", path); let s = s.trim_start_matches("tikv://"); + let s = s.trim_start_matches("tikv:"); let v = super::tikv::Datastore::new(s).await.map(|v| Datastore { inner: Inner::TiKV(v), }); info!(target: LOG, "Connected to kvs store at {}", path); v } - // Parse and initiate an TiKV database + // Parse and initiate a FoundationDB database #[cfg(feature = "kv-fdb")] s if s.starts_with("fdb:") => { info!(target: LOG, "Connecting to kvs store at {}", path); let s = s.trim_start_matches("fdb://"); + let s = s.trim_start_matches("fdb:"); let v = super::fdb::Datastore::new(s).await.map(|v| Datastore { inner: Inner::FDB(v), }); @@ -150,7 +166,7 @@ impl Datastore { /// ``` pub async fn transaction(&self, write: bool, lock: bool) -> Result { match &self.inner { - #[cfg(feature = "kv-echodb")] + #[cfg(feature = "kv-mem")] Inner::Mem(v) => { let tx = v.transaction(write, lock).await?; Ok(Transaction { @@ -158,19 +174,19 @@ impl Datastore { cache: super::cache::Cache::default(), }) } - #[cfg(feature = "kv-indxdb")] - Inner::IxDB(v) => { + #[cfg(feature = "kv-rocksdb")] + Inner::RocksDB(v) => { let tx = v.transaction(write, lock).await?; Ok(Transaction { - inner: super::tx::Inner::IxDB(tx), + inner: super::tx::Inner::RocksDB(tx), cache: super::cache::Cache::default(), }) } - #[cfg(feature = "kv-yokudb")] - Inner::File(v) => { + #[cfg(feature = "kv-indxdb")] + Inner::IndxDB(v) => { let tx = v.transaction(write, lock).await?; Ok(Transaction { - inner: super::tx::Inner::File(tx), + inner: super::tx::Inner::IndxDB(tx), cache: super::cache::Cache::default(), }) } diff --git a/lib/src/kvs/file/mod.rs b/lib/src/kvs/file/mod.rs deleted file mode 100644 index 16447836..00000000 --- a/lib/src/kvs/file/mod.rs +++ /dev/null @@ -1,217 +0,0 @@ -#![cfg(feature = "kv-yokudb")] - -use crate::err::Error; -use crate::kvs::Key; -use crate::kvs::Val; -use std::ops::Range; - -pub struct Datastore { - db: echodb::Db, -} - -pub struct Transaction { - // Is the transaction complete? - ok: bool, - // Is the transaction read+write? - rw: bool, - // The distributed datastore transaction - tx: echodb::Tx, -} - -impl Datastore { - // Open a new database - pub async fn new(_path: &str) -> Result { - Ok(Datastore { - db: echodb::db::new(), - }) - } - // Start a new transaction - pub async fn transaction(&self, write: bool, _: bool) -> Result { - match self.db.begin(write).await { - Ok(tx) => Ok(Transaction { - ok: false, - rw: write, - tx, - }), - Err(e) => Err(Error::Tx(e.to_string())), - } - } -} - -impl Transaction { - // Check if closed - pub fn closed(&self) -> bool { - self.ok - } - // Cancel a transaction - pub fn cancel(&mut self) -> Result<(), Error> { - // Check to see if transaction is closed - if self.ok { - return Err(Error::TxFinished); - } - // Mark this transaction as done - self.ok = true; - // Cancel this transaction - self.tx.cancel()?; - // Continue - Ok(()) - } - // Commit a transaction - pub fn commit(&mut self) -> Result<(), Error> { - // Check to see if transaction is closed - if self.ok { - return Err(Error::TxFinished); - } - // Check to see if transaction is writable - if !self.rw { - return Err(Error::TxReadonly); - } - // Mark this transaction as done - self.ok = true; - // Cancel this transaction - self.tx.commit()?; - // Continue - Ok(()) - } - // Check if a key exists - pub fn exi(&mut self, key: K) -> Result - where - K: Into, - { - // Check to see if transaction is closed - if self.ok { - return Err(Error::TxFinished); - } - // Check the key - let res = self.tx.exi(key.into())?; - // Return result - Ok(res) - } - // Fetch a key from the database - pub fn get(&mut self, key: K) -> Result, Error> - where - K: Into, - { - // Check to see if transaction is closed - if self.ok { - return Err(Error::TxFinished); - } - // Get the key - let res = self.tx.get(key.into())?; - // Return result - Ok(res) - } - // Insert or update a key in the database - pub fn set(&mut self, key: K, val: V) -> Result<(), Error> - where - K: Into, - V: Into, - { - // Check to see if transaction is closed - if self.ok { - return Err(Error::TxFinished); - } - // Check to see if transaction is writable - if !self.rw { - return Err(Error::TxReadonly); - } - // Set the key - self.tx.set(key.into(), val.into())?; - // Return result - Ok(()) - } - // Insert a key if it doesn't exist in the database - pub fn put(&mut self, key: K, val: V) -> Result<(), Error> - where - K: Into, - V: Into, - { - // Check to see if transaction is closed - if self.ok { - return Err(Error::TxFinished); - } - // Check to see if transaction is writable - if !self.rw { - return Err(Error::TxReadonly); - } - // Set the key - self.tx.put(key.into(), val.into())?; - // Return result - Ok(()) - } - // Insert a key if it doesn't exist in the database - pub fn putc(&mut self, key: K, val: V, chk: Option) -> Result<(), Error> - where - K: Into, - V: Into, - { - // Check to see if transaction is closed - if self.ok { - return Err(Error::TxFinished); - } - // Check to see if transaction is writable - if !self.rw { - return Err(Error::TxReadonly); - } - // Set the key - self.tx.putc(key.into(), val.into(), chk.map(Into::into))?; - // Return result - Ok(()) - } - // Delete a key - pub fn del(&mut self, key: K) -> Result<(), Error> - where - K: Into, - { - // Check to see if transaction is closed - if self.ok { - return Err(Error::TxFinished); - } - // Check to see if transaction is writable - if !self.rw { - return Err(Error::TxReadonly); - } - // Remove the key - self.tx.del(key.into())?; - // Return result - Ok(()) - } - // Delete a key - pub fn delc(&mut self, key: K, chk: Option) -> Result<(), Error> - where - K: Into, - V: Into, - { - // Check to see if transaction is closed - if self.ok { - return Err(Error::TxFinished); - } - // Check to see if transaction is writable - if !self.rw { - return Err(Error::TxReadonly); - } - // Remove the key - self.tx.delc(key.into(), chk.map(Into::into))?; - // Return result - Ok(()) - } - // Retrieve a range of keys from the databases - pub fn scan(&mut self, rng: Range, limit: u32) -> Result, Error> - where - K: Into, - { - // Check to see if transaction is closed - if self.ok { - return Err(Error::TxFinished); - } - // Convert the range to bytes - let rng: Range = Range { - start: rng.start.into(), - end: rng.end.into(), - }; - // Scan the keys - let res = self.tx.scan(rng, limit)?; - // Return result - Ok(res) - } -} diff --git a/lib/src/kvs/ixdb/mod.rs b/lib/src/kvs/indxdb/mod.rs similarity index 100% rename from lib/src/kvs/ixdb/mod.rs rename to lib/src/kvs/indxdb/mod.rs diff --git a/lib/src/kvs/mem/mod.rs b/lib/src/kvs/mem/mod.rs index 5ce6b230..39eea661 100644 --- a/lib/src/kvs/mem/mod.rs +++ b/lib/src/kvs/mem/mod.rs @@ -1,4 +1,4 @@ -#![cfg(feature = "kv-echodb")] +#![cfg(feature = "kv-mem")] use crate::err::Error; use crate::kvs::Key; diff --git a/lib/src/kvs/mod.rs b/lib/src/kvs/mod.rs index d02b45fc..25bf805d 100644 --- a/lib/src/kvs/mod.rs +++ b/lib/src/kvs/mod.rs @@ -1,10 +1,10 @@ mod cache; mod ds; mod fdb; -mod file; -mod ixdb; +mod indxdb; mod kv; mod mem; +mod rocksdb; mod tikv; mod tx; diff --git a/lib/src/kvs/rocksdb/mod.rs b/lib/src/kvs/rocksdb/mod.rs new file mode 100644 index 00000000..21a9d455 --- /dev/null +++ b/lib/src/kvs/rocksdb/mod.rs @@ -0,0 +1,288 @@ +#![cfg(feature = "kv-rocksdb")] + +use crate::err::Error; +use crate::kvs::Key; +use crate::kvs::Val; +use futures::lock::Mutex; +use rocksdb::Direction; +use rocksdb::IteratorMode; +use rocksdb::OptimisticTransactionDB; +use rocksdb::ReadOptions; +use std::ops::Range; +use std::sync::Arc; + +pub struct Datastore { + db: rocksdb::OptimisticTransactionDB, +} + +pub struct Transaction { + // Is the transaction complete? + ok: bool, + // Is the transaction read+write? + rw: bool, + // The distributed datastore transaction + tx: Arc>>>, +} + +impl Datastore { + // Open a new database + pub async fn new(path: &str) -> Result { + Ok(Datastore { + db: OptimisticTransactionDB::open_default(path)?, + }) + } + // Start a new transaction + pub async fn transaction(&self, write: bool, _: bool) -> Result { + // Create a new transaction + let tx = self.db.transaction(); + // The database reference must always outlive + // the transaction. If it doesn't then this + // is undefined behaviour. This unsafe block + // ensures that the transaction reference is + // static, but will cause a crash if the + // datastore is dropped prematurely. + let tx = unsafe { + std::mem::transmute::< + rocksdb::Transaction<'_, OptimisticTransactionDB>, + rocksdb::Transaction<'static, OptimisticTransactionDB>, + >(tx) + }; + // Return the transaction + Ok(Transaction { + ok: false, + rw: write, + tx: Arc::new(Mutex::new(Some(tx))), + }) + } +} + +impl Transaction { + // Check if closed + pub fn closed(&self) -> bool { + self.ok + } + // Cancel a transaction + pub async fn cancel(&mut self) -> Result<(), Error> { + // Check to see if transaction is closed + if self.ok { + return Err(Error::TxFinished); + } + // Mark this transaction as done + self.ok = true; + // Cancel this transaction + match self.tx.lock().await.take() { + Some(tx) => tx.rollback()?, + None => unreachable!(), + }; + // Continue + Ok(()) + } + // Commit a transaction + pub async fn commit(&mut self) -> Result<(), Error> { + // Check to see if transaction is closed + if self.ok { + return Err(Error::TxFinished); + } + // Check to see if transaction is writable + if !self.rw { + return Err(Error::TxReadonly); + } + // Mark this transaction as done + self.ok = true; + // Cancel this transaction + match self.tx.lock().await.take() { + Some(tx) => tx.commit()?, + None => unreachable!(), + }; + // Continue + Ok(()) + } + // Check if a key exists + pub async fn exi(&mut self, key: K) -> Result + where + K: Into, + { + // Check to see if transaction is closed + if self.ok { + return Err(Error::TxFinished); + } + // Check the key + let res = self.tx.lock().await.as_ref().unwrap().get(key.into())?.is_some(); + // Return result + Ok(res) + } + // Fetch a key from the database + pub async fn get(&mut self, key: K) -> Result, Error> + where + K: Into, + { + // Check to see if transaction is closed + if self.ok { + return Err(Error::TxFinished); + } + // Get the key + let res = self.tx.lock().await.as_ref().unwrap().get(key.into())?; + // Return result + Ok(res) + } + // Insert or update a key in the database + pub async fn set(&mut self, key: K, val: V) -> Result<(), Error> + where + K: Into, + V: Into, + { + // Check to see if transaction is closed + if self.ok { + return Err(Error::TxFinished); + } + // Check to see if transaction is writable + if !self.rw { + return Err(Error::TxReadonly); + } + // Set the key + self.tx.lock().await.as_ref().unwrap().put(key.into(), val.into())?; + // Return result + Ok(()) + } + // Insert a key if it doesn't exist in the database + pub async fn put(&mut self, key: K, val: V) -> Result<(), Error> + where + K: Into, + V: Into, + { + // Check to see if transaction is closed + if self.ok { + return Err(Error::TxFinished); + } + // Check to see if transaction is writable + if !self.rw { + return Err(Error::TxReadonly); + } + // Get the transaction + let tx = self.tx.lock().await; + let tx = tx.as_ref().unwrap(); + // Get the arguments + let key = key.into(); + let val = val.into(); + // Set the key if empty + match tx.get(&key)? { + None => tx.put(key, val)?, + _ => return Err(Error::TxKeyAlreadyExists), + }; + // Return result + Ok(()) + } + // Insert a key if it doesn't exist in the database + pub async fn putc(&mut self, key: K, val: V, chk: Option) -> Result<(), Error> + where + K: Into, + V: Into, + { + // Check to see if transaction is closed + if self.ok { + return Err(Error::TxFinished); + } + // Check to see if transaction is writable + if !self.rw { + return Err(Error::TxReadonly); + } + // Get the transaction + let tx = self.tx.lock().await; + let tx = tx.as_ref().unwrap(); + // Get the arguments + let key = key.into(); + let val = val.into(); + let chk = chk.map(Into::into); + // Set the key if valid + match (tx.get(&key)?, chk) { + (Some(v), Some(w)) if v == w => tx.put(key, val)?, + (None, None) => tx.put(key, val)?, + _ => return Err(Error::TxConditionNotMet), + }; + // Return result + Ok(()) + } + // Delete a key + pub async fn del(&mut self, key: K) -> Result<(), Error> + where + K: Into, + { + // Check to see if transaction is closed + if self.ok { + return Err(Error::TxFinished); + } + // Check to see if transaction is writable + if !self.rw { + return Err(Error::TxReadonly); + } + // Remove the key + self.tx.lock().await.as_ref().unwrap().delete(key.into())?; + // Return result + Ok(()) + } + // Delete a key + pub async fn delc(&mut self, key: K, chk: Option) -> Result<(), Error> + where + K: Into, + V: Into, + { + // Check to see if transaction is closed + if self.ok { + return Err(Error::TxFinished); + } + // Check to see if transaction is writable + if !self.rw { + return Err(Error::TxReadonly); + } + // Get the transaction + let tx = self.tx.lock().await; + let tx = tx.as_ref().unwrap(); + // Get the arguments + let key = key.into(); + let chk = chk.map(Into::into); + // Delete the key if valid + match (tx.get(&key)?, chk) { + (Some(v), Some(w)) if v == w => tx.delete(key)?, + (None, None) => tx.delete(key)?, + _ => return Err(Error::TxConditionNotMet), + }; + // Return result + Ok(()) + } + // Retrieve a range of keys from the databases + pub async fn scan(&mut self, rng: Range, limit: u32) -> Result, Error> + where + K: Into, + { + // Check to see if transaction is closed + if self.ok { + return Err(Error::TxFinished); + } + // Get the transaction + let tx = self.tx.lock().await; + let tx = tx.as_ref().unwrap(); + // Convert the range to bytes + let rng: Range = Range { + start: rng.start.into(), + end: rng.end.into(), + }; + // Create result set + let mut res = vec![]; + // Iterate forwards + let dir = Direction::Forward; + // Set the start key + let cnf = IteratorMode::From(&rng.start, dir); + // Set the maximum key + let mut opt = ReadOptions::default(); + opt.set_iterate_range(..rng.end); + // Create the iterator + let ite = tx.iterator_opt(cnf, opt); + // Scan the keys in the iterator + for item in ite.take(limit as usize) { + let (k, v) = item?; + res.push((k.into_vec(), v.into_vec())); + } + // Return result + Ok(res) + } +} diff --git a/lib/src/kvs/tx.rs b/lib/src/kvs/tx.rs index e4b790a5..612dd9bf 100644 --- a/lib/src/kvs/tx.rs +++ b/lib/src/kvs/tx.rs @@ -31,12 +31,12 @@ pub struct Transaction { #[allow(clippy::large_enum_variant)] pub(super) enum Inner { - #[cfg(feature = "kv-echodb")] + #[cfg(feature = "kv-mem")] Mem(super::mem::Transaction), + #[cfg(feature = "kv-rocksdb")] + RocksDB(super::rocksdb::Transaction), #[cfg(feature = "kv-indxdb")] - IxDB(super::ixdb::Transaction), - #[cfg(feature = "kv-yokudb")] - File(super::file::Transaction), + IndxDB(super::indxdb::Transaction), #[cfg(feature = "kv-tikv")] TiKV(super::tikv::Transaction), #[cfg(feature = "kv-fdb")] @@ -52,19 +52,19 @@ impl Transaction { /// in a [`Error::TxFinished`] error. pub async fn closed(&self) -> bool { match self { - #[cfg(feature = "kv-echodb")] + #[cfg(feature = "kv-mem")] Transaction { inner: Inner::Mem(v), .. } => v.closed(), - #[cfg(feature = "kv-yokudb")] + #[cfg(feature = "kv-rocksdb")] Transaction { - inner: Inner::File(v), + inner: Inner::RocksDB(v), .. } => v.closed(), #[cfg(feature = "kv-indxdb")] Transaction { - inner: Inner::IxDB(v), + inner: Inner::IndxDB(v), .. } => v.closed(), #[cfg(feature = "kv-tikv")] @@ -84,19 +84,19 @@ impl Transaction { /// This reverses all changes made within the transaction. pub async fn cancel(&mut self) -> Result<(), Error> { match self { - #[cfg(feature = "kv-echodb")] + #[cfg(feature = "kv-mem")] Transaction { inner: Inner::Mem(v), .. } => v.cancel(), - #[cfg(feature = "kv-yokudb")] + #[cfg(feature = "kv-rocksdb")] Transaction { - inner: Inner::File(v), + inner: Inner::RocksDB(v), .. - } => v.cancel(), + } => v.cancel().await, #[cfg(feature = "kv-indxdb")] Transaction { - inner: Inner::IxDB(v), + inner: Inner::IndxDB(v), .. } => v.cancel().await, #[cfg(feature = "kv-tikv")] @@ -116,19 +116,19 @@ impl Transaction { /// This attempts to commit all changes made within the transaction. pub async fn commit(&mut self) -> Result<(), Error> { match self { - #[cfg(feature = "kv-echodb")] + #[cfg(feature = "kv-mem")] Transaction { inner: Inner::Mem(v), .. } => v.commit(), - #[cfg(feature = "kv-yokudb")] + #[cfg(feature = "kv-rocksdb")] Transaction { - inner: Inner::File(v), + inner: Inner::RocksDB(v), .. - } => v.commit(), + } => v.commit().await, #[cfg(feature = "kv-indxdb")] Transaction { - inner: Inner::IxDB(v), + inner: Inner::IndxDB(v), .. } => v.commit().await, #[cfg(feature = "kv-tikv")] @@ -149,19 +149,19 @@ impl Transaction { K: Into, { match self { - #[cfg(feature = "kv-echodb")] + #[cfg(feature = "kv-mem")] Transaction { inner: Inner::Mem(v), .. } => v.del(key), - #[cfg(feature = "kv-yokudb")] + #[cfg(feature = "kv-rocksdb")] Transaction { - inner: Inner::File(v), + inner: Inner::RocksDB(v), .. - } => v.del(key), + } => v.del(key).await, #[cfg(feature = "kv-indxdb")] Transaction { - inner: Inner::IxDB(v), + inner: Inner::IndxDB(v), .. } => v.del(key).await, #[cfg(feature = "kv-tikv")] @@ -182,19 +182,19 @@ impl Transaction { K: Into, { match self { - #[cfg(feature = "kv-echodb")] + #[cfg(feature = "kv-mem")] Transaction { inner: Inner::Mem(v), .. } => v.exi(key), - #[cfg(feature = "kv-yokudb")] + #[cfg(feature = "kv-rocksdb")] Transaction { - inner: Inner::File(v), + inner: Inner::RocksDB(v), .. - } => v.exi(key), + } => v.exi(key).await, #[cfg(feature = "kv-indxdb")] Transaction { - inner: Inner::IxDB(v), + inner: Inner::IndxDB(v), .. } => v.exi(key).await, #[cfg(feature = "kv-tikv")] @@ -215,19 +215,19 @@ impl Transaction { K: Into, { match self { - #[cfg(feature = "kv-echodb")] + #[cfg(feature = "kv-mem")] Transaction { inner: Inner::Mem(v), .. } => v.get(key), - #[cfg(feature = "kv-yokudb")] + #[cfg(feature = "kv-rocksdb")] Transaction { - inner: Inner::File(v), + inner: Inner::RocksDB(v), .. - } => v.get(key), + } => v.get(key).await, #[cfg(feature = "kv-indxdb")] Transaction { - inner: Inner::IxDB(v), + inner: Inner::IndxDB(v), .. } => v.get(key).await, #[cfg(feature = "kv-tikv")] @@ -249,19 +249,19 @@ impl Transaction { V: Into, { match self { - #[cfg(feature = "kv-echodb")] + #[cfg(feature = "kv-mem")] Transaction { inner: Inner::Mem(v), .. } => v.set(key, val), - #[cfg(feature = "kv-yokudb")] + #[cfg(feature = "kv-rocksdb")] Transaction { - inner: Inner::File(v), + inner: Inner::RocksDB(v), .. - } => v.set(key, val), + } => v.set(key, val).await, #[cfg(feature = "kv-indxdb")] Transaction { - inner: Inner::IxDB(v), + inner: Inner::IndxDB(v), .. } => v.set(key, val).await, #[cfg(feature = "kv-tikv")] @@ -283,19 +283,19 @@ impl Transaction { V: Into, { match self { - #[cfg(feature = "kv-echodb")] + #[cfg(feature = "kv-mem")] Transaction { inner: Inner::Mem(v), .. } => v.put(key, val), - #[cfg(feature = "kv-yokudb")] + #[cfg(feature = "kv-rocksdb")] Transaction { - inner: Inner::File(v), + inner: Inner::RocksDB(v), .. - } => v.put(key, val), + } => v.put(key, val).await, #[cfg(feature = "kv-indxdb")] Transaction { - inner: Inner::IxDB(v), + inner: Inner::IndxDB(v), .. } => v.put(key, val).await, #[cfg(feature = "kv-tikv")] @@ -318,19 +318,19 @@ impl Transaction { K: Into, { match self { - #[cfg(feature = "kv-echodb")] + #[cfg(feature = "kv-mem")] Transaction { inner: Inner::Mem(v), .. } => v.scan(rng, limit), - #[cfg(feature = "kv-yokudb")] + #[cfg(feature = "kv-rocksdb")] Transaction { - inner: Inner::File(v), + inner: Inner::RocksDB(v), .. - } => v.scan(rng, limit), + } => v.scan(rng, limit).await, #[cfg(feature = "kv-indxdb")] Transaction { - inner: Inner::IxDB(v), + inner: Inner::IndxDB(v), .. } => v.scan(rng, limit).await, #[cfg(feature = "kv-tikv")] @@ -352,19 +352,19 @@ impl Transaction { V: Into, { match self { - #[cfg(feature = "kv-echodb")] + #[cfg(feature = "kv-mem")] Transaction { inner: Inner::Mem(v), .. } => v.putc(key, val, chk), - #[cfg(feature = "kv-yokudb")] + #[cfg(feature = "kv-rocksdb")] Transaction { - inner: Inner::File(v), + inner: Inner::RocksDB(v), .. - } => v.putc(key, val, chk), + } => v.putc(key, val, chk).await, #[cfg(feature = "kv-indxdb")] Transaction { - inner: Inner::IxDB(v), + inner: Inner::IndxDB(v), .. } => v.putc(key, val, chk).await, #[cfg(feature = "kv-tikv")] @@ -386,19 +386,19 @@ impl Transaction { V: Into, { match self { - #[cfg(feature = "kv-echodb")] + #[cfg(feature = "kv-mem")] Transaction { inner: Inner::Mem(v), .. } => v.delc(key, chk), - #[cfg(feature = "kv-yokudb")] + #[cfg(feature = "kv-rocksdb")] Transaction { - inner: Inner::File(v), + inner: Inner::RocksDB(v), .. - } => v.delc(key, chk), + } => v.delc(key, chk).await, #[cfg(feature = "kv-indxdb")] Transaction { - inner: Inner::IxDB(v), + inner: Inner::IndxDB(v), .. } => v.delc(key, chk).await, #[cfg(feature = "kv-tikv")]