From 457adff85cb9889f63cf6306da9c04181c6093b0 Mon Sep 17 00:00:00 2001 From: Tobie Morgan Hitchcock Date: Mon, 16 Sep 2024 21:51:29 +0100 Subject: [PATCH] Improve node membership processing and garbage collection (#4780) --- core/src/cf/gc.rs | 24 +- core/src/cf/writer.rs | 12 +- core/src/dbs/lifecycle.rs | 22 -- core/src/dbs/mod.rs | 2 - core/src/kvs/cf.rs | 53 ++++ core/src/kvs/ds.rs | 153 +++++------ core/src/kvs/mod.rs | 1 + core/src/kvs/node.rs | 92 ++++--- .../kvs/tests/timestamp_to_versionstamp.rs | 2 +- core/src/options.rs | 32 ++- sdk/Cargo.toml | 6 +- sdk/src/api/engine/local/mod.rs | 3 - sdk/src/api/engine/local/native.rs | 42 ++-- sdk/src/api/engine/local/wasm.rs | 34 ++- sdk/src/api/engine/tasks.rs | 237 +++++++++++------- sdk/src/api/opt/config.rs | 41 ++- sdk/tests/api.rs | 31 +-- sdk/tests/changefeeds.rs | 22 +- src/cli/config.rs | 5 +- src/cli/start.rs | 77 ++++-- src/telemetry/mod.rs | 17 +- tests/cli_integration.rs | 7 +- tests/common/server.rs | 8 - 23 files changed, 521 insertions(+), 402 deletions(-) delete mode 100644 core/src/dbs/lifecycle.rs create mode 100644 core/src/kvs/cf.rs diff --git a/core/src/cf/gc.rs b/core/src/cf/gc.rs index f68fb957..bb6b2ccc 100644 --- a/core/src/cf/gc.rs +++ b/core/src/cf/gc.rs @@ -1,6 +1,5 @@ use crate::err::Error; use crate::key::change; -#[cfg(debug_assertions)] use crate::key::debug::Sprintable; use crate::kvs::Transaction; use crate::vs; @@ -9,30 +8,30 @@ use std::str; // gc_all_at deletes all change feed entries that become stale at the given timestamp. #[allow(unused)] +#[instrument(level = "trace", target = "surrealdb::core::cfs", skip(tx))] pub async fn gc_all_at(tx: &Transaction, ts: u64) -> Result<(), Error> { // Fetch all namespaces let nss = tx.all_ns().await?; // Loop over each namespace for ns in nss.as_ref() { // Trace for debugging - #[cfg(debug_assertions)] - trace!("Performing garbage collection on {ns} for timestamp {ts}"); + trace!("Performing garbage collection on {} for timestamp {ts}", ns.name); // Process the namespace - gc_ns(tx, ts, ns.name.as_str()).await?; + gc_ns(tx, ts, &ns.name).await?; } Ok(()) } // gc_ns deletes all change feed entries in the given namespace that are older than the given watermark. #[allow(unused)] +#[instrument(level = "trace", target = "surrealdb::core::cfs", skip(tx))] pub async fn gc_ns(tx: &Transaction, ts: u64, ns: &str) -> Result<(), Error> { // Fetch all databases let dbs = tx.all_db(ns).await?; // Loop over each database for db in dbs.as_ref() { // Trace for debugging - #[cfg(debug_assertions)] - trace!("Performing garbage collection on {ns}:{db} for timestamp {ts}"); + trace!("Performing garbage collection on {ns}:{} for timestamp {ts}", db.name); // Fetch all tables let tbs = tx.all_tb(ns, &db.name, None).await?; // Get the database changefeed expiration @@ -68,19 +67,14 @@ pub async fn gc_ns(tx: &Transaction, ts: u64, ns: &str) -> Result<(), Error> { } // gc_db deletes all change feed entries in the given database that are older than the given watermark. -pub async fn gc_range( - tx: &Transaction, - ns: &str, - db: &str, - watermark: Versionstamp, -) -> Result<(), Error> { +#[instrument(level = "trace", target = "surrealdb::core::cfs", skip(tx))] +pub async fn gc_range(tx: &Transaction, ns: &str, db: &str, vt: Versionstamp) -> Result<(), Error> { // Calculate the range let beg = change::prefix_ts(ns, db, vs::u64_to_versionstamp(0)); - let end = change::prefix_ts(ns, db, watermark); + let end = change::prefix_ts(ns, db, vt); // Trace for debugging - #[cfg(debug_assertions)] trace!( - "Performing garbage collection on {ns}:{db} for watermark {watermark:?}, between {} and {}", + "Performing garbage collection on {ns}:{db} for watermark {vt:?}, between {} and {}", beg.sprint(), end.sprint() ); diff --git a/core/src/cf/writer.rs b/core/src/cf/writer.rs index 1e13b08d..58a75461 100644 --- a/core/src/cf/writer.rs +++ b/core/src/cf/writer.rs @@ -177,7 +177,7 @@ mod tests { // Let the db remember the timestamp for the current versionstamp // so that we can replay change feeds from the timestamp later. - ds.tick_at(ts.0.timestamp().try_into().unwrap()).await.unwrap(); + ds.changefeed_process_at(ts.0.timestamp().try_into().unwrap()).await.unwrap(); // // Write things to the table. @@ -379,7 +379,7 @@ mod tests { assert_eq!(r, want); // Now we should see the gc_all results - ds.tick_at((ts.0.timestamp() + 5).try_into().unwrap()).await.unwrap(); + ds.changefeed_process_at((ts.0.timestamp() + 5).try_into().unwrap()).await.unwrap(); let tx7 = ds.transaction(Write, Optimistic).await.unwrap(); let r = crate::cf::read(&tx7, NS, DB, Some(TB), ShowSince::Timestamp(ts), Some(10)) @@ -393,13 +393,13 @@ mod tests { async fn scan_picks_up_from_offset() { // Given we have 2 entries in change feeds let ds = init(false).await; - ds.tick_at(5).await.unwrap(); + ds.changefeed_process_at(5).await.unwrap(); let _id1 = record_change_feed_entry( ds.transaction(Write, Optimistic).await.unwrap(), "First".to_string(), ) .await; - ds.tick_at(10).await.unwrap(); + ds.changefeed_process_at(10).await.unwrap(); let mut tx = ds.transaction(Write, Optimistic).await.unwrap().inner(); let vs1 = tx.get_versionstamp_from_timestamp(5, NS, DB).await.unwrap().unwrap(); let vs2 = tx.get_versionstamp_from_timestamp(10, NS, DB).await.unwrap().unwrap(); @@ -431,7 +431,7 @@ mod tests { let ds = init(true).await; // Create a doc - ds.tick_at(ts.0.timestamp().try_into().unwrap()).await.unwrap(); + ds.changefeed_process_at(ts.0.timestamp().try_into().unwrap()).await.unwrap(); let thing = Thing { tb: TB.to_owned(), id: Id::from("A"), @@ -444,7 +444,7 @@ mod tests { res.result.unwrap(); // Now update it - ds.tick_at((ts.0.timestamp() + 10).try_into().unwrap()).await.unwrap(); + ds.changefeed_process_at((ts.0.timestamp() + 10).try_into().unwrap()).await.unwrap(); let res = ds .execute( format!("UPDATE {thing} SET value=100, new_field=\"new_value\"").as_str(), diff --git a/core/src/dbs/lifecycle.rs b/core/src/dbs/lifecycle.rs deleted file mode 100644 index 0a1b804e..00000000 --- a/core/src/dbs/lifecycle.rs +++ /dev/null @@ -1,22 +0,0 @@ -/// LoggingLifecycle is used to create log messages upon creation, and log messages when it is dropped -#[doc(hidden)] -#[non_exhaustive] -pub struct LoggingLifecycle { - identifier: String, -} - -impl LoggingLifecycle { - #[doc(hidden)] - pub fn new(identifier: String) -> Self { - debug!("Started {}", identifier); - Self { - identifier, - } - } -} - -impl Drop for LoggingLifecycle { - fn drop(&mut self) { - debug!("Stopped {}", self.identifier); - } -} diff --git a/core/src/dbs/mod.rs b/core/src/dbs/mod.rs index 25d74ac5..0ae6ea76 100644 --- a/core/src/dbs/mod.rs +++ b/core/src/dbs/mod.rs @@ -18,11 +18,9 @@ mod store; mod variables; pub mod capabilities; -pub mod lifecycle; pub mod node; pub use self::capabilities::Capabilities; -pub use self::lifecycle::*; pub use self::notification::*; pub use self::options::*; pub use self::response::*; diff --git a/core/src/kvs/cf.rs b/core/src/kvs/cf.rs new file mode 100644 index 00000000..7e54a38b --- /dev/null +++ b/core/src/kvs/cf.rs @@ -0,0 +1,53 @@ +use crate::err::Error; +use crate::kvs::Datastore; +use crate::kvs::{LockType::*, TransactionType::*}; +use crate::vs::Versionstamp; + +impl Datastore { + /// Saves the current timestamp for each database's current versionstamp. + #[instrument(level = "trace", target = "surrealdb::core::kvs::ds", skip(self))] + pub(crate) async fn changefeed_versionstamp( + &self, + ts: u64, + ) -> Result, Error> { + // Store the latest versionstamp + let mut vs: Option = None; + // Create a new transaction + let txn = self.transaction(Write, Optimistic).await?; + // Fetch all namespaces + let nss = catch!(txn, txn.all_ns().await); + // Loop over all namespaces + for ns in nss.iter() { + // Get the namespace name + let ns = &ns.name; + // Fetch all namespaces + let dbs = catch!(txn, txn.all_db(ns).await); + // Loop over all databases + for db in dbs.iter() { + // Get the database name + let db = &db.name; + // TODO(SUR-341): This is incorrect, it's a [ns,db] to vs pair + // It's safe for now, as it is unused but either the signature must change + // to include {(ns, db): (ts, vs)} mapping, or we don't return it + vs = Some(txn.lock().await.set_timestamp_for_versionstamp(ts, ns, db).await?); + } + } + // Commit the changes + catch!(txn, txn.commit().await); + // Return the version + Ok(vs) + } + + /// Deletes all change feed entries that are older than the timestamp. + #[instrument(level = "trace", target = "surrealdb::core::kvs::ds", skip(self))] + pub(crate) async fn changefeed_cleanup(&self, ts: u64) -> Result<(), Error> { + // Create a new transaction + let txn = self.transaction(Write, Optimistic).await?; + // Perform the garbage collection + catch!(txn, crate::cf::gc_all_at(&txn, ts).await); + // Commit the changes + catch!(txn, txn.commit().await); + // Everything ok + Ok(()) + } +} diff --git a/core/src/kvs/ds.rs b/core/src/kvs/ds.rs index 5a7e4525..b6a1c0fd 100644 --- a/core/src/kvs/ds.rs +++ b/core/src/kvs/ds.rs @@ -22,7 +22,6 @@ use crate::kvs::index::IndexBuilder; use crate::kvs::{LockType, LockType::*, TransactionType, TransactionType::*}; use crate::sql::{statements::DefineUserStatement, Base, Query, Value}; use crate::syn; -use crate::vs::{conv, Versionstamp}; use channel::{Receiver, Sender}; use futures::Future; use reblessive::TreeStack; @@ -41,7 +40,7 @@ use uuid::Uuid; #[cfg(target_arch = "wasm32")] use wasmtimer::std::{SystemTime, UNIX_EPOCH}; -const TARGET: &str = "surrealdb::core::kvs::tr"; +const TARGET: &str = "surrealdb::core::kvs::ds"; // If there are an infinite number of heartbeats, then we want to go batch-by-batch spread over several checks const LQ_CHANNEL_SIZE: usize = 100; @@ -601,118 +600,82 @@ impl Datastore { } } - // Initialise the cluster and run bootstrap utilities + /// Initialise the cluster and run bootstrap utilities #[instrument(err, level = "trace", target = "surrealdb::core::kvs::ds", skip_all)] pub async fn bootstrap(&self) -> Result<(), Error> { // Insert this node in the cluster self.insert_node(self.id).await?; - // Mark expired nodes as archived + // Mark inactive nodes as archived self.expire_nodes().await?; + // Remove archived nodes + self.remove_nodes().await?; // Everything ok Ok(()) } - // tick is called periodically to perform maintenance tasks. - // This is called every TICK_INTERVAL. - #[instrument(err, level = "trace", target = "surrealdb::core::kvs::ds", skip(self))] - pub async fn tick(&self) -> Result<(), Error> { - let now = SystemTime::now().duration_since(UNIX_EPOCH).map_err(|e| { - Error::Internal(format!("Clock may have gone backwards: {:?}", e.duration())) - })?; - let ts = now.as_secs(); - self.tick_at(ts).await?; - Ok(()) - } - - // tick_at is the utility function that is called by tick. - // It is handy for testing, because it allows you to specify the timestamp, - // without depending on a system clock. + /// Run the background task to update node registration information #[instrument(level = "trace", target = "surrealdb::core::kvs::ds", skip(self))] - pub async fn tick_at(&self, ts: u64) -> Result<(), Error> { - trace!(target: TARGET, "Ticking at timestamp {ts} ({:?})", conv::u64_to_versionstamp(ts)); - let _vs = self.save_timestamp_for_versionstamp(ts).await?; - self.garbage_collect_stale_change_feeds(ts).await?; + pub async fn node_membership_update(&self) -> Result<(), Error> { + // Output function invocation details to logs + trace!(target: TARGET, "Updating node registration information"); // Update this node in the cluster self.update_node(self.id).await?; - // Mark expired nodes as archived - self.expire_nodes().await?; - // Cleanup expired nodes data - self.cleanup_nodes().await?; - // Garbage collect other data - self.garbage_collect().await?; // Everything ok Ok(()) } - // save_timestamp_for_versionstamp saves the current timestamp for the each database's current versionstamp. - // Note: the returned VS is flawed, as there are multiple {ts: vs} mappings per (ns, db) - pub(crate) async fn save_timestamp_for_versionstamp( - &self, - ts: u64, - ) -> Result, Error> { - let tx = self.transaction(Write, Optimistic).await?; - match self.save_timestamp_for_versionstamp_impl(ts, &tx).await { - Ok(vs) => Ok(vs), - Err(e) => { - match tx.cancel().await { - Ok(_) => { - Err(e) - } - Err(txe) => { - Err(Error::Tx(format!("Error saving timestamp for versionstamp: {:?} and error cancelling transaction: {:?}", e, txe))) - } - } - } - } - } - - async fn save_timestamp_for_versionstamp_impl( - &self, - ts: u64, - tx: &Transaction, - ) -> Result, Error> { - let mut vs: Option = None; - let nses = tx.all_ns().await?; - let nses = nses.as_ref(); - for ns in nses { - let ns = ns.name.as_str(); - let dbs = tx.all_db(ns).await?; - let dbs = dbs.as_ref(); - for db in dbs { - let db = db.name.as_str(); - // TODO(SUR-341): This is incorrect, it's a [ns,db] to vs pair - // It's safe for now, as it is unused but either the signature must change - // to include {(ns, db): (ts, vs)} mapping, or we don't return it - vs = Some(tx.lock().await.set_timestamp_for_versionstamp(ts, ns, db).await?); - } - } - tx.commit().await?; - Ok(vs) - } - - // garbage_collect_stale_change_feeds deletes all change feed entries that are older than the watermarks. - pub(crate) async fn garbage_collect_stale_change_feeds(&self, ts: u64) -> Result<(), Error> { - let tx = self.transaction(Write, Optimistic).await?; - if let Err(e) = self.garbage_collect_stale_change_feeds_impl(&tx, ts).await { - return match tx.cancel().await { - Ok(_) => { - Err(e) - } - Err(txe) => { - Err(Error::Tx(format!("Error garbage collecting stale change feeds: {:?} and error cancelling transaction: {:?}", e, txe))) - } - }; - } + /// Run the background task to process and archive inactive nodes + #[instrument(level = "trace", target = "surrealdb::core::kvs::ds", skip(self))] + pub async fn node_membership_expire(&self) -> Result<(), Error> { + // Output function invocation details to logs + trace!(target: TARGET, "Processing and archiving inactive nodes"); + // Mark expired nodes as archived + self.expire_nodes().await?; + // Everything ok Ok(()) } - async fn garbage_collect_stale_change_feeds_impl( - &self, - tx: &Transaction, - ts: u64, - ) -> Result<(), Error> { - cf::gc_all_at(tx, ts).await?; - tx.commit().await?; + /// Run the background task to process and cleanup archived nodes + #[instrument(level = "trace", target = "surrealdb::core::kvs::ds", skip(self))] + pub async fn node_membership_remove(&self) -> Result<(), Error> { + // Output function invocation details to logs + trace!(target: TARGET, "Processing and cleaning archived nodes"); + // Cleanup expired nodes data + self.remove_nodes().await?; + // Everything ok + Ok(()) + } + + /// Run the background task to perform changeed garbage collection + #[instrument(level = "trace", target = "surrealdb::core::kvs::ds", skip(self))] + pub async fn changefeed_process(&self) -> Result<(), Error> { + // Output function invocation details to logs + trace!(target: TARGET, "Running changefeed garbage collection"); + // Calculate the current system time + let ts = SystemTime::now() + .duration_since(UNIX_EPOCH) + .map_err(|e| { + Error::Internal(format!("Clock may have gone backwards: {:?}", e.duration())) + })? + .as_secs(); + // Save timestamps for current versionstamps + self.changefeed_versionstamp(ts).await?; + // Garbage old changefeed data from all databases + self.changefeed_cleanup(ts).await?; + // Everything ok + Ok(()) + } + + /// Run the background task to perform changeed garbage collection + #[instrument(level = "trace", target = "surrealdb::core::kvs::ds", skip(self))] + pub async fn changefeed_process_at(&self, ts: u64) -> Result<(), Error> { + // Output function invocation details to logs + trace!(target: TARGET, "Running changefeed garbage collection"); + // Save timestamps for current versionstamps + self.changefeed_versionstamp(ts).await?; + // Garbage old changefeed data from all databases + self.changefeed_cleanup(ts).await?; + // Everything ok Ok(()) } diff --git a/core/src/kvs/mod.rs b/core/src/kvs/mod.rs index 71d8e818..3fa9e61e 100644 --- a/core/src/kvs/mod.rs +++ b/core/src/kvs/mod.rs @@ -15,6 +15,7 @@ mod api; mod batch; mod cache; +mod cf; mod clock; mod ds; mod export; diff --git a/core/src/kvs/node.rs b/core/src/kvs/node.rs index 3036001b..2fd30cb7 100644 --- a/core/src/kvs/node.rs +++ b/core/src/kvs/node.rs @@ -84,31 +84,46 @@ impl Datastore { pub async fn expire_nodes(&self) -> Result<(), Error> { // Log when this method is run trace!(target: TARGET, "Archiving expired nodes in the cluster"); - // Open transaction and fetch nodes - let txn = self.transaction(Write, Optimistic).await?; - let now = self.clock_now().await; - let nds = catch!(txn, txn.all_nodes().await); - for nd in nds.iter() { - // Check that the node is active - if nd.is_active() { - // Check if the node has expired - if nd.hb < now - Duration::from_secs(30) { - // Log the live query scanning - trace!(target: TARGET, id = %nd.id, "Archiving node in the cluster"); - // Mark the node as archived - let val = nd.archive(); - // Get the key for the node entry - let key = crate::key::root::nd::new(nd.id); - // Update the node entry - catch!(txn, txn.set(key, val, None).await); - } + // Fetch all of the inactive nodes + let inactive = { + let txn = self.transaction(Read, Optimistic).await?; + let nds = catch!(txn, txn.all_nodes().await); + let now = self.clock_now().await; + catch!(txn, txn.cancel().await); + // Filter the inactive nodes + nds.iter() + .filter_map(|n| { + // Check that the node is active and has expired + match n.is_active() && n.hb < now - Duration::from_secs(30) { + true => Some(n.to_owned()), + false => None, + } + }) + .collect::>() + }; + // Check if there are inactive nodes + if !inactive.is_empty() { + // Open a writeable transaction + let txn = self.transaction(Write, Optimistic).await?; + // Archive the inactive nodes + for nd in inactive.iter() { + // Log the live query scanning + trace!(target: TARGET, id = %nd.id, "Archiving node in the cluster"); + // Mark the node as archived + let val = nd.archive(); + // Get the key for the node entry + let key = crate::key::root::nd::new(nd.id); + // Update the node entry + catch!(txn, txn.set(key, val, None).await); } + // Commit the changes + catch!(txn, txn.commit().await); } - // Commit the changes - txn.commit().await + // Everything was successful + Ok(()) } - /// Cleans up nodes which are no longer in this cluster. + /// Removes and cleans up nodes which are no longer in this cluster. /// /// This function should be run periodically at an interval. /// @@ -116,23 +131,25 @@ impl Datastore { /// When a matching node is found, all node queries, and table queries are /// garbage collected, before the node itself is completely deleted. #[instrument(err, level = "trace", target = "surrealdb::core::kvs::node", skip(self))] - pub async fn cleanup_nodes(&self) -> Result<(), Error> { + pub async fn remove_nodes(&self) -> Result<(), Error> { // Log when this method is run trace!(target: TARGET, "Cleaning up archived nodes in the cluster"); - // Fetch all of the expired nodes - let expired = { + // Fetch all of the archived nodes + let archived = { let txn = self.transaction(Read, Optimistic).await?; let nds = catch!(txn, txn.all_nodes().await); + catch!(txn, txn.cancel().await); // Filter the archived nodes nds.iter().filter_map(Node::archived).collect::>() }; - // Delete the live queries - { - for id in expired.iter() { + // Loop over the archived nodes + for id in archived.iter() { + // Open a writeable transaction + let txn = self.transaction(Write, Optimistic).await?; + { // Log the live query scanning trace!(target: TARGET, id = %id, "Deleting live queries for node"); // Scan the live queries for this node - let txn = self.transaction(Write, Optimistic).await?; let beg = crate::key::node::lq::prefix(*id); let end = crate::key::node::lq::suffix(*id); let mut next = Some(beg..end); @@ -145,7 +162,7 @@ impl Datastore { // Get the key for this node live query let nlq = catch!(txn, crate::key::node::lq::Lq::decode(k)); // Check that the node for this query is archived - if expired.contains(&nlq.nd) { + if archived.contains(&nlq.nd) { // Get the key for this table live query let tlq = crate::key::table::lq::new(&val.ns, &val.db, &val.tb, nlq.lq); // Delete the table live query @@ -155,15 +172,8 @@ impl Datastore { } } } - // Commit the changes - txn.commit().await?; } - } - // Delete the expired nodes - { - let txn = self.transaction(Write, Optimistic).await?; - // Loop over the nodes and delete - for id in expired.iter() { + { // Log the node deletion trace!(target: TARGET, id = %id, "Deleting node from the cluster"); // Get the key for the node entry @@ -172,7 +182,7 @@ impl Datastore { catch!(txn, txn.del(key).await); } // Commit the changes - txn.commit().await?; + catch!(txn, txn.commit().await); } // Everything was successful Ok(()) @@ -192,8 +202,8 @@ impl Datastore { pub async fn garbage_collect(&self) -> Result<(), Error> { // Log the node deletion trace!(target: TARGET, "Garbage collecting all miscellaneous data"); - // Fetch expired nodes - let expired = { + // Fetch archived nodes + let archived = { let txn = self.transaction(Read, Optimistic).await?; let nds = catch!(txn, txn.all_nodes().await); // Filter the archived nodes @@ -240,7 +250,7 @@ impl Datastore { // Get the node id and the live query id let (nid, lid) = (stm.node.0, stm.id.0); // Check that the node for this query is archived - if expired.contains(&stm.node) { + if archived.contains(&stm.node) { // Get the key for this node live query let tlq = catch!(txn, crate::key::table::lq::Lq::decode(k)); // Get the key for this table live query diff --git a/core/src/kvs/tests/timestamp_to_versionstamp.rs b/core/src/kvs/tests/timestamp_to_versionstamp.rs index 5c4c46c2..77686252 100644 --- a/core/src/kvs/tests/timestamp_to_versionstamp.rs +++ b/core/src/kvs/tests/timestamp_to_versionstamp.rs @@ -88,7 +88,7 @@ async fn writing_ts_again_results_in_following_ts() { assert_eq!(scanned[1].0, crate::key::database::ts::new("myns", "mydb", 1).encode().unwrap()); // Repeating tick - ds.tick_at(1).await.unwrap(); + ds.changefeed_process_at(1).await.unwrap(); // Validate let mut tx = ds.transaction(Write, Optimistic).await.unwrap().inner(); diff --git a/core/src/options.rs b/core/src/options.rs index 22435777..a5a9c8f0 100644 --- a/core/src/options.rs +++ b/core/src/options.rs @@ -7,26 +7,38 @@ use std::time::Duration; #[doc(hidden)] #[non_exhaustive] pub struct EngineOptions { - /// The maximum number of live queries that can be created in a single transaction - pub new_live_queries_per_transaction: u32, - /// The size of batches being requested per update in order to catch up a live query - pub live_query_catchup_size: u32, - pub tick_interval: Duration, + pub node_membership_refresh_interval: Duration, + pub node_membership_check_interval: Duration, + pub node_membership_cleanup_interval: Duration, + pub changefeed_gc_interval: Duration, } impl Default for EngineOptions { fn default() -> Self { Self { - new_live_queries_per_transaction: 100, - live_query_catchup_size: 1000, - tick_interval: Duration::from_secs(10), + node_membership_refresh_interval: Duration::from_secs(3), + node_membership_check_interval: Duration::from_secs(15), + node_membership_cleanup_interval: Duration::from_secs(300), + changefeed_gc_interval: Duration::from_secs(10), } } } impl EngineOptions { - pub fn with_tick_interval(mut self, tick_interval: Duration) -> Self { - self.tick_interval = tick_interval; + pub fn with_node_membership_refresh_interval(mut self, interval: Duration) -> Self { + self.node_membership_refresh_interval = interval; + self + } + pub fn with_node_membership_check_interval(mut self, interval: Duration) -> Self { + self.node_membership_check_interval = interval; + self + } + pub fn with_node_membership_cleanup_interval(mut self, interval: Duration) -> Self { + self.node_membership_cleanup_interval = interval; + self + } + pub fn with_changefeed_gc_interval(mut self, interval: Duration) -> Self { + self.changefeed_gc_interval = interval; self } } diff --git a/sdk/Cargo.toml b/sdk/Cargo.toml index 2c41023c..aee12a8f 100644 --- a/sdk/Cargo.toml +++ b/sdk/Cargo.toml @@ -24,7 +24,7 @@ resolver = "2" [features] # Public features default = ["protocol-ws", "rustls"] -protocol-http = ["dep:reqwest", "dep:tokio-util"] +protocol-http = ["dep:reqwest"] protocol-ws = ["dep:tokio-tungstenite", "dep:trice", "tokio/time"] kv-mem = ["surrealdb-core/kv-mem", "tokio/time"] kv-indxdb = ["surrealdb-core/kv-indxdb"] @@ -67,6 +67,7 @@ features = [ targets = [] [dependencies] +arrayvec = "=0.7.4" bincode = "1.3.3" channel = { version = "1.9.0", package = "async-channel" } chrono = { version = "0.4.38", features = ["serde"] } @@ -96,7 +97,6 @@ rustls = { version = "0.23.12", default-features = false, features = [ "std", "tls12", ], optional = true } -arrayvec = "=0.7.4" reblessive = { version = "0.4.0", features = ["tree"] } rustls-pki-types = { version = "1.7.0", features = ["web"] } semver = { version = "1.0.20", features = ["serde"] } @@ -105,7 +105,7 @@ serde_json = "1.0.127" serde-content = "0.1.0" surrealdb-core = { version = "=2.0.0", default-features = false, path = "../core", package = "surrealdb-core" } thiserror = "1.0.63" -tokio-util = { version = "0.7.11", optional = true, features = ["compat"] } +tokio-util = { version = "0.7.11", features = ["compat"] } tracing = "0.1.40" trice = { version = "0.4.0", optional = true } url = "2.5.0" diff --git a/sdk/src/api/engine/local/mod.rs b/sdk/src/api/engine/local/mod.rs index 123edee8..314b3c37 100644 --- a/sdk/src/api/engine/local/mod.rs +++ b/sdk/src/api/engine/local/mod.rs @@ -36,7 +36,6 @@ use std::{ marker::PhantomData, mem, sync::Arc, - time::Duration, }; use surrealdb_core::{ dbs::{Response, Session}, @@ -83,8 +82,6 @@ pub(crate) mod native; #[cfg(target_arch = "wasm32")] pub(crate) mod wasm; -const DEFAULT_TICK_INTERVAL: Duration = Duration::from_secs(10); - /// In-memory database /// /// # Examples diff --git a/sdk/src/api/engine/local/native.rs b/sdk/src/api/engine/local/native.rs index 56ec5bb1..d67f8f44 100644 --- a/sdk/src/api/engine/local/native.rs +++ b/sdk/src/api/engine/local/native.rs @@ -6,7 +6,7 @@ use crate::{ opt::{Endpoint, EndpointKind}, ExtraFeatures, OnceLockExt, Result, Surreal, }, - engine::tasks::start_tasks, + engine::tasks, opt::{auth::Root, WaitFor}, value::Notification, Action, @@ -20,6 +20,7 @@ use std::{ }; use surrealdb_core::{dbs::Session, iam::Level, kvs::Datastore, options::EngineOptions}; use tokio::sync::watch; +use tokio_util::sync::CancellationToken; impl crate::api::Connection for Db {} @@ -116,15 +117,22 @@ pub(crate) async fn run_router( let mut live_queries = HashMap::new(); let mut session = Session::default().with_rt(true); - let opt = { - let mut engine_options = EngineOptions::default(); - engine_options.tick_interval = address - .config - .tick_interval - .unwrap_or(crate::api::engine::local::DEFAULT_TICK_INTERVAL); - engine_options - }; - let (tasks, task_chans) = start_tasks(&opt, kvs.clone()); + let canceller = CancellationToken::new(); + + let mut opt = EngineOptions::default(); + if let Some(interval) = address.config.node_membership_refresh_interval { + opt.node_membership_refresh_interval = interval; + } + if let Some(interval) = address.config.node_membership_check_interval { + opt.node_membership_check_interval = interval; + } + if let Some(interval) = address.config.node_membership_cleanup_interval { + opt.node_membership_cleanup_interval = interval; + } + if let Some(interval) = address.config.changefeed_gc_interval { + opt.changefeed_gc_interval = interval; + } + let tasks = tasks::init(kvs.clone(), canceller.clone(), &opt); let mut notifications = kvs.notifications(); let mut notification_stream = poll_fn(move |cx| match &mut notifications { @@ -179,12 +187,10 @@ pub(crate) async fn run_router( } } } - - // Stop maintenance tasks - for chan in task_chans { - if chan.send(()).is_err() { - error!("Error sending shutdown signal to task"); - } - } - tasks.resolve().await.unwrap(); + // Shutdown and stop closed tasks + canceller.cancel(); + // Wait for background tasks to finish + let _ = tasks.resolve().await; + // Delete this node from the cluster + let _ = kvs.delete_node(kvs.id()).await; } diff --git a/sdk/src/api/engine/local/wasm.rs b/sdk/src/api/engine/local/wasm.rs index 75d04e17..828208b0 100644 --- a/sdk/src/api/engine/local/wasm.rs +++ b/sdk/src/api/engine/local/wasm.rs @@ -2,7 +2,6 @@ use crate::api::conn::Connection; use crate::api::conn::Route; use crate::api::conn::Router; use crate::api::engine::local::Db; -use crate::api::engine::local::DEFAULT_TICK_INTERVAL; use crate::api::method::BoxFuture; use crate::api::opt::Endpoint; use crate::api::ExtraFeatures; @@ -10,7 +9,7 @@ use crate::api::OnceLockExt; use crate::api::Result; use crate::api::Surreal; use crate::dbs::Session; -use crate::engine::tasks::start_tasks; +use crate::engine::tasks; use crate::iam::Level; use crate::kvs::Datastore; use crate::opt::auth::Root; @@ -29,6 +28,7 @@ use std::sync::Arc; use std::sync::OnceLock; use std::task::Poll; use tokio::sync::watch; +use tokio_util::sync::CancellationToken; use wasm_bindgen_futures::spawn_local; impl crate::api::Connection for Db {} @@ -113,9 +113,22 @@ pub(crate) async fn run_router( let mut live_queries = HashMap::new(); let mut session = Session::default().with_rt(true); + let canceller = CancellationToken::new(); + let mut opt = EngineOptions::default(); - opt.tick_interval = address.config.tick_interval.unwrap_or(DEFAULT_TICK_INTERVAL); - let (_tasks, task_chans) = start_tasks(&opt, kvs.clone()); + if let Some(interval) = address.config.node_membership_refresh_interval { + opt.node_membership_refresh_interval = interval; + } + if let Some(interval) = address.config.node_membership_check_interval { + opt.node_membership_check_interval = interval; + } + if let Some(interval) = address.config.node_membership_cleanup_interval { + opt.node_membership_cleanup_interval = interval; + } + if let Some(interval) = address.config.changefeed_gc_interval { + opt.changefeed_gc_interval = interval; + } + let tasks = tasks::init(kvs.clone(), canceller.clone(), &opt); let mut notifications = kvs.notifications(); let mut notification_stream = poll_fn(move |cx| match &mut notifications { @@ -177,11 +190,10 @@ pub(crate) async fn run_router( } } } - - // Stop maintenance tasks - for chan in task_chans { - if chan.send(()).is_err() { - error!("Error sending shutdown signal to maintenance task"); - } - } + // Shutdown and stop closed tasks + canceller.cancel(); + // Wait for background tasks to finish + let _ = tasks.resolve().await; + // Delete this node from the cluster + let _ = kvs.delete_node(kvs.id()).await; } diff --git a/sdk/src/api/engine/tasks.rs b/sdk/src/api/engine/tasks.rs index 0045c829..89e506f5 100644 --- a/sdk/src/api/engine/tasks.rs +++ b/sdk/src/api/engine/tasks.rs @@ -1,110 +1,180 @@ use crate::engine::IntervalStream; +use crate::err::Error; +use core::future::Future; use futures::StreamExt; +use std::pin::Pin; use std::sync::Arc; use std::time::Duration; use surrealdb_core::{kvs::Datastore, options::EngineOptions}; -use tokio::sync::oneshot; +use tokio_util::sync::CancellationToken; #[cfg(not(target_arch = "wasm32"))] -use crate::Error as RootError; +use tokio::spawn; +#[cfg(target_arch = "wasm32")] +use wasm_bindgen_futures::spawn_local as spawn; + #[cfg(not(target_arch = "wasm32"))] -use tokio::{spawn as spawn_future, task::JoinHandle}; +type Task = Pin> + Send + 'static>>; #[cfg(target_arch = "wasm32")] -use std::sync::atomic::{AtomicBool, Ordering}; -#[cfg(target_arch = "wasm32")] -use wasm_bindgen_futures::spawn_local as spawn_future; +type Task = Pin>; -/// This will be true if a task has completed -#[cfg(not(target_arch = "wasm32"))] -type FutureTask = JoinHandle<()>; - -/// This will be true if a task has completed -#[cfg(target_arch = "wasm32")] -type FutureTask = Arc; - -pub struct Tasks { - pub nd: FutureTask, -} +pub struct Tasks(Vec); impl Tasks { + #[cfg(target_arch = "wasm32")] + pub async fn resolve(self) -> Result<(), Error> { + Ok(()) + } #[cfg(not(target_arch = "wasm32"))] - pub async fn resolve(self) -> Result<(), RootError> { - match self.nd.await { - // cancelling this task is fine, and can happen when surrealdb exits. - Ok(_) => {} - Err(e) if e.is_cancelled() => {} - Err(e) => { - error!("Node agent task failed: {}", e); - let inner_err = - surrealdb_core::err::Error::NodeAgent("node task failed and has been logged"); - return Err(RootError::Db(inner_err)); - } + pub async fn resolve(self) -> Result<(), Error> { + for task in self.0.into_iter() { + let _ = task.await; } Ok(()) } } -/// Starts tasks that are required for the correct running of the engine -pub fn start_tasks(opt: &EngineOptions, dbs: Arc) -> (Tasks, [oneshot::Sender<()>; 1]) { - let nd = init(opt, dbs.clone()); - let cancellation_channels = [nd.1]; - ( - Tasks { - nd: nd.0, - }, - cancellation_channels, - ) -} - // The init starts a long-running thread for periodically calling Datastore.tick. // Datastore.tick is responsible for running garbage collection and other // background tasks. // // This function needs to be called before after the dbs::init and before the net::init functions. // It needs to be before net::init because the net::init function blocks until the web server stops. -fn init(opt: &EngineOptions, dbs: Arc) -> (FutureTask, oneshot::Sender<()>) { - let _init = surrealdb_core::dbs::LoggingLifecycle::new("node agent initialisation".to_string()); - let tick_interval = opt.tick_interval; - - trace!("Ticker interval is {:?}", tick_interval); - #[cfg(target_arch = "wasm32")] - let completed_status = Arc::new(AtomicBool::new(false)); - #[cfg(target_arch = "wasm32")] - let ret_status = completed_status.clone(); - - // We create a channel that can be streamed that will indicate termination - let (tx, mut rx) = oneshot::channel(); - - let _fut = spawn_future(async move { - let _lifecycle = surrealdb_core::dbs::LoggingLifecycle::new("heartbeat task".to_string()); - let mut ticker = interval_ticker(tick_interval).await; +pub fn init(dbs: Arc, canceller: CancellationToken, opts: &EngineOptions) -> Tasks { + let task1 = spawn_task_node_membership_refresh(dbs.clone(), canceller.clone(), opts); + let task2 = spawn_task_node_membership_check(dbs.clone(), canceller.clone(), opts); + let task3 = spawn_task_node_membership_cleanup(dbs.clone(), canceller.clone(), opts); + let task4 = spawn_task_changefeed_cleanup(dbs.clone(), canceller.clone(), opts); + Tasks(vec![task1, task2, task3, task4]) +} +fn spawn_task_node_membership_refresh( + dbs: Arc, + canceller: CancellationToken, + opts: &EngineOptions, +) -> Task { + // Get the delay interval from the config + let delay = opts.node_membership_refresh_interval; + // Spawn a future + Box::pin(spawn(async move { + // Log the interval frequency + trace!("Updating node registration information every {delay:?}"); + // Create a new time-based interval ticket + let mut ticker = interval_ticker(delay).await; + // Loop continuously until the task is cancelled loop { tokio::select! { - v = ticker.next() => { - // ticker will never return None; - let i = v.unwrap(); - trace!("Node agent tick: {:?}", i); - if let Err(e) = dbs.tick().await { - error!("Error running node agent tick: {}", e); + biased; + // Check if this has shutdown + _ = canceller.cancelled() => break, + // Receive a notification on the channel + Some(_) = ticker.next() => { + if let Err(e) = dbs.node_membership_update().await { + error!("Error running node agent tick: {e}"); break; } } - _ = &mut rx => { - // termination requested - break + } + } + trace!("Background task exited: Updating node registration information"); + })) +} + +fn spawn_task_node_membership_check( + dbs: Arc, + canceller: CancellationToken, + opts: &EngineOptions, +) -> Task { + // Get the delay interval from the config + let delay = opts.node_membership_check_interval; + // Spawn a future + Box::pin(spawn(async move { + // Log the interval frequency + trace!("Processing and archiving inactive nodes every {delay:?}"); + // Create a new time-based interval ticket + let mut ticker = interval_ticker(delay).await; + // Loop continuously until the task is cancelled + loop { + tokio::select! { + biased; + // Check if this has shutdown + _ = canceller.cancelled() => break, + // Receive a notification on the channel + Some(_) = ticker.next() => { + if let Err(e) = dbs.node_membership_expire().await { + error!("Error running node agent tick: {e}"); + break; + } } } } + trace!("Background task exited: Processing and archiving inactive nodes"); + })) +} - #[cfg(target_arch = "wasm32")] - completed_status.store(true, Ordering::Relaxed); - }); - #[cfg(not(target_arch = "wasm32"))] - return (_fut, tx); - #[cfg(target_arch = "wasm32")] - return (ret_status, tx); +fn spawn_task_node_membership_cleanup( + dbs: Arc, + canceller: CancellationToken, + opts: &EngineOptions, +) -> Task { + // Get the delay interval from the config + let delay = opts.node_membership_cleanup_interval; + // Spawn a future + Box::pin(spawn(async move { + // Log the interval frequency + trace!("Processing and cleaning archived nodes every {delay:?}"); + // Create a new time-based interval ticket + let mut ticker = interval_ticker(delay).await; + // Loop continuously until the task is cancelled + loop { + tokio::select! { + biased; + // Check if this has shutdown + _ = canceller.cancelled() => break, + // Receive a notification on the channel + Some(_) = ticker.next() => { + if let Err(e) = dbs.node_membership_remove().await { + error!("Error running node agent tick: {e}"); + break; + } + } + } + } + trace!("Background task exited: Processing and cleaning archived nodes"); + })) +} + +fn spawn_task_changefeed_cleanup( + dbs: Arc, + canceller: CancellationToken, + opts: &EngineOptions, +) -> Task { + // Get the delay interval from the config + let delay = opts.changefeed_gc_interval; + // Spawn a future + Box::pin(spawn(async move { + // Log the interval frequency + trace!("Running changefeed garbage collection every {delay:?}"); + // Create a new time-based interval ticket + let mut ticker = interval_ticker(delay).await; + // Loop continuously until the task is cancelled + loop { + tokio::select! { + biased; + // Check if this has shutdown + _ = canceller.cancelled() => break, + // Receive a notification on the channel + Some(_) = ticker.next() => { + if let Err(e) = dbs.changefeed_process().await { + error!("Error running node agent tick: {e}"); + break; + } + } + } + } + trace!("Background task exited: Running changefeed garbage collection"); + })) } async fn interval_ticker(interval: Duration) -> IntervalStream { @@ -112,7 +182,7 @@ async fn interval_ticker(interval: Duration) -> IntervalStream { use tokio::{time, time::MissedTickBehavior}; #[cfg(target_arch = "wasm32")] use wasmtimer::{tokio as time, tokio::MissedTickBehavior}; - + // Create a new interval timer let mut interval = time::interval(interval); // Don't bombard the database if we miss some ticks interval.set_missed_tick_behavior(MissedTickBehavior::Delay); @@ -123,31 +193,30 @@ async fn interval_ticker(interval: Duration) -> IntervalStream { #[cfg(test)] #[cfg(feature = "kv-mem")] mod test { - use crate::engine::tasks::start_tasks; + use crate::engine::tasks; use std::sync::Arc; use std::time::Duration; use surrealdb_core::{kvs::Datastore, options::EngineOptions}; + use tokio_util::sync::CancellationToken; #[test_log::test(tokio::test)] pub async fn tasks_complete() { + let can = CancellationToken::new(); let opt = EngineOptions::default(); let dbs = Arc::new(Datastore::new("memory").await.unwrap()); - let (val, chans) = start_tasks(&opt, dbs.clone()); - for chan in chans { - chan.send(()).unwrap(); - } - val.resolve().await.unwrap(); + let tasks = tasks::init(dbs.clone(), can.clone(), &opt); + can.cancel(); + tasks.resolve().await.unwrap(); } #[test_log::test(tokio::test)] pub async fn tasks_complete_channel_closed() { + let can = CancellationToken::new(); let opt = EngineOptions::default(); let dbs = Arc::new(Datastore::new("memory").await.unwrap()); - let val = { - let (val, _chans) = start_tasks(&opt, dbs.clone()); - val - }; - tokio::time::timeout(Duration::from_secs(10), val.resolve()) + let tasks = tasks::init(dbs.clone(), can.clone(), &opt); + can.cancel(); + tokio::time::timeout(Duration::from_secs(10), tasks.resolve()) .await .map_err(|e| format!("Timed out after {e}")) .unwrap() diff --git a/sdk/src/api/opt/config.rs b/sdk/src/api/opt/config.rs index c08562c0..16eb18e3 100644 --- a/sdk/src/api/opt/config.rs +++ b/sdk/src/api/opt/config.rs @@ -18,10 +18,13 @@ pub struct Config { pub(crate) auth: Level, pub(crate) username: String, pub(crate) password: String, - pub(crate) tick_interval: Option, pub(crate) capabilities: CoreCapabilities, #[cfg(storage)] pub(crate) temporary_directory: Option, + pub(crate) node_membership_refresh_interval: Option, + pub(crate) node_membership_check_interval: Option, + pub(crate) node_membership_cleanup_interval: Option, + pub(crate) changefeed_gc_interval: Option, } impl Config { @@ -98,12 +101,6 @@ impl Config { self } - /// Set the interval at which the database should run node maintenance tasks - pub fn tick_interval(mut self, interval: impl Into>) -> Self { - self.tick_interval = interval.into().filter(|x| !x.is_zero()); - self - } - /// Set the capabilities for the database pub fn capabilities(mut self, capabilities: Capabilities) -> Self { self.capabilities = capabilities.build(); @@ -115,4 +112,34 @@ impl Config { self.temporary_directory = path; self } + + /// Set the interval at which the database should run node maintenance tasks + pub fn node_membership_refresh_interval( + mut self, + interval: impl Into>, + ) -> Self { + self.node_membership_refresh_interval = interval.into().filter(|x| !x.is_zero()); + self + } + + /// Set the interval at which the database should run node maintenance tasks + pub fn node_membership_check_interval(mut self, interval: impl Into>) -> Self { + self.node_membership_check_interval = interval.into().filter(|x| !x.is_zero()); + self + } + + /// Set the interval at which the database should run node maintenance tasks + pub fn node_membership_cleanup_interval( + mut self, + interval: impl Into>, + ) -> Self { + self.node_membership_cleanup_interval = interval.into().filter(|x| !x.is_zero()); + self + } + + /// Set the interval at which the database should run node maintenance tasks + pub fn changefeed_gc_interval(mut self, interval: impl Into>) -> Self { + self.changefeed_gc_interval = interval.into().filter(|x| !x.is_zero()); + self + } } diff --git a/sdk/tests/api.rs b/sdk/tests/api.rs index dcef7697..b3c1f8a8 100644 --- a/sdk/tests/api.rs +++ b/sdk/tests/api.rs @@ -38,7 +38,6 @@ mod api_integration { const NS: &str = "test-ns"; const ROOT_USER: &str = "root"; const ROOT_PASS: &str = "root"; - const TICK_INTERVAL: Duration = Duration::from_secs(1); #[derive(Debug, Serialize)] struct Record { @@ -187,10 +186,7 @@ mod api_integration { username: ROOT_USER, password: ROOT_PASS, }; - let config = Config::new() - .user(root) - .tick_interval(TICK_INTERVAL) - .capabilities(Capabilities::all()); + let config = Config::new().user(root).capabilities(Capabilities::all()); let db = Surreal::new::(config).await.unwrap(); db.signin(root).await.unwrap(); (permit, db) @@ -284,10 +280,7 @@ mod api_integration { username: ROOT_USER, password: ROOT_PASS, }; - let config = Config::new() - .user(root) - .tick_interval(TICK_INTERVAL) - .capabilities(Capabilities::all()); + let config = Config::new().user(root).capabilities(Capabilities::all()); #[allow(deprecated)] let db = Surreal::new::((path, config)).await.unwrap(); db.signin(root).await.unwrap(); @@ -320,10 +313,7 @@ mod api_integration { username: ROOT_USER, password: ROOT_PASS, }; - let config = Config::new() - .user(root) - .tick_interval(TICK_INTERVAL) - .capabilities(Capabilities::all()); + let config = Config::new().user(root).capabilities(Capabilities::all()); let db = Surreal::new::((path, config)).await.unwrap(); db.signin(root).await.unwrap(); (permit, db) @@ -354,10 +344,7 @@ mod api_integration { username: ROOT_USER, password: ROOT_PASS, }; - let config = Config::new() - .user(root) - .tick_interval(TICK_INTERVAL) - .capabilities(Capabilities::all()); + let config = Config::new().user(root).capabilities(Capabilities::all()); let db = Surreal::new::(("127.0.0.1:2379", config)).await.unwrap(); db.signin(root).await.unwrap(); (permit, db) @@ -387,10 +374,7 @@ mod api_integration { username: ROOT_USER, password: ROOT_PASS, }; - let config = Config::new() - .user(root) - .tick_interval(TICK_INTERVAL) - .capabilities(Capabilities::all()); + let config = Config::new().user(root).capabilities(Capabilities::all()); let path = "/etc/foundationdb/fdb.cluster"; surrealdb::engine::any::connect((format!("fdb://{path}"), config.clone())) .await @@ -418,10 +402,7 @@ mod api_integration { username: ROOT_USER, password: ROOT_PASS, }; - let config = Config::new() - .user(root) - .tick_interval(TICK_INTERVAL) - .capabilities(Capabilities::all()); + let config = Config::new().user(root).capabilities(Capabilities::all()); let db = Surreal::new::((path, config)).await.unwrap(); db.signin(root).await.unwrap(); (permit, db) diff --git a/sdk/tests/changefeeds.rs b/sdk/tests/changefeeds.rs index 735ff1c1..a224be38 100644 --- a/sdk/tests/changefeeds.rs +++ b/sdk/tests/changefeeds.rs @@ -49,11 +49,11 @@ async fn database_change_feeds() -> Result<(), Error> { let dbs = new_ds().await?; let ses = Session::owner().with_ns(ns.as_str()).with_db(db.as_str()); let mut current_time = 0u64; - dbs.tick_at(current_time).await?; + dbs.changefeed_process_at(current_time).await?; let res = &mut dbs.execute(sql.as_str(), &ses, None).await?; // Increment by a second (sic) current_time += 1; - dbs.tick_at(current_time).await?; + dbs.changefeed_process_at(current_time).await?; assert_eq!(res.len(), 3); // DEFINE DATABASE let tmp = res.remove(0).result; @@ -178,7 +178,7 @@ async fn database_change_feeds() -> Result<(), Error> { "; // This is neccessary to mark a point in time that can be GC'd current_time += 1; - dbs.tick_at(current_time).await?; + dbs.changefeed_process_at(current_time).await?; let tx = dbs.transaction(Write, Optimistic).await?; tx.cancel().await?; @@ -189,7 +189,7 @@ async fn database_change_feeds() -> Result<(), Error> { let one_hour_in_secs = 3600; current_time += one_hour_in_secs; current_time += 1; - dbs.tick_at(current_time).await?; + dbs.changefeed_process_at(current_time).await?; let res = &mut dbs.execute(sql, &ses, None).await?; let tmp = res.remove(0).result?; let val = Value::parse("[]"); @@ -229,9 +229,9 @@ async fn table_change_feeds() -> Result<(), Error> { let ses = Session::owner().with_ns("test-tb-cf").with_db("test-tb-cf"); let start_ts = 0u64; let end_ts = start_ts + 1; - dbs.tick_at(start_ts).await?; + dbs.changefeed_process_at(start_ts).await?; let res = &mut dbs.execute(sql, &ses, None).await?; - dbs.tick_at(end_ts).await?; + dbs.changefeed_process_at(end_ts).await?; assert_eq!(res.len(), 10); // DEFINE TABLE let tmp = res.remove(0).result; @@ -388,7 +388,7 @@ async fn table_change_feeds() -> Result<(), Error> { let sql = " SHOW CHANGES FOR TABLE person SINCE 0; "; - dbs.tick_at(end_ts + 3599).await?; + dbs.changefeed_process_at(end_ts + 3599).await?; let res = &mut dbs.execute(sql, &ses, None).await?; let tmp = res.remove(0).result?; assert!( @@ -402,7 +402,7 @@ async fn table_change_feeds() -> Result<(), Error> { .unwrap() ); // GC after 1hs - dbs.tick_at(end_ts + 3600).await?; + dbs.changefeed_process_at(end_ts + 3600).await?; let res = &mut dbs.execute(sql, &ses, None).await?; let tmp = res.remove(0).result?; let val = Value::parse("[]"); @@ -423,7 +423,7 @@ async fn changefeed_with_ts() -> Result<(), Error> { // Save timestamp 1 let ts1_dt = "2023-08-01T00:00:00Z"; let ts1 = DateTime::parse_from_rfc3339(ts1_dt).unwrap(); - db.tick_at(ts1.timestamp().try_into().unwrap()).await.unwrap(); + db.changefeed_process_at(ts1.timestamp().try_into().unwrap()).await.unwrap(); // Create and update users let sql = " CREATE user:amos SET name = 'Amos'; @@ -627,7 +627,7 @@ async fn changefeed_with_ts() -> Result<(), Error> { // Save timestamp 2 let ts2_dt = "2023-08-01T00:00:05Z"; let ts2 = DateTime::parse_from_rfc3339(ts2_dt).unwrap(); - db.tick_at(ts2.timestamp().try_into().unwrap()).await.unwrap(); + db.changefeed_process_at(ts2.timestamp().try_into().unwrap()).await.unwrap(); // // Show changes using timestamp 1 // @@ -684,7 +684,7 @@ async fn changefeed_with_ts() -> Result<(), Error> { // Save timestamp 3 let ts3_dt = "2023-08-01T00:00:10Z"; let ts3 = DateTime::parse_from_rfc3339(ts3_dt).unwrap(); - db.tick_at(ts3.timestamp().try_into().unwrap()).await.unwrap(); + db.changefeed_process_at(ts3.timestamp().try_into().unwrap()).await.unwrap(); // // Show changes using timestamp 3 // diff --git a/src/cli/config.rs b/src/cli/config.rs index e01e0cb1..4bf4b1cb 100644 --- a/src/cli/config.rs +++ b/src/cli/config.rs @@ -1,11 +1,10 @@ use crate::net::client_ip::ClientIp; use std::sync::OnceLock; use std::{net::SocketAddr, path::PathBuf}; +use surrealdb::options::EngineOptions; pub static CF: OnceLock = OnceLock::new(); -use surrealdb::options::EngineOptions; - #[derive(Clone, Debug)] pub struct Config { pub bind: SocketAddr, @@ -15,6 +14,6 @@ pub struct Config { pub pass: Option, pub crt: Option, pub key: Option, - pub engine: Option, + pub engine: EngineOptions, pub no_identification_headers: bool, } diff --git a/src/cli/start.rs b/src/cli/start.rs index 3493f24e..ef5380e2 100644 --- a/src/cli/start.rs +++ b/src/cli/start.rs @@ -1,5 +1,4 @@ -use super::config; -use super::config::Config; +use super::config::{Config, CF}; use crate::cli::validator::parser::env_filter::CustomEnvFilter; use crate::cli::validator::parser::env_filter::CustomEnvFilterParser; use crate::cnf::LOGO; @@ -14,7 +13,7 @@ use std::path::PathBuf; use std::sync::Arc; use std::time::Duration; use surrealdb::engine::any::IntoEndpoint; -use surrealdb::engine::tasks::start_tasks; +use surrealdb::engine::tasks; use surrealdb::options::EngineOptions; use tokio_util::sync::CancellationToken; @@ -43,12 +42,33 @@ pub struct StartCommandArguments { // Tasks // #[arg( - help = "The interval at which to run node agent tick (including garbage collection)", + help = "The interval at which to refresh node registration information", help_heading = "Database" )] - #[arg(env = "SURREAL_TICK_INTERVAL", long = "tick-interval", value_parser = super::validator::duration)] + #[arg(env = "SURREAL_NODE_MEMBERSHIP_REFRESH_INTERVAL", long = "node-membership-refresh-interval", value_parser = super::validator::duration)] + #[arg(default_value = "3s")] + node_membership_refresh_interval: Duration, + #[arg( + help = "The interval at which process and archive inactive nodes", + help_heading = "Database" + )] + #[arg(env = "SURREAL_NODE_MEMBERSHIP_CHECK_INTERVAL", long = "node-membership-check-interval", value_parser = super::validator::duration)] + #[arg(default_value = "15s")] + node_membership_check_interval: Duration, + #[arg( + help = "The interval at which to process and cleanup archived nodes", + help_heading = "Database" + )] + #[arg(env = "SURREAL_NODE_MEMBERSHIP_CLEANUP_INTERVAL", long = "node-membership-cleanup-interval", value_parser = super::validator::duration)] + #[arg(default_value = "300s")] + node_membership_cleanup_interval: Duration, + #[arg( + help = "The interval at which to perform changefeed garbage collection", + help_heading = "Database" + )] + #[arg(env = "SURREAL_CHANGEFEED_GC_INTERVAL", long = "changefeed-gc-interval", value_parser = super::validator::duration)] #[arg(default_value = "10s")] - tick_interval: Duration, + changefeed_gc_interval: Duration, // // Authentication // @@ -143,7 +163,10 @@ pub async fn init( dbs, web, log, - tick_interval, + node_membership_refresh_interval, + node_membership_check_interval, + node_membership_cleanup_interval, + changefeed_gc_interval, no_banner, no_identification_headers, .. @@ -168,38 +191,42 @@ pub async fn init( } else { (None, None) }; - // Setup the command-line options - let _ = config::CF.set(Config { + // Configure the engine + let engine = EngineOptions::default() + .with_node_membership_refresh_interval(node_membership_refresh_interval) + .with_node_membership_check_interval(node_membership_check_interval) + .with_node_membership_cleanup_interval(node_membership_cleanup_interval) + .with_changefeed_gc_interval(changefeed_gc_interval); + // Configure the config + let config = Config { bind: listen_addresses.first().cloned().unwrap(), client_ip, path, user, pass, no_identification_headers, - engine: Some(EngineOptions::default().with_tick_interval(tick_interval)), + engine, crt, key, - }); - // This is the cancellation token propagated down to - // all the async functions that needs to be stopped gracefully. - let ct = CancellationToken::new(); + }; + // Setup the command-line options + let _ = CF.set(config); // Initiate environment env::init().await?; + // Create a token to cancel tasks + let canceller = CancellationToken::new(); // Start the datastore - let ds = Arc::new(dbs::init(dbs).await?); + let datastore = Arc::new(dbs::init(dbs).await?); // Start the node agent - let (tasks, task_chans) = - start_tasks(&config::CF.get().unwrap().engine.unwrap_or_default(), ds.clone()); + let nodetasks = tasks::init(datastore.clone(), canceller.clone(), &CF.get().unwrap().engine); // Start the web server - net::init(ds, ct.clone()).await?; + net::init(datastore.clone(), canceller.clone()).await?; // Shutdown and stop closed tasks - task_chans.into_iter().for_each(|chan| { - if chan.send(()).is_err() { - error!("Failed to send shutdown signal to task"); - } - }); - ct.cancel(); - tasks.resolve().await?; + canceller.cancel(); + // Wait for background tasks to finish + nodetasks.resolve().await?; + // Delete this node from the cluster + datastore.delete_node(datastore.id()).await?; // All ok Ok(()) } diff --git a/src/telemetry/mod.rs b/src/telemetry/mod.rs index a4e28109..ae7abb29 100644 --- a/src/telemetry/mod.rs +++ b/src/telemetry/mod.rs @@ -97,6 +97,8 @@ impl Builder { } pub fn shutdown() -> Result<(), Error> { + // Output information to logs + trace!("Shutting down telemetry service"); // Flush all telemetry data and block until done opentelemetry::global::shutdown_tracer_provider(); // Everything ok @@ -115,14 +117,17 @@ pub fn filter_from_value(v: &str) -> Result { // Otherwise, let's show info and above "info" => Ok(EnvFilter::default().add_directive(Level::INFO.into())), // Otherwise, let's show debugs and above - "debug" => EnvFilter::builder() - .parse("warn,surreal=debug,surrealdb=debug,surrealcs=warn,surrealdb::core::kvs=debug"), + "debug" => EnvFilter::builder().parse( + "warn,surreal=debug,surrealdb=debug,surrealcs=warn,surrealdb::core::kvs::tr=debug", + ), // Specify the log level for each code area - "trace" => EnvFilter::builder() - .parse("warn,surreal=trace,surrealdb=trace,surrealcs=warn,surrealdb::core::kvs=debug"), + "trace" => EnvFilter::builder().parse( + "warn,surreal=trace,surrealdb=trace,surrealcs=warn,surrealdb::core::kvs::tr=debug", + ), // Check if we should show all surreal logs - "full" => EnvFilter::builder() - .parse("debug,surreal=trace,surrealdb=trace,surrealcs=debug,surrealdb::core=trace"), + "full" => EnvFilter::builder().parse( + "debug,surreal=trace,surrealdb=trace,surrealcs=debug,surrealdb::core::kvs::tr=trace", + ), // Check if we should show all module logs "all" => Ok(EnvFilter::default().add_directive(Level::TRACE.into())), // Let's try to parse the custom log level diff --git a/tests/cli_integration.rs b/tests/cli_integration.rs index c4b181a0..5dfdc5e1 100644 --- a/tests/cli_integration.rs +++ b/tests/cli_integration.rs @@ -22,10 +22,6 @@ mod cli_integration { use super::common::{self, StartServerArguments, PASS, USER}; - /// This depends on the interval configuration that we cannot yet inject - const ONE_PERIOD: Duration = Duration::new(10, 0); - const TWO_PERIODS: Duration = Duration::new(20, 0); - #[test] fn version_command() { assert!(common::run("version").output().is_ok()); @@ -878,7 +874,6 @@ mod cli_integration { auth: false, tls: false, wait_is_ready: true, - tick_interval: ONE_PERIOD, ..Default::default() }) .await @@ -980,7 +975,7 @@ mod cli_integration { } }; - sleep(TWO_PERIODS).await; + sleep(Duration::from_secs(20)).await; info!("* Show changes after GC"); { diff --git a/tests/common/server.rs b/tests/common/server.rs index d58a97c1..66593167 100644 --- a/tests/common/server.rs +++ b/tests/common/server.rs @@ -159,7 +159,6 @@ pub struct StartServerArguments { pub auth: bool, pub tls: bool, pub wait_is_ready: bool, - pub tick_interval: time::Duration, pub temporary_directory: Option, pub args: String, pub vars: Option>, @@ -172,7 +171,6 @@ impl Default for StartServerArguments { auth: true, tls: false, wait_is_ready: true, - tick_interval: time::Duration::new(1, 0), temporary_directory: None, args: "".to_string(), vars: None, @@ -247,7 +245,6 @@ pub async fn start_server( auth, tls, wait_is_ready, - tick_interval, temporary_directory, args, vars, @@ -274,11 +271,6 @@ pub async fn start_server( extra_args.push_str(" --unauthenticated"); } - if !tick_interval.is_zero() { - let sec = tick_interval.as_secs(); - extra_args.push_str(format!(" --tick-interval {sec}s").as_str()); - } - if let Some(path) = temporary_directory { extra_args.push_str(format!(" --temporary-directory {path}").as_str()); }