diff --git a/lib/src/kvs/ds.rs b/lib/src/kvs/ds.rs index 6717d269..ba53197a 100644 --- a/lib/src/kvs/ds.rs +++ b/lib/src/kvs/ds.rs @@ -71,6 +71,10 @@ pub struct Datastore { notification_channel: Option<(Sender, Receiver)>, } +/// We always want to be circulating the live query information +/// And we will sometimes have an error attached but still not want to lose the LQ. +pub(crate) type BootstrapOperationResult = (LqValue, Option); + #[allow(clippy::large_enum_variant)] pub(super) enum Inner { #[cfg(feature = "kv-mem")] @@ -352,6 +356,10 @@ impl Datastore { } // Initialise bootstrap with implicit values intended for runtime + // An error indicates that a failure happened, but that does not mean that the bootstrap + // completely failed. It may have partially completed. It certainly has side-effects + // that weren't reversed, as it tries to bootstrap and garbage collect to the best of its + // ability. pub async fn bootstrap(&self) -> Result<(), Error> { trace!("Bootstrapping {}", self.id); let mut tx = self.transaction(true, false).await?; @@ -367,9 +375,26 @@ impl Datastore { return Err(e); } }; + // Filtered includes all lqs that should be used in subsequent step + // Currently that is all of them, no matter the error encountered + let mut filtered: Vec = vec![]; + // err is used to aggregate all errors across all stages + let mut err = vec![]; + for res in archived { + match res { + (lq, Some(e)) => { + filtered.push(lq); + err.push(e); + } + (lq, None) => { + filtered.push(lq); + } + } + } let mut tx = self.transaction(true, false).await?; - match self.remove_archived(&mut tx, archived).await { + let val = self.remove_archived(&mut tx, filtered).await; + let resolve_err = match val { Ok(_) => tx.commit().await, Err(e) => { error!("Error bootstrapping sweep phase: {:?}", e); @@ -381,7 +406,15 @@ impl Datastore { } } } + }; + if resolve_err.is_err() { + err.push(resolve_err.unwrap_err()); } + if !err.is_empty() { + error!("Error bootstrapping sweep phase: {:?}", err); + return Err(Error::Tx(format!("Error bootstrapping sweep phase: {:?}", err))); + } + Ok(()) } // Node registration + "mark" stage of mark-and-sweep gc @@ -390,7 +423,7 @@ impl Datastore { tx: &mut Transaction, node_id: &Uuid, timestamp: Timestamp, - ) -> Result, Error> { + ) -> Result, Error> { trace!("Registering node {}", node_id); self.register_membership(tx, node_id, ×tamp).await?; // Determine the timeout for when a cluster node is expired @@ -440,7 +473,7 @@ impl Datastore { tx: &mut Transaction, nodes: &[Uuid], this_node_id: &Uuid, - ) -> Result, Error> { + ) -> Result, Error> { let mut archived = vec![]; for nd in nodes.iter() { trace!("Archiving node {}", &nd); @@ -450,7 +483,14 @@ impl Datastore { for lq in node_lqs { trace!("Archiving query {:?}", &lq); let node_archived_lqs = - self.archive_lv_for_node(tx, &lq.nd, this_node_id.clone()).await?; + match self.archive_lv_for_node(tx, &lq.nd, this_node_id.clone()).await { + Ok(lq) => lq, + Err(e) => { + error!("Error archiving lqs during bootstrap phase: {:?}", e); + vec![] + } + }; + // We need to add lv nodes not found so that they can be deleted in second stage for lq_value in node_archived_lqs { archived.push(lq_value); } @@ -464,6 +504,7 @@ impl Datastore { tx: &mut Transaction, archived: Vec, ) -> Result<(), Error> { + trace!("Gone into removing archived"); for lq in archived { // Delete the cluster key, used for finding LQ associated with a node let key = crate::key::node::lq::new(lq.nd.0, lq.lq.0, &lq.ns, &lq.db); @@ -475,30 +516,6 @@ impl Datastore { Ok(()) } - pub async fn _garbage_collect( - // TODO not invoked - // But this is garbage collection outside of bootstrap - &self, - tx: &mut Transaction, - watermark: &Timestamp, - this_node_id: &Uuid, - ) -> Result<(), Error> { - let dead_heartbeats = self.delete_dead_heartbeats(tx, watermark).await?; - trace!("Found dead hbs: {:?}", dead_heartbeats); - let mut archived: Vec = vec![]; - for hb in dead_heartbeats { - let new_archived = self - .archive_lv_for_node(tx, &crate::sql::uuid::Uuid::from(hb.nd), this_node_id.clone()) - .await?; - tx.del_nd(hb.nd).await?; - trace!("Deleted node {}", hb.nd); - for lq_value in new_archived { - archived.push(lq_value); - } - } - Ok(()) - } - // Garbage collection task to run when a client disconnects from a surrealdb node // i.e. we know the node, we are not performing a full wipe on the node // and the wipe must be fully performed by this node @@ -542,16 +559,22 @@ impl Datastore { tx: &mut Transaction, nd: &Uuid, this_node_id: Uuid, - ) -> Result, Error> { + ) -> Result, Error> { let lqs = tx.all_lq(nd).await?; trace!("Archiving lqs and found {} LQ entries for {}", lqs.len(), nd); - let mut ret = vec![]; + let mut ret: Vec = vec![]; for lq in lqs { - let lvs = - tx.get_tb_live(lq.ns.as_str(), lq.db.as_str(), lq.tb.as_str(), &lq.lq).await?; - let archived_lvs = lvs.clone().archive(this_node_id.clone()); - tx.putc_tblq(&lq.ns, &lq.db, &lq.tb, archived_lvs, Some(lvs)).await?; - ret.push(lq); + let lv_res = + tx.get_tb_live(lq.ns.as_str(), lq.db.as_str(), lq.tb.as_str(), &lq.lq).await; + if let Err(e) = lv_res { + error!("Error getting live query for node {}: {:?}", nd, e); + ret.push((lq, Some(e))); + continue; + } + let lv = lv_res.unwrap(); + let archived_lvs = lv.clone().archive(this_node_id.clone()); + tx.putc_tblq(&lq.ns, &lq.db, &lq.tb, archived_lvs, Some(lv)).await?; + ret.push((lq, None)); } Ok(ret) } diff --git a/lib/src/kvs/tests/cluster_init.rs b/lib/src/kvs/tests/cluster_init.rs index d5904e13..5d11ef94 100644 --- a/lib/src/kvs/tests/cluster_init.rs +++ b/lib/src/kvs/tests/cluster_init.rs @@ -190,3 +190,102 @@ async fn single_live_queries_are_garbage_collected() { assert_eq!(&scanned[0].lq, &sql::Uuid::from(live_query_to_keep)); tx.commit().await.unwrap(); } + +#[test(tokio::test)] +#[serial] +async fn bootstrap_does_not_error_on_missing_live_queries() { + // Test parameters + let ctx = context::Context::background(); + let old_node_id = Uuid::parse_str("5f644f02-7c1a-4f8b-babd-bd9e92c1836a").unwrap(); + let test = init(old_node_id).await.unwrap(); + let time = Timestamp { + value: 123, + }; + let namespace = "test_namespace_0A8BD08BE4F2457BB9F145557EF19605"; + let database_owned = format!("test_db_{:?}", test.kvs); + let database = database_owned.as_str(); + let table = "test_table"; + let options = Options::default() + .with_required( + old_node_id.clone(), + Some(Arc::from(namespace)), + Some(Arc::from(database)), + Arc::new(Auth::for_root(Role::Owner)), + ) + .with_live(true); + + // We do standard cluster init + trace!("Bootstrapping node {}", old_node_id); + test.bootstrap_at_time(crate::sql::uuid::Uuid::from(old_node_id), time).await.unwrap(); + + // We set up 2 live queries, one of which we want to garbage collect + trace!("Setting up live queries"); + let tx = Arc::new(Mutex::new(test.db.transaction(true, false).await.unwrap())); + let live_query_to_corrupt = Uuid::parse_str("d4cee7ce-5c78-4a30-9fa9-2444d58029f6").unwrap(); + let live_st = LiveStatement { + id: sql::Uuid(live_query_to_corrupt), + node: sql::uuid::Uuid::from(old_node_id), + expr: Fields(vec![sql::Field::All], false), + what: Table(sql::Table::from(table)), + cond: None, + fetch: None, + archived: None, + auth: Some(Auth::for_root(Role::Owner)), + }; + live_st + .compute(&ctx, &options, &tx, None) + .await + .map_err(|e| format!("Error computing live statement: {:?} {:?}", live_st, e)) + .unwrap(); + + // Now we corrupt the live query entry by leaving the node entry in but removing the table entry + let key = crate::key::table::lq::new(namespace, database, table, live_query_to_corrupt); + tx.lock().await.del(key).await.unwrap(); + tx.lock().await.commit().await.unwrap(); + + // Subject: Perform the action we are testing + trace!("Bootstrapping"); + let new_node_id = Uuid::parse_str("53f7355d-5be1-4a94-9803-5192b59c5244").unwrap(); + // There should not be an error + let second_node = test.db.with_node_id(crate::sql::uuid::Uuid::from(new_node_id)); + match second_node.bootstrap().await { + Ok(_) => { + panic!("Expected an error because of missing live query") + } + Err(Error::Tx(e)) => match e { + _ if e.contains("LvNotFound") => { + // This is what we want... an LvNotFound error, but it gets wrapped into a string so that Tx doesnt carry vecs + } + _ => { + panic!("Expected an LvNotFound error but got: {:?}", e); + } + }, + Err(e) => { + panic!("Missing live query error: {:?}", e) + } + } + + // Verify node live query was deleted + let mut tx = second_node.transaction(true, false).await.unwrap(); + let found = tx + .scan_ndlq(&old_node_id, 100) + .await + .map_err(|e| format!("Error scanning ndlq: {:?}", e)) + .unwrap(); + assert_eq!(0, found.len(), "Found: {:?}", found); + let found = tx + .scan_ndlq(&new_node_id, 100) + .await + .map_err(|e| format!("Error scanning ndlq: {:?}", e)) + .unwrap(); + assert_eq!(0, found.len(), "Found: {:?}", found); + + // Verify table live query does not exist + let found = tx + .scan_tblq(namespace, database, table, 100) + .await + .map_err(|e| format!("Error scanning tblq: {:?}", e)) + .unwrap(); + assert_eq!(0, found.len(), "Found: {:?}", found); + tx.cancel().await.unwrap(); +} diff --git a/lib/src/kvs/tests/helper.rs b/lib/src/kvs/tests/helper.rs index 9c842a95..28d06ea1 100644 --- a/lib/src/kvs/tests/helper.rs +++ b/lib/src/kvs/tests/helper.rs @@ -3,6 +3,7 @@ use crate::err::Error; pub struct TestContext { pub(crate) db: Datastore, + pub(crate) kvs: Kvs, // A string identifier for this context. // It will usually be a uuid or combination of uuid and fixed string identifier. // It is useful for separating test setups when environments are shared. @@ -17,11 +18,31 @@ impl TestContext { node_id: crate::sql::uuid::Uuid, time: Timestamp, ) -> Result<(), Error> { + // TODO we shouldn't test bootstrapping manually let mut tx = self.db.transaction(true, false).await?; let archived = self.db.register_remove_and_archive(&mut tx, &node_id, time).await?; tx.commit().await?; + + let mut errors = vec![]; + let mut values = vec![]; + for res in archived { + match res { + (v, Some(e)) => { + values.push(v); + errors.push(e); + } + (v, None) => { + values.push(v); + } + } + } + if !errors.is_empty() { + // You can customize this panic message as per your needs + panic!("Encountered errors: {:?}", errors); + } + let mut tx = self.db.transaction(true, false).await?; - self.db.remove_archived(&mut tx, archived).await?; + self.db.remove_archived(&mut tx, values).await?; Ok(tx.commit().await?) } @@ -34,9 +55,10 @@ impl TestContext { /// Initialise logging and prepare a useable datastore /// In the future it would be nice to handle multiple datastores pub(crate) async fn init(node_id: Uuid) -> Result { - let db = new_ds(node_id).await; + let (db, kvs) = new_ds(node_id).await; return Ok(TestContext { db, + kvs, context_id: node_id.to_string(), // The context does not always have to be a uuid }); } diff --git a/lib/src/kvs/tests/mod.rs b/lib/src/kvs/tests/mod.rs index e8d3eeae..a2659a75 100644 --- a/lib/src/kvs/tests/mod.rs +++ b/lib/src/kvs/tests/mod.rs @@ -1,19 +1,34 @@ +#[derive(Clone, Debug)] +pub(crate) enum Kvs { + #[allow(dead_code)] + Mem, + #[allow(dead_code)] + Rocksdb, + #[allow(dead_code)] + Speedb, + #[allow(dead_code)] + Tikv, + #[allow(dead_code)] + Fdb, +} + #[cfg(feature = "kv-mem")] mod mem { + use crate::kvs::tests::Kvs; use crate::kvs::Datastore; use crate::kvs::Transaction; use serial_test::serial; - async fn new_ds(id: Uuid) -> Datastore { - Datastore::new("memory").await.unwrap().with_node_id(sql::Uuid::from(id)) + async fn new_ds(id: Uuid) -> (Datastore, Kvs) { + (Datastore::new("memory").await.unwrap().with_node_id(sql::Uuid::from(id)), Kvs::Mem) } async fn new_tx(write: bool, lock: bool) -> Transaction { // Shared node id for one-off transactions // We should delete this, node IDs should be known. let new_tx_uuid = Uuid::parse_str("361893b5-a041-40c0-996c-c3a8828ef06b").unwrap(); - new_ds(new_tx_uuid).await.transaction(write, lock).await.unwrap() + new_ds(new_tx_uuid).await.0.transaction(write, lock).await.unwrap() } include!("cluster_init.rs"); @@ -30,24 +45,28 @@ mod mem { #[cfg(feature = "kv-rocksdb")] mod rocksdb { + use crate::kvs::tests::Kvs; use crate::kvs::Datastore; use crate::kvs::Transaction; use serial_test::serial; use temp_dir::TempDir; - async fn new_ds(node_id: Uuid) -> Datastore { + async fn new_ds(node_id: Uuid) -> (Datastore, Kvs) { let path = TempDir::new().unwrap().path().to_string_lossy().to_string(); - Datastore::new(format!("rocksdb:{path}").as_str()) - .await - .unwrap() - .with_node_id(sql::Uuid::from(node_id)) + ( + Datastore::new(format!("rocksdb:{path}").as_str()) + .await + .unwrap() + .with_node_id(sql::Uuid::from(node_id)), + Kvs::Rocksdb, + ) } async fn new_tx(write: bool, lock: bool) -> Transaction { // Shared node id for one-off transactions // We should delete this, node IDs should be known. let new_tx_uuid = Uuid::parse_str("22358e5e-87bd-4040-8c63-01db896191ab").unwrap(); - new_ds(new_tx_uuid).await.transaction(write, lock).await.unwrap() + new_ds(new_tx_uuid).await.0.transaction(write, lock).await.unwrap() } include!("cluster_init.rs"); @@ -66,24 +85,28 @@ mod rocksdb { #[cfg(feature = "kv-speedb")] mod speedb { + use crate::kvs::tests::Kvs; use crate::kvs::Datastore; use crate::kvs::Transaction; use serial_test::serial; use temp_dir::TempDir; - async fn new_ds(node_id: Uuid) -> Datastore { + async fn new_ds(node_id: Uuid) -> (Datastore, Kvs) { let path = TempDir::new().unwrap().path().to_string_lossy().to_string(); - Datastore::new(format!("speedb:{path}").as_str()) - .await - .unwrap() - .with_node_id(sql::Uuid::from(node_id)) + ( + Datastore::new(format!("speedb:{path}").as_str()) + .await + .unwrap() + .with_node_id(sql::Uuid::from(node_id)), + Kvs::Speedb, + ) } async fn new_tx(write: bool, lock: bool) -> Transaction { // Shared node id for one-off transactions // We should delete this, node IDs should be known. let new_tx_uuid = Uuid::parse_str("5877e580-12ac-49e4-95e1-3c407c4887f3").unwrap(); - new_ds(new_tx_uuid).await.transaction(write, lock).await.unwrap() + new_ds(new_tx_uuid).await.0.transaction(write, lock).await.unwrap() } include!("cluster_init.rs"); @@ -102,11 +125,12 @@ mod speedb { #[cfg(feature = "kv-tikv")] mod tikv { + use crate::kvs::tests::Kvs; use crate::kvs::Datastore; use crate::kvs::Transaction; use serial_test::serial; - async fn new_ds(node_id: Uuid) -> Datastore { + async fn new_ds(node_id: Uuid) -> (Datastore, Kvs) { let ds = Datastore::new("tikv:127.0.0.1:2379") .await .unwrap() @@ -116,14 +140,14 @@ mod tikv { tx.delp(vec![], u32::MAX).await.unwrap(); tx.commit().await.unwrap(); // Return the datastore - ds + (ds, Kvs::Tikv) } async fn new_tx(write: bool, lock: bool) -> Transaction { // Shared node id for one-off transactions // We should delete this, node IDs should be known. let new_tx_uuid = Uuid::parse_str("18717a0f-0ab0-421e-b20c-e69fb03e90a3").unwrap(); - new_ds(new_tx_uuid).await.transaction(write, lock).await.unwrap() + new_ds(new_tx_uuid).await.0.transaction(write, lock).await.unwrap() } include!("cluster_init.rs"); @@ -142,11 +166,12 @@ mod tikv { #[cfg(feature = "kv-fdb")] mod fdb { + use crate::kvs::tests::Kvs; use crate::kvs::Datastore; use crate::kvs::Transaction; use serial_test::serial; - async fn new_ds(node_id: Uuid) -> Datastore { + async fn new_ds(node_id: Uuid) -> (Datastore, Kvs) { let ds = Datastore::new("fdb:/etc/foundationdb/fdb.cluster") .await .unwrap() @@ -156,14 +181,14 @@ mod fdb { tx.delp(vec![], u32::MAX).await.unwrap(); tx.commit().await.unwrap(); // Return the datastore - ds + (ds, Kvs::Fdb) } async fn new_tx(write: bool, lock: bool) -> Transaction { // Shared node id for one-off transactions // We should delete this, node IDs should be known. let new_tx_uuid = Uuid::parse_str("50f5bdf5-8abe-406b-8002-a79c942f510f").unwrap(); - new_ds(new_tx_uuid).await.transaction(write, lock).await.unwrap() + new_ds(new_tx_uuid).await.0.transaction(write, lock).await.unwrap() } include!("cluster_init.rs"); diff --git a/lib/src/kvs/tests/multireader.rs b/lib/src/kvs/tests/multireader.rs index 4385757a..e67cc3b3 100644 --- a/lib/src/kvs/tests/multireader.rs +++ b/lib/src/kvs/tests/multireader.rs @@ -3,7 +3,7 @@ async fn multireader() { // Create a new datastore let node_id = Uuid::parse_str("b7afc077-2123-476f-bee0-43d7504f1e0a").unwrap(); - let ds = new_ds(node_id).await; + let (ds, _) = new_ds(node_id).await; // Insert an initial key let mut tx = ds.transaction(true, false).await.unwrap(); tx.set("test", "some text").await.unwrap(); diff --git a/lib/src/kvs/tests/multiwriter_different_keys.rs b/lib/src/kvs/tests/multiwriter_different_keys.rs index cadbab62..25283c42 100644 --- a/lib/src/kvs/tests/multiwriter_different_keys.rs +++ b/lib/src/kvs/tests/multiwriter_different_keys.rs @@ -3,7 +3,7 @@ async fn multiwriter_different_keys() { // Create a new datastore let node_id = Uuid::parse_str("7f0153b0-79cf-4922-85ef-61e390970514").unwrap(); - let ds = new_ds(node_id).await; + let (ds, _) = new_ds(node_id).await; // Insert an initial key let mut tx = ds.transaction(true, false).await.unwrap(); tx.set("test", "some text").await.unwrap(); diff --git a/lib/src/kvs/tests/multiwriter_same_keys_allow.rs b/lib/src/kvs/tests/multiwriter_same_keys_allow.rs index 93a6bdbf..1d8a4ab7 100644 --- a/lib/src/kvs/tests/multiwriter_same_keys_allow.rs +++ b/lib/src/kvs/tests/multiwriter_same_keys_allow.rs @@ -3,7 +3,7 @@ async fn multiwriter_same_keys_allow() { // Create a new datastore let node_id = Uuid::parse_str("a19cf00d-f95b-42c6-95e5-7b310162d570").unwrap(); - let ds = new_ds(node_id).await; + let (ds, _) = new_ds(node_id).await; // Insert an initial key let mut tx = ds.transaction(true, false).await.unwrap(); tx.set("test", "some text").await.unwrap(); diff --git a/lib/src/kvs/tests/multiwriter_same_keys_conflict.rs b/lib/src/kvs/tests/multiwriter_same_keys_conflict.rs index f3274487..434d741d 100644 --- a/lib/src/kvs/tests/multiwriter_same_keys_conflict.rs +++ b/lib/src/kvs/tests/multiwriter_same_keys_conflict.rs @@ -3,7 +3,7 @@ async fn multiwriter_same_keys_conflict() { // Create a new datastore let node_id = Uuid::parse_str("96ebbb5c-8040-497a-9459-838e4931aca7").unwrap(); - let ds = new_ds(node_id).await; + let (ds, _) = new_ds(node_id).await; // Insert an initial key let mut tx = ds.transaction(true, false).await.unwrap(); tx.set("test", "some text").await.unwrap(); diff --git a/lib/src/kvs/tests/nq.rs b/lib/src/kvs/tests/nq.rs index 944d62f3..a0647d34 100644 --- a/lib/src/kvs/tests/nq.rs +++ b/lib/src/kvs/tests/nq.rs @@ -38,12 +38,21 @@ async fn archive_lv_for_node_archives() { .await .unwrap(); assert_eq!(results.len(), 1); - assert_eq!(results[0].nd, sql::uuid::Uuid(node_id.clone())); - assert_eq!(results[0].ns, namespace); - assert_eq!(results[0].db, database); - assert_eq!(results[0].tb, table); - assert_eq!(results[0].lq, lv_id); tx.commit().await.unwrap(); + let (lq, opt_err) = &results[0]; + match opt_err { + None => { + //expected + } + Some(err) => { + panic!("Unexpected error: {:?}", err); + } + } + assert_eq!(lq.nd, sql::uuid::Uuid(node_id.clone())); + assert_eq!(lq.ns, namespace); + assert_eq!(lq.db, database); + assert_eq!(lq.tb, table); + assert_eq!(lq.lq, lv_id); let mut tx = test.db.transaction(true, false).await.unwrap(); let lv = tx.all_tb_lives(namespace, database, table).await.unwrap(); diff --git a/lib/src/kvs/tests/raw.rs b/lib/src/kvs/tests/raw.rs index e59a5b94..594ce8e2 100644 --- a/lib/src/kvs/tests/raw.rs +++ b/lib/src/kvs/tests/raw.rs @@ -11,7 +11,7 @@ async fn initialise() { async fn exi() { // Create a new datastore let node_id = Uuid::parse_str("463a5008-ee1d-43db-9662-5e752b6ea3f9").unwrap(); - let ds = new_ds(node_id).await; + let (ds, _) = new_ds(node_id).await; // Create a writeable transaction let mut tx = ds.transaction(true, false).await.unwrap(); assert!(tx.put("test", "ok").await.is_ok()); @@ -30,7 +30,7 @@ async fn exi() { async fn get() { // Create a new datastore let node_id = Uuid::parse_str("477e2895-8c98-4606-a827-0add82eb466b").unwrap(); - let ds = new_ds(node_id).await; + let (ds, _) = new_ds(node_id).await; // Create a writeable transaction let mut tx = ds.transaction(true, false).await.unwrap(); assert!(tx.put("test", "ok").await.is_ok()); @@ -49,7 +49,7 @@ async fn get() { async fn set() { // Create a new datastore let node_id = Uuid::parse_str("32b80d8b-dd16-4f6f-a687-1192f6cfc6f1").unwrap(); - let ds = new_ds(node_id).await; + let (ds, _) = new_ds(node_id).await; // Create a writeable transaction let mut tx = ds.transaction(true, false).await.unwrap(); assert!(tx.set("test", "one").await.is_ok()); @@ -75,7 +75,7 @@ async fn set() { async fn put() { // Create a new datastore let node_id = Uuid::parse_str("80149655-db34-451c-8711-6fa662a44b70").unwrap(); - let ds = new_ds(node_id).await; + let (ds, _) = new_ds(node_id).await; // Create a writeable transaction let mut tx = ds.transaction(true, false).await.unwrap(); assert!(tx.put("test", "one").await.is_ok()); @@ -101,7 +101,7 @@ async fn put() { async fn del() { // Create a new datastore let node_id = Uuid::parse_str("e0acb360-9187-401f-8192-f870b09e2c9e").unwrap(); - let ds = new_ds(node_id).await; + let (ds, _) = new_ds(node_id).await; // Create a writeable transaction let mut tx = ds.transaction(true, false).await.unwrap(); assert!(tx.put("test", "one").await.is_ok()); @@ -122,7 +122,7 @@ async fn del() { async fn putc() { // Create a new datastore let node_id = Uuid::parse_str("705bb520-bc2b-4d52-8e64-d1214397e408").unwrap(); - let ds = new_ds(node_id).await; + let (ds, _) = new_ds(node_id).await; // Create a writeable transaction let mut tx = ds.transaction(true, false).await.unwrap(); assert!(tx.put("test", "one").await.is_ok()); @@ -157,7 +157,7 @@ async fn putc() { async fn delc() { // Create a new datastore let node_id = Uuid::parse_str("0985488e-cf2f-417a-bd10-7f4aa9c99c15").unwrap(); - let ds = new_ds(node_id).await; + let (ds, _) = new_ds(node_id).await; // Create a writeable transaction let mut tx = ds.transaction(true, false).await.unwrap(); assert!(tx.put("test", "one").await.is_ok()); @@ -187,7 +187,7 @@ async fn delc() { async fn scan() { // Create a new datastore let node_id = Uuid::parse_str("83b81cc2-9609-4533-bede-c170ab9f7bbe").unwrap(); - let ds = new_ds(node_id).await; + let (ds, _) = new_ds(node_id).await; // Create a writeable transaction let mut tx = ds.transaction(true, false).await.unwrap(); assert!(tx.put("test1", "1").await.is_ok()); diff --git a/lib/src/kvs/tests/snapshot.rs b/lib/src/kvs/tests/snapshot.rs index 0e6560b4..9b4dfa45 100644 --- a/lib/src/kvs/tests/snapshot.rs +++ b/lib/src/kvs/tests/snapshot.rs @@ -3,7 +3,7 @@ async fn snapshot() { // Create a new datastore let node_id = Uuid::parse_str("056804f2-b379-4397-9ceb-af8ebd527beb").unwrap(); - let ds = new_ds(node_id).await; + let (ds, _) = new_ds(node_id).await; // Insert an initial key let mut tx = ds.transaction(true, false).await.unwrap(); tx.set("test", "some text").await.unwrap(); diff --git a/lib/src/kvs/tests/timestamp_to_versionstamp.rs b/lib/src/kvs/tests/timestamp_to_versionstamp.rs index 63a9a645..55820670 100644 --- a/lib/src/kvs/tests/timestamp_to_versionstamp.rs +++ b/lib/src/kvs/tests/timestamp_to_versionstamp.rs @@ -13,7 +13,7 @@ #[serial] async fn timestamp_to_versionstamp() { // Create a new datastore - let ds = new_ds(Uuid::parse_str("A905CA25-56ED-49FB-B759-696AEA87C342").unwrap()).await; + let (ds, _) = new_ds(Uuid::parse_str("A905CA25-56ED-49FB-B759-696AEA87C342").unwrap()).await; // Give the current versionstamp a timestamp of 0 let mut tx = ds.transaction(true, false).await.unwrap(); tx.set_timestamp_for_versionstamp(0, "myns", "mydb", true).await.unwrap(); diff --git a/lib/src/kvs/tx.rs b/lib/src/kvs/tx.rs index 20e6735d..c9fe0f72 100644 --- a/lib/src/kvs/tx.rs +++ b/lib/src/kvs/tx.rs @@ -7,6 +7,7 @@ use crate::dbs::node::ClusterMembership; use crate::dbs::node::Timestamp; use crate::err::Error; use crate::idg::u32::U32; +use crate::key::debug; use crate::kvs::cache::Cache; use crate::kvs::cache::Entry; use crate::kvs::Check; @@ -621,7 +622,11 @@ impl Transaction { K: Into + Debug + Clone, { #[cfg(debug_assertions)] - trace!("Scan {:?} - {:?}", rng.start, rng.end); + trace!( + "Scan {:?} - {:?}", + debug::sprint_key(&rng.start.clone().into()), + debug::sprint_key(&rng.end.clone().into()) + ); match self { #[cfg(feature = "kv-mem")] Transaction {