Ensure failures are caught and transactions rolledback (#4621)
Co-authored-by: Emmanuel Keller <emmanuel.keller@surrealdb.com>
This commit is contained in:
parent
e5bf40ae01
commit
6fbf84782f
3 changed files with 27 additions and 27 deletions
|
@ -527,7 +527,7 @@ impl Datastore {
|
|||
// Create the key where the version is stored
|
||||
let key = crate::key::version::new();
|
||||
// Check if a version is already set in storage
|
||||
let val = match catch!(txn, txn.get(key.clone(), None)) {
|
||||
let val = match catch!(txn, txn.get(key.clone(), None).await) {
|
||||
// There is a version set in the storage
|
||||
Some(v) => {
|
||||
// Attempt to decode the current stored version
|
||||
|
@ -554,7 +554,7 @@ impl Datastore {
|
|||
None => {
|
||||
// Fetch any keys immediately following the version key
|
||||
let rng = crate::key::version::proceeding();
|
||||
let keys = catch!(txn, txn.keys(rng, 1));
|
||||
let keys = catch!(txn, txn.keys(rng, 1).await);
|
||||
// Check the storage if there are any other keys set
|
||||
let val = if keys.is_empty() {
|
||||
// There are no keys set in storage, so this is a new database
|
||||
|
@ -566,9 +566,9 @@ impl Datastore {
|
|||
// Convert the version to binary
|
||||
let bytes: Vec<u8> = val.into();
|
||||
// Attempt to set the current version in storage
|
||||
catch!(txn, txn.set(key, bytes, None));
|
||||
catch!(txn, txn.set(key, bytes, None).await);
|
||||
// We set the version, so commit the transaction
|
||||
catch!(txn, txn.commit());
|
||||
catch!(txn, txn.commit().await);
|
||||
// Return the current version
|
||||
val
|
||||
}
|
||||
|
@ -583,7 +583,7 @@ impl Datastore {
|
|||
// Start a new writeable transaction
|
||||
let txn = self.transaction(Write, Optimistic).await?.enclose();
|
||||
// Fetch the root users from the storage
|
||||
let users = catch!(txn, txn.all_root_users());
|
||||
let users = catch!(txn, txn.all_root_users().await);
|
||||
// Process credentials, depending on existing users
|
||||
if users.is_empty() {
|
||||
// Display information in the logs
|
||||
|
@ -594,7 +594,7 @@ impl Datastore {
|
|||
let mut ctx = MutableContext::default();
|
||||
ctx.set_transaction(txn.clone());
|
||||
let ctx = ctx.freeze();
|
||||
catch!(txn, stm.compute(&ctx, &opt, None));
|
||||
catch!(txn, stm.compute(&ctx, &opt, None).await);
|
||||
// We added a user, so commit the transaction
|
||||
txn.commit().await
|
||||
} else {
|
||||
|
|
|
@ -87,7 +87,7 @@ impl Datastore {
|
|||
// Open transaction and fetch nodes
|
||||
let txn = self.transaction(Write, Optimistic).await?;
|
||||
let now = self.clock_now().await;
|
||||
let nds = catch!(txn, txn.all_nodes());
|
||||
let nds = catch!(txn, txn.all_nodes().await);
|
||||
for nd in nds.iter() {
|
||||
// Check that the node is active
|
||||
if nd.is_active() {
|
||||
|
@ -100,7 +100,7 @@ impl Datastore {
|
|||
// Get the key for the node entry
|
||||
let key = crate::key::root::nd::new(nd.id);
|
||||
// Update the node entry
|
||||
catch!(txn, txn.set(key, val, None));
|
||||
catch!(txn, txn.set(key, val, None).await);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -122,7 +122,7 @@ impl Datastore {
|
|||
// Fetch all of the expired nodes
|
||||
let expired = {
|
||||
let txn = self.transaction(Read, Optimistic).await?;
|
||||
let nds = catch!(txn, txn.all_nodes());
|
||||
let nds = catch!(txn, txn.all_nodes().await);
|
||||
// Filter the archived nodes
|
||||
nds.iter().filter_map(Node::archived).collect::<Vec<_>>()
|
||||
};
|
||||
|
@ -137,21 +137,21 @@ impl Datastore {
|
|||
let end = crate::key::node::lq::suffix(*id);
|
||||
let mut next = Some(beg..end);
|
||||
while let Some(rng) = next {
|
||||
let res = catch!(txn, txn.batch(rng, *NORMAL_FETCH_SIZE, true));
|
||||
let res = catch!(txn, txn.batch(rng, *NORMAL_FETCH_SIZE, true).await);
|
||||
next = res.next;
|
||||
for (k, v) in res.values.iter() {
|
||||
// Decode the data for this live query
|
||||
let val: Live = v.into();
|
||||
// Get the key for this node live query
|
||||
let nlq = crate::key::node::lq::Lq::decode(k)?;
|
||||
let nlq = catch!(txn, crate::key::node::lq::Lq::decode(k));
|
||||
// Check that the node for this query is archived
|
||||
if expired.contains(&nlq.nd) {
|
||||
// Get the key for this table live query
|
||||
let tlq = crate::key::table::lq::new(&val.ns, &val.db, &val.tb, nlq.lq);
|
||||
// Delete the table live query
|
||||
catch!(txn, txn.del(tlq));
|
||||
catch!(txn, txn.del(tlq).await);
|
||||
// Delete the node live query
|
||||
catch!(txn, txn.del(nlq));
|
||||
catch!(txn, txn.del(nlq).await);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -169,7 +169,7 @@ impl Datastore {
|
|||
// Get the key for the node entry
|
||||
let key = crate::key::root::nd::new(*id);
|
||||
// Delete the cluster node entry
|
||||
catch!(txn, txn.del(key));
|
||||
catch!(txn, txn.del(key).await);
|
||||
}
|
||||
// Commit the changes
|
||||
txn.commit().await?;
|
||||
|
@ -195,14 +195,14 @@ impl Datastore {
|
|||
// Fetch expired nodes
|
||||
let expired = {
|
||||
let txn = self.transaction(Read, Optimistic).await?;
|
||||
let nds = catch!(txn, txn.all_nodes());
|
||||
let nds = catch!(txn, txn.all_nodes().await);
|
||||
// Filter the archived nodes
|
||||
nds.iter().filter_map(Node::archived).collect::<Vec<_>>()
|
||||
};
|
||||
// Fetch all namespaces
|
||||
let nss = {
|
||||
let txn = self.transaction(Read, Optimistic).await?;
|
||||
catch!(txn, txn.all_ns())
|
||||
catch!(txn, txn.all_ns().await)
|
||||
};
|
||||
// Loop over all namespaces
|
||||
for ns in nss.iter() {
|
||||
|
@ -211,7 +211,7 @@ impl Datastore {
|
|||
// Fetch all databases
|
||||
let dbs = {
|
||||
let txn = self.transaction(Read, Optimistic).await?;
|
||||
catch!(txn, txn.all_db(&ns.name))
|
||||
catch!(txn, txn.all_db(&ns.name).await)
|
||||
};
|
||||
// Loop over all databases
|
||||
for db in dbs.iter() {
|
||||
|
@ -220,7 +220,7 @@ impl Datastore {
|
|||
// Fetch all tables
|
||||
let tbs = {
|
||||
let txn = self.transaction(Read, Optimistic).await?;
|
||||
catch!(txn, txn.all_tb(&ns.name, &db.name))
|
||||
catch!(txn, txn.all_tb(&ns.name, &db.name).await)
|
||||
};
|
||||
// Loop over all tables
|
||||
for tb in tbs.iter() {
|
||||
|
@ -232,23 +232,23 @@ impl Datastore {
|
|||
let end = crate::key::table::lq::suffix(&ns.name, &db.name, &tb.name);
|
||||
let mut next = Some(beg..end);
|
||||
while let Some(rng) = next {
|
||||
let res = catch!(txn, txn.batch(rng, *NORMAL_FETCH_SIZE, true));
|
||||
let res = catch!(txn, txn.batch(rng, *NORMAL_FETCH_SIZE, true).await);
|
||||
next = res.next;
|
||||
for (k, v) in res.values.iter() {
|
||||
// Decode the LIVE query statement
|
||||
let stm: LiveStatement = v.into();
|
||||
// Get the key for this node live query
|
||||
let tlq = crate::key::table::lq::Lq::decode(k)?;
|
||||
// Get the node id and the live query id
|
||||
let (nid, lid) = (stm.node.0, stm.id.0);
|
||||
// Check that the node for this query is archived
|
||||
if expired.contains(&stm.node) {
|
||||
// Get the key for this node live query
|
||||
let tlq = catch!(txn, crate::key::table::lq::Lq::decode(k));
|
||||
// Get the key for this table live query
|
||||
let nlq = crate::key::node::lq::new(nid, lid);
|
||||
// Delete the node live query
|
||||
catch!(txn, txn.del(nlq));
|
||||
catch!(txn, txn.del(nlq).await);
|
||||
// Delete the table live query
|
||||
catch!(txn, txn.del(tlq));
|
||||
catch!(txn, txn.del(tlq).await);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -280,7 +280,7 @@ impl Datastore {
|
|||
// Get the key for this node live query
|
||||
let nlq = crate::key::node::lq::new(self.id(), id);
|
||||
// Fetch the LIVE meta data node entry
|
||||
if let Some(val) = catch!(txn, txn.get(nlq, None)) {
|
||||
if let Some(val) = catch!(txn, txn.get(nlq, None).await) {
|
||||
// Decode the data for this live query
|
||||
let lq: Live = val.into();
|
||||
// Get the key for this node live query
|
||||
|
@ -288,9 +288,9 @@ impl Datastore {
|
|||
// Get the key for this table live query
|
||||
let tlq = crate::key::table::lq::new(&lq.ns, &lq.db, &lq.tb, id);
|
||||
// Delete the table live query
|
||||
catch!(txn, txn.del(tlq));
|
||||
catch!(txn, txn.del(tlq).await);
|
||||
// Delete the node live query
|
||||
catch!(txn, txn.del(nlq));
|
||||
catch!(txn, txn.del(nlq).await);
|
||||
}
|
||||
}
|
||||
// Commit the changes
|
||||
|
|
|
@ -37,7 +37,7 @@ macro_rules! get_cfg {
|
|||
/// a transaction in an uncommitted state without rolling back.
|
||||
macro_rules! catch {
|
||||
($txn:ident, $default:expr) => {
|
||||
match $default.await {
|
||||
match $default {
|
||||
Err(e) => {
|
||||
let _ = $txn.cancel().await;
|
||||
return Err(e);
|
||||
|
|
Loading…
Reference in a new issue