From ac213d69bb27af5ff2b389414001df8249812c7d Mon Sep 17 00:00:00 2001
From: Przemyslaw Hugh Kaznowski <hugh@surrealdb.com>
Date: Thu, 13 Jul 2023 14:44:54 +0100
Subject: [PATCH] Sur 191 self garbage collection 2 (#2137)

Co-authored-by: Tobie Morgan Hitchcock <tobie@surrealdb.com>
---
 lib/src/dbs/cl.rs                 |  48 +++++-
 lib/src/dbs/notification.rs       |  15 +-
 lib/src/doc/lives.rs              |   6 +-
 lib/src/err/mod.rs                |  26 ++++
 lib/src/key/cl.rs                 |  37 ++++-
 lib/src/key/debug.rs              |  13 ++
 lib/src/key/hb.rs                 |  63 ++++++--
 lib/src/key/lq.rs                 |  53 ++++++-
 lib/src/key/lv.rs                 |  28 +++-
 lib/src/key/mod.rs                |  11 +-
 lib/src/kvs/ds.rs                 | 228 ++++++++++++++++++++++++++--
 lib/src/kvs/tests/cluster_init.rs | 114 ++++++++++++++
 lib/src/kvs/tests/helper.rs       |  63 ++++++++
 lib/src/kvs/tests/lq.rs           |  42 ++++++
 lib/src/kvs/tests/lv.rs           |  52 +++++++
 lib/src/kvs/tests/mod.rs          |  25 ++++
 lib/src/kvs/tests/tb.rs           | 181 +++++++++++-----------
 lib/src/kvs/tx.rs                 | 241 +++++++++++++++++++++++++++++-
 lib/src/sql/constant.rs           |   1 -
 lib/src/sql/statements/live.rs    |  20 ++-
 src/dbs/mod.rs                    |   1 +
 21 files changed, 1105 insertions(+), 163 deletions(-)
 create mode 100644 lib/src/key/debug.rs
 create mode 100644 lib/src/kvs/tests/cluster_init.rs
 create mode 100644 lib/src/kvs/tests/helper.rs
 create mode 100644 lib/src/kvs/tests/lq.rs
 create mode 100644 lib/src/kvs/tests/lv.rs

diff --git a/lib/src/dbs/cl.rs b/lib/src/dbs/cl.rs
index 11a3b043..2e7b94fc 100644
--- a/lib/src/dbs/cl.rs
+++ b/lib/src/dbs/cl.rs
@@ -1,5 +1,9 @@
-use derive::Store;
+use crate::err::Error;
+use crate::err::Error::TimestampOverflow;
+use derive::{Key, Store};
 use serde::{Deserialize, Serialize};
+use std::ops::{Add, Sub};
+use std::time::Duration;
 
 // NOTE: This is not a statement, but as per layering, keeping it here till we
 // have a better structure.
@@ -17,3 +21,45 @@ pub struct ClusterMembership {
 pub struct Timestamp {
 	pub value: u64,
 }
+// This struct is to be used only when storing keys as the macro currently
+// conflicts when you have Store and Key derive macros.
+#[derive(Clone, Debug, Eq, PartialEq, Deserialize, Serialize, PartialOrd, Hash, Key)]
+pub struct KeyTimestamp {
+	pub value: u64,
+}
+
+impl From<&Timestamp> for KeyTimestamp {
+	fn from(ts: &Timestamp) -> Self {
+		KeyTimestamp {
+			value: ts.value,
+		}
+	}
+}
+
+impl Add<Duration> for Timestamp {
+	type Output = Timestamp;
+	fn add(self, rhs: Duration) -> Timestamp {
+		Timestamp {
+			value: self.value + rhs.as_secs(),
+		}
+	}
+}
+
+impl Sub<Duration> for Timestamp {
+	type Output = Result<Timestamp, Error>;
+	fn sub(self, rhs: Duration) -> Self::Output {
+		let millis = rhs.as_secs();
+		if self.value <= millis {
+			// Removing the duration from this timestamp will cause it to overflow
+			return Err(TimestampOverflow(format!(
+				"Failed to subtract {} from {}",
+				&millis, &self.value
+			)));
+		}
+		Ok(Timestamp {
+			value: self.value - millis,
+		})
+	}
+}
+
+// TODO test
diff --git a/lib/src/dbs/notification.rs b/lib/src/dbs/notification.rs
index f75d8060..916e9343 100644
--- a/lib/src/dbs/notification.rs
+++ b/lib/src/dbs/notification.rs
@@ -1,6 +1,7 @@
-use crate::sql::Value;
+use crate::sql::{Object, Value};
 use serde::{Deserialize, Serialize};
 use std::fmt;
+use std::fmt::Debug;
 use uuid::Uuid;
 
 #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
@@ -12,11 +13,13 @@ pub struct Notification {
 
 impl fmt::Display for Notification {
 	fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
-		write!(
-			f,
-			"Notification {{ id: {}, action: {}, result: {} }}",
-			self.id, self.action, self.result
-		)
+		let obj: Object = map! {
+			"id".to_string() => self.id.to_string().into(),
+			"action".to_string() => self.action.to_string().into(),
+			"result".to_string() => self.result.clone(),
+		}
+		.into();
+		write!(f, "{}", obj)
 	}
 }
 
diff --git a/lib/src/doc/lives.rs b/lib/src/doc/lives.rs
index 480d33f1..8fb3c35b 100644
--- a/lib/src/doc/lives.rs
+++ b/lib/src/doc/lives.rs
@@ -36,7 +36,7 @@ impl<'a> Document<'a> {
 				// Check what type of data change this is
 				if stm.is_delete() {
 					// Send a DELETE notification
-					if opt.id()? == lv.node.0 {
+					if opt.id()? == lv.node {
 						let thing = (*rid).clone();
 						chn.send(Notification {
 							id: lv.id.0,
@@ -49,7 +49,7 @@ impl<'a> Document<'a> {
 					}
 				} else if self.is_new() {
 					// Send a CREATE notification
-					if opt.id()? == lv.node.0 {
+					if opt.id()? == lv.node {
 						chn.send(Notification {
 							id: lv.id.0,
 							action: Action::Create,
@@ -61,7 +61,7 @@ impl<'a> Document<'a> {
 					}
 				} else {
 					// Send a UPDATE notification
-					if opt.id()? == lv.node.0 {
+					if opt.id()? == lv.node {
 						chn.send(Notification {
 							id: lv.id.0,
 							action: Action::Update,
diff --git a/lib/src/err/mod.rs b/lib/src/err/mod.rs
index e9aa1ae9..9f13003d 100644
--- a/lib/src/err/mod.rs
+++ b/lib/src/err/mod.rs
@@ -278,6 +278,18 @@ pub enum Error {
 		value: String,
 	},
 
+	/// The requested live query does not exist
+	#[error("The live query '{value}' does not exist")]
+	LvNotFound {
+		value: String,
+	},
+
+	/// The requested cluster live query does not exist
+	#[error("The cluster live query '{value}' does not exist")]
+	LqNotFound {
+		value: String,
+	},
+
 	/// The requested analyzer does not exist
 	#[error("The analyzer '{value}' does not exist")]
 	AzNotFound {
@@ -504,6 +516,20 @@ pub enum Error {
 	DuplicatedMatchRef {
 		mr: MatchRef,
 	},
+
+	/// Represents a failure in timestamp arithmetic related to database internals
+	#[error("Timestamp arithmetic error: {0}")]
+	TimestampOverflow(String),
+
+	/// Internal server error
+	/// This should be used extremely sporadically, since we lose the type of error as a consequence
+	/// There will be times when it is useful, such as with unusual type conversion errors
+	#[error("Internal database error: {0}")]
+	Internal(String),
+
+	/// Unimplemented functionality
+	#[error("Unimplemented functionality: {0}")]
+	Unimplemented(String),
 }
 
 impl From<Error> for String {
diff --git a/lib/src/key/cl.rs b/lib/src/key/cl.rs
index 936fa143..483ef270 100644
--- a/lib/src/key/cl.rs
+++ b/lib/src/key/cl.rs
@@ -17,13 +17,25 @@ pub struct Cl {
 impl Cl {
 	pub fn new(nd: Uuid) -> Self {
 		Self {
-			__: 0x2f, // /
-			_a: 0x21, // !
-			_b: 0x63, // c
-			_c: 0x6c, // l
+			__: b'/',
+			_a: b'!',
+			_b: b'c',
+			_c: b'l',
 			nd,
 		}
 	}
+
+	pub fn prefix() -> Vec<u8> {
+		let mut k = super::kv::new().encode().unwrap();
+		k.extend_from_slice(&[b'!', b'c', b'l', 0x00]);
+		k
+	}
+
+	pub fn suffix() -> Vec<u8> {
+		let mut k = super::kv::new().encode().unwrap();
+		k.extend_from_slice(&[b'!', b'c', b'l', 0xff]);
+		k
+	}
 }
 
 #[cfg(test)]
@@ -31,12 +43,21 @@ mod tests {
 	#[test]
 	fn key() {
 		use super::*;
-		#[rustfmt::skip]
-            let val = Cl::new(
-            Uuid::default(),
-        );
+		let val = Cl::new(Uuid::default());
 		let enc = Cl::encode(&val).unwrap();
 		let dec = Cl::decode(&enc).unwrap();
 		assert_eq!(val, dec);
 	}
+
+	#[test]
+	fn test_prefix() {
+		let val = super::Cl::prefix();
+		assert_eq!(val, b"/!cl\0")
+	}
+
+	#[test]
+	fn test_suffix() {
+		let val = super::Cl::suffix();
+		assert_eq!(val, b"/!cl\xff")
+	}
 }
diff --git a/lib/src/key/debug.rs b/lib/src/key/debug.rs
new file mode 100644
index 00000000..d1f1d4f8
--- /dev/null
+++ b/lib/src/key/debug.rs
@@ -0,0 +1,13 @@
+use crate::kvs::Key;
+
+/// Helpers for debugging keys
+
+/// sprint_key converts a key to an escaped string.
+/// This is used for logging and debugging tests and should not be used in implementation code.
+pub fn sprint_key(key: &Key) -> String {
+	key.clone()
+		.iter()
+		.flat_map(|&byte| std::ascii::escape_default(byte))
+		.map(|byte| byte as char)
+		.collect::<String>()
+}
diff --git a/lib/src/key/hb.rs b/lib/src/key/hb.rs
index 09b9a089..18f0b698 100644
--- a/lib/src/key/hb.rs
+++ b/lib/src/key/hb.rs
@@ -1,4 +1,4 @@
-use crate::dbs::cl::Timestamp;
+use crate::dbs::cl::{KeyTimestamp, Timestamp};
 use derive::Key;
 use serde::{Deserialize, Serialize};
 use uuid::Uuid;
@@ -9,8 +9,8 @@ pub struct Hb {
 	_a: u8,
 	_b: u8,
 	_c: u8,
-	_d: u8,
 	pub hb: Timestamp,
+	_d: u8,
 	#[serde(with = "uuid::serde::compact")]
 	pub nd: Uuid,
 }
@@ -18,15 +18,40 @@ pub struct Hb {
 impl Hb {
 	pub fn new(hb: Timestamp, nd: Uuid) -> Self {
 		Self {
-			__: 0x2f, // /
-			_a: 0x21, // !
-			_b: 0x68, // h
-			_c: 0x62, // b
+			__: b'/',
+			_a: b'!',
+			_b: b'h',
+			_c: b'b',
 			hb,
-			_d: 0x2f, // /
+			_d: b'/',
 			nd,
 		}
 	}
+
+	pub fn prefix() -> Vec<u8> {
+		let mut k = super::kv::new().encode().unwrap();
+		k.extend_from_slice(&[b'!', b'h', b'b', 0x00]);
+		k
+	}
+
+	pub fn suffix(ts: &Timestamp) -> Vec<u8> {
+		// Add one to timestmap so we get a complete range inclusive of provided timestamp
+		// Also convert type
+		let tskey: KeyTimestamp = KeyTimestamp {
+			value: ts.value + 1,
+		};
+		let mut k = super::kv::new().encode().unwrap();
+		k.extend_from_slice(&[b'!', b'h', b'b']);
+		k.extend_from_slice(tskey.encode().unwrap().as_ref());
+		k
+	}
+}
+
+impl From<Timestamp> for Hb {
+	fn from(ts: Timestamp) -> Self {
+		let empty_uuid = uuid::Uuid::nil();
+		Self::new(ts, empty_uuid)
+	}
 }
 
 #[cfg(test)]
@@ -35,12 +60,32 @@ mod tests {
 	fn key() {
 		use super::*;
 		#[rustfmt::skip]
-            let val = Hb::new(
+		let val = Hb::new(
             Timestamp { value: 123 },
-            Uuid::default(),
+            Uuid::from_bytes([1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16])
         );
 		let enc = Hb::encode(&val).unwrap();
+		assert_eq!(
+            enc,
+			b"/!hb\x00\x00\x00\x00\x00\x00\x00\x7b/\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f\x10");
 		let dec = Hb::decode(&enc).unwrap();
 		assert_eq!(val, dec);
 	}
+
+	#[test]
+	fn prefix() {
+		use super::*;
+		let actual = Hb::prefix();
+		assert_eq!(actual, b"/!hb\x00")
+	}
+
+	#[test]
+	fn suffix() {
+		use super::*;
+		let ts: Timestamp = Timestamp {
+			value: 456,
+		};
+		let actual = Hb::suffix(&ts);
+		assert_eq!(actual, b"/!hb\x00\x00\x00\x00\x00\x00\x01\xc9") // 457, because we add 1 to the timestamp
+	}
 }
diff --git a/lib/src/key/lq.rs b/lib/src/key/lq.rs
index 81344117..a491081c 100644
--- a/lib/src/key/lq.rs
+++ b/lib/src/key/lq.rs
@@ -8,6 +8,7 @@ pub struct Lq<'a> {
 	_a: u8,
 	_b: u8,
 	_c: u8,
+	#[serde(with = "uuid::serde::compact")]
 	pub nd: Uuid,
 	_d: u8,
 	pub ns: &'a str,
@@ -24,6 +25,20 @@ pub fn new<'a>(nd: Uuid, ns: &'a str, db: &'a str, lq: Uuid) -> Lq<'a> {
 	Lq::new(nd, ns, db, lq)
 }
 
+pub fn prefix_nd(nd: &Uuid) -> Vec<u8> {
+	let mut k = [b'/', b'!', b'n', b'd'].to_vec();
+	k.extend_from_slice(nd.as_bytes());
+	k.extend_from_slice(&[0x00]);
+	k
+}
+
+pub fn suffix_nd(nd: &Uuid) -> Vec<u8> {
+	let mut k = [b'/', b'!', b'n', b'd'].to_vec();
+	k.extend_from_slice(nd.as_bytes());
+	k.extend_from_slice(&[0xff]);
+	k
+}
+
 impl<'a> Lq<'a> {
 	pub fn new(nd: Uuid, ns: &'a str, db: &'a str, lq: Uuid) -> Self {
 		Self {
@@ -46,18 +61,44 @@ impl<'a> Lq<'a> {
 
 #[cfg(test)]
 mod tests {
+
 	#[test]
 	fn key() {
 		use super::*;
 		#[rustfmt::skip]
-		let val = Lq::new(
-			Uuid::default(),
-			"test",
-			"test",
-			Uuid::default(),
-		);
+		let nd = Uuid::from_bytes([0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0a, 0x0b, 0x0c, 0x0d, 0x0e, 0x0f, 0x10]);
+		#[rustfmt::skip]
+		let lv = Uuid::from_bytes([0x11, 0x12, 0x13, 0x14, 0x15, 0x16, 0x17, 0x18, 0x19, 0x1a, 0x1b, 0x1c, 0x1d, 0x1e, 0x1f, 0x20]);
+		let val = Lq::new(nd, "testns", "testdb", lv);
 		let enc = Lq::encode(&val).unwrap();
+		assert_eq!(enc, b"/!nd\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f\x10*testns\x00*testdb\x00!lv\x11\x12\x13\x14\x15\x16\x17\x18\x19\x1a\x1b\x1c\x1d\x1e\x1f\x20" );
+
 		let dec = Lq::decode(&enc).unwrap();
 		assert_eq!(val, dec);
 	}
+
+	#[test]
+	fn prefix_nd() {
+		use super::*;
+		let nd = Uuid::from_bytes([
+			0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0a, 0x0b, 0x0c, 0x0d, 0x0e,
+			0x0f, 0x10,
+		]);
+		let val = prefix_nd(&nd);
+		assert_eq!(
+			val,
+			b"/!nd\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f\x10\x00"
+		);
+	}
+
+	#[test]
+	fn suffix_nd() {
+		use super::*;
+		let nd = Uuid::from_bytes([1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16]);
+		let val = suffix_nd(&nd);
+		assert_eq!(
+			val,
+			b"/!nd\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f\x10\xff"
+		);
+	}
 }
diff --git a/lib/src/key/lv.rs b/lib/src/key/lv.rs
index c10661c9..cb52f6ec 100644
--- a/lib/src/key/lv.rs
+++ b/lib/src/key/lv.rs
@@ -54,18 +54,34 @@ impl<'a> Lv<'a> {
 
 #[cfg(test)]
 mod tests {
+	use crate::key::debug;
+
 	#[test]
 	fn key() {
 		use super::*;
 		#[rustfmt::skip]
-		let val = Lv::new(
-			"test",
-			"test",
-			"test",
-			Uuid::default(),
-		);
+		let live_query_id = Uuid::from_bytes([1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16]);
+		let val = Lv::new("testns", "testdb", "testtb", live_query_id);
 		let enc = Lv::encode(&val).unwrap();
+		println!("{:?}", debug::sprint_key(&enc));
+		assert_eq!(
+			enc,
+			b"/*testns\x00*testdb\x00*testtb\x00!lv\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f\x10"
+		);
+
 		let dec = Lv::decode(&enc).unwrap();
 		assert_eq!(val, dec);
 	}
+
+	#[test]
+	fn prefix() {
+		let val = super::prefix("testns", "testdb", "testtb");
+		assert_eq!(val, b"/*testns\x00*testdb\x00*testtb\x00!lv\x00")
+	}
+
+	#[test]
+	fn suffix() {
+		let val = super::suffix("testns", "testdb", "testtb");
+		assert_eq!(val, b"/*testns\x00*testdb\x00*testtb\x00!lv\xff")
+	}
 }
diff --git a/lib/src/key/mod.rs b/lib/src/key/mod.rs
index fc508778..1a960eec 100644
--- a/lib/src/key/mod.rs
+++ b/lib/src/key/mod.rs
@@ -2,11 +2,11 @@
 ///
 /// KV              /
 ///
-/// ND              /!nd{nd}
-/// LQ              /!nd{nd}*{ns}*{db}!lq{lq}
-///
 /// HB              /!hb{ts}/{nd}
 ///
+/// ND              /!nd{nd}
+/// NQ              /!nd{nd}*{ns}*{db}!lq{lq}
+///
 /// NS              /!ns{ns}
 ///
 /// Namespace       /*{ns}
@@ -16,7 +16,7 @@
 ///
 /// Database        /*{ns}*{db}
 /// AZ              /*{ns}*{db}!az{az}
-/// CF /*{ns}*{db}!cf{ts}
+/// CF              /*{ns}*{db}!cf{ts}
 /// DL              /*{ns}*{db}!dl{us}
 /// DT              /*{ns}*{db}!dt{tk}
 /// PA              /*{ns}*{db}!pa{pa}
@@ -67,6 +67,7 @@ pub mod cf; // Stores change feeds
 pub mod cl; // Stores cluster membership information
 pub mod database; // Stores the key prefix for all keys under a database
 pub mod db; // Stores a DEFINE DATABASE config definition
+pub mod debug; // Debug purposes only. It may be used in logs. Not for key handling in implementation code.
 pub mod dl; // Stores a DEFINE LOGIN ON DATABASE config definition
 pub mod dt; // Stores a DEFINE LOGIN ON DATABASE config definition
 pub mod dv; // Stores database versionstamps
@@ -91,7 +92,7 @@ pub mod scope; // Stores the key prefix for all keys under a scope
 pub mod st; // Stores a DEFINE TOKEN ON SCOPE config definition
 pub mod table; // Stores the key prefix for all keys under a table
 pub mod tb; // Stores a DEFINE TABLE config definition
-pub mod thing;
+pub mod thing; // Stores a record id
 
 const CHAR_PATH: u8 = 0xb1; // ±
 const CHAR_INDEX: u8 = 0xa4; // ¤
diff --git a/lib/src/kvs/ds.rs b/lib/src/kvs/ds.rs
index 4bb714ea..34ba15e3 100644
--- a/lib/src/kvs/ds.rs
+++ b/lib/src/kvs/ds.rs
@@ -1,5 +1,5 @@
-use super::tx::Transaction;
 use crate::ctx::Context;
+use crate::dbs::cl::Timestamp;
 use crate::dbs::Attach;
 use crate::dbs::Executor;
 use crate::dbs::Notification;
@@ -8,6 +8,9 @@ use crate::dbs::Response;
 use crate::dbs::Session;
 use crate::dbs::Variables;
 use crate::err::Error;
+use crate::key::hb::Hb;
+use crate::key::lq;
+use crate::key::lv::Lv;
 use crate::sql;
 use crate::sql::Query;
 use crate::sql::Value;
@@ -18,8 +21,22 @@ use std::fmt;
 use std::sync::Arc;
 use std::time::Duration;
 use tracing::instrument;
+use tracing::trace;
 use uuid::Uuid;
 
+use super::tx::Transaction;
+
+/// Used for cluster logic to move LQ data to LQ cleanup code
+/// Not a stored struct; Used only in this module
+#[derive(Debug, Clone, Eq, PartialEq)]
+pub struct LqValue {
+	pub cl: Uuid,
+	pub ns: String,
+	pub db: String,
+	pub tb: String,
+	pub lq: Uuid,
+}
+
 /// The underlying datastore instance which stores the dataset.
 #[allow(dead_code)]
 pub struct Datastore {
@@ -114,6 +131,13 @@ impl Datastore {
 	/// # }
 	/// ```
 	pub async fn new(path: &str) -> Result<Datastore, Error> {
+		let id = Uuid::new_v4();
+		Self::new_full(path, id).await
+	}
+
+	// For testing
+	pub async fn new_full(path: &str, node_id: Uuid) -> Result<Datastore, Error> {
+		// Initiate the desired datastore
 		let inner = match path {
 			"memory" => {
 				#[cfg(feature = "kv-mem")]
@@ -218,7 +242,7 @@ impl Datastore {
 		};
 		// Set the properties on the datastore
 		inner.map(|inner| Self {
-			id: Uuid::default(),
+			id: node_id,
 			inner,
 			strict: false,
 			query_timeout: None,
@@ -251,24 +275,204 @@ impl Datastore {
 		self
 	}
 
-	// Adds entries to the KV store indicating membership information
-	pub async fn register_membership(&self) -> Result<(), Error> {
+	/// Creates a new datastore instance
+	///
+	/// Use this for clustered environments.
+	pub async fn new_with_bootstrap(path: &str) -> Result<Datastore, Error> {
+		let ds = Datastore::new(path).await?;
+		ds.bootstrap().await?;
+		Ok(ds)
+	}
+
+	// Initialise bootstrap with implicit values intended for runtime
+	pub async fn bootstrap(&self) -> Result<(), Error> {
+		self.bootstrap_full(&self.id).await
+	}
+
+	// Initialise bootstrap with artificial values, intended for testing
+	pub async fn bootstrap_full(&self, node_id: &Uuid) -> Result<(), Error> {
+		trace!("Bootstrapping {}", self.id);
 		let mut tx = self.transaction(true, false).await?;
-		tx.set_cl(self.id).await?;
-		tx.set_hb(self.id).await?;
+		let now = tx.clock();
+		let archived = self.register_remove_and_archive(&mut tx, node_id, now).await?;
 		tx.commit().await?;
+
+		let mut tx = self.transaction(true, false).await?;
+		self.remove_archived(&mut tx, archived).await?;
+		tx.commit().await
+	}
+
+	// Node registration + "mark" stage of mark-and-sweep gc
+	pub async fn register_remove_and_archive(
+		&self,
+		tx: &mut Transaction,
+		node_id: &Uuid,
+		timestamp: Timestamp,
+	) -> Result<Vec<LqValue>, Error> {
+		trace!("Registering node {}", node_id);
+		self.register_membership(tx, node_id, &timestamp).await?;
+		// Determine the timeout for when a cluster node is expired
+		let ts_expired = (timestamp.clone() - std::time::Duration::from_secs(5))?;
+		let dead = self.remove_dead_nodes(tx, &ts_expired).await?;
+		self.archive_dead_lqs(tx, &dead, node_id).await
+	}
+
+	// Adds entries to the KV store indicating membership information
+	pub async fn register_membership(
+		&self,
+		tx: &mut Transaction,
+		node_id: &Uuid,
+		timestamp: &Timestamp,
+	) -> Result<(), Error> {
+		tx.set_cl(*node_id).await?;
+		tx.set_hb(timestamp.clone(), *node_id).await?;
 		Ok(())
 	}
 
-	// Creates a heartbeat entry for the member indicating to the cluster
-	// that the node is alive
-	pub async fn heartbeat(&self) -> Result<(), Error> {
-		let mut tx = self.transaction(true, false).await?;
-		tx.set_hb(self.id).await?;
-		tx.commit().await?;
+	/// Delete dead heartbeats and nodes
+	/// Returns node IDs
+	pub async fn remove_dead_nodes(
+		&self,
+		tx: &mut Transaction,
+		ts: &Timestamp,
+	) -> Result<Vec<Uuid>, Error> {
+		let hbs = self.delete_dead_heartbeats(tx, ts).await?;
+		let mut nodes = vec![];
+		for hb in hbs {
+			trace!("Deleting node {}", &hb.nd);
+			tx.del_cl(hb.nd).await?;
+			nodes.push(hb.nd);
+		}
+		Ok(nodes)
+	}
+
+	/// Accepts cluster IDs
+	/// Archives related live queries
+	/// Returns live query keys that can be used for deletes
+	///
+	/// The reason we archive first is to stop other nodes from picking it up for further updates
+	/// This means it will be easier to wipe the range in a subsequent transaction
+	pub async fn archive_dead_lqs(
+		&self,
+		tx: &mut Transaction,
+		nodes: &[Uuid],
+		this_node_id: &Uuid,
+	) -> Result<Vec<LqValue>, Error> {
+		let mut archived = vec![];
+		for nd in nodes.iter() {
+			trace!("Archiving node {}", &nd);
+			// Scan on node prefix for LQ space
+			let node_lqs = tx.scan_lq(nd, 1000).await?;
+			trace!("Found {} LQ entries for {:?}", node_lqs.len(), nd);
+			for lq in node_lqs {
+				trace!("Archiving query {:?}", &lq);
+				let node_archived_lqs = self.archive_lv_for_node(tx, &lq.cl, this_node_id).await?;
+				for lq_value in node_archived_lqs {
+					archived.push(lq_value);
+				}
+			}
+		}
+		Ok(archived)
+	}
+
+	pub async fn remove_archived(
+		&self,
+		tx: &mut Transaction,
+		archived: Vec<LqValue>,
+	) -> Result<(), Error> {
+		for lq in archived {
+			// Delete the cluster key, used for finding LQ associated with a node
+			tx.del(lq::new(lq.cl, lq.ns.as_str(), lq.db.as_str(), lq.lq)).await?;
+			// Delete the table key, used for finding LQ associated with a table
+			tx.del(Lv::new(lq.ns.as_str(), lq.db.as_str(), lq.tb.as_str(), lq.lq)).await?;
+		}
 		Ok(())
 	}
 
+	pub async fn _garbage_collect(
+		// TODO not invoked
+		// But this is garbage collection outside of bootstrap
+		&self,
+		tx: &mut Transaction,
+		watermark: &Timestamp,
+		this_node_id: &Uuid,
+	) -> Result<(), Error> {
+		let dead_heartbeats = self.delete_dead_heartbeats(tx, watermark).await?;
+		trace!("Found dead hbs: {:?}", dead_heartbeats);
+		let mut archived: Vec<LqValue> = vec![];
+		for hb in dead_heartbeats {
+			let new_archived = self.archive_lv_for_node(tx, &hb.nd, this_node_id).await?;
+			tx.del_cl(hb.nd).await?;
+			trace!("Deleted node {}", hb.nd);
+			for lq_value in new_archived {
+				archived.push(lq_value);
+			}
+		}
+		Ok(())
+	}
+
+	// Returns a list of live query IDs
+	pub async fn archive_lv_for_node(
+		&self,
+		tx: &mut Transaction,
+		nd: &Uuid,
+		this_node_id: &Uuid,
+	) -> Result<Vec<LqValue>, Error> {
+		let lqs = tx.all_lq(nd).await?;
+		trace!("Archiving lqs and found {} LQ entries for {}", lqs.len(), nd);
+		let mut ret = vec![];
+		for lq in lqs {
+			let lvs = tx.get_lv(lq.ns.as_str(), lq.db.as_str(), lq.tb.as_str(), &lq.lq).await?;
+			let archived_lvs = lvs.clone().archive(*this_node_id);
+			tx.putc_lv(&lq.ns, &lq.db, &lq.tb, archived_lvs, Some(lvs)).await?;
+			ret.push(lq);
+		}
+		Ok(ret)
+	}
+
+	/// Given a timestamp, delete all the heartbeats that have expired
+	/// Return the removed heartbeats as they will contain node information
+	pub async fn delete_dead_heartbeats(
+		&self,
+		tx: &mut Transaction,
+		ts: &Timestamp,
+	) -> Result<Vec<Hb>, Error> {
+		let limit = 1000;
+		let dead = tx.scan_hb(ts, limit).await?;
+		tx.delr_hb(dead.clone(), 1000).await?;
+		for dead_node in dead.clone() {
+			tx.del_cl(dead_node.nd).await?;
+		}
+		Ok::<Vec<Hb>, Error>(dead)
+	}
+
+	// Creates a heartbeat entry for the member indicating to the cluster
+	// that the node is alive.
+	// This is the preferred way of creating heartbeats inside the database, so try to use this.
+	pub async fn heartbeat(&self) -> Result<(), Error> {
+		let mut tx = self.transaction(true, false).await?;
+		let timestamp = tx.clock();
+		self.heartbeat_full(&mut tx, timestamp, self.id).await?;
+		tx.commit().await
+	}
+
+	// Creates a heartbeat entry for the member indicating to the cluster
+	// that the node is alive. Intended for testing.
+	// This includes all dependencies that are hard to control and is done in such a way for testing.
+	// Inside the database, try to use the heartbeat() function instead.
+	pub async fn heartbeat_full(
+		&self,
+		tx: &mut Transaction,
+		timestamp: Timestamp,
+		node_id: Uuid,
+	) -> Result<(), Error> {
+		tx.set_hb(timestamp, node_id).await
+	}
+
+	// -----
+	// End cluster helpers, storage functions here
+	// -----
+
 	/// Create a new transaction on this datastore
 	///
 	/// ```rust,no_run
diff --git a/lib/src/kvs/tests/cluster_init.rs b/lib/src/kvs/tests/cluster_init.rs
new file mode 100644
index 00000000..00e2e234
--- /dev/null
+++ b/lib/src/kvs/tests/cluster_init.rs
@@ -0,0 +1,114 @@
+use futures::lock::Mutex;
+use std::sync::Arc;
+
+use crate::ctx::context;
+
+use crate::dbs::{Options, Session};
+use crate::sql;
+use crate::sql::statements::LiveStatement;
+use crate::sql::Value::Table;
+use crate::sql::{Fields, Value};
+use uuid;
+
+#[tokio::test]
+#[serial]
+async fn expired_nodes_are_garbage_collected() {
+	let test = match init().await {
+		Ok(test) => test,
+		Err(e) => panic!("{}", e),
+	};
+
+	// Set up the first node at an early timestamp
+	let old_node = uuid::Uuid::new_v4();
+	let old_time = Timestamp {
+		value: 123,
+	};
+	test.bootstrap_at_time(&old_node, old_time.clone()).await.unwrap();
+
+	// Set up second node at a later timestamp
+	let new_node = uuid::Uuid::new_v4();
+	let new_time = Timestamp {
+		value: 456,
+	};
+	test.bootstrap_at_time(&new_node, new_time.clone()).await.unwrap();
+
+	// Now scan the heartbeats to validate there is only one node left
+	let mut tx = test.db.transaction(true, false).await.unwrap();
+	let scanned = tx.scan_hb(&new_time, 100).await.unwrap();
+	assert_eq!(scanned.len(), 1);
+	for hb in scanned.iter() {
+		assert_eq!(&hb.nd, &new_node);
+	}
+
+	// And scan the nodes to verify its just the latest also
+	let scanned = tx.scan_cl(100).await.unwrap();
+	assert_eq!(scanned.len(), 1);
+	for cl in scanned.iter() {
+		assert_eq!(&cl.name, &new_node.to_string());
+	}
+
+	tx.commit().await.unwrap();
+}
+
+#[tokio::test]
+#[serial]
+async fn expired_nodes_get_live_queries_archived() {
+	let test = match init().await {
+		Ok(test) => test,
+		Err(e) => panic!("{}", e),
+	};
+
+	// Set up the first node at an early timestamp
+	let old_node = uuid::Uuid::from_fields(0, 1, 2, &[3, 4, 5, 6, 7, 8, 9, 10]);
+	let old_time = Timestamp {
+		value: 123,
+	};
+	test.bootstrap_at_time(&old_node, old_time.clone()).await.unwrap();
+
+	// Set up live query
+	let ses = Session::for_kv()
+		.with_ns(test.test_str("testns").as_str())
+		.with_db(test.test_str("testdb").as_str());
+	let table = "my_table";
+	let lq = LiveStatement {
+		id: sql::Uuid(uuid::Uuid::new_v4()),
+		node: Uuid::new_v4(),
+		expr: Fields(vec![sql::Field::All], false),
+		what: Table(sql::Table::from(table)),
+		cond: None,
+		fetch: None,
+		archived: Some(old_node),
+	};
+	let ctx = context::Context::background();
+	let (sender, _) = channel::unbounded();
+	let opt = Options::new()
+		.with_ns(ses.ns())
+		.with_db(ses.db())
+		.with_auth(Arc::new(Default::default()))
+		.with_live(true)
+		.with_id(old_node.clone());
+	let opt = Options::new_with_sender(&opt, sender);
+	let tx = Arc::new(Mutex::new(test.db.transaction(true, false).await.unwrap()));
+	let res = lq.compute(&ctx, &opt, &tx, None).await.unwrap();
+	match res {
+		Value::Uuid(_) => {}
+		_ => {
+			panic!("Not a uuid: {:?}", res);
+		}
+	}
+	tx.lock().await.commit().await.unwrap();
+
+	// Set up second node at a later timestamp
+	let new_node = uuid::Uuid::from_fields(16, 17, 18, &[19, 20, 21, 22, 23, 24, 25, 26]);
+	let new_time = Timestamp {
+		value: 456,
+	}; // TODO These timestsamps are incorrect and should really be derived; Also check timestamp errors
+	test.bootstrap_at_time(&new_node, new_time.clone()).await.unwrap();
+
+	// Now validate lq was removed
+	let mut tx = test.db.transaction(true, false).await.unwrap();
+	let scanned =
+		tx.all_lv(ses.ns().unwrap().as_ref(), ses.db().unwrap().as_ref(), table).await.unwrap();
+	assert_eq!(scanned.len(), 0);
+	tx.commit().await.unwrap();
+}
diff --git a/lib/src/kvs/tests/helper.rs b/lib/src/kvs/tests/helper.rs
new file mode 100644
index 00000000..83aa4be8
--- /dev/null
+++ b/lib/src/kvs/tests/helper.rs
@@ -0,0 +1,63 @@
+use crate::dbs::cl::Timestamp;
+use crate::err::Error;
+use std::sync::Once;
+use tracing::Level;
+use tracing_subscriber;
+
+pub struct TestContext {
+	pub(crate) db: Datastore,
+	// A string identifier for this context.
+	// It will usually be a uuid or combination of uuid and fixed string identifier.
+	// It is useful for separating test setups when environments are shared.
+	pub(crate) context_id: String,
+}
+
+static INIT: Once = Once::new();
+
+/// TestContext is a container for an initialised test context
+/// Anything stateful (such as storage layer and logging) can be tied with this
+impl TestContext {
+	pub(crate) async fn bootstrap_at_time(
+		&self,
+		node_id: &Uuid,
+		time: Timestamp,
+	) -> Result<(), Error> {
+		let mut tx = self.db.transaction(true, true).await?;
+		let archived = self.db.register_remove_and_archive(&mut tx, node_id, time).await?;
+		tx.commit().await?;
+		let mut tx = self.db.transaction(true, true).await?;
+		self.db.remove_archived(&mut tx, archived).await?;
+		Ok(tx.commit().await?)
+	}
+
+	// Use this to generate strings that have the test uuid associated with it
+	pub fn test_str(&self, prefix: &str) -> String {
+		return format!("{}-{}", prefix, self.context_id);
+	}
+}
+
+/// Initialise logging and prepare a useable datastore
+/// In the future it would be nice to handle multiple datastores
+pub(crate) async fn init() -> Result<TestContext, Error> {
+	// Set tracing for tests for debug, but only do it once
+	INIT.call_once(|| {
+		let _subscriber = tracing_subscriber::fmt().with_max_level(Level::TRACE).try_init();
+	});
+
+	let db = new_ds().await;
+	return Ok(TestContext {
+		db,
+		context_id: Uuid::new_v4().to_string(), // The context does not always have to be a uuid
+	});
+}
+
+/// Scan the entire storage layer displaying keys
+/// Useful to debug scans ;)
+async fn _debug_scan(tx: &mut Transaction, message: &str) {
+	let r = tx.scan(vec![0]..vec![u8::MAX], 100000).await.unwrap();
+	println!("START OF RANGE SCAN - {}", message);
+	for (k, _v) in r.iter() {
+		println!("{}", crate::key::debug::sprint_key(k.as_ref()));
+	}
+	println!("END OF RANGE SCAN - {}", message);
+}
diff --git a/lib/src/kvs/tests/lq.rs b/lib/src/kvs/tests/lq.rs
new file mode 100644
index 00000000..178d53d2
--- /dev/null
+++ b/lib/src/kvs/tests/lq.rs
@@ -0,0 +1,42 @@
+use uuid::Uuid;
+
+#[tokio::test]
+#[serial]
+async fn scan_node_lq() {
+	let test = init().await.unwrap();
+	let mut tx = test.db.transaction(true, true).await.unwrap();
+	let node_id = Uuid::from_bytes([
+		0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E,
+		0x0F,
+	]);
+	let namespace = "test_namespace";
+	let database = "test_database";
+	let live_query_id = Uuid::from_bytes([
+		0x10, 0x11, 0x12, 0x13, 0x14, 0x15, 0x16, 0x17, 0x18, 0x19, 0x1A, 0x1B, 0x1C, 0x1D, 0x1E,
+		0x1F,
+	]);
+	let key = crate::key::lq::new(node_id, namespace, database, live_query_id);
+	trace!(
+		"Inserting key: {}",
+		key.encode()
+			.unwrap()
+			.iter()
+			.flat_map(|byte| std::ascii::escape_default(byte.clone()))
+			.map(|byte| byte as char)
+			.collect::<String>()
+	);
+	let _ = tx.putc(key, "value", None).await.unwrap();
+	tx.commit().await.unwrap();
+	let mut tx = test.db.transaction(true, true).await.unwrap();
+
+	let res = tx.scan_lq(&node_id, 100).await.unwrap();
+	assert_eq!(res.len(), 1);
+	for val in res {
+		assert_eq!(val.cl, node_id);
+		assert_eq!(val.ns, namespace);
+		assert_eq!(val.db, database);
+		assert_eq!(val.lq, live_query_id);
+	}
+
+	tx.commit().await.unwrap();
+}
diff --git a/lib/src/kvs/tests/lv.rs b/lib/src/kvs/tests/lv.rs
new file mode 100644
index 00000000..0d57a14f
--- /dev/null
+++ b/lib/src/kvs/tests/lv.rs
@@ -0,0 +1,52 @@
+use crate::sql::statements::live::live;
+
+#[tokio::test]
+#[serial]
+async fn archive_lv_for_node_archives() {
+	let test = init().await.unwrap();
+	let mut tx = test.db.transaction(true, true).await.unwrap();
+	let namespace = "test_namespace";
+	let database = "test_database";
+	let table = "test_table";
+	let node_id = Uuid::from_bytes([
+		0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E,
+		0x0F,
+	]);
+	tx.set_cl(node_id).await.unwrap();
+
+	let lv_id = Uuid::from_bytes([
+		0x10, 0x11, 0x12, 0x13, 0x14, 0x15, 0x16, 0x17, 0x18, 0x19, 0x1A, 0x1B, 0x1C, 0x1D, 0x1E,
+		0x1F,
+	]);
+
+	let key = crate::key::lq::new(node_id, namespace, database, lv_id);
+	tx.putc(key, table, None).await.unwrap();
+
+	let (_, mut stm) = live(format!("LIVE SELECT * FROM {}", table).as_str()).unwrap();
+	stm.id = crate::sql::Uuid::from(lv_id);
+	tx.putc_lv(namespace, database, table, stm, None).await.unwrap();
+
+	let this_node_id = Uuid::from_bytes([
+		0x20, 0x21, 0x22, 0x23, 0x24, 0x25, 0x26, 0x27, 0x28, 0x29, 0x2A, 0x2B, 0x2C, 0x2D, 0x2E,
+		0x2F,
+	]);
+	// We commit after setup because otherwise in memory does not have read your own writes
+	// i.e. setup data is part of same transaction as required implementation checks
+	tx.commit().await.unwrap();
+
+	let mut tx = test.db.transaction(true, false).await.unwrap();
+	let results = test.db.archive_lv_for_node(&mut tx, &node_id, &this_node_id).await.unwrap();
+	assert_eq!(results.len(), 1);
+	assert_eq!(results[0].cl, node_id);
+	assert_eq!(results[0].ns, namespace);
+	assert_eq!(results[0].db, database);
+	assert_eq!(results[0].tb, table);
+	assert_eq!(results[0].lq, lv_id);
+	tx.commit().await.unwrap();
+
+	let mut tx = test.db.transaction(true, false).await.unwrap();
+	let lv = tx.all_lv(namespace, database, table).await.unwrap();
+	assert_eq!(lv.len(), 1, "{:?}", lv);
+	assert_eq!(lv[0].archived, Some(this_node_id));
+	tx.commit().await.unwrap();
+}
diff --git a/lib/src/kvs/tests/mod.rs b/lib/src/kvs/tests/mod.rs
index 0be29946..deeb16e9 100644
--- a/lib/src/kvs/tests/mod.rs
+++ b/lib/src/kvs/tests/mod.rs
@@ -13,8 +13,13 @@ mod mem {
 		new_ds().await.transaction(write, lock).await.unwrap()
 	}
 
+	include!("helper.rs");
+	include!("cluster_init.rs");
+	include!("lq.rs");
+	include!("lv.rs");
 	include!("raw.rs");
 	include!("snapshot.rs");
+	include!("tb.rs");
 	include!("multireader.rs");
 }
 
@@ -35,8 +40,13 @@ mod rocksdb {
 		new_ds().await.transaction(write, lock).await.unwrap()
 	}
 
+	include!("helper.rs");
+	include!("cluster_init.rs");
+	include!("lq.rs");
+	include!("lv.rs");
 	include!("raw.rs");
 	include!("snapshot.rs");
+	include!("tb.rs");
 	include!("multireader.rs");
 	include!("multiwriter_different_keys.rs");
 	include!("multiwriter_same_keys_conflict.rs");
@@ -59,8 +69,13 @@ mod speedb {
 		new_ds().await.transaction(write, lock).await.unwrap()
 	}
 
+	include!("helper.rs");
+	include!("cluster_init.rs");
+	include!("lq.rs");
+	include!("lv.rs");
 	include!("raw.rs");
 	include!("snapshot.rs");
+	include!("tb.rs");
 	include!("multireader.rs");
 	include!("multiwriter_different_keys.rs");
 	include!("multiwriter_same_keys_conflict.rs");
@@ -87,8 +102,13 @@ mod tikv {
 		new_ds().await.transaction(write, lock).await.unwrap()
 	}
 
+	include!("cluster_init.rs");
+	include!("helper.rs");
+	include!("lq.rs");
+	include!("lv.rs");
 	include!("raw.rs");
 	include!("snapshot.rs");
+	include!("tb.rs");
 	include!("multireader.rs");
 	include!("multiwriter_different_keys.rs");
 	include!("multiwriter_same_keys_conflict.rs");
@@ -115,8 +135,13 @@ mod fdb {
 		new_ds().await.transaction(write, lock).await.unwrap()
 	}
 
+	include!("cluster_init.rs");
+	include!("helper.rs");
+	include!("lq.rs");
+	include!("lv.rs");
 	include!("raw.rs");
 	include!("snapshot.rs");
+	include!("tb.rs");
 	include!("multireader.rs");
 	include!("multiwriter_different_keys.rs");
 	include!("multiwriter_same_keys_allow.rs");
diff --git a/lib/src/kvs/tests/tb.rs b/lib/src/kvs/tests/tb.rs
index b168b846..91c51f50 100644
--- a/lib/src/kvs/tests/tb.rs
+++ b/lib/src/kvs/tests/tb.rs
@@ -1,103 +1,90 @@
-#[cfg(feature = "kv-mem")]
-pub(crate) mod table {
-	use crate::err::Error;
-	use crate::key::tb;
-	use crate::key::tb::Tb;
-	use crate::kvs::Datastore;
-	use crate::sql::statements::DefineTableStatement;
+use crate::key::tb;
+use crate::key::tb::Tb;
+use crate::sql::statements::DefineTableStatement;
 
-	struct TestContext {
-		db: Datastore,
-	}
+#[tokio::test]
+#[serial]
+async fn table_definitions_can_be_scanned() {
+	// Setup
+	let test = match init().await {
+		Ok(ctx) => ctx,
+		Err(e) => panic!("{:?}", e),
+	};
+	let mut tx = match test.db.transaction(true, false).await {
+		Ok(tx) => tx,
+		Err(e) => panic!("{:?}", e),
+	};
 
-	async fn init() -> Result<TestContext, Error> {
-		let db = Datastore::new("memory").await?;
-		return Ok(TestContext {
-			db,
-		});
-	}
+	// Create a table definition
+	let namespace = "test_namespace";
+	let database = "test_database";
+	let table = "test_table";
+	let key = Tb::new(namespace, database, table);
+	let value = DefineTableStatement {
+		name: Default::default(),
+		drop: false,
+		full: false,
+		view: None,
+		permissions: Default::default(),
+		changefeed: None,
+	};
+	match tx.set(&key, &value).await {
+		Ok(_) => {}
+		Err(e) => panic!("{:?}", e),
+	};
 
-	#[tokio::test]
-	#[rustfmt::skip]
-	async fn table_definitions_can_be_scanned() {
-		// Setup
-		let test = match init().await {
-			Ok(ctx) => ctx,
-			Err(e) => panic!("{:?}", e),
-		};
-		let mut tx = match test.db.transaction(true, false).await {
-			Ok(tx) => tx,
-			Err(e) => panic!("{:?}", e),
-		};
-
-		// Create a table definition
-		let namespace = "test_namespace";
-		let database = "test_database";
-		let table = "test_table";
-		let key = Tb::new(namespace, database, table);
-		let value = DefineTableStatement {
-			name: Default::default(),
-			drop: false,
-			full: false,
-			view: None,
-			permissions: Default::default(),
-		};
-		match tx.set(&key, &value).await {
-			Ok(_) => {}
-			Err(e) => panic!("{:?}", e),
-		};
-
-		// Validate with scan
-		match tx.scan(tb::prefix(namespace, database)..tb::suffix(namespace, database), 1000).await {
-			Ok(scan) => {
-				assert_eq!(scan.len(), 1);
-				let read = DefineTableStatement::from(&scan[0].1);
-				assert_eq!(&read, &value);
-			}
-			Err(e) => panic!("{:?}", e),
+	// Validate with scan
+	match tx.scan(tb::prefix(namespace, database)..tb::suffix(namespace, database), 1000).await {
+		Ok(scan) => {
+			assert_eq!(scan.len(), 1);
+			let read = DefineTableStatement::from(&scan[0].1);
+			assert_eq!(&read, &value);
 		}
-	}
-
-	#[tokio::test]
-	async fn table_definitions_can_be_deleted() {
-		// Setup
-		let test = match init().await {
-			Ok(ctx) => ctx,
-			Err(e) => panic!("{:?}", e),
-		};
-		let mut tx = match test.db.transaction(true, false).await {
-			Ok(tx) => tx,
-			Err(e) => panic!("{:?}", e),
-		};
-
-		// Create a table definition
-		let namespace = "test_namespace";
-		let database = "test_database";
-		let table = "test_table";
-		let key = Tb::new(namespace, database, table);
-		let value = DefineTableStatement {
-			name: Default::default(),
-			drop: false,
-			full: false,
-			view: None,
-			permissions: Default::default(),
-		};
-		match tx.set(&key, &value).await {
-			Ok(_) => {}
-			Err(e) => panic!("{:?}", e),
-		};
-
-		// Validate delete
-		match tx.del(&key).await {
-			Ok(_) => {}
-			Err(e) => panic!("{:?}", e),
-		};
-
-		// Should not exist
-		match tx.get(&key).await {
-			Ok(None) => {}
-			Ok(Some(o)) => panic!("Should not exist but was {:?}", o),
-			Err(e) => panic!("Unexpected error on get {:?}", e),
-		};
+		Err(e) => panic!("{:?}", e),
 	}
 }
+
+#[tokio::test]
+#[serial]
+async fn table_definitions_can_be_deleted() {
+	// Setup
+	let test = match init().await {
+		Ok(ctx) => ctx,
+		Err(e) => panic!("{:?}", e),
+	};
+	let mut tx = match test.db.transaction(true, false).await {
+		Ok(tx) => tx,
+		Err(e) => panic!("{:?}", e),
+	};
+
+	// Create a table definition
+	let namespace = "test_namespace";
+	let database = "test_database";
+	let table = "test_table";
+	let key = Tb::new(namespace, database, table);
+	let value = DefineTableStatement {
+		name: Default::default(),
+		drop: false,
+		full: false,
+		view: None,
+		permissions: Default::default(),
+		changefeed: None,
+	};
+	match tx.set(&key, &value).await {
+		Ok(_) => {}
+		Err(e) => panic!("{:?}", e),
+	};
+
+	// Validate delete
+	match tx.del(&key).await {
+		Ok(_) => {}
+		Err(e) => panic!("{:?}", e),
+	};
+
+	// Should not exist
+	match tx.get(&key).await {
+		Ok(None) => {}
+		Ok(Some(o)) => panic!("Should not exist but was {:?}", o),
+		Err(e) => panic!("Unexpected error on get {:?}", e),
+	};
+}
diff --git a/lib/src/kvs/tx.rs b/lib/src/kvs/tx.rs
index 4221d8fd..e461fc8b 100644
--- a/lib/src/kvs/tx.rs
+++ b/lib/src/kvs/tx.rs
@@ -5,15 +5,20 @@ use super::Val;
 use crate::dbs::cl::ClusterMembership;
 use crate::dbs::cl::Timestamp;
 use crate::err::Error;
-use crate::key::thing;
+use crate::key::hb::Hb;
+use crate::key::lq::Lq;
+use crate::key::lv::Lv;
+use crate::key::{lq, thing};
 use crate::kvs::cache::Cache;
 use crate::kvs::cache::Entry;
+use crate::kvs::LqValue;
 use crate::sql;
 use crate::sql::paths::EDGE;
 use crate::sql::paths::IN;
 use crate::sql::paths::OUT;
 use crate::sql::thing::Thing;
 use crate::sql::value::Value;
+use crate::sql::Strand;
 use channel::Sender;
 use sql::permission::Permissions;
 use sql::statements::DefineAnalyzerStatement;
@@ -820,7 +825,7 @@ impl Transaction {
 		}
 	}
 
-	fn clock(&self) -> Timestamp {
+	pub(crate) fn clock(&self) -> Timestamp {
 		// Use a timestamp oracle if available
 		let now: u128 = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_millis();
 		Timestamp {
@@ -829,21 +834,190 @@ impl Transaction {
 	}
 
 	// Set heartbeat
-	pub async fn set_hb(&mut self, id: Uuid) -> Result<(), Error> {
-		let now = self.clock();
-		let key = crate::key::hb::Hb::new(now.clone(), id);
+	pub async fn set_hb(&mut self, timestamp: Timestamp, id: Uuid) -> Result<(), Error> {
+		let key = Hb::new(timestamp.clone(), id);
 		// We do not need to do a read, we always want to overwrite
 		self.put(
 			key,
 			ClusterMembership {
 				name: id.to_string(),
-				heartbeat: now,
+				heartbeat: timestamp,
 			},
 		)
 		.await?;
 		Ok(())
 	}
 
+	// Delete a cluster registration entry
+	pub async fn del_cl(&mut self, node: uuid::Uuid) -> Result<(), Error> {
+		let key = crate::key::cl::Cl::new(node);
+		self.del(key).await
+	}
+
+	// Delete the live query notification registry on the table
+	// Return the Table ID
+	pub async fn del_cllv(&mut self, cl: &Uuid) -> Result<Uuid, Error> {
+		// This isn't implemented because it is covered by del_cl
+		// Will add later for remote node kill
+		Err(Error::ClNotFound {
+			value: format!("Missing cluster {:?}", cl),
+		})
+	}
+
+	// Scans up until the heartbeat timestamp and returns the discovered nodes
+	pub async fn scan_hb(&mut self, time_to: &Timestamp, limit: u32) -> Result<Vec<Hb>, Error> {
+		let beg = crate::key::hb::Hb::prefix();
+		let end = crate::key::hb::Hb::suffix(time_to);
+		trace!("Scan start: {} ({:?})", String::from_utf8_lossy(&beg).to_string(), &beg);
+		trace!("Scan end: {} ({:?})", String::from_utf8_lossy(&end).to_string(), &end);
+		let mut nxt: Option<Key> = None;
+		let mut num = limit;
+		let mut out: Vec<Hb> = vec![];
+		// Start processing
+		while num > 0 {
+			// Get records batch
+			let res = match nxt {
+				None => {
+					let min = beg.clone();
+					let max = end.clone();
+					let num = std::cmp::min(1000, num);
+					self.scan(min..max, num).await?
+				}
+				Some(ref mut beg) => {
+					beg.push(0x00);
+					let min = beg.clone();
+					let max = end.clone();
+					let num = std::cmp::min(1000, num);
+					self.scan(min..max, num).await?
+				}
+			};
+			// Get total results
+			let n = res.len();
+			// Exit when settled
+			if n == 0 {
+				break;
+			}
+			// Loop over results
+			for (i, (k, _)) in res.into_iter().enumerate() {
+				// Ready the next
+				if n == i + 1 {
+					nxt = Some(k.clone());
+				}
+				out.push(Hb::decode(k.as_slice())?);
+				// Count
+				num -= 1;
+			}
+		}
+		trace!("scan_hb: {:?}", out);
+		Ok(out)
+	}
+
+	pub async fn scan_cl(&mut self, limit: u32) -> Result<Vec<ClusterMembership>, Error> {
+		let beg = crate::key::cl::Cl::prefix();
+		let end = crate::key::cl::Cl::suffix();
+		trace!("Scan start: {} ({:?})", String::from_utf8_lossy(&beg).to_string(), &beg);
+		trace!("Scan end: {} ({:?})", String::from_utf8_lossy(&end).to_string(), &end);
+		let mut nxt: Option<Key> = None;
+		let mut num = limit;
+		let mut out: Vec<ClusterMembership> = vec![];
+		// Start processing
+		while num > 0 {
+			// Get records batch
+			let res = match nxt {
+				None => {
+					let min = beg.clone();
+					let max = end.clone();
+					let num = std::cmp::min(1000, num);
+					self.scan(min..max, num).await?
+				}
+				Some(ref mut beg) => {
+					beg.push(0x00);
+					let min = beg.clone();
+					let max = end.clone();
+					let num = std::cmp::min(1000, num);
+					self.scan(min..max, num).await?
+				}
+			};
+			// Get total results
+			let n = res.len();
+			// Exit when settled
+			if n == 0 {
+				break;
+			}
+			// Loop over results
+			for (i, (k, v)) in res.into_iter().enumerate() {
+				// Ready the next
+				if n == i + 1 {
+					nxt = Some(k.clone());
+				}
+				out.push((&v).into());
+				// Count
+				num -= 1;
+			}
+		}
+		trace!("scan_hb: {:?}", out);
+		Ok(out)
+	}
+
+	pub async fn delr_hb(&mut self, ts: Vec<Hb>, limit: u32) -> Result<(), Error> {
+		trace!("delr_hb: ts={:?} limit={:?}", ts, limit);
+		for hb in ts.into_iter() {
+			self.del(hb).await?;
+		}
+		Ok(())
+	}
+
+	pub async fn del_lv(&mut self, ns: &str, db: &str, tb: &str, lv: Uuid) -> Result<(), Error> {
+		trace!("del_lv: ns={:?} db={:?} tb={:?} lv={:?}", ns, db, tb, lv);
+		let key = crate::key::lv::new(ns, db, tb, lv);
+		self.cache.del(&key.clone().into());
+		self.del(key).await
+	}
+
+	pub async fn scan_lq<'a>(
+		&mut self,
+		node: &uuid::Uuid,
+		limit: u32,
+	) -> Result<Vec<LqValue>, Error> {
+		let pref = lq::prefix_nd(node);
+		let suff = lq::suffix_nd(node);
+		trace!(
+			"Scanning range from pref={}, suff={}",
+			crate::key::debug::sprint_key(&pref),
+			crate::key::debug::sprint_key(&suff),
+		);
+		let rng = pref..suff;
+		let scanned = self.scan(rng, limit).await?;
+		let mut res: Vec<LqValue> = vec![];
+		for (key, value) in scanned {
+			trace!("scan_lq: key={:?} value={:?}", &key, &value);
+			let lq = Lq::decode(key.as_slice())?;
+			let tb: String = String::from_utf8(value).unwrap();
+			res.push(LqValue {
+				cl: lq.nd,
+				ns: lq.ns.to_string(),
+				db: lq.db.to_string(),
+				tb,
+				lq: lq.lq,
+			});
+		}
+		Ok(res)
+	}
+
+	pub async fn putc_lv(
+		&mut self,
+		ns: &str,
+		db: &str,
+		tb: &str,
+		live_stm: LiveStatement,
+		expected: Option<LiveStatement>,
+	) -> Result<(), Error> {
+		let key = crate::key::lv::new(ns, db, tb, live_stm.id.0);
+		let key_enc = Lv::encode(&key)?;
+		trace!("putc_lv ({:?}): key={:?}", &live_stm.id, crate::key::debug::sprint_key(&key_enc));
+		self.putc(key_enc, live_stm, expected).await
+	}
+
 	/// Retrieve all namespace definitions in a datastore.
 	pub async fn all_ns(&mut self) -> Result<Arc<[DefineNamespaceStatement]>, Error> {
 		let key = crate::key::ns::prefix();
@@ -1201,6 +1375,30 @@ impl Transaction {
 			val
 		})
 	}
+
+	pub async fn all_lq(&mut self, nd: &uuid::Uuid) -> Result<Vec<LqValue>, Error> {
+		let beg = crate::key::lq::prefix_nd(nd);
+		let end = crate::key::lq::suffix_nd(nd);
+		let lq_pairs = self.getr(beg..end, u32::MAX).await?;
+		let mut lqs = vec![];
+		for (key, value) in lq_pairs {
+			let lq_key = Lq::decode(key.as_slice())?;
+			trace!("Value is {:?}", &value);
+			let lq_value = String::from_utf8(value).map_err(|e| {
+				Error::Internal(format!("Failed to decode a value while reading LQ: {}", e))
+			})?;
+			let lqv = LqValue {
+				cl: *nd,
+				ns: lq_key.ns.to_string(),
+				db: lq_key.db.to_string(),
+				tb: lq_value,
+				lq: lq_key.lq,
+			};
+			lqs.push(lqv);
+		}
+		Ok(lqs)
+	}
+
 	/// Retrieve all analyzer definitions for a specific database.
 	pub async fn all_az(
 		&mut self,
@@ -1331,6 +1529,37 @@ impl Transaction {
 		Ok(val.into())
 	}
 
+	/// Return the table stored at the lq address
+	pub async fn get_lq(
+		&mut self,
+		nd: Uuid,
+		ns: &str,
+		db: &str,
+		lq: Uuid,
+	) -> Result<Strand, Error> {
+		let key = lq::new(nd, ns, db, lq);
+		let val = self.get(key).await?.ok_or(Error::LqNotFound {
+			value: lq.to_string(),
+		})?;
+		Value::from(val).convert_to_strand()
+	}
+
+	pub async fn get_lv(
+		&mut self,
+		ns: &str,
+		db: &str,
+		tb: &str,
+		lv: &Uuid,
+	) -> Result<LiveStatement, Error> {
+		let key = crate::key::lv::new(ns, db, tb, *lv);
+		let key_enc = Lv::encode(&key)?;
+		trace!("Getting lv ({:?}) {:?}", lv, crate::key::debug::sprint_key(&key_enc));
+		let val = self.get(key_enc).await?.ok_or(Error::LvNotFound {
+			value: lv.to_string(),
+		})?;
+		Ok(val.into())
+	}
+
 	/// Retrieve a specific param definition.
 	pub async fn get_pa(
 		&mut self,
diff --git a/lib/src/sql/constant.rs b/lib/src/sql/constant.rs
index c4d8f1ab..bf4d9690 100644
--- a/lib/src/sql/constant.rs
+++ b/lib/src/sql/constant.rs
@@ -77,7 +77,6 @@ impl Constant {
 			Self::TimeEpoch => ConstantValue::Datetime(Datetime(Utc.timestamp_nanos(0))),
 		}
 	}
-
 	/// Process this type returning a computed simple Value
 	pub(crate) async fn compute(
 		&self,
diff --git a/lib/src/sql/statements/live.rs b/lib/src/sql/statements/live.rs
index ce21a8ac..2616009d 100644
--- a/lib/src/sql/statements/live.rs
+++ b/lib/src/sql/statements/live.rs
@@ -24,11 +24,16 @@ use std::fmt;
 #[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize, Store, Hash)]
 pub struct LiveStatement {
 	pub id: Uuid,
-	pub node: Uuid,
+	pub node: uuid::Uuid,
 	pub expr: Fields,
 	pub what: Value,
 	pub cond: Option<Cond>,
 	pub fetch: Option<Fetchs>,
+
+	// Non-query properties that are necessary for storage or otherwise carrying information
+
+	// When a live query is archived, this should be the node ID that archived the query.
+	pub archived: Option<uuid::Uuid>,
 }
 
 impl LiveStatement {
@@ -54,7 +59,10 @@ impl LiveStatement {
 				// Clone the current statement
 				let mut stm = self.clone();
 				// Store the current Node ID
-				stm.node = Uuid(opt.id()?);
+				if let Err(e) = opt.id() {
+					trace!("No ID for live query {:?}, error={:?}", stm, e)
+				}
+				stm.node = opt.id()?;
 				// Insert the node live query
 				let key = crate::key::lq::new(opt.id()?, opt.ns(), opt.db(), self.id.0);
 				run.putc(key, tb.as_str(), None).await?;
@@ -71,6 +79,11 @@ impl LiveStatement {
 		// Return the query id
 		Ok(self.id.clone().into())
 	}
+
+	pub(crate) fn archive(mut self, node_id: uuid::Uuid) -> LiveStatement {
+		self.archived = Some(node_id);
+		self
+	}
 }
 
 impl fmt::Display for LiveStatement {
@@ -100,11 +113,12 @@ pub fn live(i: &str) -> IResult<&str, LiveStatement> {
 		i,
 		LiveStatement {
 			id: Uuid::new_v4(),
-			node: Uuid::default(),
+			node: uuid::Uuid::new_v4(),
 			expr,
 			what,
 			cond,
 			fetch,
+			archived: None,
 		},
 	))
 }
diff --git a/src/dbs/mod.rs b/src/dbs/mod.rs
index 3ad28bed..568b9a3b 100644
--- a/src/dbs/mod.rs
+++ b/src/dbs/mod.rs
@@ -49,6 +49,7 @@ pub async fn init(
 		.with_strict_mode(strict_mode)
 		.with_query_timeout(query_timeout)
 		.with_transaction_timeout(transaction_timeout);
+	dbs.bootstrap().await?;
 	// Store database instance
 	let _ = DB.set(dbs);
 	// All ok