From 42cff67bd1cc25c37db0ff300df42bf94498a16d Mon Sep 17 00:00:00 2001 From: Przemyslaw Hugh Kaznowski Date: Thu, 26 Oct 2023 16:24:46 +0100 Subject: [PATCH] Fix cluster startup to handle stray heartbeats (#2891) --- lib/src/kvs/ds.rs | 8 +++++++- lib/src/kvs/tests/cluster_init.rs | 8 ++++++++ 2 files changed, 15 insertions(+), 1 deletion(-) diff --git a/lib/src/kvs/ds.rs b/lib/src/kvs/ds.rs index 437779d5..00ac763f 100644 --- a/lib/src/kvs/ds.rs +++ b/lib/src/kvs/ds.rs @@ -626,7 +626,13 @@ impl Datastore { let hbs = tx.scan_hb(&end_of_time, NO_LIMIT).await?; trace!("Found {} heartbeats", hbs.len()); for hb in hbs { - unreachable_nodes.remove(&hb.nd.to_string()).unwrap(); + match unreachable_nodes.remove(&hb.nd.to_string()) { + None => { + // Didnt exist in cluster and should be deleted + tx.del_hb(hb.hb, hb.nd).await?; + } + Some(_) => {} + } } // Remove unreachable nodes for (_, cl) in unreachable_nodes { diff --git a/lib/src/kvs/tests/cluster_init.rs b/lib/src/kvs/tests/cluster_init.rs index 0f0ddc72..120e3da8 100644 --- a/lib/src/kvs/tests/cluster_init.rs +++ b/lib/src/kvs/tests/cluster_init.rs @@ -31,6 +31,14 @@ async fn expired_nodes_are_garbage_collected() { test.db = test.db.with_node_id(sql::Uuid::from(old_node)); test.db.bootstrap().await.unwrap(); + // Throw in some stray nodes and heartbeats + let mut tx = test.db.transaction(Write, Optimistic).await.unwrap(); + let corrupt_node_1 = Uuid::parse_str("5a65fe57-7ac3-4b13-a31f-6376d3b484c8").unwrap(); + let corrupt_node_2 = Uuid::parse_str("eb94a0b4-70ea-482f-a7dd-dc02132be846").unwrap(); + tx.set_nd(corrupt_node_1).await.unwrap(); + tx.set_hb(old_time, corrupt_node_2).await.unwrap(); + tx.commit().await.unwrap(); + // Set up second node at a later timestamp let new_time = Timestamp { value: 567000,