Add kv unit tests verify write+scan (#2682)

This commit is contained in:
Przemyslaw Hugh Kaznowski 2023-09-12 17:45:46 +01:00 committed by GitHub
parent 34fc0cc6e6
commit 1a85f4967a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 173 additions and 14 deletions

View file

@ -360,6 +360,17 @@ impl Datastore {
// that weren't reversed, as it tries to bootstrap and garbage collect to the best of its // that weren't reversed, as it tries to bootstrap and garbage collect to the best of its
// ability. // ability.
pub async fn bootstrap(&self) -> Result<(), Error> { pub async fn bootstrap(&self) -> Result<(), Error> {
trace!("Clearing cluster");
let mut tx = self.transaction(true, false).await?;
match self.nuke_whole_cluster(&mut tx).await {
Ok(_) => tx.commit().await,
Err(e) => {
error!("Error nuking cluster at bootstrap: {:?}", e);
tx.cancel().await?;
Err(Error::Tx(format!("Error nuking cluster at bootstrap: {:?}", e).to_owned()))
}
}?;
trace!("Bootstrapping {}", self.id); trace!("Bootstrapping {}", self.id);
let mut tx = self.transaction(true, false).await?; let mut tx = self.transaction(true, false).await?;
let now = tx.clock(); let now = tx.clock();
@ -438,7 +449,7 @@ impl Datastore {
node_id: &Uuid, node_id: &Uuid,
timestamp: &Timestamp, timestamp: &Timestamp,
) -> Result<(), Error> { ) -> Result<(), Error> {
tx.set_nd(node_id.0).await?; tx.set_cl(node_id.0).await?;
tx.set_hb(timestamp.clone(), node_id.0).await?; tx.set_hb(timestamp.clone(), node_id.0).await?;
Ok(()) Ok(())
} }
@ -455,7 +466,7 @@ impl Datastore {
for hb in hbs { for hb in hbs {
trace!("Deleting node {}", &hb.nd); trace!("Deleting node {}", &hb.nd);
// TODO should be delr in case of nested entries // TODO should be delr in case of nested entries
tx.del_nd(hb.nd).await?; tx.del_cl(hb.nd).await?;
nodes.push(crate::sql::uuid::Uuid::from(hb.nd)); nodes.push(crate::sql::uuid::Uuid::from(hb.nd));
} }
Ok(nodes) Ok(nodes)
@ -515,6 +526,47 @@ impl Datastore {
Ok(()) Ok(())
} }
pub async fn nuke_whole_cluster(&self, tx: &mut Transaction) -> Result<(), Error> {
// Scan nodes
let cls = tx.scan_cl(1000).await?;
trace!("Found {} nodes", cls.len());
for cl in cls {
tx.del_cl(
uuid::Uuid::parse_str(&cl.name).map_err(|e| {
Error::Unimplemented(format!("cluster id was not uuid: {:?}", e))
})?,
)
.await?;
}
// Scan heartbeats
let hbs = tx
.scan_hb(
&Timestamp {
value: 0,
},
1000,
)
.await?;
trace!("Found {} heartbeats", hbs.len());
for hb in hbs {
tx.del_hb(hb.hb, hb.nd).await?;
}
// Scan node live queries
let ndlqs = tx.scan_ndlq(&self.id, 1000).await?;
trace!("Found {} node live queries", ndlqs.len());
for ndlq in ndlqs {
tx.del_ndlq(&ndlq.nd).await?;
// Scan table live queries
let tblqs = tx.scan_tblq(&ndlq.ns, &ndlq.db, &ndlq.tb, 1000).await?;
trace!("Found {} table live queries", tblqs.len());
for tblq in tblqs {
tx.del_tblq(&ndlq.ns, &ndlq.db, &ndlq.tb, tblq.lq.0).await?;
}
}
trace!("Successfully completed nuke");
Ok(())
}
// Garbage collection task to run when a client disconnects from a surrealdb node // Garbage collection task to run when a client disconnects from a surrealdb node
// i.e. we know the node, we are not performing a full wipe on the node // i.e. we know the node, we are not performing a full wipe on the node
// and the wipe must be fully performed by this node // and the wipe must be fully performed by this node
@ -590,7 +642,7 @@ impl Datastore {
// Delete the heartbeat and everything nested // Delete the heartbeat and everything nested
tx.delr_hb(dead.clone(), 1000).await?; tx.delr_hb(dead.clone(), 1000).await?;
for dead_node in dead.clone() { for dead_node in dead.clone() {
tx.del_nd(dead_node.nd).await?; tx.del_cl(dead_node.nd).await?;
} }
Ok::<Vec<Hb>, Error>(dead) Ok::<Vec<Hb>, Error>(dead)
} }

View file

@ -40,6 +40,9 @@ mod mem {
include!("tb.rs"); include!("tb.rs");
include!("multireader.rs"); include!("multireader.rs");
include!("timestamp_to_versionstamp.rs"); include!("timestamp_to_versionstamp.rs");
include!("ndlq.rs");
include!("tblq.rs");
include!("tbnt.rs");
} }
#[cfg(feature = "kv-rocksdb")] #[cfg(feature = "kv-rocksdb")]
@ -80,6 +83,9 @@ mod rocksdb {
include!("multiwriter_different_keys.rs"); include!("multiwriter_different_keys.rs");
include!("multiwriter_same_keys_conflict.rs"); include!("multiwriter_same_keys_conflict.rs");
include!("timestamp_to_versionstamp.rs"); include!("timestamp_to_versionstamp.rs");
include!("ndlq.rs");
include!("tblq.rs");
include!("tbnt.rs");
} }
#[cfg(feature = "kv-speedb")] #[cfg(feature = "kv-speedb")]
@ -120,6 +126,9 @@ mod speedb {
include!("multiwriter_different_keys.rs"); include!("multiwriter_different_keys.rs");
include!("multiwriter_same_keys_conflict.rs"); include!("multiwriter_same_keys_conflict.rs");
include!("timestamp_to_versionstamp.rs"); include!("timestamp_to_versionstamp.rs");
include!("ndlq.rs");
include!("tblq.rs");
include!("tbnt.rs");
} }
#[cfg(feature = "kv-tikv")] #[cfg(feature = "kv-tikv")]
@ -161,6 +170,9 @@ mod tikv {
include!("multiwriter_different_keys.rs"); include!("multiwriter_different_keys.rs");
include!("multiwriter_same_keys_conflict.rs"); include!("multiwriter_same_keys_conflict.rs");
include!("timestamp_to_versionstamp.rs"); include!("timestamp_to_versionstamp.rs");
include!("ndlq.rs");
include!("tblq.rs");
include!("tbnt.rs");
} }
#[cfg(feature = "kv-fdb")] #[cfg(feature = "kv-fdb")]
@ -202,4 +214,7 @@ mod fdb {
include!("multiwriter_different_keys.rs"); include!("multiwriter_different_keys.rs");
include!("multiwriter_same_keys_allow.rs"); include!("multiwriter_same_keys_allow.rs");
include!("timestamp_to_versionstamp.rs"); include!("timestamp_to_versionstamp.rs");
include!("ndlq.rs");
include!("tblq.rs");
include!("tbnt.rs");
} }

33
lib/src/kvs/tests/ndlq.rs Normal file
View file

@ -0,0 +1,33 @@
use crate::kvs::LqValue;
#[tokio::test]
#[serial]
async fn write_scan_ndlq() {
let nd = uuid::Uuid::parse_str("7a17446f-721f-4855-8fc7-81086752ca44").unwrap();
let test = init(nd).await.unwrap();
// Write some data
let mut tx = test.db.transaction(true, false).await.unwrap();
let ns = "namespace";
let db = "database";
let tb = "table";
let lq =
sql::Uuid::from(uuid::Uuid::parse_str("4c3dca4b-ec08-4e3e-b23a-6b03b5cdc3fc").unwrap());
tx.putc_ndlq(nd, lq.clone().0, ns, db, tb, None).await.unwrap();
tx.commit().await.unwrap();
// Verify scan
let mut tx = test.db.transaction(true, false).await.unwrap();
let res = tx.scan_ndlq(&nd, 100).await.unwrap();
assert_eq!(
res,
vec![LqValue {
nd: sql::Uuid::from(nd),
ns: ns.to_string(),
db: db.to_string(),
tb: tb.to_string(),
lq
}]
);
tx.commit().await.unwrap();
}

View file

@ -9,7 +9,7 @@ async fn archive_lv_for_node_archives() {
let namespace = "test_namespace"; let namespace = "test_namespace";
let database = "test_database"; let database = "test_database";
let table = "test_table"; let table = "test_table";
tx.set_nd(node_id).await.unwrap(); tx.set_cl(node_id).await.unwrap();
let lv_id = crate::sql::uuid::Uuid::from(Uuid::from_bytes([ let lv_id = crate::sql::uuid::Uuid::from(Uuid::from_bytes([
0x10, 0x11, 0x12, 0x13, 0x14, 0x15, 0x16, 0x17, 0x18, 0x19, 0x1A, 0x1B, 0x1C, 0x1D, 0x1E, 0x10, 0x11, 0x12, 0x13, 0x14, 0x15, 0x16, 0x17, 0x18, 0x19, 0x1A, 0x1B, 0x1C, 0x1D, 0x1E,

42
lib/src/kvs/tests/tblq.rs Normal file
View file

@ -0,0 +1,42 @@
#[tokio::test]
#[serial]
async fn write_scan_tblq() {
let node_id = uuid::Uuid::parse_str("0bee25e0-34d7-448c-abc0-48cdf3db3a53").unwrap();
let test = init(node_id).await.unwrap();
// Write some data
let mut tx = test.db.transaction(true, false).await.unwrap();
let ns = "namespace";
let db = "database";
let tb = "table";
let live_id =
sql::Uuid::from(uuid::Uuid::parse_str("b5aab54e-d1ef-4a14-b537-9206dcde2209").unwrap());
let live_stm = LiveStatement {
id: live_id.clone(),
node: sql::Uuid::from(node_id),
expr: Default::default(),
what: Default::default(),
cond: None,
fetch: None,
archived: None,
session: Some(Value::None),
auth: None,
};
tx.putc_tblq(ns, db, tb, live_stm, None).await.unwrap();
tx.commit().await.unwrap();
// Verify scan
let mut tx = test.db.transaction(true, false).await.unwrap();
let res = tx.scan_tblq(ns, db, tb, 100).await.unwrap();
assert_eq!(
res,
vec![LqValue {
nd: sql::Uuid::from(node_id),
ns: ns.to_string(),
db: db.to_string(),
tb: tb.to_string(),
lq: live_id
}]
);
tx.commit().await.unwrap();
}

View file

@ -0,0 +1 @@
// When we add notifications we can add put+scan tests here

View file

@ -979,7 +979,7 @@ impl Transaction {
// Register cluster membership // Register cluster membership
// NOTE: Setting cluster membership sets the heartbeat // NOTE: Setting cluster membership sets the heartbeat
// Remember to set the heartbeat as well // Remember to set the heartbeat as well
pub async fn set_nd(&mut self, id: Uuid) -> Result<(), Error> { pub async fn set_cl(&mut self, id: Uuid) -> Result<(), Error> {
let key = crate::key::root::nd::Nd::new(id); let key = crate::key::root::nd::Nd::new(id);
match self.get_nd(id).await? { match self.get_nd(id).await? {
Some(_) => Err(Error::ClAlreadyExists { Some(_) => Err(Error::ClAlreadyExists {
@ -1032,15 +1032,21 @@ impl Transaction {
Ok(()) Ok(())
} }
pub async fn del_hb(&mut self, timestamp: Timestamp, id: Uuid) -> Result<(), Error> {
let key = crate::key::root::hb::Hb::new(timestamp.clone(), id);
self.del(key).await?;
Ok(())
}
// Delete a cluster registration entry // Delete a cluster registration entry
pub async fn del_nd(&mut self, node: Uuid) -> Result<(), Error> { pub async fn del_cl(&mut self, node: Uuid) -> Result<(), Error> {
let key = crate::key::root::nd::Nd::new(node); let key = crate::key::root::nd::Nd::new(node);
self.del(key).await self.del(key).await
} }
// Delete the live query notification registry on the table // Delete the live query notification registry on the table
// Return the Table ID // Return the Table ID
pub async fn del_ndlv(&mut self, nd: &Uuid) -> Result<Uuid, Error> { pub async fn del_ndlq(&mut self, nd: &Uuid) -> Result<Uuid, Error> {
// This isn't implemented because it is covered by del_nd // This isn't implemented because it is covered by del_nd
// Will add later for remote node kill // Will add later for remote node kill
Err(Error::NdNotFound { Err(Error::NdNotFound {
@ -1159,7 +1165,7 @@ impl Transaction {
Ok(()) Ok(())
} }
pub async fn del_lv(&mut self, ns: &str, db: &str, tb: &str, lv: Uuid) -> Result<(), Error> { pub async fn del_tblq(&mut self, ns: &str, db: &str, tb: &str, lv: Uuid) -> Result<(), Error> {
trace!("del_lv: ns={:?} db={:?} tb={:?} lv={:?}", ns, db, tb, lv); trace!("del_lv: ns={:?} db={:?} tb={:?} lv={:?}", ns, db, tb, lv);
let key = crate::key::table::lq::new(ns, db, tb, lv); let key = crate::key::table::lq::new(ns, db, tb, lv);
self.cache.del(&key.clone().into()); self.cache.del(&key.clone().into());
@ -1239,6 +1245,19 @@ impl Transaction {
self.putc(key_enc, live_stm, expected).await self.putc(key_enc, live_stm, expected).await
} }
pub async fn putc_ndlq(
&mut self,
nd: Uuid,
lq: Uuid,
ns: &str,
db: &str,
tb: &str,
chk: Option<&str>,
) -> Result<(), Error> {
let key = crate::key::node::lq::new(nd, lq, ns, db);
self.putc(key, tb, chk).await
}
/// Retrieve all ROOT users. /// Retrieve all ROOT users.
pub async fn all_root_users(&mut self) -> Result<Arc<[DefineUserStatement]>, Error> { pub async fn all_root_users(&mut self) -> Result<Arc<[DefineUserStatement]>, Error> {
let beg = crate::key::root::us::prefix(); let beg = crate::key::root::us::prefix();

View file

@ -91,11 +91,9 @@ impl LiveStatement {
// Store the current Node ID // Store the current Node ID
stm.node = nid.into(); stm.node = nid.into();
// Insert the node live query // Insert the node live query
let key = crate::key::node::lq::new(opt.id()?, id, opt.ns(), opt.db()); run.putc_ndlq(nid, id, opt.ns(), opt.db(), tb.as_str(), None).await?;
run.putc(key, tb.as_str(), None).await?;
// Insert the table live query // Insert the table live query
let key = crate::key::table::lq::new(opt.ns(), opt.db(), &tb, id); run.putc_tblq(opt.ns(), opt.db(), &tb, stm, None).await?;
run.putc(key, stm, None).await?;
} }
v => { v => {
return Err(Error::LiveStatement { return Err(Error::LiveStatement {

View file

@ -279,7 +279,7 @@ pub async fn ws_recv_all_msgs(
_ = time::sleep(timeout) => { _ = time::sleep(timeout) => {
debug!("Waited for {:?} and received {} messages", timeout, res.len()); debug!("Waited for {:?} and received {} messages", timeout, res.len());
if res.len() != expected { if res.len() != expected {
return Err(format!("Expected {} messages but got {} after {:?}", expected, res.len(), timeout).into()); return Err(format!("Expected {} messages but got {} after {:?}: {:?}", expected, res.len(), timeout, res).into());
} }
} }
msg = ws_recv_msg(socket) => { msg = ws_recv_msg(socket) => {

View file

@ -610,7 +610,6 @@ mod ws_integration {
"method": "query", "method": "query",
"params": [query], "params": [query],
}); });
common::ws_send_msg(socket, serde_json::to_string(&json).unwrap()).await?; common::ws_send_msg(socket, serde_json::to_string(&json).unwrap()).await?;
// Wait some time for all messages to arrive, and then search for the notification message // Wait some time for all messages to arrive, and then search for the notification message