Add putc + delc functions for conditional put / del in kv store

This commit is contained in:
Tobie Morgan Hitchcock 2022-04-04 16:50:46 +01:00
parent 0df347732d
commit fd4752aff4
8 changed files with 307 additions and 98 deletions

8
Cargo.lock generated
View file

@ -466,9 +466,9 @@ dependencies = [
[[package]]
name = "echodb"
version = "0.2.1"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e7717db5a8a5af149e6652cce851908dbfe05bdd4183c6d2f382e312ef3dc523"
checksum = "9b4d29774dada6bb3dc284fbe41c79efd5425b90933defcb42b109005fe56d41"
dependencies = [
"arc-swap",
"imbl",
@ -1043,9 +1043,9 @@ dependencies = [
[[package]]
name = "indxdb"
version = "0.1.3"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c89b87ab58cbdad0201008ce91890b38e09e18bcc546ee9cb3bf78c1c8dcd3bf"
checksum = "21ea82794a5c0be29734baebe23cf505148313bc35b247b735be9b540b330595"
dependencies = [
"js-sys",
"rexie",

View file

@ -21,11 +21,11 @@ byteorder = "1.4.3"
chrono = { version = "0.4.19", features = ["serde"] }
derive = { version = "0.1.2", package = "surrealdb-derive" }
dmp = "0.1.1"
echodb = { version = "0.2.1", optional = true }
echodb = { version = "0.3.0", optional = true }
futures = "0.3.21"
fuzzy-matcher = "0.3.7"
geo = { version = "0.19.0", features = ["use-serde"] }
indxdb = { version = "0.1.3", optional = true }
indxdb = { version = "0.2.0", optional = true }
log = "0.4.16"
md-5 = "0.10.1"
nanoid = "0.4.0"

View file

@ -35,6 +35,9 @@ pub enum Error {
#[error("Couldn't write to a read only transaction")]
TxReadonly,
#[error("Value being checked was not correct")]
TxConditionNotMet,
#[error("Specify a namespace to use")]
NsEmpty,

View file

@ -73,24 +73,6 @@ impl Transaction {
// Continue
Ok(())
}
// Delete a key
pub fn del<K>(&mut self, key: K) -> Result<(), Error>
where
K: Into<Key>,
{
// Check to see if transaction is closed
if self.ok {
return Err(Error::TxFinished);
}
// Check to see if transaction is writable
if !self.rw {
return Err(Error::TxReadonly);
}
// Remove the key
let res = self.tx.del(key.into())?;
// Return result
Ok(res)
}
// Check if a key exists
pub fn exi<K>(&mut self, key: K) -> Result<bool, Error>
where
@ -157,6 +139,62 @@ impl Transaction {
// Return result
Ok(())
}
// Insert a key if it doesn't exist in the database
pub fn putc<K, V>(&mut self, key: K, val: V, chk: Option<V>) -> Result<(), Error>
where
K: Into<Key>,
V: Into<Val>,
{
// Check to see if transaction is closed
if self.ok {
return Err(Error::TxFinished);
}
// Check to see if transaction is writable
if !self.rw {
return Err(Error::TxReadonly);
}
// Set the key
self.tx.putc(key.into(), val.into(), chk.map(|v| v.into()))?;
// Return result
Ok(())
}
// Delete a key
pub fn del<K>(&mut self, key: K) -> Result<(), Error>
where
K: Into<Key>,
{
// Check to see if transaction is closed
if self.ok {
return Err(Error::TxFinished);
}
// Check to see if transaction is writable
if !self.rw {
return Err(Error::TxReadonly);
}
// Remove the key
let res = self.tx.del(key.into())?;
// Return result
Ok(res)
}
// Delete a key
pub fn delc<K, V>(&mut self, key: K, chk: Option<V>) -> Result<(), Error>
where
K: Into<Key>,
V: Into<Val>,
{
// Check to see if transaction is closed
if self.ok {
return Err(Error::TxFinished);
}
// Check to see if transaction is writable
if !self.rw {
return Err(Error::TxReadonly);
}
// Remove the key
let res = self.tx.delc(key.into(), chk.map(|v| v.into()))?;
// Return result
Ok(res)
}
// Retrieve a range of keys from the databases
pub fn scan<K>(&mut self, rng: Range<K>, limit: u32) -> Result<Vec<(Key, Val)>, Error>
where

View file

@ -73,24 +73,6 @@ impl Transaction {
// Continue
Ok(())
}
// Delete a key
pub async fn del<K>(&mut self, key: K) -> Result<(), Error>
where
K: Into<Key>,
{
// Check to see if transaction is closed
if self.ok {
return Err(Error::TxFinished);
}
// Check to see if transaction is writable
if !self.rw {
return Err(Error::TxReadonly);
}
// Remove the key
let res = self.tx.del(key.into()).await?;
// Return result
Ok(res)
}
// Check if a key exists
pub async fn exi<K>(&mut self, key: K) -> Result<bool, Error>
where
@ -157,6 +139,62 @@ impl Transaction {
// Return result
Ok(())
}
// Insert a key if it doesn't exist in the database
pub async fn putc<K, V>(&mut self, key: K, val: V, chk: Option<V>) -> Result<(), Error>
where
K: Into<Key>,
V: Into<Val>,
{
// Check to see if transaction is closed
if self.ok {
return Err(Error::TxFinished);
}
// Check to see if transaction is writable
if !self.rw {
return Err(Error::TxReadonly);
}
// Set the key
self.tx.putc(key.into(), val.into(), chk.map(|v| v.into())).await?;
// Return result
Ok(())
}
// Delete a key
pub async fn del<K>(&mut self, key: K) -> Result<(), Error>
where
K: Into<Key>,
{
// Check to see if transaction is closed
if self.ok {
return Err(Error::TxFinished);
}
// Check to see if transaction is writable
if !self.rw {
return Err(Error::TxReadonly);
}
// Remove the key
let res = self.tx.del(key.into()).await?;
// Return result
Ok(res)
}
// Delete a key
pub async fn delc<K, V>(&mut self, key: K, chk: Option<V>) -> Result<(), Error>
where
K: Into<Key>,
V: Into<Val>,
{
// Check to see if transaction is closed
if self.ok {
return Err(Error::TxFinished);
}
// Check to see if transaction is writable
if !self.rw {
return Err(Error::TxReadonly);
}
// Remove the key
let res = self.tx.del(key.into(), chk.map(|v| v.into())).await?;
// Return result
Ok(res)
}
// Retrieve a range of keys from the databases
pub async fn scan<K>(&mut self, rng: Range<K>, limit: u32) -> Result<Vec<(Key, Val)>, Error>
where

View file

@ -73,24 +73,6 @@ impl Transaction {
// Continue
Ok(())
}
// Delete a key
pub fn del<K>(&mut self, key: K) -> Result<(), Error>
where
K: Into<Key>,
{
// Check to see if transaction is closed
if self.ok {
return Err(Error::TxFinished);
}
// Check to see if transaction is writable
if !self.rw {
return Err(Error::TxReadonly);
}
// Remove the key
let res = self.tx.del(key.into())?;
// Return result
Ok(res)
}
// Check if a key exists
pub fn exi<K>(&mut self, key: K) -> Result<bool, Error>
where
@ -157,6 +139,62 @@ impl Transaction {
// Return result
Ok(())
}
// Insert a key if it doesn't exist in the database
pub fn putc<K, V>(&mut self, key: K, val: V, chk: Option<V>) -> Result<(), Error>
where
K: Into<Key>,
V: Into<Val>,
{
// Check to see if transaction is closed
if self.ok {
return Err(Error::TxFinished);
}
// Check to see if transaction is writable
if !self.rw {
return Err(Error::TxReadonly);
}
// Set the key
self.tx.putc(key.into(), val.into(), chk.map(|v| v.into()))?;
// Return result
Ok(())
}
// Delete a key
pub fn del<K>(&mut self, key: K) -> Result<(), Error>
where
K: Into<Key>,
{
// Check to see if transaction is closed
if self.ok {
return Err(Error::TxFinished);
}
// Check to see if transaction is writable
if !self.rw {
return Err(Error::TxReadonly);
}
// Remove the key
let res = self.tx.del(key.into())?;
// Return result
Ok(res)
}
// Delete a key
pub fn delc<K, V>(&mut self, key: K, chk: Option<V>) -> Result<(), Error>
where
K: Into<Key>,
V: Into<Val>,
{
// Check to see if transaction is closed
if self.ok {
return Err(Error::TxFinished);
}
// Check to see if transaction is writable
if !self.rw {
return Err(Error::TxReadonly);
}
// Remove the key
let res = self.tx.delc(key.into(), chk.map(|v| v.into()))?;
// Return result
Ok(res)
}
// Retrieve a range of keys from the databases
pub fn scan<K>(&mut self, rng: Range<K>, limit: u32) -> Result<Vec<(Key, Val)>, Error>
where

View file

@ -50,7 +50,7 @@ impl Datastore {
impl Transaction {
// Check if closed
pub async fn closed(&self) -> bool {
pub fn closed(&self) -> bool {
self.ok
}
// Cancel a transaction
@ -83,24 +83,6 @@ impl Transaction {
// Continue
Ok(())
}
// Delete a key
pub async fn del<K>(&mut self, key: K) -> Result<(), Error>
where
K: Into<Key>,
{
// Check to see if transaction is closed
if self.ok {
return Err(Error::TxFinished);
}
// Check to see if transaction is writable
if !self.rw {
return Err(Error::TxReadonly);
}
// Delete the key
self.tx.delete(key.into()).await?;
// Return result
Ok(())
}
// Check if a key exists
pub async fn exi<K>(&mut self, key: K) -> Result<bool, Error>
where
@ -167,6 +149,80 @@ impl Transaction {
// Return result
Ok(())
}
// Insert a key if it doesn't exist in the database
pub async fn putc<K, V>(&mut self, key: K, val: V, chk: Option<V>) -> Result<(), Error>
where
K: Into<Key>,
V: Into<Val>,
{
// Check to see if transaction is closed
if self.ok {
return Err(Error::TxFinished);
}
// Check to see if transaction is writable
if !self.rw {
return Err(Error::TxReadonly);
}
// Get the key
let key = key.into();
// Get the val
let val = val.into();
// Get the check
let chk = chk.map(|v| v.into());
// Delete the key
match (self.tx.get(key.clone()).await?, chk) {
(Some(v), Some(w)) if v == w => self.tx.put(key, val).await?,
(None, None) => self.tx.put(key, val).await?,
_ => return Err(Error::TxConditionNotMet),
};
// Return result
Ok(())
}
// Delete a key
pub async fn del<K>(&mut self, key: K) -> Result<(), Error>
where
K: Into<Key>,
{
// Check to see if transaction is closed
if self.ok {
return Err(Error::TxFinished);
}
// Check to see if transaction is writable
if !self.rw {
return Err(Error::TxReadonly);
}
// Delete the key
self.tx.delete(key.into()).await?;
// Return result
Ok(())
}
// Delete a key
pub async fn delc<K, V>(&mut self, key: K, chk: Option<V>) -> Result<(), Error>
where
K: Into<Key>,
V: Into<Val>,
{
// Check to see if transaction is closed
if self.ok {
return Err(Error::TxFinished);
}
// Check to see if transaction is writable
if !self.rw {
return Err(Error::TxReadonly);
}
// Get the key
let key = key.into();
// Get the check
let chk = chk.map(|v| v.into());
// Delete the key
match (self.tx.get(key.clone()).await?, chk) {
(Some(v), Some(w)) if v == w => self.tx.delete(key).await?,
(None, None) => self.tx.delete(key).await?,
_ => return Err(Error::TxConditionNotMet),
};
// Return result
Ok(())
}
// Retrieve a range of keys from the databases
pub async fn scan<K>(&mut self, rng: Range<K>, limit: u32) -> Result<Vec<(Key, Val)>, Error>
where

View file

@ -22,12 +22,12 @@ impl Transaction {
Transaction::Mock => unreachable!(),
#[cfg(feature = "kv-echodb")]
Transaction::Mem(v) => v.closed(),
#[cfg(feature = "kv-indxdb")]
Transaction::IxDB(v) => v.closed(),
#[cfg(feature = "kv-yokudb")]
Transaction::File(v) => v.closed(),
#[cfg(feature = "kv-indxdb")]
Transaction::IxDB(v) => v.closed(),
#[cfg(feature = "kv-tikv")]
Transaction::TiKV(v) => v.closed().await,
Transaction::TiKV(v) => v.closed(),
}
}
// Cancel a transaction
@ -36,10 +36,10 @@ impl Transaction {
Transaction::Mock => unreachable!(),
#[cfg(feature = "kv-echodb")]
Transaction::Mem(v) => v.cancel(),
#[cfg(feature = "kv-indxdb")]
Transaction::IxDB(v) => v.cancel().await,
#[cfg(feature = "kv-yokudb")]
Transaction::File(v) => v.cancel(),
#[cfg(feature = "kv-indxdb")]
Transaction::IxDB(v) => v.cancel().await,
#[cfg(feature = "kv-tikv")]
Transaction::TiKV(v) => v.cancel().await,
}
@ -50,10 +50,10 @@ impl Transaction {
Transaction::Mock => unreachable!(),
#[cfg(feature = "kv-echodb")]
Transaction::Mem(v) => v.commit(),
#[cfg(feature = "kv-indxdb")]
Transaction::IxDB(v) => v.commit().await,
#[cfg(feature = "kv-yokudb")]
Transaction::File(v) => v.commit(),
#[cfg(feature = "kv-indxdb")]
Transaction::IxDB(v) => v.commit().await,
#[cfg(feature = "kv-tikv")]
Transaction::TiKV(v) => v.commit().await,
}
@ -67,10 +67,10 @@ impl Transaction {
Transaction::Mock => unreachable!(),
#[cfg(feature = "kv-echodb")]
Transaction::Mem(v) => v.del(key),
#[cfg(feature = "kv-indxdb")]
Transaction::IxDB(v) => v.del(key).await,
#[cfg(feature = "kv-yokudb")]
Transaction::File(v) => v.del(key),
#[cfg(feature = "kv-indxdb")]
Transaction::IxDB(v) => v.del(key).await,
#[cfg(feature = "kv-tikv")]
Transaction::TiKV(v) => v.del(key).await,
}
@ -84,10 +84,10 @@ impl Transaction {
Transaction::Mock => unreachable!(),
#[cfg(feature = "kv-echodb")]
Transaction::Mem(v) => v.exi(key),
#[cfg(feature = "kv-indxdb")]
Transaction::IxDB(v) => v.exi(key).await,
#[cfg(feature = "kv-yokudb")]
Transaction::File(v) => v.exi(key),
#[cfg(feature = "kv-indxdb")]
Transaction::IxDB(v) => v.exi(key).await,
#[cfg(feature = "kv-tikv")]
Transaction::TiKV(v) => v.exi(key).await,
}
@ -101,10 +101,10 @@ impl Transaction {
Transaction::Mock => unreachable!(),
#[cfg(feature = "kv-echodb")]
Transaction::Mem(v) => v.get(key),
#[cfg(feature = "kv-indxdb")]
Transaction::IxDB(v) => v.get(key).await,
#[cfg(feature = "kv-yokudb")]
Transaction::File(v) => v.get(key),
#[cfg(feature = "kv-indxdb")]
Transaction::IxDB(v) => v.get(key).await,
#[cfg(feature = "kv-tikv")]
Transaction::TiKV(v) => v.get(key).await,
}
@ -119,10 +119,10 @@ impl Transaction {
Transaction::Mock => unreachable!(),
#[cfg(feature = "kv-echodb")]
Transaction::Mem(v) => v.set(key, val),
#[cfg(feature = "kv-indxdb")]
Transaction::IxDB(v) => v.set(key, val).await,
#[cfg(feature = "kv-yokudb")]
Transaction::File(v) => v.set(key, val),
#[cfg(feature = "kv-indxdb")]
Transaction::IxDB(v) => v.set(key, val).await,
#[cfg(feature = "kv-tikv")]
Transaction::TiKV(v) => v.set(key, val).await,
}
@ -137,10 +137,10 @@ impl Transaction {
Transaction::Mock => unreachable!(),
#[cfg(feature = "kv-echodb")]
Transaction::Mem(v) => v.put(key, val),
#[cfg(feature = "kv-indxdb")]
Transaction::IxDB(v) => v.put(key, val).await,
#[cfg(feature = "kv-yokudb")]
Transaction::File(v) => v.put(key, val),
#[cfg(feature = "kv-indxdb")]
Transaction::IxDB(v) => v.put(key, val).await,
#[cfg(feature = "kv-tikv")]
Transaction::TiKV(v) => v.put(key, val).await,
}
@ -154,14 +154,50 @@ impl Transaction {
Transaction::Mock => unreachable!(),
#[cfg(feature = "kv-echodb")]
Transaction::Mem(v) => v.scan(rng, limit),
#[cfg(feature = "kv-indxdb")]
Transaction::IxDB(v) => v.scan(rng, limit).await,
#[cfg(feature = "kv-yokudb")]
Transaction::File(v) => v.scan(rng, limit),
#[cfg(feature = "kv-indxdb")]
Transaction::IxDB(v) => v.scan(rng, limit).await,
#[cfg(feature = "kv-tikv")]
Transaction::TiKV(v) => v.scan(rng, limit).await,
}
}
// Delete a range of keys from the databases
pub async fn putc<K, V>(&mut self, key: K, val: V, chk: Option<V>) -> Result<(), Error>
where
K: Into<Key>,
V: Into<Val>,
{
match self {
Transaction::Mock => unreachable!(),
#[cfg(feature = "kv-echodb")]
Transaction::Mem(v) => v.putc(key, val, chk),
#[cfg(feature = "kv-yokudb")]
Transaction::File(v) => v.putc(key, val, chk),
#[cfg(feature = "kv-indxdb")]
Transaction::IxDB(v) => v.putc(key, val, chk).await,
#[cfg(feature = "kv-tikv")]
Transaction::TiKV(v) => v.putc(key, val, chk).await,
}
}
// Delete a range of keys from the databases
pub async fn delc<K, V>(&mut self, key: K, chk: Option<V>) -> Result<(), Error>
where
K: Into<Key>,
V: Into<Val>,
{
match self {
Transaction::Mock => unreachable!(),
#[cfg(feature = "kv-echodb")]
Transaction::Mem(v) => v.delc(key, chk),
#[cfg(feature = "kv-yokudb")]
Transaction::File(v) => v.delc(key, chk),
#[cfg(feature = "kv-indxdb")]
Transaction::IxDB(v) => v.delc(key, chk).await,
#[cfg(feature = "kv-tikv")]
Transaction::TiKV(v) => v.delc(key, chk).await,
}
}
// Retrieve a range of keys from the databases
pub async fn getr<K>(&mut self, rng: Range<K>, limit: u32) -> Result<Vec<(Key, Val)>, Error>
where