Cluster bootstrapping LQ GC now warns instead of panics (#2611)

This commit is contained in:
Przemyslaw Hugh Kaznowski 2023-09-08 19:18:51 +01:00 committed by GitHub
parent b1ae2b4094
commit 93d82146fb
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
13 changed files with 261 additions and 78 deletions

View file

@ -71,6 +71,10 @@ pub struct Datastore {
notification_channel: Option<(Sender<Notification>, Receiver<Notification>)>, notification_channel: Option<(Sender<Notification>, Receiver<Notification>)>,
} }
/// We always want to be circulating the live query information
/// And we will sometimes have an error attached but still not want to lose the LQ.
pub(crate) type BootstrapOperationResult = (LqValue, Option<Error>);
#[allow(clippy::large_enum_variant)] #[allow(clippy::large_enum_variant)]
pub(super) enum Inner { pub(super) enum Inner {
#[cfg(feature = "kv-mem")] #[cfg(feature = "kv-mem")]
@ -352,6 +356,10 @@ impl Datastore {
} }
// Initialise bootstrap with implicit values intended for runtime // Initialise bootstrap with implicit values intended for runtime
// An error indicates that a failure happened, but that does not mean that the bootstrap
// completely failed. It may have partially completed. It certainly has side-effects
// that weren't reversed, as it tries to bootstrap and garbage collect to the best of its
// ability.
pub async fn bootstrap(&self) -> Result<(), Error> { pub async fn bootstrap(&self) -> Result<(), Error> {
trace!("Bootstrapping {}", self.id); trace!("Bootstrapping {}", self.id);
let mut tx = self.transaction(true, false).await?; let mut tx = self.transaction(true, false).await?;
@ -367,9 +375,26 @@ impl Datastore {
return Err(e); return Err(e);
} }
}; };
// Filtered includes all lqs that should be used in subsequent step
// Currently that is all of them, no matter the error encountered
let mut filtered: Vec<LqValue> = vec![];
// err is used to aggregate all errors across all stages
let mut err = vec![];
for res in archived {
match res {
(lq, Some(e)) => {
filtered.push(lq);
err.push(e);
}
(lq, None) => {
filtered.push(lq);
}
}
}
let mut tx = self.transaction(true, false).await?; let mut tx = self.transaction(true, false).await?;
match self.remove_archived(&mut tx, archived).await { let val = self.remove_archived(&mut tx, filtered).await;
let resolve_err = match val {
Ok(_) => tx.commit().await, Ok(_) => tx.commit().await,
Err(e) => { Err(e) => {
error!("Error bootstrapping sweep phase: {:?}", e); error!("Error bootstrapping sweep phase: {:?}", e);
@ -381,7 +406,15 @@ impl Datastore {
} }
} }
} }
};
if resolve_err.is_err() {
err.push(resolve_err.unwrap_err());
} }
if !err.is_empty() {
error!("Error bootstrapping sweep phase: {:?}", err);
return Err(Error::Tx(format!("Error bootstrapping sweep phase: {:?}", err)));
}
Ok(())
} }
// Node registration + "mark" stage of mark-and-sweep gc // Node registration + "mark" stage of mark-and-sweep gc
@ -390,7 +423,7 @@ impl Datastore {
tx: &mut Transaction, tx: &mut Transaction,
node_id: &Uuid, node_id: &Uuid,
timestamp: Timestamp, timestamp: Timestamp,
) -> Result<Vec<LqValue>, Error> { ) -> Result<Vec<BootstrapOperationResult>, Error> {
trace!("Registering node {}", node_id); trace!("Registering node {}", node_id);
self.register_membership(tx, node_id, &timestamp).await?; self.register_membership(tx, node_id, &timestamp).await?;
// Determine the timeout for when a cluster node is expired // Determine the timeout for when a cluster node is expired
@ -440,7 +473,7 @@ impl Datastore {
tx: &mut Transaction, tx: &mut Transaction,
nodes: &[Uuid], nodes: &[Uuid],
this_node_id: &Uuid, this_node_id: &Uuid,
) -> Result<Vec<LqValue>, Error> { ) -> Result<Vec<BootstrapOperationResult>, Error> {
let mut archived = vec![]; let mut archived = vec![];
for nd in nodes.iter() { for nd in nodes.iter() {
trace!("Archiving node {}", &nd); trace!("Archiving node {}", &nd);
@ -450,7 +483,14 @@ impl Datastore {
for lq in node_lqs { for lq in node_lqs {
trace!("Archiving query {:?}", &lq); trace!("Archiving query {:?}", &lq);
let node_archived_lqs = let node_archived_lqs =
self.archive_lv_for_node(tx, &lq.nd, this_node_id.clone()).await?; match self.archive_lv_for_node(tx, &lq.nd, this_node_id.clone()).await {
Ok(lq) => lq,
Err(e) => {
error!("Error archiving lqs during bootstrap phase: {:?}", e);
vec![]
}
};
// We need to add lv nodes not found so that they can be deleted in second stage
for lq_value in node_archived_lqs { for lq_value in node_archived_lqs {
archived.push(lq_value); archived.push(lq_value);
} }
@ -464,6 +504,7 @@ impl Datastore {
tx: &mut Transaction, tx: &mut Transaction,
archived: Vec<LqValue>, archived: Vec<LqValue>,
) -> Result<(), Error> { ) -> Result<(), Error> {
trace!("Gone into removing archived");
for lq in archived { for lq in archived {
// Delete the cluster key, used for finding LQ associated with a node // Delete the cluster key, used for finding LQ associated with a node
let key = crate::key::node::lq::new(lq.nd.0, lq.lq.0, &lq.ns, &lq.db); let key = crate::key::node::lq::new(lq.nd.0, lq.lq.0, &lq.ns, &lq.db);
@ -475,30 +516,6 @@ impl Datastore {
Ok(()) Ok(())
} }
pub async fn _garbage_collect(
// TODO not invoked
// But this is garbage collection outside of bootstrap
&self,
tx: &mut Transaction,
watermark: &Timestamp,
this_node_id: &Uuid,
) -> Result<(), Error> {
let dead_heartbeats = self.delete_dead_heartbeats(tx, watermark).await?;
trace!("Found dead hbs: {:?}", dead_heartbeats);
let mut archived: Vec<LqValue> = vec![];
for hb in dead_heartbeats {
let new_archived = self
.archive_lv_for_node(tx, &crate::sql::uuid::Uuid::from(hb.nd), this_node_id.clone())
.await?;
tx.del_nd(hb.nd).await?;
trace!("Deleted node {}", hb.nd);
for lq_value in new_archived {
archived.push(lq_value);
}
}
Ok(())
}
// Garbage collection task to run when a client disconnects from a surrealdb node // Garbage collection task to run when a client disconnects from a surrealdb node
// i.e. we know the node, we are not performing a full wipe on the node // i.e. we know the node, we are not performing a full wipe on the node
// and the wipe must be fully performed by this node // and the wipe must be fully performed by this node
@ -542,16 +559,22 @@ impl Datastore {
tx: &mut Transaction, tx: &mut Transaction,
nd: &Uuid, nd: &Uuid,
this_node_id: Uuid, this_node_id: Uuid,
) -> Result<Vec<LqValue>, Error> { ) -> Result<Vec<BootstrapOperationResult>, Error> {
let lqs = tx.all_lq(nd).await?; let lqs = tx.all_lq(nd).await?;
trace!("Archiving lqs and found {} LQ entries for {}", lqs.len(), nd); trace!("Archiving lqs and found {} LQ entries for {}", lqs.len(), nd);
let mut ret = vec![]; let mut ret: Vec<BootstrapOperationResult> = vec![];
for lq in lqs { for lq in lqs {
let lvs = let lv_res =
tx.get_tb_live(lq.ns.as_str(), lq.db.as_str(), lq.tb.as_str(), &lq.lq).await?; tx.get_tb_live(lq.ns.as_str(), lq.db.as_str(), lq.tb.as_str(), &lq.lq).await;
let archived_lvs = lvs.clone().archive(this_node_id.clone()); if let Err(e) = lv_res {
tx.putc_tblq(&lq.ns, &lq.db, &lq.tb, archived_lvs, Some(lvs)).await?; error!("Error getting live query for node {}: {:?}", nd, e);
ret.push(lq); ret.push((lq, Some(e)));
continue;
}
let lv = lv_res.unwrap();
let archived_lvs = lv.clone().archive(this_node_id.clone());
tx.putc_tblq(&lq.ns, &lq.db, &lq.tb, archived_lvs, Some(lv)).await?;
ret.push((lq, None));
} }
Ok(ret) Ok(ret)
} }

View file

@ -190,3 +190,102 @@ async fn single_live_queries_are_garbage_collected() {
assert_eq!(&scanned[0].lq, &sql::Uuid::from(live_query_to_keep)); assert_eq!(&scanned[0].lq, &sql::Uuid::from(live_query_to_keep));
tx.commit().await.unwrap(); tx.commit().await.unwrap();
} }
#[test(tokio::test)]
#[serial]
async fn bootstrap_does_not_error_on_missing_live_queries() {
// Test parameters
let ctx = context::Context::background();
let old_node_id = Uuid::parse_str("5f644f02-7c1a-4f8b-babd-bd9e92c1836a").unwrap();
let test = init(old_node_id).await.unwrap();
let time = Timestamp {
value: 123,
};
let namespace = "test_namespace_0A8BD08BE4F2457BB9F145557EF19605";
let database_owned = format!("test_db_{:?}", test.kvs);
let database = database_owned.as_str();
let table = "test_table";
let options = Options::default()
.with_required(
old_node_id.clone(),
Some(Arc::from(namespace)),
Some(Arc::from(database)),
Arc::new(Auth::for_root(Role::Owner)),
)
.with_live(true);
// We do standard cluster init
trace!("Bootstrapping node {}", old_node_id);
test.bootstrap_at_time(crate::sql::uuid::Uuid::from(old_node_id), time).await.unwrap();
// We set up 2 live queries, one of which we want to garbage collect
trace!("Setting up live queries");
let tx = Arc::new(Mutex::new(test.db.transaction(true, false).await.unwrap()));
let live_query_to_corrupt = Uuid::parse_str("d4cee7ce-5c78-4a30-9fa9-2444d58029f6").unwrap();
let live_st = LiveStatement {
id: sql::Uuid(live_query_to_corrupt),
node: sql::uuid::Uuid::from(old_node_id),
expr: Fields(vec![sql::Field::All], false),
what: Table(sql::Table::from(table)),
cond: None,
fetch: None,
archived: None,
auth: Some(Auth::for_root(Role::Owner)),
};
live_st
.compute(&ctx, &options, &tx, None)
.await
.map_err(|e| format!("Error computing live statement: {:?} {:?}", live_st, e))
.unwrap();
// Now we corrupt the live query entry by leaving the node entry in but removing the table entry
let key = crate::key::table::lq::new(namespace, database, table, live_query_to_corrupt);
tx.lock().await.del(key).await.unwrap();
tx.lock().await.commit().await.unwrap();
// Subject: Perform the action we are testing
trace!("Bootstrapping");
let new_node_id = Uuid::parse_str("53f7355d-5be1-4a94-9803-5192b59c5244").unwrap();
// There should not be an error
let second_node = test.db.with_node_id(crate::sql::uuid::Uuid::from(new_node_id));
match second_node.bootstrap().await {
Ok(_) => {
panic!("Expected an error because of missing live query")
}
Err(Error::Tx(e)) => match e {
_ if e.contains("LvNotFound") => {
// This is what we want... an LvNotFound error, but it gets wrapped into a string so that Tx doesnt carry vecs
}
_ => {
panic!("Expected an LvNotFound error but got: {:?}", e);
}
},
Err(e) => {
panic!("Missing live query error: {:?}", e)
}
}
// Verify node live query was deleted
let mut tx = second_node.transaction(true, false).await.unwrap();
let found = tx
.scan_ndlq(&old_node_id, 100)
.await
.map_err(|e| format!("Error scanning ndlq: {:?}", e))
.unwrap();
assert_eq!(0, found.len(), "Found: {:?}", found);
let found = tx
.scan_ndlq(&new_node_id, 100)
.await
.map_err(|e| format!("Error scanning ndlq: {:?}", e))
.unwrap();
assert_eq!(0, found.len(), "Found: {:?}", found);
// Verify table live query does not exist
let found = tx
.scan_tblq(namespace, database, table, 100)
.await
.map_err(|e| format!("Error scanning tblq: {:?}", e))
.unwrap();
assert_eq!(0, found.len(), "Found: {:?}", found);
tx.cancel().await.unwrap();
}

View file

@ -3,6 +3,7 @@ use crate::err::Error;
pub struct TestContext { pub struct TestContext {
pub(crate) db: Datastore, pub(crate) db: Datastore,
pub(crate) kvs: Kvs,
// A string identifier for this context. // A string identifier for this context.
// It will usually be a uuid or combination of uuid and fixed string identifier. // It will usually be a uuid or combination of uuid and fixed string identifier.
// It is useful for separating test setups when environments are shared. // It is useful for separating test setups when environments are shared.
@ -17,11 +18,31 @@ impl TestContext {
node_id: crate::sql::uuid::Uuid, node_id: crate::sql::uuid::Uuid,
time: Timestamp, time: Timestamp,
) -> Result<(), Error> { ) -> Result<(), Error> {
// TODO we shouldn't test bootstrapping manually
let mut tx = self.db.transaction(true, false).await?; let mut tx = self.db.transaction(true, false).await?;
let archived = self.db.register_remove_and_archive(&mut tx, &node_id, time).await?; let archived = self.db.register_remove_and_archive(&mut tx, &node_id, time).await?;
tx.commit().await?; tx.commit().await?;
let mut errors = vec![];
let mut values = vec![];
for res in archived {
match res {
(v, Some(e)) => {
values.push(v);
errors.push(e);
}
(v, None) => {
values.push(v);
}
}
}
if !errors.is_empty() {
// You can customize this panic message as per your needs
panic!("Encountered errors: {:?}", errors);
}
let mut tx = self.db.transaction(true, false).await?; let mut tx = self.db.transaction(true, false).await?;
self.db.remove_archived(&mut tx, archived).await?; self.db.remove_archived(&mut tx, values).await?;
Ok(tx.commit().await?) Ok(tx.commit().await?)
} }
@ -34,9 +55,10 @@ impl TestContext {
/// Initialise logging and prepare a useable datastore /// Initialise logging and prepare a useable datastore
/// In the future it would be nice to handle multiple datastores /// In the future it would be nice to handle multiple datastores
pub(crate) async fn init(node_id: Uuid) -> Result<TestContext, Error> { pub(crate) async fn init(node_id: Uuid) -> Result<TestContext, Error> {
let db = new_ds(node_id).await; let (db, kvs) = new_ds(node_id).await;
return Ok(TestContext { return Ok(TestContext {
db, db,
kvs,
context_id: node_id.to_string(), // The context does not always have to be a uuid context_id: node_id.to_string(), // The context does not always have to be a uuid
}); });
} }

View file

@ -1,19 +1,34 @@
#[derive(Clone, Debug)]
pub(crate) enum Kvs {
#[allow(dead_code)]
Mem,
#[allow(dead_code)]
Rocksdb,
#[allow(dead_code)]
Speedb,
#[allow(dead_code)]
Tikv,
#[allow(dead_code)]
Fdb,
}
#[cfg(feature = "kv-mem")] #[cfg(feature = "kv-mem")]
mod mem { mod mem {
use crate::kvs::tests::Kvs;
use crate::kvs::Datastore; use crate::kvs::Datastore;
use crate::kvs::Transaction; use crate::kvs::Transaction;
use serial_test::serial; use serial_test::serial;
async fn new_ds(id: Uuid) -> Datastore { async fn new_ds(id: Uuid) -> (Datastore, Kvs) {
Datastore::new("memory").await.unwrap().with_node_id(sql::Uuid::from(id)) (Datastore::new("memory").await.unwrap().with_node_id(sql::Uuid::from(id)), Kvs::Mem)
} }
async fn new_tx(write: bool, lock: bool) -> Transaction { async fn new_tx(write: bool, lock: bool) -> Transaction {
// Shared node id for one-off transactions // Shared node id for one-off transactions
// We should delete this, node IDs should be known. // We should delete this, node IDs should be known.
let new_tx_uuid = Uuid::parse_str("361893b5-a041-40c0-996c-c3a8828ef06b").unwrap(); let new_tx_uuid = Uuid::parse_str("361893b5-a041-40c0-996c-c3a8828ef06b").unwrap();
new_ds(new_tx_uuid).await.transaction(write, lock).await.unwrap() new_ds(new_tx_uuid).await.0.transaction(write, lock).await.unwrap()
} }
include!("cluster_init.rs"); include!("cluster_init.rs");
@ -30,24 +45,28 @@ mod mem {
#[cfg(feature = "kv-rocksdb")] #[cfg(feature = "kv-rocksdb")]
mod rocksdb { mod rocksdb {
use crate::kvs::tests::Kvs;
use crate::kvs::Datastore; use crate::kvs::Datastore;
use crate::kvs::Transaction; use crate::kvs::Transaction;
use serial_test::serial; use serial_test::serial;
use temp_dir::TempDir; use temp_dir::TempDir;
async fn new_ds(node_id: Uuid) -> Datastore { async fn new_ds(node_id: Uuid) -> (Datastore, Kvs) {
let path = TempDir::new().unwrap().path().to_string_lossy().to_string(); let path = TempDir::new().unwrap().path().to_string_lossy().to_string();
(
Datastore::new(format!("rocksdb:{path}").as_str()) Datastore::new(format!("rocksdb:{path}").as_str())
.await .await
.unwrap() .unwrap()
.with_node_id(sql::Uuid::from(node_id)) .with_node_id(sql::Uuid::from(node_id)),
Kvs::Rocksdb,
)
} }
async fn new_tx(write: bool, lock: bool) -> Transaction { async fn new_tx(write: bool, lock: bool) -> Transaction {
// Shared node id for one-off transactions // Shared node id for one-off transactions
// We should delete this, node IDs should be known. // We should delete this, node IDs should be known.
let new_tx_uuid = Uuid::parse_str("22358e5e-87bd-4040-8c63-01db896191ab").unwrap(); let new_tx_uuid = Uuid::parse_str("22358e5e-87bd-4040-8c63-01db896191ab").unwrap();
new_ds(new_tx_uuid).await.transaction(write, lock).await.unwrap() new_ds(new_tx_uuid).await.0.transaction(write, lock).await.unwrap()
} }
include!("cluster_init.rs"); include!("cluster_init.rs");
@ -66,24 +85,28 @@ mod rocksdb {
#[cfg(feature = "kv-speedb")] #[cfg(feature = "kv-speedb")]
mod speedb { mod speedb {
use crate::kvs::tests::Kvs;
use crate::kvs::Datastore; use crate::kvs::Datastore;
use crate::kvs::Transaction; use crate::kvs::Transaction;
use serial_test::serial; use serial_test::serial;
use temp_dir::TempDir; use temp_dir::TempDir;
async fn new_ds(node_id: Uuid) -> Datastore { async fn new_ds(node_id: Uuid) -> (Datastore, Kvs) {
let path = TempDir::new().unwrap().path().to_string_lossy().to_string(); let path = TempDir::new().unwrap().path().to_string_lossy().to_string();
(
Datastore::new(format!("speedb:{path}").as_str()) Datastore::new(format!("speedb:{path}").as_str())
.await .await
.unwrap() .unwrap()
.with_node_id(sql::Uuid::from(node_id)) .with_node_id(sql::Uuid::from(node_id)),
Kvs::Speedb,
)
} }
async fn new_tx(write: bool, lock: bool) -> Transaction { async fn new_tx(write: bool, lock: bool) -> Transaction {
// Shared node id for one-off transactions // Shared node id for one-off transactions
// We should delete this, node IDs should be known. // We should delete this, node IDs should be known.
let new_tx_uuid = Uuid::parse_str("5877e580-12ac-49e4-95e1-3c407c4887f3").unwrap(); let new_tx_uuid = Uuid::parse_str("5877e580-12ac-49e4-95e1-3c407c4887f3").unwrap();
new_ds(new_tx_uuid).await.transaction(write, lock).await.unwrap() new_ds(new_tx_uuid).await.0.transaction(write, lock).await.unwrap()
} }
include!("cluster_init.rs"); include!("cluster_init.rs");
@ -102,11 +125,12 @@ mod speedb {
#[cfg(feature = "kv-tikv")] #[cfg(feature = "kv-tikv")]
mod tikv { mod tikv {
use crate::kvs::tests::Kvs;
use crate::kvs::Datastore; use crate::kvs::Datastore;
use crate::kvs::Transaction; use crate::kvs::Transaction;
use serial_test::serial; use serial_test::serial;
async fn new_ds(node_id: Uuid) -> Datastore { async fn new_ds(node_id: Uuid) -> (Datastore, Kvs) {
let ds = Datastore::new("tikv:127.0.0.1:2379") let ds = Datastore::new("tikv:127.0.0.1:2379")
.await .await
.unwrap() .unwrap()
@ -116,14 +140,14 @@ mod tikv {
tx.delp(vec![], u32::MAX).await.unwrap(); tx.delp(vec![], u32::MAX).await.unwrap();
tx.commit().await.unwrap(); tx.commit().await.unwrap();
// Return the datastore // Return the datastore
ds (ds, Kvs::Tikv)
} }
async fn new_tx(write: bool, lock: bool) -> Transaction { async fn new_tx(write: bool, lock: bool) -> Transaction {
// Shared node id for one-off transactions // Shared node id for one-off transactions
// We should delete this, node IDs should be known. // We should delete this, node IDs should be known.
let new_tx_uuid = Uuid::parse_str("18717a0f-0ab0-421e-b20c-e69fb03e90a3").unwrap(); let new_tx_uuid = Uuid::parse_str("18717a0f-0ab0-421e-b20c-e69fb03e90a3").unwrap();
new_ds(new_tx_uuid).await.transaction(write, lock).await.unwrap() new_ds(new_tx_uuid).await.0.transaction(write, lock).await.unwrap()
} }
include!("cluster_init.rs"); include!("cluster_init.rs");
@ -142,11 +166,12 @@ mod tikv {
#[cfg(feature = "kv-fdb")] #[cfg(feature = "kv-fdb")]
mod fdb { mod fdb {
use crate::kvs::tests::Kvs;
use crate::kvs::Datastore; use crate::kvs::Datastore;
use crate::kvs::Transaction; use crate::kvs::Transaction;
use serial_test::serial; use serial_test::serial;
async fn new_ds(node_id: Uuid) -> Datastore { async fn new_ds(node_id: Uuid) -> (Datastore, Kvs) {
let ds = Datastore::new("fdb:/etc/foundationdb/fdb.cluster") let ds = Datastore::new("fdb:/etc/foundationdb/fdb.cluster")
.await .await
.unwrap() .unwrap()
@ -156,14 +181,14 @@ mod fdb {
tx.delp(vec![], u32::MAX).await.unwrap(); tx.delp(vec![], u32::MAX).await.unwrap();
tx.commit().await.unwrap(); tx.commit().await.unwrap();
// Return the datastore // Return the datastore
ds (ds, Kvs::Fdb)
} }
async fn new_tx(write: bool, lock: bool) -> Transaction { async fn new_tx(write: bool, lock: bool) -> Transaction {
// Shared node id for one-off transactions // Shared node id for one-off transactions
// We should delete this, node IDs should be known. // We should delete this, node IDs should be known.
let new_tx_uuid = Uuid::parse_str("50f5bdf5-8abe-406b-8002-a79c942f510f").unwrap(); let new_tx_uuid = Uuid::parse_str("50f5bdf5-8abe-406b-8002-a79c942f510f").unwrap();
new_ds(new_tx_uuid).await.transaction(write, lock).await.unwrap() new_ds(new_tx_uuid).await.0.transaction(write, lock).await.unwrap()
} }
include!("cluster_init.rs"); include!("cluster_init.rs");

View file

@ -3,7 +3,7 @@
async fn multireader() { async fn multireader() {
// Create a new datastore // Create a new datastore
let node_id = Uuid::parse_str("b7afc077-2123-476f-bee0-43d7504f1e0a").unwrap(); let node_id = Uuid::parse_str("b7afc077-2123-476f-bee0-43d7504f1e0a").unwrap();
let ds = new_ds(node_id).await; let (ds, _) = new_ds(node_id).await;
// Insert an initial key // Insert an initial key
let mut tx = ds.transaction(true, false).await.unwrap(); let mut tx = ds.transaction(true, false).await.unwrap();
tx.set("test", "some text").await.unwrap(); tx.set("test", "some text").await.unwrap();

View file

@ -3,7 +3,7 @@
async fn multiwriter_different_keys() { async fn multiwriter_different_keys() {
// Create a new datastore // Create a new datastore
let node_id = Uuid::parse_str("7f0153b0-79cf-4922-85ef-61e390970514").unwrap(); let node_id = Uuid::parse_str("7f0153b0-79cf-4922-85ef-61e390970514").unwrap();
let ds = new_ds(node_id).await; let (ds, _) = new_ds(node_id).await;
// Insert an initial key // Insert an initial key
let mut tx = ds.transaction(true, false).await.unwrap(); let mut tx = ds.transaction(true, false).await.unwrap();
tx.set("test", "some text").await.unwrap(); tx.set("test", "some text").await.unwrap();

View file

@ -3,7 +3,7 @@
async fn multiwriter_same_keys_allow() { async fn multiwriter_same_keys_allow() {
// Create a new datastore // Create a new datastore
let node_id = Uuid::parse_str("a19cf00d-f95b-42c6-95e5-7b310162d570").unwrap(); let node_id = Uuid::parse_str("a19cf00d-f95b-42c6-95e5-7b310162d570").unwrap();
let ds = new_ds(node_id).await; let (ds, _) = new_ds(node_id).await;
// Insert an initial key // Insert an initial key
let mut tx = ds.transaction(true, false).await.unwrap(); let mut tx = ds.transaction(true, false).await.unwrap();
tx.set("test", "some text").await.unwrap(); tx.set("test", "some text").await.unwrap();

View file

@ -3,7 +3,7 @@
async fn multiwriter_same_keys_conflict() { async fn multiwriter_same_keys_conflict() {
// Create a new datastore // Create a new datastore
let node_id = Uuid::parse_str("96ebbb5c-8040-497a-9459-838e4931aca7").unwrap(); let node_id = Uuid::parse_str("96ebbb5c-8040-497a-9459-838e4931aca7").unwrap();
let ds = new_ds(node_id).await; let (ds, _) = new_ds(node_id).await;
// Insert an initial key // Insert an initial key
let mut tx = ds.transaction(true, false).await.unwrap(); let mut tx = ds.transaction(true, false).await.unwrap();
tx.set("test", "some text").await.unwrap(); tx.set("test", "some text").await.unwrap();

View file

@ -38,12 +38,21 @@ async fn archive_lv_for_node_archives() {
.await .await
.unwrap(); .unwrap();
assert_eq!(results.len(), 1); assert_eq!(results.len(), 1);
assert_eq!(results[0].nd, sql::uuid::Uuid(node_id.clone()));
assert_eq!(results[0].ns, namespace);
assert_eq!(results[0].db, database);
assert_eq!(results[0].tb, table);
assert_eq!(results[0].lq, lv_id);
tx.commit().await.unwrap(); tx.commit().await.unwrap();
let (lq, opt_err) = &results[0];
match opt_err {
None => {
//expected
}
Some(err) => {
panic!("Unexpected error: {:?}", err);
}
}
assert_eq!(lq.nd, sql::uuid::Uuid(node_id.clone()));
assert_eq!(lq.ns, namespace);
assert_eq!(lq.db, database);
assert_eq!(lq.tb, table);
assert_eq!(lq.lq, lv_id);
let mut tx = test.db.transaction(true, false).await.unwrap(); let mut tx = test.db.transaction(true, false).await.unwrap();
let lv = tx.all_tb_lives(namespace, database, table).await.unwrap(); let lv = tx.all_tb_lives(namespace, database, table).await.unwrap();

View file

@ -11,7 +11,7 @@ async fn initialise() {
async fn exi() { async fn exi() {
// Create a new datastore // Create a new datastore
let node_id = Uuid::parse_str("463a5008-ee1d-43db-9662-5e752b6ea3f9").unwrap(); let node_id = Uuid::parse_str("463a5008-ee1d-43db-9662-5e752b6ea3f9").unwrap();
let ds = new_ds(node_id).await; let (ds, _) = new_ds(node_id).await;
// Create a writeable transaction // Create a writeable transaction
let mut tx = ds.transaction(true, false).await.unwrap(); let mut tx = ds.transaction(true, false).await.unwrap();
assert!(tx.put("test", "ok").await.is_ok()); assert!(tx.put("test", "ok").await.is_ok());
@ -30,7 +30,7 @@ async fn exi() {
async fn get() { async fn get() {
// Create a new datastore // Create a new datastore
let node_id = Uuid::parse_str("477e2895-8c98-4606-a827-0add82eb466b").unwrap(); let node_id = Uuid::parse_str("477e2895-8c98-4606-a827-0add82eb466b").unwrap();
let ds = new_ds(node_id).await; let (ds, _) = new_ds(node_id).await;
// Create a writeable transaction // Create a writeable transaction
let mut tx = ds.transaction(true, false).await.unwrap(); let mut tx = ds.transaction(true, false).await.unwrap();
assert!(tx.put("test", "ok").await.is_ok()); assert!(tx.put("test", "ok").await.is_ok());
@ -49,7 +49,7 @@ async fn get() {
async fn set() { async fn set() {
// Create a new datastore // Create a new datastore
let node_id = Uuid::parse_str("32b80d8b-dd16-4f6f-a687-1192f6cfc6f1").unwrap(); let node_id = Uuid::parse_str("32b80d8b-dd16-4f6f-a687-1192f6cfc6f1").unwrap();
let ds = new_ds(node_id).await; let (ds, _) = new_ds(node_id).await;
// Create a writeable transaction // Create a writeable transaction
let mut tx = ds.transaction(true, false).await.unwrap(); let mut tx = ds.transaction(true, false).await.unwrap();
assert!(tx.set("test", "one").await.is_ok()); assert!(tx.set("test", "one").await.is_ok());
@ -75,7 +75,7 @@ async fn set() {
async fn put() { async fn put() {
// Create a new datastore // Create a new datastore
let node_id = Uuid::parse_str("80149655-db34-451c-8711-6fa662a44b70").unwrap(); let node_id = Uuid::parse_str("80149655-db34-451c-8711-6fa662a44b70").unwrap();
let ds = new_ds(node_id).await; let (ds, _) = new_ds(node_id).await;
// Create a writeable transaction // Create a writeable transaction
let mut tx = ds.transaction(true, false).await.unwrap(); let mut tx = ds.transaction(true, false).await.unwrap();
assert!(tx.put("test", "one").await.is_ok()); assert!(tx.put("test", "one").await.is_ok());
@ -101,7 +101,7 @@ async fn put() {
async fn del() { async fn del() {
// Create a new datastore // Create a new datastore
let node_id = Uuid::parse_str("e0acb360-9187-401f-8192-f870b09e2c9e").unwrap(); let node_id = Uuid::parse_str("e0acb360-9187-401f-8192-f870b09e2c9e").unwrap();
let ds = new_ds(node_id).await; let (ds, _) = new_ds(node_id).await;
// Create a writeable transaction // Create a writeable transaction
let mut tx = ds.transaction(true, false).await.unwrap(); let mut tx = ds.transaction(true, false).await.unwrap();
assert!(tx.put("test", "one").await.is_ok()); assert!(tx.put("test", "one").await.is_ok());
@ -122,7 +122,7 @@ async fn del() {
async fn putc() { async fn putc() {
// Create a new datastore // Create a new datastore
let node_id = Uuid::parse_str("705bb520-bc2b-4d52-8e64-d1214397e408").unwrap(); let node_id = Uuid::parse_str("705bb520-bc2b-4d52-8e64-d1214397e408").unwrap();
let ds = new_ds(node_id).await; let (ds, _) = new_ds(node_id).await;
// Create a writeable transaction // Create a writeable transaction
let mut tx = ds.transaction(true, false).await.unwrap(); let mut tx = ds.transaction(true, false).await.unwrap();
assert!(tx.put("test", "one").await.is_ok()); assert!(tx.put("test", "one").await.is_ok());
@ -157,7 +157,7 @@ async fn putc() {
async fn delc() { async fn delc() {
// Create a new datastore // Create a new datastore
let node_id = Uuid::parse_str("0985488e-cf2f-417a-bd10-7f4aa9c99c15").unwrap(); let node_id = Uuid::parse_str("0985488e-cf2f-417a-bd10-7f4aa9c99c15").unwrap();
let ds = new_ds(node_id).await; let (ds, _) = new_ds(node_id).await;
// Create a writeable transaction // Create a writeable transaction
let mut tx = ds.transaction(true, false).await.unwrap(); let mut tx = ds.transaction(true, false).await.unwrap();
assert!(tx.put("test", "one").await.is_ok()); assert!(tx.put("test", "one").await.is_ok());
@ -187,7 +187,7 @@ async fn delc() {
async fn scan() { async fn scan() {
// Create a new datastore // Create a new datastore
let node_id = Uuid::parse_str("83b81cc2-9609-4533-bede-c170ab9f7bbe").unwrap(); let node_id = Uuid::parse_str("83b81cc2-9609-4533-bede-c170ab9f7bbe").unwrap();
let ds = new_ds(node_id).await; let (ds, _) = new_ds(node_id).await;
// Create a writeable transaction // Create a writeable transaction
let mut tx = ds.transaction(true, false).await.unwrap(); let mut tx = ds.transaction(true, false).await.unwrap();
assert!(tx.put("test1", "1").await.is_ok()); assert!(tx.put("test1", "1").await.is_ok());

View file

@ -3,7 +3,7 @@
async fn snapshot() { async fn snapshot() {
// Create a new datastore // Create a new datastore
let node_id = Uuid::parse_str("056804f2-b379-4397-9ceb-af8ebd527beb").unwrap(); let node_id = Uuid::parse_str("056804f2-b379-4397-9ceb-af8ebd527beb").unwrap();
let ds = new_ds(node_id).await; let (ds, _) = new_ds(node_id).await;
// Insert an initial key // Insert an initial key
let mut tx = ds.transaction(true, false).await.unwrap(); let mut tx = ds.transaction(true, false).await.unwrap();
tx.set("test", "some text").await.unwrap(); tx.set("test", "some text").await.unwrap();

View file

@ -13,7 +13,7 @@
#[serial] #[serial]
async fn timestamp_to_versionstamp() { async fn timestamp_to_versionstamp() {
// Create a new datastore // Create a new datastore
let ds = new_ds(Uuid::parse_str("A905CA25-56ED-49FB-B759-696AEA87C342").unwrap()).await; let (ds, _) = new_ds(Uuid::parse_str("A905CA25-56ED-49FB-B759-696AEA87C342").unwrap()).await;
// Give the current versionstamp a timestamp of 0 // Give the current versionstamp a timestamp of 0
let mut tx = ds.transaction(true, false).await.unwrap(); let mut tx = ds.transaction(true, false).await.unwrap();
tx.set_timestamp_for_versionstamp(0, "myns", "mydb", true).await.unwrap(); tx.set_timestamp_for_versionstamp(0, "myns", "mydb", true).await.unwrap();

View file

@ -7,6 +7,7 @@ use crate::dbs::node::ClusterMembership;
use crate::dbs::node::Timestamp; use crate::dbs::node::Timestamp;
use crate::err::Error; use crate::err::Error;
use crate::idg::u32::U32; use crate::idg::u32::U32;
use crate::key::debug;
use crate::kvs::cache::Cache; use crate::kvs::cache::Cache;
use crate::kvs::cache::Entry; use crate::kvs::cache::Entry;
use crate::kvs::Check; use crate::kvs::Check;
@ -621,7 +622,11 @@ impl Transaction {
K: Into<Key> + Debug + Clone, K: Into<Key> + Debug + Clone,
{ {
#[cfg(debug_assertions)] #[cfg(debug_assertions)]
trace!("Scan {:?} - {:?}", rng.start, rng.end); trace!(
"Scan {:?} - {:?}",
debug::sprint_key(&rng.start.clone().into()),
debug::sprint_key(&rng.end.clone().into())
);
match self { match self {
#[cfg(feature = "kv-mem")] #[cfg(feature = "kv-mem")]
Transaction { Transaction {