Add initial RocksDB storage implementation

This commit is contained in:
Tobie Morgan Hitchcock 2022-08-28 14:35:25 +01:00
parent c0217078f5
commit e6d3c1e977
10 changed files with 464 additions and 310 deletions

59
Cargo.lock generated
View file

@ -522,6 +522,17 @@ version = "1.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ec8a7b6a70fde80372154c65702f00a0f56f3e1c36abbc6c440484be248856db" checksum = "ec8a7b6a70fde80372154c65702f00a0f56f3e1c36abbc6c440484be248856db"
[[package]]
name = "bzip2-sys"
version = "0.1.11+1.0.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "736a955f3fa7875102d57c82b8cac37ec45224a07fd32d58f9f7a186b6cd4cdc"
dependencies = [
"cc",
"libc",
"pkg-config",
]
[[package]] [[package]]
name = "cache-padded" name = "cache-padded"
version = "1.2.0" version = "1.2.0"
@ -533,6 +544,9 @@ name = "cc"
version = "1.0.73" version = "1.0.73"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2fff2a6927b3bb87f9595d67196a70493f627687a71d87a0d692242c33f58c11" checksum = "2fff2a6927b3bb87f9595d67196a70493f627687a71d87a0d692242c33f58c11"
dependencies = [
"jobserver",
]
[[package]] [[package]]
name = "cexpr" name = "cexpr"
@ -1889,6 +1903,15 @@ version = "1.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6c8af84674fe1f223a982c933a0ee1086ac4d4052aa0fb8060c12c6ad838e754" checksum = "6c8af84674fe1f223a982c933a0ee1086ac4d4052aa0fb8060c12c6ad838e754"
[[package]]
name = "jobserver"
version = "0.1.24"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "af25a77299a7f711a01975c35a6a424eb6862092cc2d6c72c4ed6cbc56dfc1fa"
dependencies = [
"libc",
]
[[package]] [[package]]
name = "js-sys" name = "js-sys"
version = "0.3.59" version = "0.3.59"
@ -1974,6 +1997,21 @@ dependencies = [
"libc", "libc",
] ]
[[package]]
name = "librocksdb-sys"
version = "0.8.0+7.4.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "611804e4666a25136fcc5f8cf425ab4d26c7f74ea245ffe92ea23b85b6420b5d"
dependencies = [
"bindgen 0.60.1",
"bzip2-sys",
"cc",
"glob",
"libc",
"libz-sys",
"zstd-sys",
]
[[package]] [[package]]
name = "libz-sys" name = "libz-sys"
version = "1.1.8" version = "1.1.8"
@ -2963,6 +3001,16 @@ version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e5864e7ef1a6b7bcf1d6ca3f655e65e724ed3b52546a0d0a663c991522f552ea" checksum = "e5864e7ef1a6b7bcf1d6ca3f655e65e724ed3b52546a0d0a663c991522f552ea"
[[package]]
name = "rocksdb"
version = "0.19.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7e9562ea1d70c0cc63a34a22d977753b50cca91cc6b6527750463bd5dd8697bc"
dependencies = [
"libc",
"librocksdb-sys",
]
[[package]] [[package]]
name = "rquickjs" name = "rquickjs"
version = "0.1.7" version = "0.1.7"
@ -3618,6 +3666,7 @@ dependencies = [
"rand 0.8.5", "rand 0.8.5",
"regex", "regex",
"rmp-serde", "rmp-serde",
"rocksdb",
"rquickjs", "rquickjs",
"scrypt", "scrypt",
"serde", "serde",
@ -4508,3 +4557,13 @@ name = "xml-rs"
version = "0.8.4" version = "0.8.4"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d2d7d3948613f75c98fd9328cfdcc45acc4d360655289d0a7d4ec931392200a3" checksum = "d2d7d3948613f75c98fd9328cfdcc45acc4d360655289d0a7d4ec931392200a3"
[[package]]
name = "zstd-sys"
version = "2.0.1+zstd.1.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9fd07cbbc53846d9145dbffdf6dd09a7a0aa52be46741825f5c97bdd4f73f12b"
dependencies = [
"cc",
"libc",
]

View file

@ -14,13 +14,13 @@ categories = ["database-implementations", "data-structures", "embedded"]
license = "Apache-2.0" license = "Apache-2.0"
[features] [features]
default = ["parallel", "kv-echodb", "kv-rocksdb", "scripting", "http"] default = ["parallel", "kv-mem", "kv-rocksdb", "scripting", "http"]
parallel = ["dep:executor"] parallel = ["dep:executor"]
kv-tikv = ["dep:tikv"] kv-tikv = ["dep:tikv"]
kv-fdb = ["dep:foundationdb"] kv-fdb = ["dep:foundationdb"]
kv-echodb = ["dep:echodb"] kv-mem = ["dep:echodb"]
kv-indxdb = ["dep:indxdb"] kv-indxdb = ["dep:indxdb"]
kv-rocksdb = [] kv-rocksdb = ["dep:rocksdb"]
scripting = ["dep:js", "dep:executor"] scripting = ["dep:js", "dep:executor"]
http = ["dep:surf"] http = ["dep:surf"]
@ -51,6 +51,7 @@ once_cell = "1.13.1"
pbkdf2 = "0.11.0" pbkdf2 = "0.11.0"
rand = "0.8.5" rand = "0.8.5"
regex = "1.6.0" regex = "1.6.0"
rocksdb = { version = "0.19.0", optional = true }
scrypt = "0.10.0" scrypt = "0.10.0"
serde = { version = "1.0.144", features = ["derive"] } serde = { version = "1.0.144", features = ["derive"] }
sha-1 = "0.10.0" sha-1 = "0.10.0"

View file

@ -260,7 +260,7 @@ impl From<Error> for String {
} }
} }
#[cfg(feature = "kv-echodb")] #[cfg(feature = "kv-mem")]
impl From<echodb::err::Error> for Error { impl From<echodb::err::Error> for Error {
fn from(e: echodb::err::Error) -> Error { fn from(e: echodb::err::Error) -> Error {
match e { match e {
@ -290,6 +290,13 @@ impl From<tikv::Error> for Error {
} }
} }
#[cfg(feature = "kv-rocksdb")]
impl From<rocksdb::Error> for Error {
fn from(e: rocksdb::Error) -> Error {
Error::Tx(e.to_string())
}
}
impl From<channel::RecvError> for Error { impl From<channel::RecvError> for Error {
fn from(e: channel::RecvError) -> Error { fn from(e: channel::RecvError) -> Error {
Error::Channel(e.to_string()) Error::Channel(e.to_string())

View file

@ -22,12 +22,12 @@ pub struct Datastore {
#[allow(clippy::large_enum_variant)] #[allow(clippy::large_enum_variant)]
pub(super) enum Inner { pub(super) enum Inner {
#[cfg(feature = "kv-echodb")] #[cfg(feature = "kv-mem")]
Mem(super::mem::Datastore), Mem(super::mem::Datastore),
#[cfg(feature = "kv-rocksdb")]
RocksDB(super::rocksdb::Datastore),
#[cfg(feature = "kv-indxdb")] #[cfg(feature = "kv-indxdb")]
IxDB(super::ixdb::Datastore), IndxDB(super::indxdb::Datastore),
#[cfg(feature = "kv-yokudb")]
File(super::file::Datastore),
#[cfg(feature = "kv-tikv")] #[cfg(feature = "kv-tikv")]
TiKV(super::tikv::Datastore), TiKV(super::tikv::Datastore),
#[cfg(feature = "kv-fdb")] #[cfg(feature = "kv-fdb")]
@ -74,7 +74,7 @@ impl Datastore {
/// ``` /// ```
pub async fn new(path: &str) -> Result<Datastore, Error> { pub async fn new(path: &str) -> Result<Datastore, Error> {
match path { match path {
#[cfg(feature = "kv-echodb")] #[cfg(feature = "kv-mem")]
"memory" => { "memory" => {
info!(target: LOG, "Starting kvs store in {}", path); info!(target: LOG, "Starting kvs store in {}", path);
let v = super::mem::Datastore::new().await.map(|v| Datastore { let v = super::mem::Datastore::new().await.map(|v| Datastore {
@ -83,44 +83,60 @@ impl Datastore {
info!(target: LOG, "Started kvs store in {}", path); info!(target: LOG, "Started kvs store in {}", path);
v v
} }
// Parse and initiate an IxDB database
#[cfg(feature = "kv-indxdb")]
s if s.starts_with("ixdb:") => {
info!(target: LOG, "Starting kvs store at {}", path);
let s = s.trim_start_matches("ixdb://");
let v = super::ixdb::Datastore::new(s).await.map(|v| Datastore {
inner: Inner::IxDB(v),
});
info!(target: LOG, "Started kvs store at {}", path);
v
}
// Parse and initiate an File database // Parse and initiate an File database
#[cfg(feature = "kv-yokudb")] #[cfg(feature = "kv-rocksdb")]
s if s.starts_with("file:") => { s if s.starts_with("file:") => {
info!(target: LOG, "Starting kvs store at {}", path); info!(target: LOG, "Starting kvs store at {}", path);
let s = s.trim_start_matches("file://"); let s = s.trim_start_matches("file://");
let v = super::file::Datastore::new(s).await.map(|v| Datastore { let s = s.trim_start_matches("file:");
inner: Inner::File(v), let v = super::rocksdb::Datastore::new(s).await.map(|v| Datastore {
inner: Inner::RocksDB(v),
}); });
info!(target: LOG, "Started kvs store at {}", path); info!(target: LOG, "Started kvs store at {}", path);
v v
} }
// Parse and initiate an TiKV database // Parse and initiate an RocksDB database
#[cfg(feature = "kv-rocksdb")]
s if s.starts_with("rocksdb:") => {
info!(target: LOG, "Starting kvs store at {}", path);
let s = s.trim_start_matches("rocksdb://");
let s = s.trim_start_matches("rocksdb:");
let v = super::rocksdb::Datastore::new(s).await.map(|v| Datastore {
inner: Inner::RocksDB(v),
});
info!(target: LOG, "Started kvs store at {}", path);
v
}
// Parse and initiate an IndxDB database
#[cfg(feature = "kv-indxdb")]
s if s.starts_with("indxdb:") => {
info!(target: LOG, "Starting kvs store at {}", path);
let s = s.trim_start_matches("indxdb://");
let s = s.trim_start_matches("indxdb:");
let v = super::indxdb::Datastore::new(s).await.map(|v| Datastore {
inner: Inner::IndxDB(v),
});
info!(target: LOG, "Started kvs store at {}", path);
v
}
// Parse and initiate a TiKV database
#[cfg(feature = "kv-tikv")] #[cfg(feature = "kv-tikv")]
s if s.starts_with("tikv:") => { s if s.starts_with("tikv:") => {
info!(target: LOG, "Connecting to kvs store at {}", path); info!(target: LOG, "Connecting to kvs store at {}", path);
let s = s.trim_start_matches("tikv://"); let s = s.trim_start_matches("tikv://");
let s = s.trim_start_matches("tikv:");
let v = super::tikv::Datastore::new(s).await.map(|v| Datastore { let v = super::tikv::Datastore::new(s).await.map(|v| Datastore {
inner: Inner::TiKV(v), inner: Inner::TiKV(v),
}); });
info!(target: LOG, "Connected to kvs store at {}", path); info!(target: LOG, "Connected to kvs store at {}", path);
v v
} }
// Parse and initiate an TiKV database // Parse and initiate a FoundationDB database
#[cfg(feature = "kv-fdb")] #[cfg(feature = "kv-fdb")]
s if s.starts_with("fdb:") => { s if s.starts_with("fdb:") => {
info!(target: LOG, "Connecting to kvs store at {}", path); info!(target: LOG, "Connecting to kvs store at {}", path);
let s = s.trim_start_matches("fdb://"); let s = s.trim_start_matches("fdb://");
let s = s.trim_start_matches("fdb:");
let v = super::fdb::Datastore::new(s).await.map(|v| Datastore { let v = super::fdb::Datastore::new(s).await.map(|v| Datastore {
inner: Inner::FDB(v), inner: Inner::FDB(v),
}); });
@ -150,7 +166,7 @@ impl Datastore {
/// ``` /// ```
pub async fn transaction(&self, write: bool, lock: bool) -> Result<Transaction, Error> { pub async fn transaction(&self, write: bool, lock: bool) -> Result<Transaction, Error> {
match &self.inner { match &self.inner {
#[cfg(feature = "kv-echodb")] #[cfg(feature = "kv-mem")]
Inner::Mem(v) => { Inner::Mem(v) => {
let tx = v.transaction(write, lock).await?; let tx = v.transaction(write, lock).await?;
Ok(Transaction { Ok(Transaction {
@ -158,19 +174,19 @@ impl Datastore {
cache: super::cache::Cache::default(), cache: super::cache::Cache::default(),
}) })
} }
#[cfg(feature = "kv-indxdb")] #[cfg(feature = "kv-rocksdb")]
Inner::IxDB(v) => { Inner::RocksDB(v) => {
let tx = v.transaction(write, lock).await?; let tx = v.transaction(write, lock).await?;
Ok(Transaction { Ok(Transaction {
inner: super::tx::Inner::IxDB(tx), inner: super::tx::Inner::RocksDB(tx),
cache: super::cache::Cache::default(), cache: super::cache::Cache::default(),
}) })
} }
#[cfg(feature = "kv-yokudb")] #[cfg(feature = "kv-indxdb")]
Inner::File(v) => { Inner::IndxDB(v) => {
let tx = v.transaction(write, lock).await?; let tx = v.transaction(write, lock).await?;
Ok(Transaction { Ok(Transaction {
inner: super::tx::Inner::File(tx), inner: super::tx::Inner::IndxDB(tx),
cache: super::cache::Cache::default(), cache: super::cache::Cache::default(),
}) })
} }

View file

@ -1,217 +0,0 @@
#![cfg(feature = "kv-yokudb")]
use crate::err::Error;
use crate::kvs::Key;
use crate::kvs::Val;
use std::ops::Range;
pub struct Datastore {
db: echodb::Db<Key, Val>,
}
pub struct Transaction {
// Is the transaction complete?
ok: bool,
// Is the transaction read+write?
rw: bool,
// The distributed datastore transaction
tx: echodb::Tx<Key, Val>,
}
impl Datastore {
// Open a new database
pub async fn new(_path: &str) -> Result<Datastore, Error> {
Ok(Datastore {
db: echodb::db::new(),
})
}
// Start a new transaction
pub async fn transaction(&self, write: bool, _: bool) -> Result<Transaction, Error> {
match self.db.begin(write).await {
Ok(tx) => Ok(Transaction {
ok: false,
rw: write,
tx,
}),
Err(e) => Err(Error::Tx(e.to_string())),
}
}
}
impl Transaction {
// Check if closed
pub fn closed(&self) -> bool {
self.ok
}
// Cancel a transaction
pub fn cancel(&mut self) -> Result<(), Error> {
// Check to see if transaction is closed
if self.ok {
return Err(Error::TxFinished);
}
// Mark this transaction as done
self.ok = true;
// Cancel this transaction
self.tx.cancel()?;
// Continue
Ok(())
}
// Commit a transaction
pub fn commit(&mut self) -> Result<(), Error> {
// Check to see if transaction is closed
if self.ok {
return Err(Error::TxFinished);
}
// Check to see if transaction is writable
if !self.rw {
return Err(Error::TxReadonly);
}
// Mark this transaction as done
self.ok = true;
// Cancel this transaction
self.tx.commit()?;
// Continue
Ok(())
}
// Check if a key exists
pub fn exi<K>(&mut self, key: K) -> Result<bool, Error>
where
K: Into<Key>,
{
// Check to see if transaction is closed
if self.ok {
return Err(Error::TxFinished);
}
// Check the key
let res = self.tx.exi(key.into())?;
// Return result
Ok(res)
}
// Fetch a key from the database
pub fn get<K>(&mut self, key: K) -> Result<Option<Val>, Error>
where
K: Into<Key>,
{
// Check to see if transaction is closed
if self.ok {
return Err(Error::TxFinished);
}
// Get the key
let res = self.tx.get(key.into())?;
// Return result
Ok(res)
}
// Insert or update a key in the database
pub fn set<K, V>(&mut self, key: K, val: V) -> Result<(), Error>
where
K: Into<Key>,
V: Into<Val>,
{
// Check to see if transaction is closed
if self.ok {
return Err(Error::TxFinished);
}
// Check to see if transaction is writable
if !self.rw {
return Err(Error::TxReadonly);
}
// Set the key
self.tx.set(key.into(), val.into())?;
// Return result
Ok(())
}
// Insert a key if it doesn't exist in the database
pub fn put<K, V>(&mut self, key: K, val: V) -> Result<(), Error>
where
K: Into<Key>,
V: Into<Val>,
{
// Check to see if transaction is closed
if self.ok {
return Err(Error::TxFinished);
}
// Check to see if transaction is writable
if !self.rw {
return Err(Error::TxReadonly);
}
// Set the key
self.tx.put(key.into(), val.into())?;
// Return result
Ok(())
}
// Insert a key if it doesn't exist in the database
pub fn putc<K, V>(&mut self, key: K, val: V, chk: Option<V>) -> Result<(), Error>
where
K: Into<Key>,
V: Into<Val>,
{
// Check to see if transaction is closed
if self.ok {
return Err(Error::TxFinished);
}
// Check to see if transaction is writable
if !self.rw {
return Err(Error::TxReadonly);
}
// Set the key
self.tx.putc(key.into(), val.into(), chk.map(Into::into))?;
// Return result
Ok(())
}
// Delete a key
pub fn del<K>(&mut self, key: K) -> Result<(), Error>
where
K: Into<Key>,
{
// Check to see if transaction is closed
if self.ok {
return Err(Error::TxFinished);
}
// Check to see if transaction is writable
if !self.rw {
return Err(Error::TxReadonly);
}
// Remove the key
self.tx.del(key.into())?;
// Return result
Ok(())
}
// Delete a key
pub fn delc<K, V>(&mut self, key: K, chk: Option<V>) -> Result<(), Error>
where
K: Into<Key>,
V: Into<Val>,
{
// Check to see if transaction is closed
if self.ok {
return Err(Error::TxFinished);
}
// Check to see if transaction is writable
if !self.rw {
return Err(Error::TxReadonly);
}
// Remove the key
self.tx.delc(key.into(), chk.map(Into::into))?;
// Return result
Ok(())
}
// Retrieve a range of keys from the databases
pub fn scan<K>(&mut self, rng: Range<K>, limit: u32) -> Result<Vec<(Key, Val)>, Error>
where
K: Into<Key>,
{
// Check to see if transaction is closed
if self.ok {
return Err(Error::TxFinished);
}
// Convert the range to bytes
let rng: Range<Key> = Range {
start: rng.start.into(),
end: rng.end.into(),
};
// Scan the keys
let res = self.tx.scan(rng, limit)?;
// Return result
Ok(res)
}
}

View file

@ -1,4 +1,4 @@
#![cfg(feature = "kv-echodb")] #![cfg(feature = "kv-mem")]
use crate::err::Error; use crate::err::Error;
use crate::kvs::Key; use crate::kvs::Key;

View file

@ -1,10 +1,10 @@
mod cache; mod cache;
mod ds; mod ds;
mod fdb; mod fdb;
mod file; mod indxdb;
mod ixdb;
mod kv; mod kv;
mod mem; mod mem;
mod rocksdb;
mod tikv; mod tikv;
mod tx; mod tx;

288
lib/src/kvs/rocksdb/mod.rs Normal file
View file

@ -0,0 +1,288 @@
#![cfg(feature = "kv-rocksdb")]
use crate::err::Error;
use crate::kvs::Key;
use crate::kvs::Val;
use futures::lock::Mutex;
use rocksdb::Direction;
use rocksdb::IteratorMode;
use rocksdb::OptimisticTransactionDB;
use rocksdb::ReadOptions;
use std::ops::Range;
use std::sync::Arc;
pub struct Datastore {
db: rocksdb::OptimisticTransactionDB,
}
pub struct Transaction {
// Is the transaction complete?
ok: bool,
// Is the transaction read+write?
rw: bool,
// The distributed datastore transaction
tx: Arc<Mutex<Option<rocksdb::Transaction<'static, OptimisticTransactionDB>>>>,
}
impl Datastore {
// Open a new database
pub async fn new(path: &str) -> Result<Datastore, Error> {
Ok(Datastore {
db: OptimisticTransactionDB::open_default(path)?,
})
}
// Start a new transaction
pub async fn transaction(&self, write: bool, _: bool) -> Result<Transaction, Error> {
// Create a new transaction
let tx = self.db.transaction();
// The database reference must always outlive
// the transaction. If it doesn't then this
// is undefined behaviour. This unsafe block
// ensures that the transaction reference is
// static, but will cause a crash if the
// datastore is dropped prematurely.
let tx = unsafe {
std::mem::transmute::<
rocksdb::Transaction<'_, OptimisticTransactionDB>,
rocksdb::Transaction<'static, OptimisticTransactionDB>,
>(tx)
};
// Return the transaction
Ok(Transaction {
ok: false,
rw: write,
tx: Arc::new(Mutex::new(Some(tx))),
})
}
}
impl Transaction {
// Check if closed
pub fn closed(&self) -> bool {
self.ok
}
// Cancel a transaction
pub async fn cancel(&mut self) -> Result<(), Error> {
// Check to see if transaction is closed
if self.ok {
return Err(Error::TxFinished);
}
// Mark this transaction as done
self.ok = true;
// Cancel this transaction
match self.tx.lock().await.take() {
Some(tx) => tx.rollback()?,
None => unreachable!(),
};
// Continue
Ok(())
}
// Commit a transaction
pub async fn commit(&mut self) -> Result<(), Error> {
// Check to see if transaction is closed
if self.ok {
return Err(Error::TxFinished);
}
// Check to see if transaction is writable
if !self.rw {
return Err(Error::TxReadonly);
}
// Mark this transaction as done
self.ok = true;
// Cancel this transaction
match self.tx.lock().await.take() {
Some(tx) => tx.commit()?,
None => unreachable!(),
};
// Continue
Ok(())
}
// Check if a key exists
pub async fn exi<K>(&mut self, key: K) -> Result<bool, Error>
where
K: Into<Key>,
{
// Check to see if transaction is closed
if self.ok {
return Err(Error::TxFinished);
}
// Check the key
let res = self.tx.lock().await.as_ref().unwrap().get(key.into())?.is_some();
// Return result
Ok(res)
}
// Fetch a key from the database
pub async fn get<K>(&mut self, key: K) -> Result<Option<Val>, Error>
where
K: Into<Key>,
{
// Check to see if transaction is closed
if self.ok {
return Err(Error::TxFinished);
}
// Get the key
let res = self.tx.lock().await.as_ref().unwrap().get(key.into())?;
// Return result
Ok(res)
}
// Insert or update a key in the database
pub async fn set<K, V>(&mut self, key: K, val: V) -> Result<(), Error>
where
K: Into<Key>,
V: Into<Val>,
{
// Check to see if transaction is closed
if self.ok {
return Err(Error::TxFinished);
}
// Check to see if transaction is writable
if !self.rw {
return Err(Error::TxReadonly);
}
// Set the key
self.tx.lock().await.as_ref().unwrap().put(key.into(), val.into())?;
// Return result
Ok(())
}
// Insert a key if it doesn't exist in the database
pub async fn put<K, V>(&mut self, key: K, val: V) -> Result<(), Error>
where
K: Into<Key>,
V: Into<Val>,
{
// Check to see if transaction is closed
if self.ok {
return Err(Error::TxFinished);
}
// Check to see if transaction is writable
if !self.rw {
return Err(Error::TxReadonly);
}
// Get the transaction
let tx = self.tx.lock().await;
let tx = tx.as_ref().unwrap();
// Get the arguments
let key = key.into();
let val = val.into();
// Set the key if empty
match tx.get(&key)? {
None => tx.put(key, val)?,
_ => return Err(Error::TxKeyAlreadyExists),
};
// Return result
Ok(())
}
// Insert a key if it doesn't exist in the database
pub async fn putc<K, V>(&mut self, key: K, val: V, chk: Option<V>) -> Result<(), Error>
where
K: Into<Key>,
V: Into<Val>,
{
// Check to see if transaction is closed
if self.ok {
return Err(Error::TxFinished);
}
// Check to see if transaction is writable
if !self.rw {
return Err(Error::TxReadonly);
}
// Get the transaction
let tx = self.tx.lock().await;
let tx = tx.as_ref().unwrap();
// Get the arguments
let key = key.into();
let val = val.into();
let chk = chk.map(Into::into);
// Set the key if valid
match (tx.get(&key)?, chk) {
(Some(v), Some(w)) if v == w => tx.put(key, val)?,
(None, None) => tx.put(key, val)?,
_ => return Err(Error::TxConditionNotMet),
};
// Return result
Ok(())
}
// Delete a key
pub async fn del<K>(&mut self, key: K) -> Result<(), Error>
where
K: Into<Key>,
{
// Check to see if transaction is closed
if self.ok {
return Err(Error::TxFinished);
}
// Check to see if transaction is writable
if !self.rw {
return Err(Error::TxReadonly);
}
// Remove the key
self.tx.lock().await.as_ref().unwrap().delete(key.into())?;
// Return result
Ok(())
}
// Delete a key
pub async fn delc<K, V>(&mut self, key: K, chk: Option<V>) -> Result<(), Error>
where
K: Into<Key>,
V: Into<Val>,
{
// Check to see if transaction is closed
if self.ok {
return Err(Error::TxFinished);
}
// Check to see if transaction is writable
if !self.rw {
return Err(Error::TxReadonly);
}
// Get the transaction
let tx = self.tx.lock().await;
let tx = tx.as_ref().unwrap();
// Get the arguments
let key = key.into();
let chk = chk.map(Into::into);
// Delete the key if valid
match (tx.get(&key)?, chk) {
(Some(v), Some(w)) if v == w => tx.delete(key)?,
(None, None) => tx.delete(key)?,
_ => return Err(Error::TxConditionNotMet),
};
// Return result
Ok(())
}
// Retrieve a range of keys from the databases
pub async fn scan<K>(&mut self, rng: Range<K>, limit: u32) -> Result<Vec<(Key, Val)>, Error>
where
K: Into<Key>,
{
// Check to see if transaction is closed
if self.ok {
return Err(Error::TxFinished);
}
// Get the transaction
let tx = self.tx.lock().await;
let tx = tx.as_ref().unwrap();
// Convert the range to bytes
let rng: Range<Key> = Range {
start: rng.start.into(),
end: rng.end.into(),
};
// Create result set
let mut res = vec![];
// Iterate forwards
let dir = Direction::Forward;
// Set the start key
let cnf = IteratorMode::From(&rng.start, dir);
// Set the maximum key
let mut opt = ReadOptions::default();
opt.set_iterate_range(..rng.end);
// Create the iterator
let ite = tx.iterator_opt(cnf, opt);
// Scan the keys in the iterator
for item in ite.take(limit as usize) {
let (k, v) = item?;
res.push((k.into_vec(), v.into_vec()));
}
// Return result
Ok(res)
}
}

View file

@ -31,12 +31,12 @@ pub struct Transaction {
#[allow(clippy::large_enum_variant)] #[allow(clippy::large_enum_variant)]
pub(super) enum Inner { pub(super) enum Inner {
#[cfg(feature = "kv-echodb")] #[cfg(feature = "kv-mem")]
Mem(super::mem::Transaction), Mem(super::mem::Transaction),
#[cfg(feature = "kv-rocksdb")]
RocksDB(super::rocksdb::Transaction),
#[cfg(feature = "kv-indxdb")] #[cfg(feature = "kv-indxdb")]
IxDB(super::ixdb::Transaction), IndxDB(super::indxdb::Transaction),
#[cfg(feature = "kv-yokudb")]
File(super::file::Transaction),
#[cfg(feature = "kv-tikv")] #[cfg(feature = "kv-tikv")]
TiKV(super::tikv::Transaction), TiKV(super::tikv::Transaction),
#[cfg(feature = "kv-fdb")] #[cfg(feature = "kv-fdb")]
@ -52,19 +52,19 @@ impl Transaction {
/// in a [`Error::TxFinished`] error. /// in a [`Error::TxFinished`] error.
pub async fn closed(&self) -> bool { pub async fn closed(&self) -> bool {
match self { match self {
#[cfg(feature = "kv-echodb")] #[cfg(feature = "kv-mem")]
Transaction { Transaction {
inner: Inner::Mem(v), inner: Inner::Mem(v),
.. ..
} => v.closed(), } => v.closed(),
#[cfg(feature = "kv-yokudb")] #[cfg(feature = "kv-rocksdb")]
Transaction { Transaction {
inner: Inner::File(v), inner: Inner::RocksDB(v),
.. ..
} => v.closed(), } => v.closed(),
#[cfg(feature = "kv-indxdb")] #[cfg(feature = "kv-indxdb")]
Transaction { Transaction {
inner: Inner::IxDB(v), inner: Inner::IndxDB(v),
.. ..
} => v.closed(), } => v.closed(),
#[cfg(feature = "kv-tikv")] #[cfg(feature = "kv-tikv")]
@ -84,19 +84,19 @@ impl Transaction {
/// This reverses all changes made within the transaction. /// This reverses all changes made within the transaction.
pub async fn cancel(&mut self) -> Result<(), Error> { pub async fn cancel(&mut self) -> Result<(), Error> {
match self { match self {
#[cfg(feature = "kv-echodb")] #[cfg(feature = "kv-mem")]
Transaction { Transaction {
inner: Inner::Mem(v), inner: Inner::Mem(v),
.. ..
} => v.cancel(), } => v.cancel(),
#[cfg(feature = "kv-yokudb")] #[cfg(feature = "kv-rocksdb")]
Transaction { Transaction {
inner: Inner::File(v), inner: Inner::RocksDB(v),
.. ..
} => v.cancel(), } => v.cancel().await,
#[cfg(feature = "kv-indxdb")] #[cfg(feature = "kv-indxdb")]
Transaction { Transaction {
inner: Inner::IxDB(v), inner: Inner::IndxDB(v),
.. ..
} => v.cancel().await, } => v.cancel().await,
#[cfg(feature = "kv-tikv")] #[cfg(feature = "kv-tikv")]
@ -116,19 +116,19 @@ impl Transaction {
/// This attempts to commit all changes made within the transaction. /// This attempts to commit all changes made within the transaction.
pub async fn commit(&mut self) -> Result<(), Error> { pub async fn commit(&mut self) -> Result<(), Error> {
match self { match self {
#[cfg(feature = "kv-echodb")] #[cfg(feature = "kv-mem")]
Transaction { Transaction {
inner: Inner::Mem(v), inner: Inner::Mem(v),
.. ..
} => v.commit(), } => v.commit(),
#[cfg(feature = "kv-yokudb")] #[cfg(feature = "kv-rocksdb")]
Transaction { Transaction {
inner: Inner::File(v), inner: Inner::RocksDB(v),
.. ..
} => v.commit(), } => v.commit().await,
#[cfg(feature = "kv-indxdb")] #[cfg(feature = "kv-indxdb")]
Transaction { Transaction {
inner: Inner::IxDB(v), inner: Inner::IndxDB(v),
.. ..
} => v.commit().await, } => v.commit().await,
#[cfg(feature = "kv-tikv")] #[cfg(feature = "kv-tikv")]
@ -149,19 +149,19 @@ impl Transaction {
K: Into<Key>, K: Into<Key>,
{ {
match self { match self {
#[cfg(feature = "kv-echodb")] #[cfg(feature = "kv-mem")]
Transaction { Transaction {
inner: Inner::Mem(v), inner: Inner::Mem(v),
.. ..
} => v.del(key), } => v.del(key),
#[cfg(feature = "kv-yokudb")] #[cfg(feature = "kv-rocksdb")]
Transaction { Transaction {
inner: Inner::File(v), inner: Inner::RocksDB(v),
.. ..
} => v.del(key), } => v.del(key).await,
#[cfg(feature = "kv-indxdb")] #[cfg(feature = "kv-indxdb")]
Transaction { Transaction {
inner: Inner::IxDB(v), inner: Inner::IndxDB(v),
.. ..
} => v.del(key).await, } => v.del(key).await,
#[cfg(feature = "kv-tikv")] #[cfg(feature = "kv-tikv")]
@ -182,19 +182,19 @@ impl Transaction {
K: Into<Key>, K: Into<Key>,
{ {
match self { match self {
#[cfg(feature = "kv-echodb")] #[cfg(feature = "kv-mem")]
Transaction { Transaction {
inner: Inner::Mem(v), inner: Inner::Mem(v),
.. ..
} => v.exi(key), } => v.exi(key),
#[cfg(feature = "kv-yokudb")] #[cfg(feature = "kv-rocksdb")]
Transaction { Transaction {
inner: Inner::File(v), inner: Inner::RocksDB(v),
.. ..
} => v.exi(key), } => v.exi(key).await,
#[cfg(feature = "kv-indxdb")] #[cfg(feature = "kv-indxdb")]
Transaction { Transaction {
inner: Inner::IxDB(v), inner: Inner::IndxDB(v),
.. ..
} => v.exi(key).await, } => v.exi(key).await,
#[cfg(feature = "kv-tikv")] #[cfg(feature = "kv-tikv")]
@ -215,19 +215,19 @@ impl Transaction {
K: Into<Key>, K: Into<Key>,
{ {
match self { match self {
#[cfg(feature = "kv-echodb")] #[cfg(feature = "kv-mem")]
Transaction { Transaction {
inner: Inner::Mem(v), inner: Inner::Mem(v),
.. ..
} => v.get(key), } => v.get(key),
#[cfg(feature = "kv-yokudb")] #[cfg(feature = "kv-rocksdb")]
Transaction { Transaction {
inner: Inner::File(v), inner: Inner::RocksDB(v),
.. ..
} => v.get(key), } => v.get(key).await,
#[cfg(feature = "kv-indxdb")] #[cfg(feature = "kv-indxdb")]
Transaction { Transaction {
inner: Inner::IxDB(v), inner: Inner::IndxDB(v),
.. ..
} => v.get(key).await, } => v.get(key).await,
#[cfg(feature = "kv-tikv")] #[cfg(feature = "kv-tikv")]
@ -249,19 +249,19 @@ impl Transaction {
V: Into<Val>, V: Into<Val>,
{ {
match self { match self {
#[cfg(feature = "kv-echodb")] #[cfg(feature = "kv-mem")]
Transaction { Transaction {
inner: Inner::Mem(v), inner: Inner::Mem(v),
.. ..
} => v.set(key, val), } => v.set(key, val),
#[cfg(feature = "kv-yokudb")] #[cfg(feature = "kv-rocksdb")]
Transaction { Transaction {
inner: Inner::File(v), inner: Inner::RocksDB(v),
.. ..
} => v.set(key, val), } => v.set(key, val).await,
#[cfg(feature = "kv-indxdb")] #[cfg(feature = "kv-indxdb")]
Transaction { Transaction {
inner: Inner::IxDB(v), inner: Inner::IndxDB(v),
.. ..
} => v.set(key, val).await, } => v.set(key, val).await,
#[cfg(feature = "kv-tikv")] #[cfg(feature = "kv-tikv")]
@ -283,19 +283,19 @@ impl Transaction {
V: Into<Val>, V: Into<Val>,
{ {
match self { match self {
#[cfg(feature = "kv-echodb")] #[cfg(feature = "kv-mem")]
Transaction { Transaction {
inner: Inner::Mem(v), inner: Inner::Mem(v),
.. ..
} => v.put(key, val), } => v.put(key, val),
#[cfg(feature = "kv-yokudb")] #[cfg(feature = "kv-rocksdb")]
Transaction { Transaction {
inner: Inner::File(v), inner: Inner::RocksDB(v),
.. ..
} => v.put(key, val), } => v.put(key, val).await,
#[cfg(feature = "kv-indxdb")] #[cfg(feature = "kv-indxdb")]
Transaction { Transaction {
inner: Inner::IxDB(v), inner: Inner::IndxDB(v),
.. ..
} => v.put(key, val).await, } => v.put(key, val).await,
#[cfg(feature = "kv-tikv")] #[cfg(feature = "kv-tikv")]
@ -318,19 +318,19 @@ impl Transaction {
K: Into<Key>, K: Into<Key>,
{ {
match self { match self {
#[cfg(feature = "kv-echodb")] #[cfg(feature = "kv-mem")]
Transaction { Transaction {
inner: Inner::Mem(v), inner: Inner::Mem(v),
.. ..
} => v.scan(rng, limit), } => v.scan(rng, limit),
#[cfg(feature = "kv-yokudb")] #[cfg(feature = "kv-rocksdb")]
Transaction { Transaction {
inner: Inner::File(v), inner: Inner::RocksDB(v),
.. ..
} => v.scan(rng, limit), } => v.scan(rng, limit).await,
#[cfg(feature = "kv-indxdb")] #[cfg(feature = "kv-indxdb")]
Transaction { Transaction {
inner: Inner::IxDB(v), inner: Inner::IndxDB(v),
.. ..
} => v.scan(rng, limit).await, } => v.scan(rng, limit).await,
#[cfg(feature = "kv-tikv")] #[cfg(feature = "kv-tikv")]
@ -352,19 +352,19 @@ impl Transaction {
V: Into<Val>, V: Into<Val>,
{ {
match self { match self {
#[cfg(feature = "kv-echodb")] #[cfg(feature = "kv-mem")]
Transaction { Transaction {
inner: Inner::Mem(v), inner: Inner::Mem(v),
.. ..
} => v.putc(key, val, chk), } => v.putc(key, val, chk),
#[cfg(feature = "kv-yokudb")] #[cfg(feature = "kv-rocksdb")]
Transaction { Transaction {
inner: Inner::File(v), inner: Inner::RocksDB(v),
.. ..
} => v.putc(key, val, chk), } => v.putc(key, val, chk).await,
#[cfg(feature = "kv-indxdb")] #[cfg(feature = "kv-indxdb")]
Transaction { Transaction {
inner: Inner::IxDB(v), inner: Inner::IndxDB(v),
.. ..
} => v.putc(key, val, chk).await, } => v.putc(key, val, chk).await,
#[cfg(feature = "kv-tikv")] #[cfg(feature = "kv-tikv")]
@ -386,19 +386,19 @@ impl Transaction {
V: Into<Val>, V: Into<Val>,
{ {
match self { match self {
#[cfg(feature = "kv-echodb")] #[cfg(feature = "kv-mem")]
Transaction { Transaction {
inner: Inner::Mem(v), inner: Inner::Mem(v),
.. ..
} => v.delc(key, chk), } => v.delc(key, chk),
#[cfg(feature = "kv-yokudb")] #[cfg(feature = "kv-rocksdb")]
Transaction { Transaction {
inner: Inner::File(v), inner: Inner::RocksDB(v),
.. ..
} => v.delc(key, chk), } => v.delc(key, chk).await,
#[cfg(feature = "kv-indxdb")] #[cfg(feature = "kv-indxdb")]
Transaction { Transaction {
inner: Inner::IxDB(v), inner: Inner::IndxDB(v),
.. ..
} => v.delc(key, chk).await, } => v.delc(key, chk).await,
#[cfg(feature = "kv-tikv")] #[cfg(feature = "kv-tikv")]