Improve bootstrapping for finding invalid data from beta versions. (#2700)

Co-authored-by: Raphael Darley <raphael@raphaeldarley.com>
This commit is contained in:
Przemyslaw Hugh Kaznowski 2023-10-04 14:06:58 +01:00 committed by GitHub
parent 8b2ae9fc99
commit c6f6ca8062
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
14 changed files with 675 additions and 151 deletions

View file

@ -24,6 +24,15 @@ pub struct ClusterMembership {
pub struct Timestamp {
pub value: u64,
}
impl From<u64> for Timestamp {
fn from(ts: u64) -> Self {
Timestamp {
value: ts,
}
}
}
// This struct is to be used only when storing keys as the macro currently
// conflicts when you have Store and Key derive macros.
#[derive(Clone, Debug, Eq, PartialEq, Deserialize, Serialize, PartialOrd, Hash, Key)]
@ -44,7 +53,7 @@ impl Add<Duration> for Timestamp {
type Output = Timestamp;
fn add(self, rhs: Duration) -> Timestamp {
Timestamp {
value: self.value + rhs.as_secs(),
value: self.value + rhs.as_millis() as u64,
}
}
}
@ -52,7 +61,7 @@ impl Add<Duration> for Timestamp {
impl Sub<Duration> for Timestamp {
type Output = Result<Timestamp, Error>;
fn sub(self, rhs: Duration) -> Self::Output {
let millis = rhs.as_secs();
let millis = rhs.as_millis() as u64;
if self.value <= millis {
// Removing the duration from this timestamp will cause it to overflow
return Err(TimestampOverflow(format!(

View file

@ -1726,7 +1726,6 @@ mod tests {
expected_min_objects: Option<usize>,
expected_max_objects: Option<usize>,
) {
println!("CheckTreeProperties");
let mut node_count = 0;
let mut max_depth = 0;
let mut min_leaf_depth = None;
@ -1743,13 +1742,6 @@ mod tests {
max_depth = depth;
}
let node = s.get_node(tx, node_id).await.unwrap();
println!(
"Node id: {} - depth: {} - len: {} - {:?}",
node.id,
depth,
node.n.len(),
node.n
);
match node.n {
MTreeNode::Internal(entries) => {
let next_depth = depth + 1;

View file

@ -14,7 +14,7 @@ use crate::err::Error;
use crate::iam::ResourceKind;
use crate::iam::{Action, Auth, Error as IamError, Role};
use crate::key::root::hb::Hb;
use crate::kvs::{LockType, LockType::*, TransactionType, TransactionType::*};
use crate::kvs::{LockType, LockType::*, TransactionType, TransactionType::*, NO_LIMIT};
use crate::opt::auth::Root;
use crate::sql;
use crate::sql::statements::DefineUserStatement;
@ -26,6 +26,8 @@ use channel::Receiver;
use channel::Sender;
use futures::lock::Mutex;
use futures::Future;
use std::cmp::Ordering;
use std::collections::{BTreeMap, BTreeSet};
use std::fmt;
use std::sync::Arc;
use std::time::Duration;
@ -36,6 +38,10 @@ use tracing::trace;
#[cfg(target_arch = "wasm32")]
use wasmtimer::std::{SystemTime, UNIX_EPOCH};
// If there are an infinite number of heartbeats, then we want to go batch-by-batch spread over several checks
const HEARTBEAT_BATCH_SIZE: u32 = 1000;
const LQ_CHANNEL_SIZE: usize = 100;
/// Used for cluster logic to move LQ data to LQ cleanup code
/// Not a stored struct; Used only in this module
#[derive(Debug, Clone, Eq, PartialEq)]
@ -47,6 +53,41 @@ pub struct LqValue {
pub lq: Uuid,
}
#[derive(Debug)]
pub(crate) enum LqType {
Nd(LqValue),
Tb(LqValue),
}
impl LqType {
fn get_inner(&self) -> &LqValue {
match self {
LqType::Nd(lq) => lq,
LqType::Tb(lq) => lq,
}
}
}
impl PartialEq for LqType {
fn eq(&self, other: &Self) -> bool {
self.get_inner().lq == other.get_inner().lq
}
}
impl Eq for LqType {}
impl PartialOrd for LqType {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Option::Some(self.get_inner().lq.cmp(&other.get_inner().lq))
}
}
impl Ord for LqType {
fn cmp(&self, other: &Self) -> Ordering {
self.get_inner().lq.cmp(&other.get_inner().lq)
}
}
/// The underlying datastore instance which stores the dataset.
#[allow(dead_code)]
pub struct Datastore {
@ -163,7 +204,7 @@ impl Datastore {
v
}
#[cfg(not(feature = "kv-mem"))]
return Err(Error::Ds("Cannot connect to the `memory` storage engine as it is not enabled in this build of SurrealDB".to_owned()));
return Err(Error::Ds("Cannot connect to the `memory` storage engine as it is not enabled in this build of SurrealDB".to_owned()));
}
// Parse and initiate an File database
s if s.starts_with("file:") => {
@ -177,7 +218,7 @@ impl Datastore {
v
}
#[cfg(not(feature = "kv-rocksdb"))]
return Err(Error::Ds("Cannot connect to the `rocksdb` storage engine as it is not enabled in this build of SurrealDB".to_owned()));
return Err(Error::Ds("Cannot connect to the `rocksdb` storage engine as it is not enabled in this build of SurrealDB".to_owned()));
}
// Parse and initiate an RocksDB database
s if s.starts_with("rocksdb:") => {
@ -191,7 +232,7 @@ impl Datastore {
v
}
#[cfg(not(feature = "kv-rocksdb"))]
return Err(Error::Ds("Cannot connect to the `rocksdb` storage engine as it is not enabled in this build of SurrealDB".to_owned()));
return Err(Error::Ds("Cannot connect to the `rocksdb` storage engine as it is not enabled in this build of SurrealDB".to_owned()));
}
// Parse and initiate an SpeeDB database
s if s.starts_with("speedb:") => {
@ -205,7 +246,7 @@ impl Datastore {
v
}
#[cfg(not(feature = "kv-speedb"))]
return Err(Error::Ds("Cannot connect to the `speedb` storage engine as it is not enabled in this build of SurrealDB".to_owned()));
return Err(Error::Ds("Cannot connect to the `speedb` storage engine as it is not enabled in this build of SurrealDB".to_owned()));
}
// Parse and initiate an IndxDB database
s if s.starts_with("indxdb:") => {
@ -219,7 +260,7 @@ impl Datastore {
v
}
#[cfg(not(feature = "kv-indxdb"))]
return Err(Error::Ds("Cannot connect to the `indxdb` storage engine as it is not enabled in this build of SurrealDB".to_owned()));
return Err(Error::Ds("Cannot connect to the `indxdb` storage engine as it is not enabled in this build of SurrealDB".to_owned()));
}
// Parse and initiate a TiKV database
s if s.starts_with("tikv:") => {
@ -233,7 +274,7 @@ impl Datastore {
v
}
#[cfg(not(feature = "kv-tikv"))]
return Err(Error::Ds("Cannot connect to the `tikv` storage engine as it is not enabled in this build of SurrealDB".to_owned()));
return Err(Error::Ds("Cannot connect to the `tikv` storage engine as it is not enabled in this build of SurrealDB".to_owned()));
}
// Parse and initiate a FoundationDB database
s if s.starts_with("fdb:") => {
@ -247,7 +288,7 @@ impl Datastore {
v
}
#[cfg(not(feature = "kv-fdb"))]
return Err(Error::Ds("Cannot connect to the `foundationdb` storage engine as it is not enabled in this build of SurrealDB".to_owned()));
return Err(Error::Ds("Cannot connect to the `foundationdb` storage engine as it is not enabled in this build of SurrealDB".to_owned()));
}
// The datastore path is not valid
_ => {
@ -283,7 +324,7 @@ impl Datastore {
/// Specify whether this datastore should enable live query notifications
pub fn with_notifications(mut self) -> Self {
self.notification_channel = Some(channel::bounded(100));
self.notification_channel = Some(channel::bounded(LQ_CHANNEL_SIZE));
self
}
@ -364,14 +405,17 @@ impl Datastore {
// that weren't reversed, as it tries to bootstrap and garbage collect to the best of its
// ability.
pub async fn bootstrap(&self) -> Result<(), Error> {
trace!("Clearing cluster");
// First we clear unreachable state that could exist by upgrading from
// previous beta versions
trace!("Clearing unreachable state");
let mut tx = self.transaction(Write, Optimistic).await?;
match self.nuke_whole_cluster(&mut tx).await {
match self.clear_unreachable_state(&mut tx).await {
Ok(_) => tx.commit().await,
Err(e) => {
error!("Error nuking cluster at bootstrap: {:?}", e);
let msg = format!("Error clearing unreachable cluster state at bootstrap: {:?}", e);
error!(msg);
tx.cancel().await?;
Err(Error::Tx(format!("Error nuking cluster at bootstrap: {:?}", e).to_owned()))
Err(Error::Tx(msg))
}
}?;
@ -443,6 +487,7 @@ impl Datastore {
// Determine the timeout for when a cluster node is expired
let ts_expired = (timestamp.clone() - std::time::Duration::from_secs(5))?;
let dead = self.remove_dead_nodes(tx, &ts_expired).await?;
trace!("Archiving dead nodes: {:?}", dead);
self.archive_dead_lqs(tx, &dead, node_id).await
}
@ -453,7 +498,7 @@ impl Datastore {
node_id: &Uuid,
timestamp: &Timestamp,
) -> Result<(), Error> {
tx.set_cl(node_id.0).await?;
tx.set_nd(node_id.0).await?;
tx.set_hb(timestamp.clone(), node_id.0).await?;
Ok(())
}
@ -470,7 +515,7 @@ impl Datastore {
for hb in hbs {
trace!("Deleting node {}", &hb.nd);
// TODO should be delr in case of nested entries
tx.del_cl(hb.nd).await?;
tx.del_nd(hb.nd).await?;
nodes.push(crate::sql::uuid::Uuid::from(hb.nd));
}
Ok(nodes)
@ -492,7 +537,7 @@ impl Datastore {
for nd in nodes.iter() {
trace!("Archiving node {}", &nd);
// Scan on node prefix for LQ space
let node_lqs = tx.scan_ndlq(nd, 1000).await?;
let node_lqs = tx.scan_ndlq(nd, NO_LIMIT).await?;
trace!("Found {} LQ entries for {:?}", node_lqs.len(), nd);
for lq in node_lqs {
trace!("Archiving query {:?}", &lq);
@ -530,44 +575,66 @@ impl Datastore {
Ok(())
}
pub async fn nuke_whole_cluster(&self, tx: &mut Transaction) -> Result<(), Error> {
pub async fn clear_unreachable_state(&self, tx: &mut Transaction) -> Result<(), Error> {
// Scan nodes
let cls = tx.scan_cl(1000).await?;
trace!("Found {} nodes", cls.len());
for cl in cls {
tx.del_cl(
let cluster = tx.scan_nd(NO_LIMIT).await?;
trace!("Found {} nodes", cluster.len());
let mut unreachable_nodes = BTreeMap::new();
for cl in &cluster {
unreachable_nodes.insert(cl.name.clone(), cl.clone());
}
// Scan all heartbeats
let end_of_time = Timestamp {
// We remove one, because the scan range adds one
value: u64::MAX - 1,
};
let hbs = tx.scan_hb(&end_of_time, NO_LIMIT).await?;
trace!("Found {} heartbeats", hbs.len());
for hb in hbs {
unreachable_nodes.remove(&hb.nd.to_string()).unwrap();
}
// Remove unreachable nodes
for (_, cl) in unreachable_nodes {
trace!("Removing unreachable node {}", cl.name);
tx.del_nd(
uuid::Uuid::parse_str(&cl.name).map_err(|e| {
Error::Unimplemented(format!("cluster id was not uuid: {:?}", e))
})?,
)
.await?;
}
// Scan heartbeats
let hbs = tx
.scan_hb(
&Timestamp {
value: 0,
},
1000,
)
.await?;
trace!("Found {} heartbeats", hbs.len());
for hb in hbs {
tx.del_hb(hb.hb, hb.nd).await?;
// Scan node live queries for every node
let mut nd_lq_set: BTreeSet<LqType> = BTreeSet::new();
for cl in &cluster {
let nds = tx.scan_ndlq(&uuid::Uuid::parse_str(&cl.name).map_err(|e| {
Error::Unimplemented(format!("cluster id was not uuid when parsing to aggregate cluster live queries: {:?}", e))
})?, NO_LIMIT).await?;
nd_lq_set.extend(nds.into_iter().map(LqType::Nd));
}
// Scan node live queries
let ndlqs = tx.scan_ndlq(&self.id, 1000).await?;
trace!("Found {} node live queries", ndlqs.len());
for ndlq in ndlqs {
tx.del_ndlq(&ndlq.nd).await?;
// Scan table live queries
let tblqs = tx.scan_tblq(&ndlq.ns, &ndlq.db, &ndlq.tb, 1000).await?;
trace!("Found {} table live queries", tblqs.len());
for tblq in tblqs {
tx.del_tblq(&ndlq.ns, &ndlq.db, &ndlq.tb, tblq.lq.0).await?;
trace!("Found {} node live queries", nd_lq_set.len());
// Scan tables for all live queries
// let mut tb_lqs: Vec<LqValue> = vec![];
let mut tb_lq_set: BTreeSet<LqType> = BTreeSet::new();
for ndlq in &nd_lq_set {
let lq = ndlq.get_inner();
let tbs = tx.scan_tblq(&lq.ns, &lq.db, &lq.tb, NO_LIMIT).await?;
tb_lq_set.extend(tbs.into_iter().map(LqType::Tb));
}
trace!("Found {} table live queries", tb_lq_set.len());
// Find and delete missing
for missing in nd_lq_set.symmetric_difference(&tb_lq_set) {
match missing {
LqType::Nd(ndlq) => {
warn!("Deleting ndlq {:?}", &ndlq);
tx.del_ndlq(ndlq.nd.0, ndlq.lq.0, &ndlq.ns, &ndlq.db).await?;
}
LqType::Tb(tblq) => {
warn!("Deleting tblq {:?}", &tblq);
tx.del_tblq(&tblq.ns, &tblq.db, &tblq.tb, tblq.lq.0).await?;
}
}
}
trace!("Successfully completed nuke");
trace!("Successfully cleared cluster of unreachable state");
Ok(())
}
@ -582,7 +649,7 @@ impl Datastore {
// Find all the LQs we own, so that we can get the ns/ds from provided uuids
// We may improve this in future by tracking in web layer
let lqs = tx.scan_ndlq(&self.id, 1000).await?;
let lqs = tx.scan_ndlq(&self.id, NO_LIMIT).await?;
let mut hits = vec![];
for lq_value in lqs {
if live_queries.contains(&lq_value.lq) {
@ -641,12 +708,11 @@ impl Datastore {
tx: &mut Transaction,
ts: &Timestamp,
) -> Result<Vec<Hb>, Error> {
let limit = 1000;
let dead = tx.scan_hb(ts, limit).await?;
let dead = tx.scan_hb(ts, HEARTBEAT_BATCH_SIZE).await?;
// Delete the heartbeat and everything nested
tx.delr_hb(dead.clone(), 1000).await?;
tx.delr_hb(dead.clone(), NO_LIMIT).await?;
for dead_node in dead.clone() {
tx.del_cl(dead_node.nd).await?;
tx.del_nd(dead_node.nd).await?;
}
Ok::<Vec<Hb>, Error>(dead)
}

View file

@ -1,11 +1,12 @@
use futures::lock::Mutex;
use std::collections::BTreeSet;
use std::sync::Arc;
use crate::ctx::context;
use crate::dbs::{Options, Session};
use crate::iam::{Auth, Role};
use crate::kvs::{LockType::*, TransactionType::*};
use crate::kvs::{LockType::*, LqType, TransactionType::*};
use crate::sql;
use crate::sql::statements::LiveStatement;
use crate::sql::Value::Table;
@ -22,13 +23,13 @@ async fn expired_nodes_are_garbage_collected() {
// Set up the first node at an early timestamp
let old_time = Timestamp {
value: 123,
value: 123000,
};
test.bootstrap_at_time(sql::Uuid::from(old_node), old_time.clone()).await.unwrap();
// Set up second node at a later timestamp
let new_time = Timestamp {
value: 567,
value: 567000,
};
test.bootstrap_at_time(sql::Uuid::from(new_node), new_time.clone()).await.unwrap();
@ -41,7 +42,7 @@ async fn expired_nodes_are_garbage_collected() {
}
// And scan the nodes to verify its just the latest also
let scanned = tx.scan_cl(100).await.unwrap();
let scanned = tx.scan_nd(100).await.unwrap();
assert_eq!(scanned.len(), 1);
for cl in scanned.iter() {
assert_eq!(&cl.name, &new_node.to_string());
@ -58,7 +59,7 @@ async fn expired_nodes_get_live_queries_archived() {
// Set up the first node at an early timestamp
let old_time = Timestamp {
value: 123,
value: 123000,
};
test.bootstrap_at_time(sql::Uuid::from(old_node), old_time.clone()).await.unwrap();
@ -100,7 +101,7 @@ async fn expired_nodes_get_live_queries_archived() {
// Set up second node at a later timestamp
let new_node = Uuid::parse_str("04da7d4c-0086-4358-8318-49f0bb168fa7").unwrap();
let new_time = Timestamp {
value: 456,
value: 456000,
}; // TODO These timestsamps are incorrect and should really be derived; Also check timestamp errors
test.bootstrap_at_time(sql::Uuid::from(new_node), new_time.clone()).await.unwrap();
@ -122,7 +123,7 @@ async fn single_live_queries_are_garbage_collected() {
let node_id = Uuid::parse_str("b1a08614-a826-4581-938d-bea17f00e253").unwrap();
let test = init(node_id).await.unwrap();
let time = Timestamp {
value: 123,
value: 123000,
};
let namespace = "test_namespace";
let database = "test_db";
@ -203,7 +204,7 @@ async fn bootstrap_does_not_error_on_missing_live_queries() {
let old_node_id = Uuid::parse_str("5f644f02-7c1a-4f8b-babd-bd9e92c1836a").unwrap();
let test = init(old_node_id).await.unwrap();
let time = Timestamp {
value: 123,
value: 123000,
};
let namespace = "test_namespace_0A8BD08BE4F2457BB9F145557EF19605";
let database_owned = format!("test_db_{:?}", test.kvs);
@ -255,18 +256,10 @@ async fn bootstrap_does_not_error_on_missing_live_queries() {
let second_node = test.db.with_node_id(crate::sql::uuid::Uuid::from(new_node_id));
match second_node.bootstrap().await {
Ok(_) => {
panic!("Expected an error because of missing live query")
// The behaviour has now changed to remove all broken entries without raising errors
}
Err(Error::Tx(e)) => match e {
_ if e.contains("LvNotFound") => {
// This is what we want... an LvNotFound error, but it gets wrapped into a string so that Tx doesnt carry vecs
}
_ => {
panic!("Expected an LvNotFound error but got: {:?}", e);
}
},
Err(e) => {
panic!("Missing live query error: {:?}", e)
panic!("Bootstrapping should not generate errors: {:?}", e)
}
}
@ -294,3 +287,63 @@ async fn bootstrap_does_not_error_on_missing_live_queries() {
assert_eq!(0, found.len(), "Found: {:?}", found);
tx.cancel().await.unwrap();
}
#[test(tokio::test)]
async fn test_asymmetric_difference() {
let nd1 = Uuid::parse_str("7da0b3bb-1811-4c0e-8d8d-5fc08b8200a5").unwrap();
let nd2 = Uuid::parse_str("8fd394df-7f96-4395-9c9a-3abf1e2386ea").unwrap();
let nd3 = Uuid::parse_str("aa53cb74-1d6b-44df-b063-c495e240ae9e").unwrap();
let ns1 = "namespace_one";
let ns2 = "namespace_two";
let ns3 = "namespace_three";
let db1 = "database_one";
let db2 = "database_two";
let db3 = "database_three";
let tb1 = "table_one";
let tb2 = "table_two";
let tb3 = "table_three";
let lq1 = Uuid::parse_str("95f0e060-d301-4dfc-9d35-f150e802873b").unwrap();
let lq2 = Uuid::parse_str("acf60c04-5819-4a23-9874-aeb0ae1be425").unwrap();
let lq3 = Uuid::parse_str("5d591ae7-db79-4e4f-aa02-a83a4a25ce3f").unwrap();
let left_set = BTreeSet::from_iter(vec![
LqType::Nd(LqValue {
nd: nd1.into(),
ns: ns1.to_string(),
db: db1.to_string(),
tb: tb1.to_string(),
lq: lq1.into(),
}),
LqType::Nd(LqValue {
nd: nd2.into(),
ns: ns2.to_string(),
db: db2.to_string(),
tb: tb2.to_string(),
lq: lq2.into(),
}),
]);
let right_set = BTreeSet::from_iter(vec![
LqType::Tb(LqValue {
nd: nd2.into(),
ns: ns2.to_string(),
db: db2.to_string(),
tb: tb2.to_string(),
lq: lq2.into(),
}),
LqType::Tb(LqValue {
nd: nd3.into(),
ns: ns3.to_string(),
db: db3.to_string(),
tb: tb3.to_string(),
lq: lq3.into(),
}),
]);
let diff = left_set.symmetric_difference(&right_set);
// TODO but also poorman's count
let mut count = 0;
for _ in diff {
count += 1;
}
assert_ne!(count, 0);
}

34
lib/src/kvs/tests/hb.rs Normal file
View file

@ -0,0 +1,34 @@
#[tokio::test]
#[serial]
async fn write_scan_hb() {
let nd = uuid::Uuid::parse_str("e80540d4-2869-4bf3-ae27-790a538c53f3").unwrap();
let test = init(nd).await.unwrap();
// Add 2 nodes
let mut tx = test.db.transaction(Write, Optimistic).await.unwrap();
let t1 = tx.clock();
let t2 = Timestamp {
value: t1.value + 1,
};
let t3 = Timestamp {
value: t2.value + 1,
};
tx.set_hb(t1, Uuid::parse_str("6d1210a0-9224-4813-8090-ded787d51894").unwrap()).await.unwrap();
tx.set_hb(t2, Uuid::parse_str("b80ff454-c3e7-46a9-a0b0-7b40e9a62626").unwrap()).await.unwrap();
tx.commit().await.unwrap();
// Scan limit 1000
let mut tx = test.db.transaction(Write, Optimistic).await.unwrap();
let vals_lim = tx.scan_hb(&t3, 1000).await.unwrap();
tx.cancel().await.unwrap();
// Scan limit 0
let mut tx = test.db.transaction(Write, Optimistic).await.unwrap();
let vals_no_lim = tx.scan_hb(&t3, NO_LIMIT).await.unwrap();
tx.cancel().await.unwrap();
// Assert equal
assert_eq!(vals_lim, vals_no_lim);
assert_eq!(vals_lim.len(), 2);
assert_eq!(vals_no_lim.len(), 2);
}

View file

@ -20,8 +20,16 @@ impl TestContext {
) -> Result<(), Error> {
// TODO we shouldn't test bootstrapping manually
let mut tx = self.db.transaction(Write, Optimistic).await?;
let archived = self.db.register_remove_and_archive(&mut tx, &node_id, time).await?;
tx.commit().await?;
let archived = match self.db.register_remove_and_archive(&mut tx, &node_id, time).await {
Ok(v) => {
tx.commit().await?;
v
}
Err(e) => {
tx.cancel().await?;
return Err(e);
}
};
let mut errors = vec![];
let mut values = vec![];

View file

@ -34,6 +34,7 @@ mod mem {
}
include!("cluster_init.rs");
include!("hb.rs");
include!("helper.rs");
include!("lq.rs");
include!("nq.rs");
@ -42,6 +43,7 @@ mod mem {
include!("tb.rs");
include!("multireader.rs");
include!("timestamp_to_versionstamp.rs");
include!("nd.rs");
include!("ndlq.rs");
include!("tblq.rs");
include!("tbnt.rs");
@ -77,6 +79,7 @@ mod rocksdb {
}
include!("cluster_init.rs");
include!("hb.rs");
include!("helper.rs");
include!("lq.rs");
include!("nq.rs");
@ -87,6 +90,7 @@ mod rocksdb {
include!("multiwriter_different_keys.rs");
include!("multiwriter_same_keys_conflict.rs");
include!("timestamp_to_versionstamp.rs");
include!("nd.rs");
include!("ndlq.rs");
include!("tblq.rs");
include!("tbnt.rs");
@ -120,6 +124,7 @@ mod speedb {
}
include!("cluster_init.rs");
include!("hb.rs");
include!("helper.rs");
include!("lq.rs");
include!("nq.rs");
@ -130,6 +135,7 @@ mod speedb {
include!("multiwriter_different_keys.rs");
include!("multiwriter_same_keys_conflict.rs");
include!("timestamp_to_versionstamp.rs");
include!("nd.rs");
include!("ndlq.rs");
include!("tblq.rs");
include!("tbnt.rs");
@ -164,6 +170,7 @@ mod tikv {
}
include!("cluster_init.rs");
include!("hb.rs");
include!("helper.rs");
include!("lq.rs");
include!("nq.rs");
@ -174,6 +181,7 @@ mod tikv {
include!("multiwriter_different_keys.rs");
include!("multiwriter_same_keys_conflict.rs");
include!("timestamp_to_versionstamp.rs");
include!("nd.rs");
include!("ndlq.rs");
include!("tblq.rs");
include!("tbnt.rs");
@ -208,6 +216,7 @@ mod fdb {
}
include!("cluster_init.rs");
include!("hb.rs");
include!("helper.rs");
include!("lq.rs");
include!("nq.rs");
@ -218,6 +227,7 @@ mod fdb {
include!("multiwriter_different_keys.rs");
include!("multiwriter_same_keys_allow.rs");
include!("timestamp_to_versionstamp.rs");
include!("nd.rs");
include!("ndlq.rs");
include!("tblq.rs");
include!("tbnt.rs");

27
lib/src/kvs/tests/nd.rs Normal file
View file

@ -0,0 +1,27 @@
#[tokio::test]
#[serial]
async fn write_scan_nd() {
let nd = uuid::Uuid::parse_str("6a6a4e59-3e86-431d-884f-8f433781e4e9").unwrap();
let test = init(nd).await.unwrap();
// Add 2 nodes
let mut tx = test.db.transaction(Write, Optimistic).await.unwrap();
tx.set_nd(Uuid::parse_str("83d9b3c0-f3c4-45be-9ef9-9d48502fecb1").unwrap()).await.unwrap();
tx.set_nd(Uuid::parse_str("cbefc4fe-8ba0-4898-ab69-782e3ebc06f9").unwrap()).await.unwrap();
tx.commit().await.unwrap();
// Scan limit 1000
let mut tx = test.db.transaction(Write, Optimistic).await.unwrap();
let vals_lim = tx.scan_nd(1000).await.unwrap();
tx.cancel().await.unwrap();
// Scan limit 0
let mut tx = test.db.transaction(Write, Optimistic).await.unwrap();
let vals_no_lim = tx.scan_nd(NO_LIMIT).await.unwrap();
tx.cancel().await.unwrap();
// Assert equal
assert_eq!(vals_lim, vals_no_lim);
assert_eq!(vals_lim.len(), 2);
assert_eq!(vals_no_lim.len(), 2);
}

View file

@ -1,4 +1,4 @@
use crate::kvs::LqValue;
use crate::kvs::{LqValue, NO_LIMIT};
#[tokio::test]
#[serial]
@ -18,9 +18,11 @@ async fn write_scan_ndlq() {
// Verify scan
let mut tx = test.db.transaction(Write, Optimistic).await.unwrap();
let res = tx.scan_ndlq(&nd, 100).await.unwrap();
let res_lim = tx.scan_ndlq(&nd, 100).await.unwrap();
let res_no_lim = tx.scan_ndlq(&nd, NO_LIMIT).await.unwrap();
tx.commit().await.unwrap();
assert_eq!(
res,
res_lim,
vec![LqValue {
nd: sql::Uuid::from(nd),
ns: ns.to_string(),
@ -29,5 +31,5 @@ async fn write_scan_ndlq() {
lq
}]
);
tx.commit().await.unwrap();
assert_eq!(res_lim, res_no_lim);
}

View file

@ -9,7 +9,7 @@ async fn archive_lv_for_node_archives() {
let namespace = "test_namespace";
let database = "test_database";
let table = "test_table";
tx.set_cl(node_id).await.unwrap();
tx.set_nd(node_id).await.unwrap();
let lv_id = crate::sql::uuid::Uuid::from(Uuid::from_bytes([
0x10, 0x11, 0x12, 0x13, 0x14, 0x15, 0x16, 0x17, 0x18, 0x19, 0x1A, 0x1B, 0x1C, 0x1D, 0x1E,

View file

@ -28,6 +28,8 @@ async fn write_scan_tblq() {
// Verify scan
let mut tx = test.db.transaction(Write, Optimistic).await.unwrap();
let res = tx.scan_tblq(ns, db, tb, 100).await.unwrap();
let no_limit = tx.scan_tblq(ns, db, tb, NO_LIMIT).await.unwrap();
tx.commit().await.unwrap();
assert_eq!(
res,
vec![LqValue {
@ -38,5 +40,5 @@ async fn write_scan_tblq() {
lq: live_id
}]
);
tx.commit().await.unwrap();
assert_eq!(res, no_limit);
}

View file

@ -51,6 +51,8 @@ use uuid::Uuid;
#[cfg(target_arch = "wasm32")]
use wasmtimer::std::{SystemTime, UNIX_EPOCH};
pub(crate) const NO_LIMIT: u32 = 0;
/// A set of undoable updates and requests against a dataset.
#[allow(dead_code)]
pub struct Transaction {
@ -1011,7 +1013,7 @@ impl Transaction {
// Register cluster membership
// NOTE: Setting cluster membership sets the heartbeat
// Remember to set the heartbeat as well
pub async fn set_cl(&mut self, id: Uuid) -> Result<(), Error> {
pub async fn set_nd(&mut self, id: Uuid) -> Result<(), Error> {
let key = crate::key::root::nd::Nd::new(id);
match self.get_nd(id).await? {
Some(_) => Err(Error::ClAlreadyExists {
@ -1038,7 +1040,8 @@ impl Transaction {
}
}
pub(crate) fn clock(&self) -> Timestamp {
// Public for tests, but we might not want to expose this
pub fn clock(&self) -> Timestamp {
// Use a timestamp oracle if available
let now: u128 = match SystemTime::now().duration_since(UNIX_EPOCH) {
Ok(duration) => duration.as_millis(),
@ -1053,9 +1056,10 @@ impl Transaction {
pub async fn set_hb(&mut self, timestamp: Timestamp, id: Uuid) -> Result<(), Error> {
let key = crate::key::root::hb::Hb::new(timestamp.clone(), id);
// We do not need to do a read, we always want to overwrite
let key_enc = key.encode()?;
self.put(
key.key_category(),
key,
key_enc,
ClusterMembership {
name: id.to_string(),
heartbeat: timestamp,
@ -1072,19 +1076,17 @@ impl Transaction {
}
// Delete a cluster registration entry
pub async fn del_cl(&mut self, node: Uuid) -> Result<(), Error> {
pub async fn del_nd(&mut self, node: Uuid) -> Result<(), Error> {
let key = crate::key::root::nd::Nd::new(node);
self.del(key).await
let key_enc = key.encode()?;
self.del(key_enc).await
}
// Delete the live query notification registry on the table
// Return the Table ID
pub async fn del_ndlq(&mut self, nd: &Uuid) -> Result<Uuid, Error> {
// This isn't implemented because it is covered by del_nd
// Will add later for remote node kill
Err(Error::NdNotFound {
value: format!("Missing cluster node {:?}", nd),
})
pub async fn del_ndlq(&mut self, nd: Uuid, lq: Uuid, ns: &str, db: &str) -> Result<(), Error> {
let key = crate::key::node::lq::Lq::new(nd, lq, ns, db);
let key_enc = key.encode()?;
self.del(key_enc).await
}
// Scans up until the heartbeat timestamp and returns the discovered nodes
@ -1101,21 +1103,23 @@ impl Transaction {
let mut num = limit;
let mut out: Vec<crate::key::root::hb::Hb> = vec![];
// Start processing
while num > 0 {
while limit == NO_LIMIT || num > 0 {
let batch_size = match num {
0 => 1000,
_ => std::cmp::min(1000, num),
};
// Get records batch
let res = match nxt {
None => {
let min = beg.clone();
let max = end.clone();
let num = std::cmp::min(1000, num);
self.scan(min..max, num).await?
self.scan(min..max, batch_size).await?
}
Some(ref mut beg) => {
beg.push(0x00);
let min = beg.clone();
let max = end.clone();
let num = std::cmp::min(1000, num);
self.scan(min..max, num).await?
self.scan(min..max, batch_size).await?
}
};
// Get total results
@ -1132,14 +1136,18 @@ impl Transaction {
}
out.push(crate::key::root::hb::Hb::decode(k.as_slice())?);
// Count
num -= 1;
if limit > 0 {
num -= 1;
}
}
}
trace!("scan_hb: {:?}", out);
Ok(out)
}
pub async fn scan_cl(&mut self, limit: u32) -> Result<Vec<ClusterMembership>, Error> {
/// scan_nd will scan all the cluster membership registers
/// setting limit to 0 will result in scanning all entries
pub async fn scan_nd(&mut self, limit: u32) -> Result<Vec<ClusterMembership>, Error> {
let beg = crate::key::root::nd::Nd::prefix();
let end = crate::key::root::nd::Nd::suffix();
trace!("Scan start: {} ({:?})", String::from_utf8_lossy(&beg).to_string(), &beg);
@ -1148,21 +1156,23 @@ impl Transaction {
let mut num = limit;
let mut out: Vec<ClusterMembership> = vec![];
// Start processing
while num > 0 {
while (limit == NO_LIMIT) || (num > 0) {
let batch_size = match num {
0 => 1000,
_ => std::cmp::min(1000, num),
};
// Get records batch
let res = match nxt {
None => {
let min = beg.clone();
let max = end.clone();
let num = std::cmp::min(1000, num);
self.scan(min..max, num).await?
self.scan(min..max, batch_size).await?
}
Some(ref mut beg) => {
beg.push(0x00);
let min = beg.clone();
let max = end.clone();
let num = std::cmp::min(1000, num);
self.scan(min..max, num).await?
self.scan(min..max, batch_size).await?
}
};
// Get total results
@ -1179,10 +1189,12 @@ impl Transaction {
}
out.push((&v).into());
// Count
num -= 1;
if limit > 0 {
num -= 1;
}
}
}
trace!("scan_hb: {:?}", out);
trace!("scan_nd: {:?}", out);
Ok(out)
}
@ -1206,30 +1218,64 @@ impl Transaction {
}
pub async fn scan_ndlq<'a>(&mut self, node: &Uuid, limit: u32) -> Result<Vec<LqValue>, Error> {
let pref = crate::key::node::lq::prefix_nd(node);
let suff = crate::key::node::lq::suffix_nd(node);
let beg = crate::key::node::lq::prefix_nd(node);
let end = crate::key::node::lq::suffix_nd(node);
trace!(
"Scanning range from pref={}, suff={}",
crate::key::debug::sprint_key(&pref),
crate::key::debug::sprint_key(&suff),
crate::key::debug::sprint_key(&beg),
crate::key::debug::sprint_key(&end),
);
let rng = pref..suff;
let scanned = self.scan(rng, limit).await?;
let mut res: Vec<LqValue> = vec![];
for (key, value) in scanned {
trace!("scan_lq: key={:?} value={:?}", &key, &value);
let lq = crate::key::node::lq::Lq::decode(key.as_slice())?;
let tb: String = String::from_utf8(value).unwrap();
trace!("scan_lq Found tb: {:?}", tb);
res.push(LqValue {
nd: lq.nd.into(),
ns: lq.ns.to_string(),
db: lq.db.to_string(),
tb,
lq: lq.lq.into(),
});
let mut nxt: Option<Key> = None;
let mut num = limit;
let mut out: Vec<LqValue> = vec![];
while limit == NO_LIMIT || num > 0 {
let batch_size = match num {
0 => 1000,
_ => std::cmp::min(1000, num),
};
// Get records batch
let res = match nxt {
None => {
let min = beg.clone();
let max = end.clone();
self.scan(min..max, batch_size).await?
}
Some(ref mut beg) => {
beg.push(0x00);
let min = beg.clone();
let max = end.clone();
self.scan(min..max, batch_size).await?
}
};
// Get total results
let n = res.len();
// Exit when settled
if n == 0 {
break;
}
// Loop over results
for (i, (key, value)) in res.into_iter().enumerate() {
// Ready the next
if n == i + 1 {
nxt = Some(key.clone());
}
let lq = crate::key::node::lq::Lq::decode(key.as_slice())?;
let tb: String = String::from_utf8(value).unwrap();
trace!("scan_lq Found tb: {:?}", tb);
out.push(LqValue {
nd: lq.nd.into(),
ns: lq.ns.to_string(),
db: lq.db.to_string(),
tb,
lq: lq.lq.into(),
});
// Count
if limit != NO_LIMIT {
num -= 1;
}
}
}
Ok(res)
Ok(out)
}
pub async fn scan_tblq<'a>(
@ -1239,29 +1285,63 @@ impl Transaction {
tb: &str,
limit: u32,
) -> Result<Vec<LqValue>, Error> {
let pref = crate::key::table::lq::prefix(ns, db, tb);
let suff = crate::key::table::lq::suffix(ns, db, tb);
let beg = crate::key::table::lq::prefix(ns, db, tb);
let end = crate::key::table::lq::suffix(ns, db, tb);
trace!(
"Scanning range from pref={}, suff={}",
crate::key::debug::sprint_key(&pref),
crate::key::debug::sprint_key(&suff),
crate::key::debug::sprint_key(&beg),
crate::key::debug::sprint_key(&end),
);
let rng = pref..suff;
let scanned = self.scan(rng, limit).await?;
let mut res: Vec<LqValue> = vec![];
for (key, value) in scanned {
trace!("scan_lv: key={:?} value={:?}", &key, &value);
let val: LiveStatement = value.into();
let lv = crate::key::table::lq::Lq::decode(key.as_slice())?;
res.push(LqValue {
nd: val.node,
ns: lv.ns.to_string(),
db: lv.db.to_string(),
tb: lv.tb.to_string(),
lq: val.id.clone(),
});
let mut nxt: Option<Key> = None;
let mut num = limit;
let mut out: Vec<LqValue> = vec![];
while limit == NO_LIMIT || num > 0 {
let batch_size = match num {
0 => 1000,
_ => std::cmp::min(1000, num),
};
// Get records batch
let res = match nxt {
None => {
let min = beg.clone();
let max = end.clone();
self.scan(min..max, batch_size).await?
}
Some(ref mut beg) => {
beg.push(0x00);
let min = beg.clone();
let max = end.clone();
self.scan(min..max, batch_size).await?
}
};
// Get total results
let n = res.len();
// Exit when settled
if n == 0 {
break;
}
// Loop over results
for (i, (key, value)) in res.into_iter().enumerate() {
// Ready the next
if n == i + 1 {
nxt = Some(key.clone());
}
let lv = crate::key::table::lq::Lq::decode(key.as_slice())?;
let val: LiveStatement = value.into();
out.push(LqValue {
nd: val.node,
ns: lv.ns.to_string(),
db: lv.db.to_string(),
tb: lv.tb.to_string(),
lq: val.id.clone(),
});
// Count
if limit != NO_LIMIT {
num -= 1;
}
}
}
Ok(res)
Ok(out)
}
pub async fn putc_tblq(
@ -1274,7 +1354,7 @@ impl Transaction {
) -> Result<(), Error> {
let key = crate::key::table::lq::new(ns, db, tb, live_stm.id.0);
let key_enc = crate::key::table::lq::Lq::encode(&key)?;
trace!("putc_lv ({:?}): key={:?}", &live_stm.id, crate::key::debug::sprint_key(&key_enc));
trace!("putc_tblq ({:?}): key={:?}", &live_stm.id, crate::key::debug::sprint_key(&key_enc));
self.putc(key_enc, live_stm, expected).await
}

228
lib/tests/bootstrap.rs Normal file
View file

@ -0,0 +1,228 @@
/// The tests in this file are checking that bootstrapping of the database works correctly
/// They are testing edge cases that may accidentally occur with bugs - we wan't to make sure
/// the system can recover in light of these issues.
///
/// We may want to move these tests to another suite, as they aren't testing the statements like
/// the other tests are.
mod helpers;
mod parse;
use helpers::new_ds;
use serial_test::serial;
use surrealdb::err::Error;
use surrealdb::kvs::LockType::Optimistic;
use surrealdb::kvs::Transaction;
use surrealdb::kvs::TransactionType::Write;
use surrealdb::sql::statements::LiveStatement;
use surrealdb::sql::Uuid;
use tokio::time::sleep;
#[tokio::test]
#[serial]
async fn bootstrap_removes_unreachable_nodes() -> Result<(), Error> {
// Create the datastore
let dbs = new_ds().await.unwrap();
let mut tx = dbs.transaction(Write, Optimistic).await.unwrap();
// Introduce missing nodes (without heartbeats)
let bad_node = uuid::Uuid::parse_str("9d8e16e4-9f6a-4704-8cf1-7cd55b937c5b").unwrap();
tx.set_nd(bad_node).await.unwrap();
// Introduce a valid chain of data to confirm it is not removed from a cleanup
a_valid_notification(
&mut tx,
ValidNotificationState {
timestamp: None,
node_id: None,
live_query_id: None,
notification_id: None,
namespace: "testns".to_string(),
database: "testdb".to_string(),
table: "testtb".to_string(),
},
)
.await
.unwrap();
tx.commit().await.unwrap();
// Bootstrap
dbs.bootstrap().await.unwrap();
// Verify the incorrect node is deleted, but self and valid still exist
let mut tx = dbs.transaction(Write, Optimistic).await.unwrap();
let res = tx.scan_nd(1000).await.unwrap();
tx.commit().await.unwrap();
for node in &res {
assert_ne!(node.name, bad_node.to_string());
}
// {Node generated by bootstrap} + {valid node who's uuid we don't know}
assert_eq!(res.len(), 2);
Ok(())
}
#[tokio::test]
#[serial]
async fn bootstrap_removes_unreachable_node_live_queries() -> Result<(), Error> {
// Create the datastore
let dbs = new_ds().await.unwrap();
// Introduce an invalid node live query
let mut tx = dbs.transaction(Write, Optimistic).await.unwrap();
let valid_data = a_valid_notification(
&mut tx,
ValidNotificationState {
timestamp: None,
node_id: None,
live_query_id: None,
notification_id: None,
namespace: "testns".to_string(),
database: "testdb".to_string(),
table: "testtb".to_string(),
},
)
.await
.unwrap();
let bad_nd_lq_id = uuid::Uuid::parse_str("67b0f588-2b95-4b6e-87f3-73d0a49034be").unwrap();
tx.putc_ndlq(
valid_data.clone().node_id.unwrap().0,
bad_nd_lq_id,
&valid_data.namespace,
&valid_data.database,
&valid_data.table,
None,
)
.await
.unwrap();
tx.commit().await.unwrap();
// Bootstrap
dbs.bootstrap().await.unwrap();
// Verify node live query is deleted
let mut tx = dbs.transaction(Write, Optimistic).await.unwrap();
let res = tx.scan_ndlq(valid_data.node_id.as_ref().unwrap(), 1000).await.unwrap();
tx.commit().await.unwrap();
assert_eq!(res.len(), 1, "We expect the node to be available");
let tested_entry = res.get(0).unwrap();
assert_eq!(tested_entry.lq, valid_data.live_query_id.unwrap());
Ok(())
}
#[tokio::test]
#[serial]
async fn bootstrap_removes_unreachable_table_live_queries() -> Result<(), Error> {
// Create the datastore
let dbs = new_ds().await.unwrap();
// Introduce an invalid table live query
let mut tx = dbs.transaction(Write, Optimistic).await.unwrap();
let valid_data = a_valid_notification(
&mut tx,
ValidNotificationState {
timestamp: None,
node_id: None,
live_query_id: None,
notification_id: None,
namespace: "testns".to_string(),
database: "testdb".to_string(),
table: "testtb".to_string(),
},
)
.await
.unwrap();
let bad_tb_lq_id = uuid::Uuid::parse_str("97b8fbe4-a147-4420-95dc-97db3a46c491").unwrap();
let mut live_stm = LiveStatement::default();
live_stm.id = bad_tb_lq_id.into();
tx.putc_tblq(&valid_data.namespace, &valid_data.database, &valid_data.table, live_stm, None)
.await
.unwrap();
tx.commit().await.unwrap();
// Bootstrap
dbs.bootstrap().await.unwrap();
// Verify invalid table live query is deleted
let mut tx = dbs.transaction(Write, Optimistic).await.unwrap();
let res = tx
.scan_tblq(&valid_data.namespace, &valid_data.database, &valid_data.table, 1000)
.await
.unwrap();
tx.commit().await.unwrap();
assert_eq!(res.len(), 1, "Expected 1 table live query: {:?}", res);
let tested_entry = res.get(0).unwrap();
assert_eq!(tested_entry.lq, valid_data.live_query_id.unwrap());
Ok(())
}
#[tokio::test]
#[serial]
async fn bootstrap_removes_unreachable_live_query_notifications() -> Result<(), Error> {
Ok(())
}
/// ValidBootstrapState is a representation of a chain of information that bootstrap is concerned
/// with. It is used for two reasons
/// - sometimes we want to detect invalid data that has a valid path (notification without a live query).
/// - sometimes we want to detect existing valid data is not deleted
#[derive(Debug, Clone)]
struct ValidNotificationState {
pub timestamp: Option<u64>,
pub node_id: Option<Uuid>,
pub live_query_id: Option<Uuid>,
pub notification_id: Option<Uuid>,
pub namespace: String,
pub database: String,
pub table: String,
}
/// Create a chain of valid state that bootstrapping should not remove.
/// As a general rule, there is no need to override the system defaults since this code is to place generic data.
/// If you see these IDs, it is because you captured this entry.
/// So its ok to share ID between tests
async fn a_valid_notification(
tx: &mut Transaction,
args: ValidNotificationState,
) -> Result<ValidNotificationState, Error> {
let now = tx.clock();
let default_node_id =
Uuid::from(uuid::Uuid::parse_str("123e9d92-c975-4daf-8080-3082e83cfa9b").unwrap());
let default_lq_id =
Uuid::from(uuid::Uuid::parse_str("ca02c2d0-31dd-4bf0-ada4-ee02b1191e0a").unwrap());
let default_not_id =
Uuid::from(uuid::Uuid::parse_str("c952cf7d-b503-4370-802e-cd2404f2160d").unwrap());
let entry = ValidNotificationState {
timestamp: Some(args.timestamp.unwrap_or(now.value)),
node_id: Some(args.node_id.unwrap_or(default_node_id)),
live_query_id: Some(args.live_query_id.unwrap_or(default_lq_id)),
notification_id: Some(args.notification_id.unwrap_or(default_not_id)),
..args
};
let mut live_stm = LiveStatement::default();
live_stm.id = entry.live_query_id.clone().unwrap().into();
live_stm.node = entry.node_id.clone().unwrap().into();
// Create heartbeat
tx.set_hb(entry.timestamp.clone().unwrap().into(), entry.node_id.clone().unwrap().0).await?;
// Create cluster node entry
tx.set_nd(entry.node_id.clone().unwrap().0).await?;
// Create node live query registration
tx.putc_ndlq(
entry.node_id.clone().unwrap().0,
entry.live_query_id.clone().unwrap().0,
&entry.namespace,
&entry.database,
&entry.table,
None,
)
.await?;
// Create table live query registration
tx.putc_tblq(&entry.namespace, &entry.database, &entry.table, live_stm, None).await?;
// TODO Create notification
// tx.putc_tbnt(
// ).await?;
Ok(entry)
}

13
lib/tests/running.md Normal file
View file

@ -0,0 +1,13 @@
These tests are run with the following command
Individual test files (not api module):
```bash
cargo test -p surrealdb --features kv-mem --test bootstrap -- --nocapture
```
Api module:
```bash
TODO
cargo test -p surrealdb --features kv-rocksdb --test api api_integration::file::delete_record_range
```