From c6f6ca8062ce9104729ca237282b8aca4a6b5813 Mon Sep 17 00:00:00 2001 From: Przemyslaw Hugh Kaznowski Date: Wed, 4 Oct 2023 14:06:58 +0100 Subject: [PATCH] Improve bootstrapping for finding invalid data from beta versions. (#2700) Co-authored-by: Raphael Darley --- lib/src/dbs/node.rs | 13 +- lib/src/idx/trees/mtree.rs | 8 -- lib/src/kvs/ds.rs | 164 ++++++++++++++------- lib/src/kvs/tests/cluster_init.rs | 89 +++++++++--- lib/src/kvs/tests/hb.rs | 34 +++++ lib/src/kvs/tests/helper.rs | 12 +- lib/src/kvs/tests/mod.rs | 10 ++ lib/src/kvs/tests/nd.rs | 27 ++++ lib/src/kvs/tests/ndlq.rs | 10 +- lib/src/kvs/tests/nq.rs | 2 +- lib/src/kvs/tests/tblq.rs | 4 +- lib/src/kvs/tx.rs | 212 ++++++++++++++++++--------- lib/tests/bootstrap.rs | 228 ++++++++++++++++++++++++++++++ lib/tests/running.md | 13 ++ 14 files changed, 675 insertions(+), 151 deletions(-) create mode 100644 lib/src/kvs/tests/hb.rs create mode 100644 lib/src/kvs/tests/nd.rs create mode 100644 lib/tests/bootstrap.rs create mode 100644 lib/tests/running.md diff --git a/lib/src/dbs/node.rs b/lib/src/dbs/node.rs index 4de41c08..64d94b4e 100644 --- a/lib/src/dbs/node.rs +++ b/lib/src/dbs/node.rs @@ -24,6 +24,15 @@ pub struct ClusterMembership { pub struct Timestamp { pub value: u64, } + +impl From for Timestamp { + fn from(ts: u64) -> Self { + Timestamp { + value: ts, + } + } +} + // This struct is to be used only when storing keys as the macro currently // conflicts when you have Store and Key derive macros. #[derive(Clone, Debug, Eq, PartialEq, Deserialize, Serialize, PartialOrd, Hash, Key)] @@ -44,7 +53,7 @@ impl Add for Timestamp { type Output = Timestamp; fn add(self, rhs: Duration) -> Timestamp { Timestamp { - value: self.value + rhs.as_secs(), + value: self.value + rhs.as_millis() as u64, } } } @@ -52,7 +61,7 @@ impl Add for Timestamp { impl Sub for Timestamp { type Output = Result; fn sub(self, rhs: Duration) -> Self::Output { - let millis = rhs.as_secs(); + let millis = rhs.as_millis() as u64; if self.value <= millis { // Removing the duration from this timestamp will cause it to overflow return Err(TimestampOverflow(format!( diff --git a/lib/src/idx/trees/mtree.rs b/lib/src/idx/trees/mtree.rs index 526df7b1..4baa461d 100644 --- a/lib/src/idx/trees/mtree.rs +++ b/lib/src/idx/trees/mtree.rs @@ -1726,7 +1726,6 @@ mod tests { expected_min_objects: Option, expected_max_objects: Option, ) { - println!("CheckTreeProperties"); let mut node_count = 0; let mut max_depth = 0; let mut min_leaf_depth = None; @@ -1743,13 +1742,6 @@ mod tests { max_depth = depth; } let node = s.get_node(tx, node_id).await.unwrap(); - println!( - "Node id: {} - depth: {} - len: {} - {:?}", - node.id, - depth, - node.n.len(), - node.n - ); match node.n { MTreeNode::Internal(entries) => { let next_depth = depth + 1; diff --git a/lib/src/kvs/ds.rs b/lib/src/kvs/ds.rs index 51fa00f1..ce8cf42b 100644 --- a/lib/src/kvs/ds.rs +++ b/lib/src/kvs/ds.rs @@ -14,7 +14,7 @@ use crate::err::Error; use crate::iam::ResourceKind; use crate::iam::{Action, Auth, Error as IamError, Role}; use crate::key::root::hb::Hb; -use crate::kvs::{LockType, LockType::*, TransactionType, TransactionType::*}; +use crate::kvs::{LockType, LockType::*, TransactionType, TransactionType::*, NO_LIMIT}; use crate::opt::auth::Root; use crate::sql; use crate::sql::statements::DefineUserStatement; @@ -26,6 +26,8 @@ use channel::Receiver; use channel::Sender; use futures::lock::Mutex; use futures::Future; +use std::cmp::Ordering; +use std::collections::{BTreeMap, BTreeSet}; use std::fmt; use std::sync::Arc; use std::time::Duration; @@ -36,6 +38,10 @@ use tracing::trace; #[cfg(target_arch = "wasm32")] use wasmtimer::std::{SystemTime, UNIX_EPOCH}; +// If there are an infinite number of heartbeats, then we want to go batch-by-batch spread over several checks +const HEARTBEAT_BATCH_SIZE: u32 = 1000; +const LQ_CHANNEL_SIZE: usize = 100; + /// Used for cluster logic to move LQ data to LQ cleanup code /// Not a stored struct; Used only in this module #[derive(Debug, Clone, Eq, PartialEq)] @@ -47,6 +53,41 @@ pub struct LqValue { pub lq: Uuid, } +#[derive(Debug)] +pub(crate) enum LqType { + Nd(LqValue), + Tb(LqValue), +} + +impl LqType { + fn get_inner(&self) -> &LqValue { + match self { + LqType::Nd(lq) => lq, + LqType::Tb(lq) => lq, + } + } +} + +impl PartialEq for LqType { + fn eq(&self, other: &Self) -> bool { + self.get_inner().lq == other.get_inner().lq + } +} + +impl Eq for LqType {} + +impl PartialOrd for LqType { + fn partial_cmp(&self, other: &Self) -> Option { + Option::Some(self.get_inner().lq.cmp(&other.get_inner().lq)) + } +} + +impl Ord for LqType { + fn cmp(&self, other: &Self) -> Ordering { + self.get_inner().lq.cmp(&other.get_inner().lq) + } +} + /// The underlying datastore instance which stores the dataset. #[allow(dead_code)] pub struct Datastore { @@ -163,7 +204,7 @@ impl Datastore { v } #[cfg(not(feature = "kv-mem"))] - return Err(Error::Ds("Cannot connect to the `memory` storage engine as it is not enabled in this build of SurrealDB".to_owned())); + return Err(Error::Ds("Cannot connect to the `memory` storage engine as it is not enabled in this build of SurrealDB".to_owned())); } // Parse and initiate an File database s if s.starts_with("file:") => { @@ -177,7 +218,7 @@ impl Datastore { v } #[cfg(not(feature = "kv-rocksdb"))] - return Err(Error::Ds("Cannot connect to the `rocksdb` storage engine as it is not enabled in this build of SurrealDB".to_owned())); + return Err(Error::Ds("Cannot connect to the `rocksdb` storage engine as it is not enabled in this build of SurrealDB".to_owned())); } // Parse and initiate an RocksDB database s if s.starts_with("rocksdb:") => { @@ -191,7 +232,7 @@ impl Datastore { v } #[cfg(not(feature = "kv-rocksdb"))] - return Err(Error::Ds("Cannot connect to the `rocksdb` storage engine as it is not enabled in this build of SurrealDB".to_owned())); + return Err(Error::Ds("Cannot connect to the `rocksdb` storage engine as it is not enabled in this build of SurrealDB".to_owned())); } // Parse and initiate an SpeeDB database s if s.starts_with("speedb:") => { @@ -205,7 +246,7 @@ impl Datastore { v } #[cfg(not(feature = "kv-speedb"))] - return Err(Error::Ds("Cannot connect to the `speedb` storage engine as it is not enabled in this build of SurrealDB".to_owned())); + return Err(Error::Ds("Cannot connect to the `speedb` storage engine as it is not enabled in this build of SurrealDB".to_owned())); } // Parse and initiate an IndxDB database s if s.starts_with("indxdb:") => { @@ -219,7 +260,7 @@ impl Datastore { v } #[cfg(not(feature = "kv-indxdb"))] - return Err(Error::Ds("Cannot connect to the `indxdb` storage engine as it is not enabled in this build of SurrealDB".to_owned())); + return Err(Error::Ds("Cannot connect to the `indxdb` storage engine as it is not enabled in this build of SurrealDB".to_owned())); } // Parse and initiate a TiKV database s if s.starts_with("tikv:") => { @@ -233,7 +274,7 @@ impl Datastore { v } #[cfg(not(feature = "kv-tikv"))] - return Err(Error::Ds("Cannot connect to the `tikv` storage engine as it is not enabled in this build of SurrealDB".to_owned())); + return Err(Error::Ds("Cannot connect to the `tikv` storage engine as it is not enabled in this build of SurrealDB".to_owned())); } // Parse and initiate a FoundationDB database s if s.starts_with("fdb:") => { @@ -247,7 +288,7 @@ impl Datastore { v } #[cfg(not(feature = "kv-fdb"))] - return Err(Error::Ds("Cannot connect to the `foundationdb` storage engine as it is not enabled in this build of SurrealDB".to_owned())); + return Err(Error::Ds("Cannot connect to the `foundationdb` storage engine as it is not enabled in this build of SurrealDB".to_owned())); } // The datastore path is not valid _ => { @@ -283,7 +324,7 @@ impl Datastore { /// Specify whether this datastore should enable live query notifications pub fn with_notifications(mut self) -> Self { - self.notification_channel = Some(channel::bounded(100)); + self.notification_channel = Some(channel::bounded(LQ_CHANNEL_SIZE)); self } @@ -364,14 +405,17 @@ impl Datastore { // that weren't reversed, as it tries to bootstrap and garbage collect to the best of its // ability. pub async fn bootstrap(&self) -> Result<(), Error> { - trace!("Clearing cluster"); + // First we clear unreachable state that could exist by upgrading from + // previous beta versions + trace!("Clearing unreachable state"); let mut tx = self.transaction(Write, Optimistic).await?; - match self.nuke_whole_cluster(&mut tx).await { + match self.clear_unreachable_state(&mut tx).await { Ok(_) => tx.commit().await, Err(e) => { - error!("Error nuking cluster at bootstrap: {:?}", e); + let msg = format!("Error clearing unreachable cluster state at bootstrap: {:?}", e); + error!(msg); tx.cancel().await?; - Err(Error::Tx(format!("Error nuking cluster at bootstrap: {:?}", e).to_owned())) + Err(Error::Tx(msg)) } }?; @@ -443,6 +487,7 @@ impl Datastore { // Determine the timeout for when a cluster node is expired let ts_expired = (timestamp.clone() - std::time::Duration::from_secs(5))?; let dead = self.remove_dead_nodes(tx, &ts_expired).await?; + trace!("Archiving dead nodes: {:?}", dead); self.archive_dead_lqs(tx, &dead, node_id).await } @@ -453,7 +498,7 @@ impl Datastore { node_id: &Uuid, timestamp: &Timestamp, ) -> Result<(), Error> { - tx.set_cl(node_id.0).await?; + tx.set_nd(node_id.0).await?; tx.set_hb(timestamp.clone(), node_id.0).await?; Ok(()) } @@ -470,7 +515,7 @@ impl Datastore { for hb in hbs { trace!("Deleting node {}", &hb.nd); // TODO should be delr in case of nested entries - tx.del_cl(hb.nd).await?; + tx.del_nd(hb.nd).await?; nodes.push(crate::sql::uuid::Uuid::from(hb.nd)); } Ok(nodes) @@ -492,7 +537,7 @@ impl Datastore { for nd in nodes.iter() { trace!("Archiving node {}", &nd); // Scan on node prefix for LQ space - let node_lqs = tx.scan_ndlq(nd, 1000).await?; + let node_lqs = tx.scan_ndlq(nd, NO_LIMIT).await?; trace!("Found {} LQ entries for {:?}", node_lqs.len(), nd); for lq in node_lqs { trace!("Archiving query {:?}", &lq); @@ -530,44 +575,66 @@ impl Datastore { Ok(()) } - pub async fn nuke_whole_cluster(&self, tx: &mut Transaction) -> Result<(), Error> { + pub async fn clear_unreachable_state(&self, tx: &mut Transaction) -> Result<(), Error> { // Scan nodes - let cls = tx.scan_cl(1000).await?; - trace!("Found {} nodes", cls.len()); - for cl in cls { - tx.del_cl( + let cluster = tx.scan_nd(NO_LIMIT).await?; + trace!("Found {} nodes", cluster.len()); + let mut unreachable_nodes = BTreeMap::new(); + for cl in &cluster { + unreachable_nodes.insert(cl.name.clone(), cl.clone()); + } + // Scan all heartbeats + let end_of_time = Timestamp { + // We remove one, because the scan range adds one + value: u64::MAX - 1, + }; + let hbs = tx.scan_hb(&end_of_time, NO_LIMIT).await?; + trace!("Found {} heartbeats", hbs.len()); + for hb in hbs { + unreachable_nodes.remove(&hb.nd.to_string()).unwrap(); + } + // Remove unreachable nodes + for (_, cl) in unreachable_nodes { + trace!("Removing unreachable node {}", cl.name); + tx.del_nd( uuid::Uuid::parse_str(&cl.name).map_err(|e| { Error::Unimplemented(format!("cluster id was not uuid: {:?}", e)) })?, ) .await?; } - // Scan heartbeats - let hbs = tx - .scan_hb( - &Timestamp { - value: 0, - }, - 1000, - ) - .await?; - trace!("Found {} heartbeats", hbs.len()); - for hb in hbs { - tx.del_hb(hb.hb, hb.nd).await?; + // Scan node live queries for every node + let mut nd_lq_set: BTreeSet = BTreeSet::new(); + for cl in &cluster { + let nds = tx.scan_ndlq(&uuid::Uuid::parse_str(&cl.name).map_err(|e| { + Error::Unimplemented(format!("cluster id was not uuid when parsing to aggregate cluster live queries: {:?}", e)) + })?, NO_LIMIT).await?; + nd_lq_set.extend(nds.into_iter().map(LqType::Nd)); } - // Scan node live queries - let ndlqs = tx.scan_ndlq(&self.id, 1000).await?; - trace!("Found {} node live queries", ndlqs.len()); - for ndlq in ndlqs { - tx.del_ndlq(&ndlq.nd).await?; - // Scan table live queries - let tblqs = tx.scan_tblq(&ndlq.ns, &ndlq.db, &ndlq.tb, 1000).await?; - trace!("Found {} table live queries", tblqs.len()); - for tblq in tblqs { - tx.del_tblq(&ndlq.ns, &ndlq.db, &ndlq.tb, tblq.lq.0).await?; + trace!("Found {} node live queries", nd_lq_set.len()); + // Scan tables for all live queries + // let mut tb_lqs: Vec = vec![]; + let mut tb_lq_set: BTreeSet = BTreeSet::new(); + for ndlq in &nd_lq_set { + let lq = ndlq.get_inner(); + let tbs = tx.scan_tblq(&lq.ns, &lq.db, &lq.tb, NO_LIMIT).await?; + tb_lq_set.extend(tbs.into_iter().map(LqType::Tb)); + } + trace!("Found {} table live queries", tb_lq_set.len()); + // Find and delete missing + for missing in nd_lq_set.symmetric_difference(&tb_lq_set) { + match missing { + LqType::Nd(ndlq) => { + warn!("Deleting ndlq {:?}", &ndlq); + tx.del_ndlq(ndlq.nd.0, ndlq.lq.0, &ndlq.ns, &ndlq.db).await?; + } + LqType::Tb(tblq) => { + warn!("Deleting tblq {:?}", &tblq); + tx.del_tblq(&tblq.ns, &tblq.db, &tblq.tb, tblq.lq.0).await?; + } } } - trace!("Successfully completed nuke"); + trace!("Successfully cleared cluster of unreachable state"); Ok(()) } @@ -582,7 +649,7 @@ impl Datastore { // Find all the LQs we own, so that we can get the ns/ds from provided uuids // We may improve this in future by tracking in web layer - let lqs = tx.scan_ndlq(&self.id, 1000).await?; + let lqs = tx.scan_ndlq(&self.id, NO_LIMIT).await?; let mut hits = vec![]; for lq_value in lqs { if live_queries.contains(&lq_value.lq) { @@ -641,12 +708,11 @@ impl Datastore { tx: &mut Transaction, ts: &Timestamp, ) -> Result, Error> { - let limit = 1000; - let dead = tx.scan_hb(ts, limit).await?; + let dead = tx.scan_hb(ts, HEARTBEAT_BATCH_SIZE).await?; // Delete the heartbeat and everything nested - tx.delr_hb(dead.clone(), 1000).await?; + tx.delr_hb(dead.clone(), NO_LIMIT).await?; for dead_node in dead.clone() { - tx.del_cl(dead_node.nd).await?; + tx.del_nd(dead_node.nd).await?; } Ok::, Error>(dead) } diff --git a/lib/src/kvs/tests/cluster_init.rs b/lib/src/kvs/tests/cluster_init.rs index 34057a1f..da108720 100644 --- a/lib/src/kvs/tests/cluster_init.rs +++ b/lib/src/kvs/tests/cluster_init.rs @@ -1,11 +1,12 @@ use futures::lock::Mutex; +use std::collections::BTreeSet; use std::sync::Arc; use crate::ctx::context; use crate::dbs::{Options, Session}; use crate::iam::{Auth, Role}; -use crate::kvs::{LockType::*, TransactionType::*}; +use crate::kvs::{LockType::*, LqType, TransactionType::*}; use crate::sql; use crate::sql::statements::LiveStatement; use crate::sql::Value::Table; @@ -22,13 +23,13 @@ async fn expired_nodes_are_garbage_collected() { // Set up the first node at an early timestamp let old_time = Timestamp { - value: 123, + value: 123000, }; test.bootstrap_at_time(sql::Uuid::from(old_node), old_time.clone()).await.unwrap(); // Set up second node at a later timestamp let new_time = Timestamp { - value: 567, + value: 567000, }; test.bootstrap_at_time(sql::Uuid::from(new_node), new_time.clone()).await.unwrap(); @@ -41,7 +42,7 @@ async fn expired_nodes_are_garbage_collected() { } // And scan the nodes to verify its just the latest also - let scanned = tx.scan_cl(100).await.unwrap(); + let scanned = tx.scan_nd(100).await.unwrap(); assert_eq!(scanned.len(), 1); for cl in scanned.iter() { assert_eq!(&cl.name, &new_node.to_string()); @@ -58,7 +59,7 @@ async fn expired_nodes_get_live_queries_archived() { // Set up the first node at an early timestamp let old_time = Timestamp { - value: 123, + value: 123000, }; test.bootstrap_at_time(sql::Uuid::from(old_node), old_time.clone()).await.unwrap(); @@ -100,7 +101,7 @@ async fn expired_nodes_get_live_queries_archived() { // Set up second node at a later timestamp let new_node = Uuid::parse_str("04da7d4c-0086-4358-8318-49f0bb168fa7").unwrap(); let new_time = Timestamp { - value: 456, + value: 456000, }; // TODO These timestsamps are incorrect and should really be derived; Also check timestamp errors test.bootstrap_at_time(sql::Uuid::from(new_node), new_time.clone()).await.unwrap(); @@ -122,7 +123,7 @@ async fn single_live_queries_are_garbage_collected() { let node_id = Uuid::parse_str("b1a08614-a826-4581-938d-bea17f00e253").unwrap(); let test = init(node_id).await.unwrap(); let time = Timestamp { - value: 123, + value: 123000, }; let namespace = "test_namespace"; let database = "test_db"; @@ -203,7 +204,7 @@ async fn bootstrap_does_not_error_on_missing_live_queries() { let old_node_id = Uuid::parse_str("5f644f02-7c1a-4f8b-babd-bd9e92c1836a").unwrap(); let test = init(old_node_id).await.unwrap(); let time = Timestamp { - value: 123, + value: 123000, }; let namespace = "test_namespace_0A8BD08BE4F2457BB9F145557EF19605"; let database_owned = format!("test_db_{:?}", test.kvs); @@ -255,18 +256,10 @@ async fn bootstrap_does_not_error_on_missing_live_queries() { let second_node = test.db.with_node_id(crate::sql::uuid::Uuid::from(new_node_id)); match second_node.bootstrap().await { Ok(_) => { - panic!("Expected an error because of missing live query") + // The behaviour has now changed to remove all broken entries without raising errors } - Err(Error::Tx(e)) => match e { - _ if e.contains("LvNotFound") => { - // This is what we want... an LvNotFound error, but it gets wrapped into a string so that Tx doesnt carry vecs - } - _ => { - panic!("Expected an LvNotFound error but got: {:?}", e); - } - }, Err(e) => { - panic!("Missing live query error: {:?}", e) + panic!("Bootstrapping should not generate errors: {:?}", e) } } @@ -294,3 +287,63 @@ async fn bootstrap_does_not_error_on_missing_live_queries() { assert_eq!(0, found.len(), "Found: {:?}", found); tx.cancel().await.unwrap(); } + +#[test(tokio::test)] +async fn test_asymmetric_difference() { + let nd1 = Uuid::parse_str("7da0b3bb-1811-4c0e-8d8d-5fc08b8200a5").unwrap(); + let nd2 = Uuid::parse_str("8fd394df-7f96-4395-9c9a-3abf1e2386ea").unwrap(); + let nd3 = Uuid::parse_str("aa53cb74-1d6b-44df-b063-c495e240ae9e").unwrap(); + let ns1 = "namespace_one"; + let ns2 = "namespace_two"; + let ns3 = "namespace_three"; + let db1 = "database_one"; + let db2 = "database_two"; + let db3 = "database_three"; + let tb1 = "table_one"; + let tb2 = "table_two"; + let tb3 = "table_three"; + let lq1 = Uuid::parse_str("95f0e060-d301-4dfc-9d35-f150e802873b").unwrap(); + let lq2 = Uuid::parse_str("acf60c04-5819-4a23-9874-aeb0ae1be425").unwrap(); + let lq3 = Uuid::parse_str("5d591ae7-db79-4e4f-aa02-a83a4a25ce3f").unwrap(); + let left_set = BTreeSet::from_iter(vec![ + LqType::Nd(LqValue { + nd: nd1.into(), + ns: ns1.to_string(), + db: db1.to_string(), + tb: tb1.to_string(), + lq: lq1.into(), + }), + LqType::Nd(LqValue { + nd: nd2.into(), + ns: ns2.to_string(), + db: db2.to_string(), + tb: tb2.to_string(), + lq: lq2.into(), + }), + ]); + + let right_set = BTreeSet::from_iter(vec![ + LqType::Tb(LqValue { + nd: nd2.into(), + ns: ns2.to_string(), + db: db2.to_string(), + tb: tb2.to_string(), + lq: lq2.into(), + }), + LqType::Tb(LqValue { + nd: nd3.into(), + ns: ns3.to_string(), + db: db3.to_string(), + tb: tb3.to_string(), + lq: lq3.into(), + }), + ]); + + let diff = left_set.symmetric_difference(&right_set); + // TODO but also poorman's count + let mut count = 0; + for _ in diff { + count += 1; + } + assert_ne!(count, 0); +} diff --git a/lib/src/kvs/tests/hb.rs b/lib/src/kvs/tests/hb.rs new file mode 100644 index 00000000..382d3d84 --- /dev/null +++ b/lib/src/kvs/tests/hb.rs @@ -0,0 +1,34 @@ +#[tokio::test] +#[serial] +async fn write_scan_hb() { + let nd = uuid::Uuid::parse_str("e80540d4-2869-4bf3-ae27-790a538c53f3").unwrap(); + let test = init(nd).await.unwrap(); + + // Add 2 nodes + let mut tx = test.db.transaction(Write, Optimistic).await.unwrap(); + let t1 = tx.clock(); + let t2 = Timestamp { + value: t1.value + 1, + }; + let t3 = Timestamp { + value: t2.value + 1, + }; + tx.set_hb(t1, Uuid::parse_str("6d1210a0-9224-4813-8090-ded787d51894").unwrap()).await.unwrap(); + tx.set_hb(t2, Uuid::parse_str("b80ff454-c3e7-46a9-a0b0-7b40e9a62626").unwrap()).await.unwrap(); + tx.commit().await.unwrap(); + + // Scan limit 1000 + let mut tx = test.db.transaction(Write, Optimistic).await.unwrap(); + let vals_lim = tx.scan_hb(&t3, 1000).await.unwrap(); + tx.cancel().await.unwrap(); + + // Scan limit 0 + let mut tx = test.db.transaction(Write, Optimistic).await.unwrap(); + let vals_no_lim = tx.scan_hb(&t3, NO_LIMIT).await.unwrap(); + tx.cancel().await.unwrap(); + + // Assert equal + assert_eq!(vals_lim, vals_no_lim); + assert_eq!(vals_lim.len(), 2); + assert_eq!(vals_no_lim.len(), 2); +} diff --git a/lib/src/kvs/tests/helper.rs b/lib/src/kvs/tests/helper.rs index 50a48309..fd4031db 100644 --- a/lib/src/kvs/tests/helper.rs +++ b/lib/src/kvs/tests/helper.rs @@ -20,8 +20,16 @@ impl TestContext { ) -> Result<(), Error> { // TODO we shouldn't test bootstrapping manually let mut tx = self.db.transaction(Write, Optimistic).await?; - let archived = self.db.register_remove_and_archive(&mut tx, &node_id, time).await?; - tx.commit().await?; + let archived = match self.db.register_remove_and_archive(&mut tx, &node_id, time).await { + Ok(v) => { + tx.commit().await?; + v + } + Err(e) => { + tx.cancel().await?; + return Err(e); + } + }; let mut errors = vec![]; let mut values = vec![]; diff --git a/lib/src/kvs/tests/mod.rs b/lib/src/kvs/tests/mod.rs index f48d5acf..8b097a51 100644 --- a/lib/src/kvs/tests/mod.rs +++ b/lib/src/kvs/tests/mod.rs @@ -34,6 +34,7 @@ mod mem { } include!("cluster_init.rs"); + include!("hb.rs"); include!("helper.rs"); include!("lq.rs"); include!("nq.rs"); @@ -42,6 +43,7 @@ mod mem { include!("tb.rs"); include!("multireader.rs"); include!("timestamp_to_versionstamp.rs"); + include!("nd.rs"); include!("ndlq.rs"); include!("tblq.rs"); include!("tbnt.rs"); @@ -77,6 +79,7 @@ mod rocksdb { } include!("cluster_init.rs"); + include!("hb.rs"); include!("helper.rs"); include!("lq.rs"); include!("nq.rs"); @@ -87,6 +90,7 @@ mod rocksdb { include!("multiwriter_different_keys.rs"); include!("multiwriter_same_keys_conflict.rs"); include!("timestamp_to_versionstamp.rs"); + include!("nd.rs"); include!("ndlq.rs"); include!("tblq.rs"); include!("tbnt.rs"); @@ -120,6 +124,7 @@ mod speedb { } include!("cluster_init.rs"); + include!("hb.rs"); include!("helper.rs"); include!("lq.rs"); include!("nq.rs"); @@ -130,6 +135,7 @@ mod speedb { include!("multiwriter_different_keys.rs"); include!("multiwriter_same_keys_conflict.rs"); include!("timestamp_to_versionstamp.rs"); + include!("nd.rs"); include!("ndlq.rs"); include!("tblq.rs"); include!("tbnt.rs"); @@ -164,6 +170,7 @@ mod tikv { } include!("cluster_init.rs"); + include!("hb.rs"); include!("helper.rs"); include!("lq.rs"); include!("nq.rs"); @@ -174,6 +181,7 @@ mod tikv { include!("multiwriter_different_keys.rs"); include!("multiwriter_same_keys_conflict.rs"); include!("timestamp_to_versionstamp.rs"); + include!("nd.rs"); include!("ndlq.rs"); include!("tblq.rs"); include!("tbnt.rs"); @@ -208,6 +216,7 @@ mod fdb { } include!("cluster_init.rs"); + include!("hb.rs"); include!("helper.rs"); include!("lq.rs"); include!("nq.rs"); @@ -218,6 +227,7 @@ mod fdb { include!("multiwriter_different_keys.rs"); include!("multiwriter_same_keys_allow.rs"); include!("timestamp_to_versionstamp.rs"); + include!("nd.rs"); include!("ndlq.rs"); include!("tblq.rs"); include!("tbnt.rs"); diff --git a/lib/src/kvs/tests/nd.rs b/lib/src/kvs/tests/nd.rs new file mode 100644 index 00000000..dea528e9 --- /dev/null +++ b/lib/src/kvs/tests/nd.rs @@ -0,0 +1,27 @@ +#[tokio::test] +#[serial] +async fn write_scan_nd() { + let nd = uuid::Uuid::parse_str("6a6a4e59-3e86-431d-884f-8f433781e4e9").unwrap(); + let test = init(nd).await.unwrap(); + + // Add 2 nodes + let mut tx = test.db.transaction(Write, Optimistic).await.unwrap(); + tx.set_nd(Uuid::parse_str("83d9b3c0-f3c4-45be-9ef9-9d48502fecb1").unwrap()).await.unwrap(); + tx.set_nd(Uuid::parse_str("cbefc4fe-8ba0-4898-ab69-782e3ebc06f9").unwrap()).await.unwrap(); + tx.commit().await.unwrap(); + + // Scan limit 1000 + let mut tx = test.db.transaction(Write, Optimistic).await.unwrap(); + let vals_lim = tx.scan_nd(1000).await.unwrap(); + tx.cancel().await.unwrap(); + + // Scan limit 0 + let mut tx = test.db.transaction(Write, Optimistic).await.unwrap(); + let vals_no_lim = tx.scan_nd(NO_LIMIT).await.unwrap(); + tx.cancel().await.unwrap(); + + // Assert equal + assert_eq!(vals_lim, vals_no_lim); + assert_eq!(vals_lim.len(), 2); + assert_eq!(vals_no_lim.len(), 2); +} diff --git a/lib/src/kvs/tests/ndlq.rs b/lib/src/kvs/tests/ndlq.rs index 7c1657a2..f72af890 100644 --- a/lib/src/kvs/tests/ndlq.rs +++ b/lib/src/kvs/tests/ndlq.rs @@ -1,4 +1,4 @@ -use crate::kvs::LqValue; +use crate::kvs::{LqValue, NO_LIMIT}; #[tokio::test] #[serial] @@ -18,9 +18,11 @@ async fn write_scan_ndlq() { // Verify scan let mut tx = test.db.transaction(Write, Optimistic).await.unwrap(); - let res = tx.scan_ndlq(&nd, 100).await.unwrap(); + let res_lim = tx.scan_ndlq(&nd, 100).await.unwrap(); + let res_no_lim = tx.scan_ndlq(&nd, NO_LIMIT).await.unwrap(); + tx.commit().await.unwrap(); assert_eq!( - res, + res_lim, vec![LqValue { nd: sql::Uuid::from(nd), ns: ns.to_string(), @@ -29,5 +31,5 @@ async fn write_scan_ndlq() { lq }] ); - tx.commit().await.unwrap(); + assert_eq!(res_lim, res_no_lim); } diff --git a/lib/src/kvs/tests/nq.rs b/lib/src/kvs/tests/nq.rs index 08ffbaf4..cb8d329e 100644 --- a/lib/src/kvs/tests/nq.rs +++ b/lib/src/kvs/tests/nq.rs @@ -9,7 +9,7 @@ async fn archive_lv_for_node_archives() { let namespace = "test_namespace"; let database = "test_database"; let table = "test_table"; - tx.set_cl(node_id).await.unwrap(); + tx.set_nd(node_id).await.unwrap(); let lv_id = crate::sql::uuid::Uuid::from(Uuid::from_bytes([ 0x10, 0x11, 0x12, 0x13, 0x14, 0x15, 0x16, 0x17, 0x18, 0x19, 0x1A, 0x1B, 0x1C, 0x1D, 0x1E, diff --git a/lib/src/kvs/tests/tblq.rs b/lib/src/kvs/tests/tblq.rs index fb1aee40..60dd27b5 100644 --- a/lib/src/kvs/tests/tblq.rs +++ b/lib/src/kvs/tests/tblq.rs @@ -28,6 +28,8 @@ async fn write_scan_tblq() { // Verify scan let mut tx = test.db.transaction(Write, Optimistic).await.unwrap(); let res = tx.scan_tblq(ns, db, tb, 100).await.unwrap(); + let no_limit = tx.scan_tblq(ns, db, tb, NO_LIMIT).await.unwrap(); + tx.commit().await.unwrap(); assert_eq!( res, vec![LqValue { @@ -38,5 +40,5 @@ async fn write_scan_tblq() { lq: live_id }] ); - tx.commit().await.unwrap(); + assert_eq!(res, no_limit); } diff --git a/lib/src/kvs/tx.rs b/lib/src/kvs/tx.rs index 423f00b0..87533810 100644 --- a/lib/src/kvs/tx.rs +++ b/lib/src/kvs/tx.rs @@ -51,6 +51,8 @@ use uuid::Uuid; #[cfg(target_arch = "wasm32")] use wasmtimer::std::{SystemTime, UNIX_EPOCH}; +pub(crate) const NO_LIMIT: u32 = 0; + /// A set of undoable updates and requests against a dataset. #[allow(dead_code)] pub struct Transaction { @@ -1011,7 +1013,7 @@ impl Transaction { // Register cluster membership // NOTE: Setting cluster membership sets the heartbeat // Remember to set the heartbeat as well - pub async fn set_cl(&mut self, id: Uuid) -> Result<(), Error> { + pub async fn set_nd(&mut self, id: Uuid) -> Result<(), Error> { let key = crate::key::root::nd::Nd::new(id); match self.get_nd(id).await? { Some(_) => Err(Error::ClAlreadyExists { @@ -1038,7 +1040,8 @@ impl Transaction { } } - pub(crate) fn clock(&self) -> Timestamp { + // Public for tests, but we might not want to expose this + pub fn clock(&self) -> Timestamp { // Use a timestamp oracle if available let now: u128 = match SystemTime::now().duration_since(UNIX_EPOCH) { Ok(duration) => duration.as_millis(), @@ -1053,9 +1056,10 @@ impl Transaction { pub async fn set_hb(&mut self, timestamp: Timestamp, id: Uuid) -> Result<(), Error> { let key = crate::key::root::hb::Hb::new(timestamp.clone(), id); // We do not need to do a read, we always want to overwrite + let key_enc = key.encode()?; self.put( key.key_category(), - key, + key_enc, ClusterMembership { name: id.to_string(), heartbeat: timestamp, @@ -1072,19 +1076,17 @@ impl Transaction { } // Delete a cluster registration entry - pub async fn del_cl(&mut self, node: Uuid) -> Result<(), Error> { + pub async fn del_nd(&mut self, node: Uuid) -> Result<(), Error> { let key = crate::key::root::nd::Nd::new(node); - self.del(key).await + let key_enc = key.encode()?; + self.del(key_enc).await } // Delete the live query notification registry on the table - // Return the Table ID - pub async fn del_ndlq(&mut self, nd: &Uuid) -> Result { - // This isn't implemented because it is covered by del_nd - // Will add later for remote node kill - Err(Error::NdNotFound { - value: format!("Missing cluster node {:?}", nd), - }) + pub async fn del_ndlq(&mut self, nd: Uuid, lq: Uuid, ns: &str, db: &str) -> Result<(), Error> { + let key = crate::key::node::lq::Lq::new(nd, lq, ns, db); + let key_enc = key.encode()?; + self.del(key_enc).await } // Scans up until the heartbeat timestamp and returns the discovered nodes @@ -1101,21 +1103,23 @@ impl Transaction { let mut num = limit; let mut out: Vec = vec![]; // Start processing - while num > 0 { + while limit == NO_LIMIT || num > 0 { + let batch_size = match num { + 0 => 1000, + _ => std::cmp::min(1000, num), + }; // Get records batch let res = match nxt { None => { let min = beg.clone(); let max = end.clone(); - let num = std::cmp::min(1000, num); - self.scan(min..max, num).await? + self.scan(min..max, batch_size).await? } Some(ref mut beg) => { beg.push(0x00); let min = beg.clone(); let max = end.clone(); - let num = std::cmp::min(1000, num); - self.scan(min..max, num).await? + self.scan(min..max, batch_size).await? } }; // Get total results @@ -1132,14 +1136,18 @@ impl Transaction { } out.push(crate::key::root::hb::Hb::decode(k.as_slice())?); // Count - num -= 1; + if limit > 0 { + num -= 1; + } } } trace!("scan_hb: {:?}", out); Ok(out) } - pub async fn scan_cl(&mut self, limit: u32) -> Result, Error> { + /// scan_nd will scan all the cluster membership registers + /// setting limit to 0 will result in scanning all entries + pub async fn scan_nd(&mut self, limit: u32) -> Result, Error> { let beg = crate::key::root::nd::Nd::prefix(); let end = crate::key::root::nd::Nd::suffix(); trace!("Scan start: {} ({:?})", String::from_utf8_lossy(&beg).to_string(), &beg); @@ -1148,21 +1156,23 @@ impl Transaction { let mut num = limit; let mut out: Vec = vec![]; // Start processing - while num > 0 { + while (limit == NO_LIMIT) || (num > 0) { + let batch_size = match num { + 0 => 1000, + _ => std::cmp::min(1000, num), + }; // Get records batch let res = match nxt { None => { let min = beg.clone(); let max = end.clone(); - let num = std::cmp::min(1000, num); - self.scan(min..max, num).await? + self.scan(min..max, batch_size).await? } Some(ref mut beg) => { beg.push(0x00); let min = beg.clone(); let max = end.clone(); - let num = std::cmp::min(1000, num); - self.scan(min..max, num).await? + self.scan(min..max, batch_size).await? } }; // Get total results @@ -1179,10 +1189,12 @@ impl Transaction { } out.push((&v).into()); // Count - num -= 1; + if limit > 0 { + num -= 1; + } } } - trace!("scan_hb: {:?}", out); + trace!("scan_nd: {:?}", out); Ok(out) } @@ -1206,30 +1218,64 @@ impl Transaction { } pub async fn scan_ndlq<'a>(&mut self, node: &Uuid, limit: u32) -> Result, Error> { - let pref = crate::key::node::lq::prefix_nd(node); - let suff = crate::key::node::lq::suffix_nd(node); + let beg = crate::key::node::lq::prefix_nd(node); + let end = crate::key::node::lq::suffix_nd(node); trace!( "Scanning range from pref={}, suff={}", - crate::key::debug::sprint_key(&pref), - crate::key::debug::sprint_key(&suff), + crate::key::debug::sprint_key(&beg), + crate::key::debug::sprint_key(&end), ); - let rng = pref..suff; - let scanned = self.scan(rng, limit).await?; - let mut res: Vec = vec![]; - for (key, value) in scanned { - trace!("scan_lq: key={:?} value={:?}", &key, &value); - let lq = crate::key::node::lq::Lq::decode(key.as_slice())?; - let tb: String = String::from_utf8(value).unwrap(); - trace!("scan_lq Found tb: {:?}", tb); - res.push(LqValue { - nd: lq.nd.into(), - ns: lq.ns.to_string(), - db: lq.db.to_string(), - tb, - lq: lq.lq.into(), - }); + let mut nxt: Option = None; + let mut num = limit; + let mut out: Vec = vec![]; + while limit == NO_LIMIT || num > 0 { + let batch_size = match num { + 0 => 1000, + _ => std::cmp::min(1000, num), + }; + // Get records batch + let res = match nxt { + None => { + let min = beg.clone(); + let max = end.clone(); + self.scan(min..max, batch_size).await? + } + Some(ref mut beg) => { + beg.push(0x00); + let min = beg.clone(); + let max = end.clone(); + self.scan(min..max, batch_size).await? + } + }; + // Get total results + let n = res.len(); + // Exit when settled + if n == 0 { + break; + } + // Loop over results + for (i, (key, value)) in res.into_iter().enumerate() { + // Ready the next + if n == i + 1 { + nxt = Some(key.clone()); + } + let lq = crate::key::node::lq::Lq::decode(key.as_slice())?; + let tb: String = String::from_utf8(value).unwrap(); + trace!("scan_lq Found tb: {:?}", tb); + out.push(LqValue { + nd: lq.nd.into(), + ns: lq.ns.to_string(), + db: lq.db.to_string(), + tb, + lq: lq.lq.into(), + }); + // Count + if limit != NO_LIMIT { + num -= 1; + } + } } - Ok(res) + Ok(out) } pub async fn scan_tblq<'a>( @@ -1239,29 +1285,63 @@ impl Transaction { tb: &str, limit: u32, ) -> Result, Error> { - let pref = crate::key::table::lq::prefix(ns, db, tb); - let suff = crate::key::table::lq::suffix(ns, db, tb); + let beg = crate::key::table::lq::prefix(ns, db, tb); + let end = crate::key::table::lq::suffix(ns, db, tb); trace!( "Scanning range from pref={}, suff={}", - crate::key::debug::sprint_key(&pref), - crate::key::debug::sprint_key(&suff), + crate::key::debug::sprint_key(&beg), + crate::key::debug::sprint_key(&end), ); - let rng = pref..suff; - let scanned = self.scan(rng, limit).await?; - let mut res: Vec = vec![]; - for (key, value) in scanned { - trace!("scan_lv: key={:?} value={:?}", &key, &value); - let val: LiveStatement = value.into(); - let lv = crate::key::table::lq::Lq::decode(key.as_slice())?; - res.push(LqValue { - nd: val.node, - ns: lv.ns.to_string(), - db: lv.db.to_string(), - tb: lv.tb.to_string(), - lq: val.id.clone(), - }); + let mut nxt: Option = None; + let mut num = limit; + let mut out: Vec = vec![]; + while limit == NO_LIMIT || num > 0 { + let batch_size = match num { + 0 => 1000, + _ => std::cmp::min(1000, num), + }; + // Get records batch + let res = match nxt { + None => { + let min = beg.clone(); + let max = end.clone(); + self.scan(min..max, batch_size).await? + } + Some(ref mut beg) => { + beg.push(0x00); + let min = beg.clone(); + let max = end.clone(); + self.scan(min..max, batch_size).await? + } + }; + // Get total results + let n = res.len(); + // Exit when settled + if n == 0 { + break; + } + // Loop over results + for (i, (key, value)) in res.into_iter().enumerate() { + // Ready the next + if n == i + 1 { + nxt = Some(key.clone()); + } + let lv = crate::key::table::lq::Lq::decode(key.as_slice())?; + let val: LiveStatement = value.into(); + out.push(LqValue { + nd: val.node, + ns: lv.ns.to_string(), + db: lv.db.to_string(), + tb: lv.tb.to_string(), + lq: val.id.clone(), + }); + // Count + if limit != NO_LIMIT { + num -= 1; + } + } } - Ok(res) + Ok(out) } pub async fn putc_tblq( @@ -1274,7 +1354,7 @@ impl Transaction { ) -> Result<(), Error> { let key = crate::key::table::lq::new(ns, db, tb, live_stm.id.0); let key_enc = crate::key::table::lq::Lq::encode(&key)?; - trace!("putc_lv ({:?}): key={:?}", &live_stm.id, crate::key::debug::sprint_key(&key_enc)); + trace!("putc_tblq ({:?}): key={:?}", &live_stm.id, crate::key::debug::sprint_key(&key_enc)); self.putc(key_enc, live_stm, expected).await } diff --git a/lib/tests/bootstrap.rs b/lib/tests/bootstrap.rs new file mode 100644 index 00000000..0ec86ddd --- /dev/null +++ b/lib/tests/bootstrap.rs @@ -0,0 +1,228 @@ +/// The tests in this file are checking that bootstrapping of the database works correctly +/// They are testing edge cases that may accidentally occur with bugs - we wan't to make sure +/// the system can recover in light of these issues. +/// +/// We may want to move these tests to another suite, as they aren't testing the statements like +/// the other tests are. +mod helpers; +mod parse; + +use helpers::new_ds; +use serial_test::serial; +use surrealdb::err::Error; +use surrealdb::kvs::LockType::Optimistic; +use surrealdb::kvs::Transaction; +use surrealdb::kvs::TransactionType::Write; +use surrealdb::sql::statements::LiveStatement; +use surrealdb::sql::Uuid; +use tokio::time::sleep; + +#[tokio::test] +#[serial] +async fn bootstrap_removes_unreachable_nodes() -> Result<(), Error> { + // Create the datastore + let dbs = new_ds().await.unwrap(); + + let mut tx = dbs.transaction(Write, Optimistic).await.unwrap(); + // Introduce missing nodes (without heartbeats) + let bad_node = uuid::Uuid::parse_str("9d8e16e4-9f6a-4704-8cf1-7cd55b937c5b").unwrap(); + tx.set_nd(bad_node).await.unwrap(); + + // Introduce a valid chain of data to confirm it is not removed from a cleanup + a_valid_notification( + &mut tx, + ValidNotificationState { + timestamp: None, + node_id: None, + live_query_id: None, + notification_id: None, + namespace: "testns".to_string(), + database: "testdb".to_string(), + table: "testtb".to_string(), + }, + ) + .await + .unwrap(); + + tx.commit().await.unwrap(); + + // Bootstrap + dbs.bootstrap().await.unwrap(); + + // Verify the incorrect node is deleted, but self and valid still exist + let mut tx = dbs.transaction(Write, Optimistic).await.unwrap(); + let res = tx.scan_nd(1000).await.unwrap(); + tx.commit().await.unwrap(); + for node in &res { + assert_ne!(node.name, bad_node.to_string()); + } + // {Node generated by bootstrap} + {valid node who's uuid we don't know} + assert_eq!(res.len(), 2); + Ok(()) +} + +#[tokio::test] +#[serial] +async fn bootstrap_removes_unreachable_node_live_queries() -> Result<(), Error> { + // Create the datastore + let dbs = new_ds().await.unwrap(); + + // Introduce an invalid node live query + let mut tx = dbs.transaction(Write, Optimistic).await.unwrap(); + let valid_data = a_valid_notification( + &mut tx, + ValidNotificationState { + timestamp: None, + node_id: None, + live_query_id: None, + notification_id: None, + namespace: "testns".to_string(), + database: "testdb".to_string(), + table: "testtb".to_string(), + }, + ) + .await + .unwrap(); + let bad_nd_lq_id = uuid::Uuid::parse_str("67b0f588-2b95-4b6e-87f3-73d0a49034be").unwrap(); + tx.putc_ndlq( + valid_data.clone().node_id.unwrap().0, + bad_nd_lq_id, + &valid_data.namespace, + &valid_data.database, + &valid_data.table, + None, + ) + .await + .unwrap(); + tx.commit().await.unwrap(); + + // Bootstrap + dbs.bootstrap().await.unwrap(); + + // Verify node live query is deleted + let mut tx = dbs.transaction(Write, Optimistic).await.unwrap(); + let res = tx.scan_ndlq(valid_data.node_id.as_ref().unwrap(), 1000).await.unwrap(); + tx.commit().await.unwrap(); + assert_eq!(res.len(), 1, "We expect the node to be available"); + let tested_entry = res.get(0).unwrap(); + assert_eq!(tested_entry.lq, valid_data.live_query_id.unwrap()); + + Ok(()) +} + +#[tokio::test] +#[serial] +async fn bootstrap_removes_unreachable_table_live_queries() -> Result<(), Error> { + // Create the datastore + let dbs = new_ds().await.unwrap(); + + // Introduce an invalid table live query + let mut tx = dbs.transaction(Write, Optimistic).await.unwrap(); + let valid_data = a_valid_notification( + &mut tx, + ValidNotificationState { + timestamp: None, + node_id: None, + live_query_id: None, + notification_id: None, + namespace: "testns".to_string(), + database: "testdb".to_string(), + table: "testtb".to_string(), + }, + ) + .await + .unwrap(); + let bad_tb_lq_id = uuid::Uuid::parse_str("97b8fbe4-a147-4420-95dc-97db3a46c491").unwrap(); + let mut live_stm = LiveStatement::default(); + live_stm.id = bad_tb_lq_id.into(); + tx.putc_tblq(&valid_data.namespace, &valid_data.database, &valid_data.table, live_stm, None) + .await + .unwrap(); + tx.commit().await.unwrap(); + + // Bootstrap + dbs.bootstrap().await.unwrap(); + + // Verify invalid table live query is deleted + let mut tx = dbs.transaction(Write, Optimistic).await.unwrap(); + + let res = tx + .scan_tblq(&valid_data.namespace, &valid_data.database, &valid_data.table, 1000) + .await + .unwrap(); + tx.commit().await.unwrap(); + + assert_eq!(res.len(), 1, "Expected 1 table live query: {:?}", res); + let tested_entry = res.get(0).unwrap(); + assert_eq!(tested_entry.lq, valid_data.live_query_id.unwrap()); + Ok(()) +} + +#[tokio::test] +#[serial] +async fn bootstrap_removes_unreachable_live_query_notifications() -> Result<(), Error> { + Ok(()) +} + +/// ValidBootstrapState is a representation of a chain of information that bootstrap is concerned +/// with. It is used for two reasons +/// - sometimes we want to detect invalid data that has a valid path (notification without a live query). +/// - sometimes we want to detect existing valid data is not deleted +#[derive(Debug, Clone)] +struct ValidNotificationState { + pub timestamp: Option, + pub node_id: Option, + pub live_query_id: Option, + pub notification_id: Option, + pub namespace: String, + pub database: String, + pub table: String, +} + +/// Create a chain of valid state that bootstrapping should not remove. +/// As a general rule, there is no need to override the system defaults since this code is to place generic data. +/// If you see these IDs, it is because you captured this entry. +/// So its ok to share ID between tests +async fn a_valid_notification( + tx: &mut Transaction, + args: ValidNotificationState, +) -> Result { + let now = tx.clock(); + let default_node_id = + Uuid::from(uuid::Uuid::parse_str("123e9d92-c975-4daf-8080-3082e83cfa9b").unwrap()); + let default_lq_id = + Uuid::from(uuid::Uuid::parse_str("ca02c2d0-31dd-4bf0-ada4-ee02b1191e0a").unwrap()); + let default_not_id = + Uuid::from(uuid::Uuid::parse_str("c952cf7d-b503-4370-802e-cd2404f2160d").unwrap()); + let entry = ValidNotificationState { + timestamp: Some(args.timestamp.unwrap_or(now.value)), + node_id: Some(args.node_id.unwrap_or(default_node_id)), + live_query_id: Some(args.live_query_id.unwrap_or(default_lq_id)), + notification_id: Some(args.notification_id.unwrap_or(default_not_id)), + ..args + }; + let mut live_stm = LiveStatement::default(); + live_stm.id = entry.live_query_id.clone().unwrap().into(); + live_stm.node = entry.node_id.clone().unwrap().into(); + + // Create heartbeat + tx.set_hb(entry.timestamp.clone().unwrap().into(), entry.node_id.clone().unwrap().0).await?; + // Create cluster node entry + tx.set_nd(entry.node_id.clone().unwrap().0).await?; + // Create node live query registration + tx.putc_ndlq( + entry.node_id.clone().unwrap().0, + entry.live_query_id.clone().unwrap().0, + &entry.namespace, + &entry.database, + &entry.table, + None, + ) + .await?; + // Create table live query registration + tx.putc_tblq(&entry.namespace, &entry.database, &entry.table, live_stm, None).await?; + // TODO Create notification + // tx.putc_tbnt( + // ).await?; + Ok(entry) +} diff --git a/lib/tests/running.md b/lib/tests/running.md new file mode 100644 index 00000000..07f0de69 --- /dev/null +++ b/lib/tests/running.md @@ -0,0 +1,13 @@ +These tests are run with the following command + +Individual test files (not api module): +```bash +cargo test -p surrealdb --features kv-mem --test bootstrap -- --nocapture +``` + +Api module: +```bash +TODO +cargo test -p surrealdb --features kv-rocksdb --test api api_integration::file::delete_record_range +``` +