Use native SurrealKV savepoints (#4749)

This commit is contained in:
Sergii Glushchenko 2024-09-17 11:05:15 +01:00 committed by GitHub
parent c1443fe107
commit 682523db6d
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 28 additions and 96 deletions

5
Cargo.lock generated
View file

@ -6225,9 +6225,9 @@ dependencies = [
[[package]] [[package]]
name = "surrealkv" name = "surrealkv"
version = "0.3.4" version = "0.3.6"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7d5fe193207896e6eeb445b88b810830097a9940058174bbbe8ec5597b1f52e1" checksum = "40ac63abeb621f728a556d63004756b70ff6dd9b9805c4d8e2e26c16bbed6097"
dependencies = [ dependencies = [
"ahash 0.8.11", "ahash 0.8.11",
"async-channel 2.2.0", "async-channel 2.2.0",
@ -6239,7 +6239,6 @@ dependencies = [
"parking_lot", "parking_lot",
"quick_cache 0.6.2", "quick_cache 0.6.2",
"revision 0.7.1", "revision 0.7.1",
"sha2",
"tokio", "tokio",
"vart", "vart",
] ]

View file

@ -58,7 +58,9 @@ ammonia = "4.0.0"
arbitrary = { version = "1.3.2", features = ["derive"], optional = true } arbitrary = { version = "1.3.2", features = ["derive"], optional = true }
argon2 = "0.5.2" argon2 = "0.5.2"
ascii = { version = "0.3.2", package = "any_ascii" } ascii = { version = "0.3.2", package = "any_ascii" }
async-graphql = { version = "7.0.9", default-features = false, features = ["dynamic-schema"] } async-graphql = { version = "7.0.9", default-features = false, features = [
"dynamic-schema",
] }
base64 = "0.21.5" base64 = "0.21.5"
bcrypt = "0.15.0" bcrypt = "0.15.0"
bincode = "1.3.3" bincode = "1.3.3"
@ -146,7 +148,7 @@ snap = "1.1.0"
storekey = "0.5.0" storekey = "0.5.0"
subtle = "2.6" subtle = "2.6"
surrealcs = { version = "0.3.1", optional = true } surrealcs = { version = "0.3.1", optional = true }
surrealkv = { version = "0.3.4", optional = true } surrealkv = { version = "0.3.6", optional = true }
surrealml = { version = "0.1.1", optional = true, package = "surrealml-core" } surrealml = { version = "0.1.1", optional = true, package = "surrealml-core" }
tempfile = { version = "3.10.1", optional = true } tempfile = { version = "3.10.1", optional = true }
thiserror = "1.0.63" thiserror = "1.0.63"

View file

@ -43,7 +43,6 @@ mod index;
feature = "kv-tikv", feature = "kv-tikv",
feature = "kv-fdb", feature = "kv-fdb",
feature = "kv-indxdb", feature = "kv-indxdb",
feature = "kv-surrealkv",
feature = "kv-surrealcs", feature = "kv-surrealcs",
))] ))]
mod savepoint; mod savepoint;

View file

@ -2,7 +2,6 @@
use crate::err::Error; use crate::err::Error;
use crate::key::debug::Sprintable; use crate::key::debug::Sprintable;
use crate::kvs::savepoint::{SaveOperation, SavePointImpl, SavePoints, SavePrepare};
use crate::kvs::Check; use crate::kvs::Check;
use crate::kvs::Key; use crate::kvs::Key;
use crate::kvs::Val; use crate::kvs::Val;
@ -27,8 +26,6 @@ pub struct Transaction {
check: Check, check: Check,
/// The underlying datastore transaction /// The underlying datastore transaction
inner: Tx, inner: Tx,
/// The save point implementation
save_points: SavePoints,
} }
impl Drop for Transaction { impl Drop for Transaction {
@ -88,7 +85,6 @@ impl Datastore {
check, check,
write, write,
inner, inner,
save_points: Default::default(),
}), }),
Err(e) => Err(Error::Tx(e.to_string())), Err(e) => Err(Error::Tx(e.to_string())),
} }
@ -197,22 +193,10 @@ impl super::api::Transaction for Transaction {
if !self.write { if !self.write {
return Err(Error::TxReadonly); return Err(Error::TxReadonly);
} }
// Extract the key
let key = key.into();
// Prepare the savepoint if any
let prep = if self.save_points.is_some() {
self.save_point_prepare(&key, version, SaveOperation::Set).await?
} else {
None
};
// Set the key // Set the key
match version { match version {
Some(ts) => self.inner.set_at_ts(&key, &val.into(), ts)?, Some(ts) => self.inner.set_at_ts(&key.into(), &val.into(), ts)?,
None => self.inner.set(&key, &val.into())?, None => self.inner.set(&key.into(), &val.into())?,
}
// Confirm the save point
if let Some(prep) = prep {
self.save_points.save(prep);
} }
// Return result // Return result
Ok(()) Ok(())
@ -236,32 +220,14 @@ impl super::api::Transaction for Transaction {
// Get the arguments // Get the arguments
let key = key.into(); let key = key.into();
let val = val.into(); let val = val.into();
// Hydrate the savepoint if any
let prep = if self.save_points.is_some() {
self.save_point_prepare(&key, version, SaveOperation::Put).await?
} else {
None
};
// Set the key if empty // Set the key if empty
if let Some(ts) = version { if let Some(ts) = version {
self.inner.set_at_ts(&key, &val, ts)?; self.inner.set_at_ts(&key, &val, ts)?;
} else { } else {
// Does the key exists? match self.inner.get(&key)? {
let key_exists = if let Some(SavePrepare::NewKey(_, sv)) = &prep { None => self.inner.set(&key, &val)?,
sv.get_val().is_some() _ => return Err(Error::TxKeyAlreadyExists),
} else {
self.inner.get(&key)?.is_some()
};
// If the key exist we return an error
if key_exists {
return Err(Error::TxKeyAlreadyExists);
} }
// Set the key/value
self.inner.set(&key, &val)?;
}
// Confirm the save point
if let Some(prep) = prep {
self.save_points.save(prep);
} }
// Return result // Return result
Ok(()) Ok(())
@ -286,28 +252,12 @@ impl super::api::Transaction for Transaction {
let key = key.into(); let key = key.into();
let val = val.into(); let val = val.into();
let chk = chk.map(Into::into); let chk = chk.map(Into::into);
// Hydrate the savepoint if any
let prep = if self.save_points.is_some() {
self.save_point_prepare(&key, None, SaveOperation::Put).await?
} else {
None
};
// Does the key exists?
let current_val = if let Some(SavePrepare::NewKey(_, sv)) = &prep {
sv.get_val().cloned()
} else {
self.inner.get(&key)?
};
// Set the key if valid // Set the key if valid
match (current_val, chk) { match (self.inner.get(&key)?, chk) {
(Some(v), Some(w)) if v == w => self.inner.set(&key, &val)?, (Some(v), Some(w)) if v == w => self.inner.set(&key, &val)?,
(None, None) => self.inner.set(&key, &val)?, (None, None) => self.inner.set(&key, &val)?,
_ => return Err(Error::TxConditionNotMet), _ => return Err(Error::TxConditionNotMet),
}; };
// Confirm the save point
if let Some(prep) = prep {
self.save_points.save(prep);
}
// Return result // Return result
Ok(()) Ok(())
} }
@ -326,20 +276,8 @@ impl super::api::Transaction for Transaction {
if !self.write { if !self.write {
return Err(Error::TxReadonly); return Err(Error::TxReadonly);
} }
// Extract the key
let key = key.into();
// Hydrate the savepoint if any
let prep = if self.save_points.is_some() {
self.save_point_prepare(&key, None, SaveOperation::Del).await?
} else {
None
};
// Remove the key // Remove the key
self.inner.delete(&key)?; self.inner.delete(&key.into())?;
// Confirm the save point
if let Some(prep) = prep {
self.save_points.save(prep);
}
// Return result // Return result
Ok(()) Ok(())
} }
@ -362,28 +300,12 @@ impl super::api::Transaction for Transaction {
// Get the arguments // Get the arguments
let key = key.into(); let key = key.into();
let chk = chk.map(Into::into); let chk = chk.map(Into::into);
// Hydrate the savepoint if any
let prep = if self.save_points.is_some() {
self.save_point_prepare(&key, None, SaveOperation::Del).await?
} else {
None
};
// Does the key exists?
let current_val = if let Some(SavePrepare::NewKey(_, sv)) = &prep {
sv.get_val().cloned()
} else {
self.inner.get(&key)?
};
// Delete the key if valid // Delete the key if valid
match (current_val, chk) { match (self.inner.get(&key)?, chk) {
(Some(v), Some(w)) if v == w => self.inner.delete(&key)?, (Some(v), Some(w)) if v == w => self.inner.delete(&key)?,
(None, None) => self.inner.delete(&key)?, (None, None) => self.inner.delete(&key)?,
_ => return Err(Error::TxConditionNotMet), _ => return Err(Error::TxConditionNotMet),
}; };
// Confirm the save point
if let Some(prep) = prep {
self.save_points.save(prep);
}
// Return result // Return result
Ok(()) Ok(())
} }
@ -444,8 +366,19 @@ impl super::api::Transaction for Transaction {
} }
} }
impl SavePointImpl for Transaction { impl Transaction {
fn get_save_points(&mut self) -> &mut SavePoints { pub(crate) fn new_save_point(&mut self) {
&mut self.save_points // Set the save point, the errors are ignored.
let _ = self.inner.set_savepoint();
}
pub(crate) async fn rollback_to_save_point(&mut self) -> Result<(), Error> {
// Rollback
self.inner.rollback_to_savepoint()?;
Ok(())
}
pub(crate) fn release_last_save_point(&mut self) -> Result<(), Error> {
Ok(())
} }
} }

View file

@ -15,7 +15,6 @@ use crate::kvs::clock::SizedClock;
feature = "kv-tikv", feature = "kv-tikv",
feature = "kv-fdb", feature = "kv-fdb",
feature = "kv-indxdb", feature = "kv-indxdb",
feature = "kv-surrealkv",
feature = "kv-surrealcs", feature = "kv-surrealcs",
))] ))]
use crate::kvs::savepoint::SavePointImpl; use crate::kvs::savepoint::SavePointImpl;