From 1a85f4967a75de38e78a5c82a3f875a4e9b00f35 Mon Sep 17 00:00:00 2001 From: Przemyslaw Hugh Kaznowski Date: Tue, 12 Sep 2023 17:45:46 +0100 Subject: [PATCH] Add kv unit tests verify write+scan (#2682) --- lib/src/kvs/ds.rs | 58 ++++++++++++++++++++++++++++++++-- lib/src/kvs/tests/mod.rs | 15 +++++++++ lib/src/kvs/tests/ndlq.rs | 33 +++++++++++++++++++ lib/src/kvs/tests/nq.rs | 2 +- lib/src/kvs/tests/tblq.rs | 42 ++++++++++++++++++++++++ lib/src/kvs/tests/tbnt.rs | 1 + lib/src/kvs/tx.rs | 27 +++++++++++++--- lib/src/sql/statements/live.rs | 6 ++-- tests/common/mod.rs | 2 +- tests/ws_integration.rs | 1 - 10 files changed, 173 insertions(+), 14 deletions(-) create mode 100644 lib/src/kvs/tests/ndlq.rs create mode 100644 lib/src/kvs/tests/tblq.rs create mode 100644 lib/src/kvs/tests/tbnt.rs diff --git a/lib/src/kvs/ds.rs b/lib/src/kvs/ds.rs index 5f1b9870..991e4472 100644 --- a/lib/src/kvs/ds.rs +++ b/lib/src/kvs/ds.rs @@ -360,6 +360,17 @@ impl Datastore { // that weren't reversed, as it tries to bootstrap and garbage collect to the best of its // ability. pub async fn bootstrap(&self) -> Result<(), Error> { + trace!("Clearing cluster"); + let mut tx = self.transaction(true, false).await?; + match self.nuke_whole_cluster(&mut tx).await { + Ok(_) => tx.commit().await, + Err(e) => { + error!("Error nuking cluster at bootstrap: {:?}", e); + tx.cancel().await?; + Err(Error::Tx(format!("Error nuking cluster at bootstrap: {:?}", e).to_owned())) + } + }?; + trace!("Bootstrapping {}", self.id); let mut tx = self.transaction(true, false).await?; let now = tx.clock(); @@ -438,7 +449,7 @@ impl Datastore { node_id: &Uuid, timestamp: &Timestamp, ) -> Result<(), Error> { - tx.set_nd(node_id.0).await?; + tx.set_cl(node_id.0).await?; tx.set_hb(timestamp.clone(), node_id.0).await?; Ok(()) } @@ -455,7 +466,7 @@ impl Datastore { for hb in hbs { trace!("Deleting node {}", &hb.nd); // TODO should be delr in case of nested entries - tx.del_nd(hb.nd).await?; + tx.del_cl(hb.nd).await?; nodes.push(crate::sql::uuid::Uuid::from(hb.nd)); } Ok(nodes) @@ -515,6 +526,47 @@ impl Datastore { Ok(()) } + pub async fn nuke_whole_cluster(&self, tx: &mut Transaction) -> Result<(), Error> { + // Scan nodes + let cls = tx.scan_cl(1000).await?; + trace!("Found {} nodes", cls.len()); + for cl in cls { + tx.del_cl( + uuid::Uuid::parse_str(&cl.name).map_err(|e| { + Error::Unimplemented(format!("cluster id was not uuid: {:?}", e)) + })?, + ) + .await?; + } + // Scan heartbeats + let hbs = tx + .scan_hb( + &Timestamp { + value: 0, + }, + 1000, + ) + .await?; + trace!("Found {} heartbeats", hbs.len()); + for hb in hbs { + tx.del_hb(hb.hb, hb.nd).await?; + } + // Scan node live queries + let ndlqs = tx.scan_ndlq(&self.id, 1000).await?; + trace!("Found {} node live queries", ndlqs.len()); + for ndlq in ndlqs { + tx.del_ndlq(&ndlq.nd).await?; + // Scan table live queries + let tblqs = tx.scan_tblq(&ndlq.ns, &ndlq.db, &ndlq.tb, 1000).await?; + trace!("Found {} table live queries", tblqs.len()); + for tblq in tblqs { + tx.del_tblq(&ndlq.ns, &ndlq.db, &ndlq.tb, tblq.lq.0).await?; + } + } + trace!("Successfully completed nuke"); + Ok(()) + } + // Garbage collection task to run when a client disconnects from a surrealdb node // i.e. we know the node, we are not performing a full wipe on the node // and the wipe must be fully performed by this node @@ -590,7 +642,7 @@ impl Datastore { // Delete the heartbeat and everything nested tx.delr_hb(dead.clone(), 1000).await?; for dead_node in dead.clone() { - tx.del_nd(dead_node.nd).await?; + tx.del_cl(dead_node.nd).await?; } Ok::, Error>(dead) } diff --git a/lib/src/kvs/tests/mod.rs b/lib/src/kvs/tests/mod.rs index a2659a75..71e34499 100644 --- a/lib/src/kvs/tests/mod.rs +++ b/lib/src/kvs/tests/mod.rs @@ -40,6 +40,9 @@ mod mem { include!("tb.rs"); include!("multireader.rs"); include!("timestamp_to_versionstamp.rs"); + include!("ndlq.rs"); + include!("tblq.rs"); + include!("tbnt.rs"); } #[cfg(feature = "kv-rocksdb")] @@ -80,6 +83,9 @@ mod rocksdb { include!("multiwriter_different_keys.rs"); include!("multiwriter_same_keys_conflict.rs"); include!("timestamp_to_versionstamp.rs"); + include!("ndlq.rs"); + include!("tblq.rs"); + include!("tbnt.rs"); } #[cfg(feature = "kv-speedb")] @@ -120,6 +126,9 @@ mod speedb { include!("multiwriter_different_keys.rs"); include!("multiwriter_same_keys_conflict.rs"); include!("timestamp_to_versionstamp.rs"); + include!("ndlq.rs"); + include!("tblq.rs"); + include!("tbnt.rs"); } #[cfg(feature = "kv-tikv")] @@ -161,6 +170,9 @@ mod tikv { include!("multiwriter_different_keys.rs"); include!("multiwriter_same_keys_conflict.rs"); include!("timestamp_to_versionstamp.rs"); + include!("ndlq.rs"); + include!("tblq.rs"); + include!("tbnt.rs"); } #[cfg(feature = "kv-fdb")] @@ -202,4 +214,7 @@ mod fdb { include!("multiwriter_different_keys.rs"); include!("multiwriter_same_keys_allow.rs"); include!("timestamp_to_versionstamp.rs"); + include!("ndlq.rs"); + include!("tblq.rs"); + include!("tbnt.rs"); } diff --git a/lib/src/kvs/tests/ndlq.rs b/lib/src/kvs/tests/ndlq.rs new file mode 100644 index 00000000..a3c98bf0 --- /dev/null +++ b/lib/src/kvs/tests/ndlq.rs @@ -0,0 +1,33 @@ +use crate::kvs::LqValue; + +#[tokio::test] +#[serial] +async fn write_scan_ndlq() { + let nd = uuid::Uuid::parse_str("7a17446f-721f-4855-8fc7-81086752ca44").unwrap(); + let test = init(nd).await.unwrap(); + + // Write some data + let mut tx = test.db.transaction(true, false).await.unwrap(); + let ns = "namespace"; + let db = "database"; + let tb = "table"; + let lq = + sql::Uuid::from(uuid::Uuid::parse_str("4c3dca4b-ec08-4e3e-b23a-6b03b5cdc3fc").unwrap()); + tx.putc_ndlq(nd, lq.clone().0, ns, db, tb, None).await.unwrap(); + tx.commit().await.unwrap(); + + // Verify scan + let mut tx = test.db.transaction(true, false).await.unwrap(); + let res = tx.scan_ndlq(&nd, 100).await.unwrap(); + assert_eq!( + res, + vec![LqValue { + nd: sql::Uuid::from(nd), + ns: ns.to_string(), + db: db.to_string(), + tb: tb.to_string(), + lq + }] + ); + tx.commit().await.unwrap(); +} diff --git a/lib/src/kvs/tests/nq.rs b/lib/src/kvs/tests/nq.rs index a0647d34..5b06fb49 100644 --- a/lib/src/kvs/tests/nq.rs +++ b/lib/src/kvs/tests/nq.rs @@ -9,7 +9,7 @@ async fn archive_lv_for_node_archives() { let namespace = "test_namespace"; let database = "test_database"; let table = "test_table"; - tx.set_nd(node_id).await.unwrap(); + tx.set_cl(node_id).await.unwrap(); let lv_id = crate::sql::uuid::Uuid::from(Uuid::from_bytes([ 0x10, 0x11, 0x12, 0x13, 0x14, 0x15, 0x16, 0x17, 0x18, 0x19, 0x1A, 0x1B, 0x1C, 0x1D, 0x1E, diff --git a/lib/src/kvs/tests/tblq.rs b/lib/src/kvs/tests/tblq.rs new file mode 100644 index 00000000..e806e1d9 --- /dev/null +++ b/lib/src/kvs/tests/tblq.rs @@ -0,0 +1,42 @@ +#[tokio::test] +#[serial] +async fn write_scan_tblq() { + let node_id = uuid::Uuid::parse_str("0bee25e0-34d7-448c-abc0-48cdf3db3a53").unwrap(); + let test = init(node_id).await.unwrap(); + + // Write some data + let mut tx = test.db.transaction(true, false).await.unwrap(); + let ns = "namespace"; + let db = "database"; + let tb = "table"; + let live_id = + sql::Uuid::from(uuid::Uuid::parse_str("b5aab54e-d1ef-4a14-b537-9206dcde2209").unwrap()); + let live_stm = LiveStatement { + id: live_id.clone(), + node: sql::Uuid::from(node_id), + expr: Default::default(), + what: Default::default(), + cond: None, + fetch: None, + archived: None, + session: Some(Value::None), + auth: None, + }; + tx.putc_tblq(ns, db, tb, live_stm, None).await.unwrap(); + tx.commit().await.unwrap(); + + // Verify scan + let mut tx = test.db.transaction(true, false).await.unwrap(); + let res = tx.scan_tblq(ns, db, tb, 100).await.unwrap(); + assert_eq!( + res, + vec![LqValue { + nd: sql::Uuid::from(node_id), + ns: ns.to_string(), + db: db.to_string(), + tb: tb.to_string(), + lq: live_id + }] + ); + tx.commit().await.unwrap(); +} diff --git a/lib/src/kvs/tests/tbnt.rs b/lib/src/kvs/tests/tbnt.rs new file mode 100644 index 00000000..6bac121f --- /dev/null +++ b/lib/src/kvs/tests/tbnt.rs @@ -0,0 +1 @@ +// When we add notifications we can add put+scan tests here diff --git a/lib/src/kvs/tx.rs b/lib/src/kvs/tx.rs index 7e09e5a7..c8951634 100644 --- a/lib/src/kvs/tx.rs +++ b/lib/src/kvs/tx.rs @@ -979,7 +979,7 @@ impl Transaction { // Register cluster membership // NOTE: Setting cluster membership sets the heartbeat // Remember to set the heartbeat as well - pub async fn set_nd(&mut self, id: Uuid) -> Result<(), Error> { + pub async fn set_cl(&mut self, id: Uuid) -> Result<(), Error> { let key = crate::key::root::nd::Nd::new(id); match self.get_nd(id).await? { Some(_) => Err(Error::ClAlreadyExists { @@ -1032,15 +1032,21 @@ impl Transaction { Ok(()) } + pub async fn del_hb(&mut self, timestamp: Timestamp, id: Uuid) -> Result<(), Error> { + let key = crate::key::root::hb::Hb::new(timestamp.clone(), id); + self.del(key).await?; + Ok(()) + } + // Delete a cluster registration entry - pub async fn del_nd(&mut self, node: Uuid) -> Result<(), Error> { + pub async fn del_cl(&mut self, node: Uuid) -> Result<(), Error> { let key = crate::key::root::nd::Nd::new(node); self.del(key).await } // Delete the live query notification registry on the table // Return the Table ID - pub async fn del_ndlv(&mut self, nd: &Uuid) -> Result { + pub async fn del_ndlq(&mut self, nd: &Uuid) -> Result { // This isn't implemented because it is covered by del_nd // Will add later for remote node kill Err(Error::NdNotFound { @@ -1159,7 +1165,7 @@ impl Transaction { Ok(()) } - pub async fn del_lv(&mut self, ns: &str, db: &str, tb: &str, lv: Uuid) -> Result<(), Error> { + pub async fn del_tblq(&mut self, ns: &str, db: &str, tb: &str, lv: Uuid) -> Result<(), Error> { trace!("del_lv: ns={:?} db={:?} tb={:?} lv={:?}", ns, db, tb, lv); let key = crate::key::table::lq::new(ns, db, tb, lv); self.cache.del(&key.clone().into()); @@ -1239,6 +1245,19 @@ impl Transaction { self.putc(key_enc, live_stm, expected).await } + pub async fn putc_ndlq( + &mut self, + nd: Uuid, + lq: Uuid, + ns: &str, + db: &str, + tb: &str, + chk: Option<&str>, + ) -> Result<(), Error> { + let key = crate::key::node::lq::new(nd, lq, ns, db); + self.putc(key, tb, chk).await + } + /// Retrieve all ROOT users. pub async fn all_root_users(&mut self) -> Result, Error> { let beg = crate::key::root::us::prefix(); diff --git a/lib/src/sql/statements/live.rs b/lib/src/sql/statements/live.rs index d7590546..361fa1d7 100644 --- a/lib/src/sql/statements/live.rs +++ b/lib/src/sql/statements/live.rs @@ -91,11 +91,9 @@ impl LiveStatement { // Store the current Node ID stm.node = nid.into(); // Insert the node live query - let key = crate::key::node::lq::new(opt.id()?, id, opt.ns(), opt.db()); - run.putc(key, tb.as_str(), None).await?; + run.putc_ndlq(nid, id, opt.ns(), opt.db(), tb.as_str(), None).await?; // Insert the table live query - let key = crate::key::table::lq::new(opt.ns(), opt.db(), &tb, id); - run.putc(key, stm, None).await?; + run.putc_tblq(opt.ns(), opt.db(), &tb, stm, None).await?; } v => { return Err(Error::LiveStatement { diff --git a/tests/common/mod.rs b/tests/common/mod.rs index dd2c1ddb..d48edd68 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -279,7 +279,7 @@ pub async fn ws_recv_all_msgs( _ = time::sleep(timeout) => { debug!("Waited for {:?} and received {} messages", timeout, res.len()); if res.len() != expected { - return Err(format!("Expected {} messages but got {} after {:?}", expected, res.len(), timeout).into()); + return Err(format!("Expected {} messages but got {} after {:?}: {:?}", expected, res.len(), timeout, res).into()); } } msg = ws_recv_msg(socket) => { diff --git a/tests/ws_integration.rs b/tests/ws_integration.rs index b0dae4df..f781e9c9 100644 --- a/tests/ws_integration.rs +++ b/tests/ws_integration.rs @@ -610,7 +610,6 @@ mod ws_integration { "method": "query", "params": [query], }); - common::ws_send_msg(socket, serde_json::to_string(&json).unwrap()).await?; // Wait some time for all messages to arrive, and then search for the notification message