Implements SavePoint(s) (#4707)

This commit is contained in:
Emmanuel Keller 2024-09-10 11:10:46 +01:00 committed by GitHub
parent 6735ea96d8
commit 30118f445c
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
14 changed files with 762 additions and 36 deletions

View file

@ -34,6 +34,13 @@ impl Document {
};
// Setup a new document
let mut doc = Document::new(pro.rid, pro.ir, ins.0, ins.1);
// Optionally create a save point so we can roll back any upcoming changes
let is_save_point = if !stm.is_select() {
ctx.tx().lock().await.new_save_point();
true
} else {
false
};
// Process the statement
let res = match stm {
Statement::Select(_) => doc.select(stk, ctx, opt, stm).await,
@ -51,6 +58,10 @@ impl Document {
// retry this request using a new ID, so
// we load the new record, and reprocess
Err(Error::RetryWithId(v)) => {
// We roll back any change following the save point
if is_save_point {
ctx.tx().lock().await.rollback_to_save_point().await?;
}
// Fetch the data from the store
let key = crate::key::thing::new(opt.ns()?, opt.db()?, &v.tb, &v.id);
let val = ctx.tx().get(key, None).await?;
@ -71,11 +82,24 @@ impl Document {
// Go to top of loop
continue;
}
Err(Error::Ignore) => Err(Error::Ignore),
// If any other error was received, then let's
// pass that error through and return an error
Err(e) => Err(e),
Err(e) => {
// We roll back any change following the save point
if is_save_point {
ctx.tx().lock().await.rollback_to_save_point().await?;
}
Err(e)
}
// Otherwise the record creation succeeded
Ok(v) => Ok(v),
Ok(v) => {
// The statement is successful, we can release the savepoint
if is_save_point {
ctx.tx().lock().await.release_last_save_point()?;
}
Ok(v)
}
};
// Send back the result
let _ = chn.send(res).await;
@ -85,7 +109,7 @@ impl Document {
// We shouldn't really reach this part, but if we
// did it was probably due to the fact that we
// encountered two Err::RetryWithId errors due to
// two separtate UNIQUE index definitions, and it
// two separate UNIQUE index definitions, and it
// wasn't possible to detect which record was the
// correct one to be updated
let _ = chn.send(Err(fail!("Internal error"))).await;

View file

@ -28,6 +28,13 @@ impl Document {
};
// Setup a new document
let mut doc = Document::new(pro.rid, pro.ir, ins.0, ins.1);
// Optionally create a save point so we can roll back any upcoming changes
let is_save_point = if !stm.is_select() {
ctx.tx().lock().await.new_save_point();
true
} else {
false
};
// Process the statement
let res = match stm {
Statement::Select(_) => doc.select(stk, ctx, opt, stm).await,
@ -45,6 +52,10 @@ impl Document {
// retry this request using a new ID, so
// we load the new record, and reprocess
Err(Error::RetryWithId(v)) => {
// We roll back any change following the save point
if is_save_point {
ctx.tx().lock().await.rollback_to_save_point().await?;
}
// Fetch the data from the store
let key = crate::key::thing::new(opt.ns()?, opt.db()?, &v.tb, &v.id);
let val = ctx.tx().get(key, None).await?;
@ -65,11 +76,24 @@ impl Document {
// Go to top of loop
continue;
}
Err(Error::Ignore) => Err(Error::Ignore),
// If any other error was received, then let's
// pass that error through and return an error
Err(e) => Err(e),
Err(e) => {
// We roll back any change following the save point
if is_save_point {
ctx.tx().lock().await.rollback_to_save_point().await?;
}
Err(e)
}
// Otherwise the record creation succeeded
Ok(v) => Ok(v),
Ok(v) => {
// The statement is successful, we can release the savepoint
if is_save_point {
ctx.tx().lock().await.release_last_save_point()?;
}
Ok(v)
}
};
// Send back the result
return res;
@ -77,7 +101,7 @@ impl Document {
// We shouldn't really reach this part, but if we
// did it was probably due to the fact that we
// encountered two Err::RetryWithId errors due to
// two separtate UNIQUE index definitions, and it
// two separate UNIQUE index definitions, and it
// wasn't possible to detect which record was the
// correct one to be updated
Err(fail!("Internal error"))

View file

@ -4,6 +4,7 @@ mod cnf;
use crate::err::Error;
use crate::key::debug::Sprintable;
use crate::kvs::savepoint::{SaveOperation, SavePointImpl, SavePoints};
use crate::kvs::Check;
use crate::kvs::Key;
use crate::kvs::Val;
@ -48,6 +49,8 @@ pub struct Transaction {
check: Check,
/// The underlying datastore transaction
inner: Option<Tx>,
/// The save point implementation
save_points: SavePoints,
}
impl Drop for Transaction {
@ -138,6 +141,7 @@ impl Datastore {
check,
write,
inner: Some(inner),
save_points: Default::default(),
}),
Err(e) => Err(Error::Tx(e.to_string())),
}
@ -238,7 +242,7 @@ impl super::api::Transaction for Transaction {
where
K: Into<Key> + Sprintable + Debug,
{
// FDB does not support verisoned queries.
// FDB does not support versioned queries.
if version.is_some() {
return Err(Error::UnsupportedVersionedQueries);
}
@ -266,7 +270,7 @@ impl super::api::Transaction for Transaction {
K: Into<Key> + Sprintable + Debug,
V: Into<Val> + Debug,
{
// FDB does not support verisoned queries.
// FDB does not support versioned queries.
if version.is_some() {
return Err(Error::UnsupportedVersionedQueries);
}
@ -279,8 +283,20 @@ impl super::api::Transaction for Transaction {
if !self.write {
return Err(Error::TxReadonly);
}
// Extract the key
let key = key.into();
// Prepare the savepoint if any
let prep = if self.save_points.is_some() {
self.save_point_prepare(&key, version, SaveOperation::Set).await?
} else {
None
};
// Set the key
self.inner.as_ref().unwrap().set(&key.into(), &val.into());
self.inner.as_ref().unwrap().set(&key, &val.into());
// Confirm the save point
if let Some(prep) = prep {
self.save_points.save(prep);
}
// Return result
Ok(())
}
@ -305,16 +321,26 @@ impl super::api::Transaction for Transaction {
if !self.write {
return Err(Error::TxReadonly);
}
// Get the transaction
let inner = self.inner.as_ref().unwrap();
// Get the arguments
let key = key.into();
let val = val.into();
// Hydrate the savepoint if any
let prep = if self.save_points.is_some() {
self.save_point_prepare(&key, version, SaveOperation::Put).await?
} else {
None
};
// Get the transaction
let inner = self.inner.as_ref().unwrap();
// Set the key if empty
match inner.get(&key, self.snapshot()).await? {
None => inner.set(&key, &val),
_ => return Err(Error::TxKeyAlreadyExists),
};
// Confirm the save point
if let Some(prep) = prep {
self.save_points.save(prep);
}
// Return result
Ok(())
}
@ -334,18 +360,28 @@ impl super::api::Transaction for Transaction {
if !self.write {
return Err(Error::TxReadonly);
}
// Get the transaction
let inner = self.inner.as_ref().unwrap();
// Get the arguments
let key = key.into();
let val = val.into();
let chk = chk.map(Into::into);
// Hydrate the savepoint if any
let prep = if self.save_points.is_some() {
self.save_point_prepare(&key, None, SaveOperation::Put).await?
} else {
None
};
// Get the transaction
let inner = self.inner.as_ref().unwrap();
// Set the key if valid
match (inner.get(&key, self.snapshot()).await?, chk) {
(Some(v), Some(w)) if *v.as_ref() == w => inner.set(&key, &val),
(None, None) => inner.set(&key, &val),
_ => return Err(Error::TxConditionNotMet),
};
// Confirm the save point
if let Some(prep) = prep {
self.save_points.save(prep);
}
// Return result
Ok(())
}
@ -364,8 +400,20 @@ impl super::api::Transaction for Transaction {
if !self.write {
return Err(Error::TxReadonly);
}
// Extract the key
let key = key.into();
// Hydrate the savepoint if any
let prep = if self.save_points.is_some() {
self.save_point_prepare(&key, None, SaveOperation::Del).await?
} else {
None
};
// Remove the key
self.inner.as_ref().unwrap().clear(&key.into());
self.inner.as_ref().unwrap().clear(&key);
// Confirm the save point
if let Some(prep) = prep {
self.save_points.save(prep);
}
// Return result
Ok(())
}
@ -385,17 +433,27 @@ impl super::api::Transaction for Transaction {
if !self.write {
return Err(Error::TxReadonly);
}
// Get the transaction
let inner = self.inner.as_ref().unwrap();
// Get the arguments
let key = key.into();
let chk = chk.map(Into::into);
// Hydrate the savepoint if any
let prep = if self.save_points.is_some() {
self.save_point_prepare(&key, None, SaveOperation::Del).await?
} else {
None
};
// Get the transaction
let inner = self.inner.as_ref().unwrap();
// Delete the key if valid
match (inner.get(&key, self.snapshot()).await?, chk) {
(Some(v), Some(w)) if *v.as_ref() == w => inner.clear(&key),
(None, None) => inner.clear(&key),
_ => return Err(Error::TxConditionNotMet),
};
// Confirm the save point
if let Some(prep) = prep {
self.save_points.save(prep);
}
// Return result
Ok(())
}
@ -414,6 +472,8 @@ impl super::api::Transaction for Transaction {
if !self.write {
return Err(Error::TxReadonly);
}
// TODO: Check if we need savepoint with ranges
// Delete the key range
self.inner.as_ref().unwrap().clear_range(&rng.start.into(), &rng.end.into());
// Return result
@ -559,3 +619,9 @@ impl super::api::Transaction for Transaction {
Ok(())
}
}
impl SavePointImpl for Transaction {
fn get_save_points(&mut self) -> &mut SavePoints {
&mut self.save_points
}
}

View file

@ -2,6 +2,7 @@
use crate::err::Error;
use crate::key::debug::Sprintable;
use crate::kvs::savepoint::{SavePointImpl, SavePoints};
use crate::kvs::Check;
use crate::kvs::Key;
use crate::kvs::Val;
@ -23,6 +24,8 @@ pub struct Transaction {
check: Check,
/// The underlying datastore transaction
inner: indxdb::Tx,
/// The save point implementation
save_points: SavePoints,
}
impl Drop for Transaction {
@ -79,6 +82,7 @@ impl Datastore {
check,
write,
inner,
save_points: Default::default(),
}),
Err(e) => Err(Error::Tx(e.to_string())),
}
@ -338,3 +342,9 @@ impl super::api::Transaction for Transaction {
Ok(res)
}
}
impl SavePointImpl for Transaction {
fn get_save_points(&mut self) -> &mut SavePoints {
&mut self.save_points
}
}

View file

@ -2,6 +2,7 @@
use crate::err::Error;
use crate::key::debug::Sprintable;
use crate::kvs::savepoint::{SaveOperation, SavePointImpl, SavePoints};
use crate::kvs::Check;
use crate::kvs::Key;
use crate::kvs::Val;
@ -23,6 +24,8 @@ pub struct Transaction {
check: Check,
/// The underlying datastore transaction
inner: echodb::Tx<Key, Val>,
/// The save point implementation
save_points: SavePoints,
}
impl Drop for Transaction {
@ -76,6 +79,7 @@ impl Datastore {
check,
write,
inner,
save_points: Default::default(),
}),
Err(e) => Err(Error::Tx(e.to_string())),
}
@ -176,7 +180,7 @@ impl super::api::Transaction for Transaction {
K: Into<Key> + Sprintable + Debug,
V: Into<Val> + Debug,
{
// MemDB does not support verisoned queries.
// MemDB does not support versioned queries.
if version.is_some() {
return Err(Error::UnsupportedVersionedQueries);
}
@ -189,8 +193,20 @@ impl super::api::Transaction for Transaction {
if !self.write {
return Err(Error::TxReadonly);
}
// Extract the key
let key = key.into();
// Prepare the savepoint if any
let prep = if self.save_points.is_some() {
self.save_point_prepare(&key, version, SaveOperation::Set).await?
} else {
None
};
// Set the key
self.inner.set(key.into(), val.into())?;
self.inner.set(key, val.into())?;
// Confirm the save point
if let Some(prep) = prep {
self.save_points.save(prep);
}
// Return result
Ok(())
}
@ -215,8 +231,20 @@ impl super::api::Transaction for Transaction {
if !self.write {
return Err(Error::TxReadonly);
}
// Extract the key
let key = key.into();
// Hydrate the savepoint if any
let prep = if self.save_points.is_some() {
self.save_point_prepare(&key, version, SaveOperation::Put).await?
} else {
None
};
// Set the key
self.inner.put(key.into(), val.into())?;
self.inner.put(key, val.into())?;
// Confirm the save point
if let Some(prep) = prep {
self.save_points.save(prep);
}
// Return result
Ok(())
}
@ -236,8 +264,20 @@ impl super::api::Transaction for Transaction {
if !self.write {
return Err(Error::TxReadonly);
}
// Extract the key
let key = key.into();
// Hydrate the savepoint if any
let prep = if self.save_points.is_some() {
self.save_point_prepare(&key, None, SaveOperation::Put).await?
} else {
None
};
// Set the key
self.inner.putc(key.into(), val.into(), chk.map(Into::into))?;
self.inner.putc(key, val.into(), chk.map(Into::into))?;
// Confirm the save point
if let Some(prep) = prep {
self.save_points.save(prep);
}
// Return result
Ok(())
}
@ -256,8 +296,20 @@ impl super::api::Transaction for Transaction {
if !self.write {
return Err(Error::TxReadonly);
}
// Extract the key
let key = key.into();
// Hydrate the savepoint if any
let prep = if self.save_points.is_some() {
self.save_point_prepare(&key, None, SaveOperation::Del).await?
} else {
None
};
// Remove the key
self.inner.del(key.into())?;
self.inner.del(key)?;
// Confirm the save point
if let Some(prep) = prep {
self.save_points.save(prep);
}
// Return result
Ok(())
}
@ -277,8 +329,20 @@ impl super::api::Transaction for Transaction {
if !self.write {
return Err(Error::TxReadonly);
}
// Extract the key
let key = key.into();
// Hydrate the savepoint if any
let prep = if self.save_points.is_some() {
self.save_point_prepare(&key, None, SaveOperation::Del).await?
} else {
None
};
// Remove the key
self.inner.delc(key.into(), chk.map(Into::into))?;
self.inner.delc(key, chk.map(Into::into))?;
// Confirm the save point
if let Some(prep) = prep {
self.save_points.save(prep);
}
// Return result
Ok(())
}
@ -315,7 +379,7 @@ impl super::api::Transaction for Transaction {
where
K: Into<Key> + Sprintable + Debug,
{
// MemDB does not support verisoned queries.
// MemDB does not support versioned queries.
if version.is_some() {
return Err(Error::UnsupportedVersionedQueries);
}
@ -335,3 +399,9 @@ impl super::api::Transaction for Transaction {
Ok(res)
}
}
impl SavePointImpl for Transaction {
fn get_save_points(&mut self) -> &mut SavePoints {
&mut self.save_points
}
}

View file

@ -37,6 +37,15 @@ mod tikv;
#[cfg(not(target_arch = "wasm32"))]
mod index;
#[cfg(any(
feature = "kv-mem",
feature = "kv-tikv",
feature = "kv-fdb",
feature = "kv-indxdb",
feature = "kv-surrealkv",
feature = "kv-surrealcs",
))]
mod savepoint;
#[cfg(test)]
mod tests;

View file

@ -260,7 +260,7 @@ impl super::api::Transaction for Transaction {
K: Into<Key> + Sprintable + Debug,
V: Into<Val> + Debug,
{
// RocksDB does not support verisoned queries.
// RocksDB does not support versioned queries.
if version.is_some() {
return Err(Error::UnsupportedVersionedQueries);
}
@ -502,3 +502,25 @@ impl super::api::Transaction for Transaction {
Ok(res)
}
}
impl Transaction {
pub(crate) fn new_save_point(&mut self) {
// Get the transaction
let inner = self.inner.as_ref().unwrap();
// Set the save point
inner.set_savepoint();
}
pub(crate) async fn rollback_to_save_point(&mut self) -> Result<(), Error> {
// Get the transaction
let inner = self.inner.as_ref().unwrap();
// Rollback
inner.rollback_to_savepoint()?;
//
Ok(())
}
pub(crate) fn release_last_save_point(&mut self) -> Result<(), Error> {
Ok(())
}
}

147
core/src/kvs/savepoint.rs Normal file
View file

@ -0,0 +1,147 @@
use crate::err::Error;
use crate::kvs::api::Transaction;
use crate::kvs::{Key, Val};
use std::collections::{HashMap, VecDeque};
type SavePoint = HashMap<Key, SavedValue>;
#[derive(Debug)]
pub(super) enum SaveOperation {
Set,
Put,
Del,
}
pub(super) struct SavedValue {
saved_val: Option<Val>,
saved_version: Option<u64>,
last_operation: SaveOperation,
}
impl SavedValue {
pub(super) fn new(val: Option<Val>, version: Option<u64>, op: SaveOperation) -> Self {
Self {
saved_val: val,
saved_version: version,
last_operation: op,
}
}
}
pub(super) enum SavePrepare {
AlreadyPresent(Key, SaveOperation),
NewKey(Key, SavedValue),
}
#[derive(Default)]
pub(crate) struct SavePoints {
stack: VecDeque<SavePoint>,
current: Option<SavePoint>,
}
impl SavePoints {
pub(super) fn new_save_point(&mut self) {
if let Some(c) = self.current.take() {
self.stack.push_back(c);
}
self.current = Some(SavePoint::default());
}
pub(super) fn is_some(&self) -> bool {
self.current.is_some()
}
pub(super) fn pop(&mut self) -> Result<SavePoint, Error> {
if let Some(c) = self.current.take() {
self.current = self.stack.pop_back();
Ok(c)
} else {
Err(fail!("No current SavePoint"))
}
}
pub(super) fn is_saved_key(&self, key: &Key) -> Option<bool> {
self.current.as_ref().map(|current| current.contains_key(key))
}
pub(super) fn save(&mut self, prep: SavePrepare) {
if let Some(current) = &mut self.current {
match prep {
SavePrepare::AlreadyPresent(key, op) => {
if let Some(sv) = current.get_mut(&key) {
// We keep the last operation executed in the transaction so we can do the appropriate rollback action (SET or PUT)
sv.last_operation = op;
}
}
SavePrepare::NewKey(key, sv) => {
current.insert(key, sv);
}
}
}
}
pub(super) async fn rollback<T>(sp: SavePoint, tx: &mut T) -> Result<(), Error>
where
T: Transaction,
{
for (key, saved_value) in sp {
match saved_value.last_operation {
SaveOperation::Set | SaveOperation::Put => {
if let Some(initial_value) = saved_value.saved_val {
// If the last operation was a SET or PUT
// then we just have set back the key to its initial value
tx.set(key, initial_value, saved_value.saved_version).await?;
} else {
// If the last operation on this key was not a DEL operation,
// then we have to delete the key
tx.del(key).await?;
}
}
SaveOperation::Del => {
if let Some(initial_value) = saved_value.saved_val {
// If the last operation was a DEL,
// then we have to put back the initial value
tx.put(key, initial_value, saved_value.saved_version).await?;
}
}
}
}
Ok(())
}
}
pub(super) trait SavePointImpl: Transaction + Sized {
fn get_save_points(&mut self) -> &mut SavePoints;
fn new_save_point(&mut self) {
self.get_save_points().new_save_point()
}
async fn rollback_to_save_point(&mut self) -> Result<(), Error> {
let sp = self.get_save_points().pop()?;
SavePoints::rollback(sp, self).await
}
fn release_last_save_point(&mut self) -> Result<(), Error> {
self.get_save_points().pop()?;
Ok(())
}
async fn save_point_prepare(
&mut self,
key: &Key,
version: Option<u64>,
op: SaveOperation,
) -> Result<Option<SavePrepare>, Error> {
let is_saved_key = self.get_save_points().is_saved_key(key);
let r = match is_saved_key {
None => None,
Some(true) => Some(SavePrepare::AlreadyPresent(key.clone(), op)),
Some(false) => {
let val = self.get(key.clone(), version).await?;
Some(SavePrepare::NewKey(key.clone(), SavedValue::new(val, version, op)))
}
};
Ok(r)
}
}

View file

@ -2,6 +2,7 @@
use crate::err::Error;
use crate::key::debug::Sprintable;
use crate::kvs::savepoint::{SaveOperation, SavePointImpl, SavePoints};
use crate::kvs::Check;
use crate::kvs::Key;
use crate::kvs::Val;
@ -34,6 +35,8 @@ pub struct Transaction {
started: bool,
/// The underlying datastore transaction
inner: Arc<Mutex<SurrealCSTransaction<AnyState>>>,
/// The save point implementation
save_points: SavePoints,
}
impl Drop for Transaction {
@ -90,6 +93,7 @@ impl Datastore {
write,
started: false,
inner: Arc::new(Mutex::new(transaction)),
save_points: Default::default(),
})
}
}
@ -234,12 +238,24 @@ impl super::api::Transaction for Transaction {
if !self.write {
return Err(Error::TxReadonly);
}
// Extract the key
let key = key.into();
// Prepare the savepoint if any
let prep = if self.save_points.is_some() {
self.save_point_prepare(&key, version, SaveOperation::Set).await?
} else {
None
};
// Set the key
let message = ServerTransactionMessage::Set(MessageSet {
key: key.into(),
key,
value: val.into(),
});
self.send_message(message).await?;
// Confirm the save point
if let Some(prep) = prep {
self.save_points.save(prep);
}
// Return result
Ok(())
}
@ -259,12 +275,24 @@ impl super::api::Transaction for Transaction {
if !self.write {
return Err(Error::TxReadonly);
}
// Extract the key
let key = key.into();
// Hydrate the savepoint if any
let prep = if self.save_points.is_some() {
self.save_point_prepare(&key, version, SaveOperation::Put).await?
} else {
None
};
// Put the key
let message = ServerTransactionMessage::Put(MessagePut {
key: key.into(),
key,
value: val.into(),
});
self.send_message(message).await?;
// Confirm the save point
if let Some(prep) = prep {
self.save_points.save(prep);
}
// Return result
Ok(())
}
@ -286,13 +314,25 @@ impl super::api::Transaction for Transaction {
}
// Get the arguments
let chk = chk.map(Into::into);
// Extract the key
let key = key.into();
// Hydrate the savepoint if any
let prep = if self.save_points.is_some() {
self.save_point_prepare(&key, None, SaveOperation::Put).await?
} else {
None
};
// Set the key if valid
let message = ServerTransactionMessage::Putc(MessagePutc {
key: key.into(),
key,
value: val.into(),
expected_value: chk,
});
self.send_message(message).await?;
// Confirm the save point
if let Some(prep) = prep {
self.save_points.save(prep);
}
// Return result
Ok(())
}
@ -311,11 +351,23 @@ impl super::api::Transaction for Transaction {
if !self.write {
return Err(Error::TxReadonly);
}
// Extract the key
let key = key.into();
// Hydrate the savepoint if any
let prep = if self.save_points.is_some() {
self.save_point_prepare(&key, None, SaveOperation::Del).await?
} else {
None
};
// Remove the key
let message = ServerTransactionMessage::Del(MessageDel {
key: key.into(),
key,
});
self.send_message(message).await?;
// Confirm the save point
if let Some(prep) = prep {
self.save_points.save(prep);
}
// Return result
Ok(())
}
@ -337,12 +389,24 @@ impl super::api::Transaction for Transaction {
}
// Get the arguments
let chk = chk.map(Into::into);
// Extract the key
let key = key.into();
// Hydrate the savepoint if any
let prep = if self.save_points.is_some() {
self.save_point_prepare(&key, None, SaveOperation::Del).await?
} else {
None
};
// Delete the key if valid
let message = ServerTransactionMessage::Delc(MessageDelc {
key: key.into(),
key,
expected_value: chk,
});
self.send_message(message).await?;
// Confirm the save point
if let Some(prep) = prep {
self.save_points.save(prep);
}
// Return result
Ok(())
}
@ -387,6 +451,7 @@ impl super::api::Transaction for Transaction {
finish: rng.end.into(),
limit,
});
// TODO: Check if save point needs to be implemented here
let response = match self.send_message(message).await? {
ServerTransactionMessage::ResponseKeys(v) => v,
_ => return Err(Error::Tx("Received an invalid response".to_string())),
@ -424,3 +489,9 @@ impl super::api::Transaction for Transaction {
Ok(response.values)
}
}
impl SavePointImpl for Transaction {
fn get_save_points(&mut self) -> &mut SavePoints {
&mut self.save_points
}
}

View file

@ -2,6 +2,7 @@
use crate::err::Error;
use crate::key::debug::Sprintable;
use crate::kvs::savepoint::{SaveOperation, SavePointImpl, SavePoints};
use crate::kvs::Check;
use crate::kvs::Key;
use crate::kvs::Val;
@ -26,6 +27,8 @@ pub struct Transaction {
check: Check,
/// The underlying datastore transaction
inner: Tx,
/// The save point implementation
save_points: SavePoints,
}
impl Drop for Transaction {
@ -85,6 +88,7 @@ impl Datastore {
check,
write,
inner,
save_points: Default::default(),
}),
Err(e) => Err(Error::Tx(e.to_string())),
}
@ -193,10 +197,22 @@ impl super::api::Transaction for Transaction {
if !self.write {
return Err(Error::TxReadonly);
}
// Extract the key
let key = key.into();
// Prepare the savepoint if any
let prep = if self.save_points.is_some() {
self.save_point_prepare(&key, version, SaveOperation::Set).await?
} else {
None
};
// Set the key
match version {
Some(ts) => self.inner.set_at_ts(&key.into(), &val.into(), ts)?,
None => self.inner.set(&key.into(), &val.into())?,
Some(ts) => self.inner.set_at_ts(&key, &val.into(), ts)?,
None => self.inner.set(&key, &val.into())?,
}
// Confirm the save point
if let Some(prep) = prep {
self.save_points.save(prep);
}
// Return result
Ok(())
@ -220,6 +236,12 @@ impl super::api::Transaction for Transaction {
// Get the arguments
let key = key.into();
let val = val.into();
// Hydrate the savepoint if any
let prep = if self.save_points.is_some() {
self.save_point_prepare(&key, version, SaveOperation::Put).await?
} else {
None
};
// Set the key if empty
if let Some(ts) = version {
self.inner.set_at_ts(&key, &val, ts)?;
@ -229,6 +251,10 @@ impl super::api::Transaction for Transaction {
_ => return Err(Error::TxKeyAlreadyExists),
};
}
// Confirm the save point
if let Some(prep) = prep {
self.save_points.save(prep);
}
// Return result
Ok(())
}
@ -252,12 +278,22 @@ impl super::api::Transaction for Transaction {
let key = key.into();
let val = val.into();
let chk = chk.map(Into::into);
// Hydrate the savepoint if any
let prep = if self.save_points.is_some() {
self.save_point_prepare(&key, None, SaveOperation::Put).await?
} else {
None
};
// Set the key if valid
match (self.inner.get(&key)?, chk) {
(Some(v), Some(w)) if v == w => self.inner.set(&key, &val)?,
(None, None) => self.inner.set(&key, &val)?,
_ => return Err(Error::TxConditionNotMet),
};
// Confirm the save point
if let Some(prep) = prep {
self.save_points.save(prep);
}
// Return result
Ok(())
}
@ -276,8 +312,20 @@ impl super::api::Transaction for Transaction {
if !self.write {
return Err(Error::TxReadonly);
}
// Extract the key
let key = key.into();
// Hydrate the savepoint if any
let prep = if self.save_points.is_some() {
self.save_point_prepare(&key, None, SaveOperation::Del).await?
} else {
None
};
// Remove the key
self.inner.delete(&key.into())?;
self.inner.delete(&key)?;
// Confirm the save point
if let Some(prep) = prep {
self.save_points.save(prep);
}
// Return result
Ok(())
}
@ -300,12 +348,22 @@ impl super::api::Transaction for Transaction {
// Get the arguments
let key = key.into();
let chk = chk.map(Into::into);
// Hydrate the savepoint if any
let prep = if self.save_points.is_some() {
self.save_point_prepare(&key, None, SaveOperation::Del).await?
} else {
None
};
// Delete the key if valid
match (self.inner.get(&key)?, chk) {
(Some(v), Some(w)) if v == w => self.inner.delete(&key)?,
(None, None) => self.inner.delete(&key)?,
_ => return Err(Error::TxConditionNotMet),
};
// Confirm the save point
if let Some(prep) = prep {
self.save_points.save(prep);
}
// Return result
Ok(())
}
@ -365,3 +423,9 @@ impl super::api::Transaction for Transaction {
Ok(res)
}
}
impl SavePointImpl for Transaction {
fn get_save_points(&mut self) -> &mut SavePoints {
&mut self.save_points
}
}

View file

@ -2,6 +2,7 @@
use crate::err::Error;
use crate::key::debug::Sprintable;
use crate::kvs::savepoint::{SaveOperation, SavePointImpl, SavePoints};
use crate::kvs::Check;
use crate::kvs::Key;
use crate::kvs::Val;
@ -34,6 +35,8 @@ pub struct Transaction {
// the memory is kept alive. This pointer must
// be declared last, so that it is dropped last.
db: Pin<Arc<tikv::TransactionClient>>,
/// The save point implementation
save_points: SavePoints,
}
impl Drop for Transaction {
@ -107,6 +110,7 @@ impl Datastore {
write,
inner,
db: self.db.clone(),
save_points: Default::default(),
}),
Err(e) => Err(Error::Tx(e.to_string())),
}
@ -192,7 +196,7 @@ impl super::api::Transaction for Transaction {
where
K: Into<Key> + Sprintable + Debug,
{
// TiKV does not support verisoned queries.
// TiKV does not support versioned queries.
if version.is_some() {
return Err(Error::UnsupportedVersionedQueries);
}
@ -214,7 +218,7 @@ impl super::api::Transaction for Transaction {
K: Into<Key> + Sprintable + Debug,
V: Into<Val> + Debug,
{
// TiKV does not support verisoned queries.
// TiKV does not support versioned queries.
if version.is_some() {
return Err(Error::UnsupportedVersionedQueries);
}
@ -227,8 +231,20 @@ impl super::api::Transaction for Transaction {
if !self.write {
return Err(Error::TxReadonly);
}
// Extract the key
let key = key.into();
// Prepare the savepoint if any
let prep = if self.save_points.is_some() {
self.save_point_prepare(&key, version, SaveOperation::Set).await?
} else {
None
};
// Set the key
self.inner.put(key.into(), val.into()).await?;
self.inner.put(key, val.into()).await?;
// Confirm the save point
if let Some(prep) = prep {
self.save_points.save(prep);
}
// Return result
Ok(())
}
@ -257,11 +273,21 @@ impl super::api::Transaction for Transaction {
let key = key.into();
// Get the val
let val = val.into();
// Hydrate the savepoint if any
let prep = if self.save_points.is_some() {
self.save_point_prepare(&key, version, SaveOperation::Put).await?
} else {
None
};
// Set the key if empty
match self.inner.key_exists(key.clone()).await? {
false => self.inner.put(key, val).await?,
_ => return Err(Error::TxKeyAlreadyExists),
};
// Confirm the save point
if let Some(prep) = prep {
self.save_points.save(prep);
}
// Return result
Ok(())
}
@ -287,12 +313,22 @@ impl super::api::Transaction for Transaction {
let val = val.into();
// Get the check
let chk = chk.map(Into::into);
// Hydrate the savepoint if any
let prep = if self.save_points.is_some() {
self.save_point_prepare(&key, None, SaveOperation::Put).await?
} else {
None
};
// Delete the key
match (self.inner.get(key.clone()).await?, chk) {
(Some(v), Some(w)) if v == w => self.inner.put(key, val).await?,
(None, None) => self.inner.put(key, val).await?,
_ => return Err(Error::TxConditionNotMet),
};
// Confirm the save point
if let Some(prep) = prep {
self.save_points.save(prep);
}
// Return result
Ok(())
}
@ -311,8 +347,20 @@ impl super::api::Transaction for Transaction {
if !self.write {
return Err(Error::TxReadonly);
}
// Extract the key
let key = key.into();
// Hydrate the savepoint if any
let prep = if self.save_points.is_some() {
self.save_point_prepare(&key, None, SaveOperation::Del).await?
} else {
None
};
// Delete the key
self.inner.delete(key.into()).await?;
self.inner.delete(key).await?;
// Confirm the save point
if let Some(prep) = prep {
self.save_points.save(prep);
}
// Return result
Ok(())
}
@ -336,12 +384,22 @@ impl super::api::Transaction for Transaction {
let key = key.into();
// Get the check
let chk = chk.map(Into::into);
// Hydrate the savepoint if any
let prep = if self.save_points.is_some() {
self.save_point_prepare(&key, None, SaveOperation::Del).await?
} else {
None
};
// Delete the key
match (self.inner.get(key.clone()).await?, chk) {
(Some(v), Some(w)) if v == w => self.inner.delete(key).await?,
(None, None) => self.inner.delete(key).await?,
_ => return Err(Error::TxConditionNotMet),
};
// Confirm the save point
if let Some(prep) = prep {
self.save_points.save(prep);
}
// Return result
Ok(())
}
@ -360,6 +418,8 @@ impl super::api::Transaction for Transaction {
if !self.write {
return Err(Error::TxReadonly);
}
// TODO: Check if we need savepoint with ranges
// Delete the key range
self.db.unsafe_destroy_range(rng.start.into()..rng.end.into()).await?;
// Return result
@ -451,3 +511,9 @@ impl super::api::Transaction for Transaction {
Ok(verbytes)
}
}
impl SavePointImpl for Transaction {
fn get_save_points(&mut self) -> &mut SavePoints {
&mut self.save_points
}
}

View file

@ -11,6 +11,15 @@ use crate::idg::u32::U32;
use crate::key::debug::Sprintable;
use crate::kvs::batch::Batch;
use crate::kvs::clock::SizedClock;
#[cfg(any(
feature = "kv-mem",
feature = "kv-tikv",
feature = "kv-fdb",
feature = "kv-indxdb",
feature = "kv-surrealkv",
feature = "kv-surrealcs",
))]
use crate::kvs::savepoint::SavePointImpl;
use crate::kvs::stash::Stash;
use crate::sql;
use crate::sql::thing::Thing;
@ -633,4 +642,16 @@ impl Transactor {
}
Ok(None)
}
pub(crate) fn new_save_point(&mut self) {
expand_inner!(&mut self.inner, v => { v.new_save_point() })
}
pub(crate) async fn rollback_to_save_point(&mut self) -> Result<(), Error> {
expand_inner!(&mut self.inner, v => { v.rollback_to_save_point().await })
}
pub(crate) fn release_last_save_point(&mut self) -> Result<(), Error> {
expand_inner!(&mut self.inner, v => { v.release_last_save_point() })
}
}

View file

@ -593,6 +593,97 @@ async fn insert_relation_table() {
let _: Vec<ApiRecordId> = db.insert("likes").relation(vals).await.unwrap();
}
#[tokio::test]
async fn insert_with_savepoint() -> Result<(), surrealdb_core::err::Error> {
let (permit, db) = new_db().await;
db.use_ns(NS).use_db(Ulid::new().to_string()).await.unwrap();
drop(permit);
let sqls = vec![
("DEFINE INDEX a ON pokemon FIELDS a UNIQUE", "None"),
("DEFINE INDEX b ON pokemon FIELDS b UNIQUE", "None"),
(
"INSERT INTO pokemon (id, b) VALUES (1, 'b')",
"[
{
b: 'b',
id: pokemon:1
}
]"
),
(
"INSERT INTO pokemon (id, a, b) VALUES (2, 'a', 'b')",
"[
{
b: 'b',
id: pokemon:1
}
]"
),
(
"INSERT INTO pokemon (id, a, b) VALUES (2, 'a', 'b')",
"[
{
b: 'b',
id: pokemon:1
}
]"
),
(
"INSERT INTO pokemon (id, a, b) VALUES (2, 'a', 'b') PARALLEL",
"[
{
b: 'b',
id: pokemon:1
}
]"
),
(
"INSERT INTO pokemon (id, a, b) VALUES (2, 'a', 'b') PARALLEL",
"[
{
b: 'b',
id: pokemon:1
}
]"
),
(
"INSERT INTO pokemon (id, a, b) VALUES (2, 'a', 'b') ON DUPLICATE KEY UPDATE something = 'else'",
"[
{
b: 'b',
id: pokemon:1,
something: 'else'
}
]"
),
(
"SELECT * FROM pokemon;",
"[
{
b: 'b',
id: pokemon:1,
something: 'else'
}
]"
)
];
let check_fetch = |mut response: Response, expected: &str| {
let val: Value = response.take(0).unwrap();
let exp: Value = expected.parse().unwrap();
assert_eq!(format!("{val:#}"), format!("{exp:#}"));
};
for (sql, expected) in sqls {
let res = db.query(sql).await.unwrap().check().unwrap();
check_fetch(res, expected);
}
Ok(())
}
#[test_log::test(tokio::test)]
async fn select_table() {
let (permit, db) = new_db().await;

View file

@ -1,6 +1,7 @@
mod parse;
use parse::Parse;
mod helpers;
use crate::helpers::Test;
use helpers::new_ds;
use surrealdb::dbs::Session;
use surrealdb::err::Error;
@ -145,6 +146,46 @@ async fn insert_statement_on_duplicate_key() -> Result<(), Error> {
Ok(())
}
#[tokio::test]
async fn insert_with_savepoint() -> Result<(), Error> {
let sql = "
DEFINE INDEX a ON pokemon FIELDS a UNIQUE;
DEFINE INDEX b ON pokemon FIELDS b UNIQUE;
INSERT INTO pokemon (id, b) VALUES (1, 'b');
INSERT INTO pokemon (id, a, b) VALUES (2, 'a', 'b');
INSERT INTO pokemon (id, a, b) VALUES (2, 'a', 'b');
INSERT INTO pokemon (id, a, b) VALUES (2, 'a', 'b') PARALLEL;
INSERT INTO pokemon (id, a, b) VALUES (2, 'a', 'b') PARALLEL;
INSERT INTO pokemon (id, a, b) VALUES (2, 'a', 'b') ON DUPLICATE KEY UPDATE something = 'else';
SELECT * FROM pokemon;
";
let mut t = Test::new(sql).await?;
t.expect_size(9)?;
t.skip_ok(2)?;
for _ in 0..5 {
t.expect_val(
"[
{
b: 'b',
id: pokemon:1,
}
]",
)?;
}
for _ in 0..2 {
t.expect_val(
"[
{
b: 'b',
id: pokemon:1,
something: 'else'
}
]",
)?;
}
Ok(())
}
#[tokio::test]
async fn insert_statement_output() -> Result<(), Error> {
let sql = "