From 888184f50f40bd502be8e454145a892028c6a282 Mon Sep 17 00:00:00 2001 From: Przemyslaw Hugh Kaznowski Date: Tue, 27 Feb 2024 15:18:25 +0000 Subject: [PATCH] Add notifications for LQ v2 on CF (#3480) Co-authored-by: Mees Delzenne --- core/src/dbs/executor.rs | 84 +++++++++--- core/src/kvs/ds.rs | 199 ++++++++++++++++++++++++----- core/src/kvs/lq_structs.rs | 12 +- core/src/kvs/tests/mod.rs | 6 + core/src/kvs/tests/tx_test.rs | 36 ++++++ core/src/kvs/tx.rs | 16 ++- core/src/lib.rs | 1 + core/src/sql/v2/changefeed.rs | 1 - lib/src/api/engine/local/native.rs | 8 +- lib/src/api/engine/local/wasm.rs | 4 +- lib/tests/changefeeds.rs | 10 +- lib/tests/helpers.rs | 2 +- lib/tests/live.rs | 46 ++++--- src/cli/start.rs | 6 + src/node/mod.rs | 34 +++++ 15 files changed, 378 insertions(+), 87 deletions(-) create mode 100644 core/src/kvs/tests/tx_test.rs diff --git a/core/src/dbs/executor.rs b/core/src/dbs/executor.rs index cbb8c875..540e4453 100644 --- a/core/src/dbs/executor.rs +++ b/core/src/dbs/executor.rs @@ -1,3 +1,16 @@ +use std::sync::Arc; + +use channel::Receiver; +use futures::lock::Mutex; +use futures::StreamExt; +#[cfg(not(target_arch = "wasm32"))] +use tokio::spawn; +use tracing::instrument; +use trice::Instant; + +#[cfg(target_arch = "wasm32")] +use wasm_bindgen_futures::spawn_local as spawn; + use crate::ctx::Context; use crate::dbs::response::Response; use crate::dbs::Notification; @@ -7,6 +20,7 @@ use crate::dbs::Transaction; use crate::err::Error; use crate::iam::Action; use crate::iam::ResourceKind; +use crate::kvs::lq_structs::TrackedResult; use crate::kvs::TransactionType; use crate::kvs::{Datastore, LockType::*, TransactionType::*}; use crate::sql::paths::DB; @@ -15,16 +29,6 @@ use crate::sql::query::Query; use crate::sql::statement::Statement; use crate::sql::value::Value; use crate::sql::Base; -use channel::Receiver; -use futures::lock::Mutex; -use futures::StreamExt; -use std::sync::Arc; -#[cfg(not(target_arch = "wasm32"))] -use tokio::spawn; -use tracing::instrument; -use trice::Instant; -#[cfg(target_arch = "wasm32")] -use wasm_bindgen_futures::spawn_local as spawn; pub(crate) struct Executor<'a> { err: bool, @@ -83,7 +87,19 @@ impl<'a> Executor<'a> { let _ = txn.cancel().await; } else { let r = match txn.complete_changes(false).await { - Ok(_) => txn.commit().await, + Ok(_) => { + match txn.commit().await { + Ok(()) => { + // Commit succeeded, do post commit operations that do not matter to the tx + let lqs: Vec = + txn.consume_pending_live_queries(); + // Track the live queries in the data store + self.kvs.track_live_queries(&lqs).await?; + Ok(()) + } + Err(e) => Err(e), + } + } r => r, }; if let Err(e) = r { @@ -151,6 +167,7 @@ impl<'a> Executor<'a> { /// Flush notifications from a buffer channel (live queries) to the committed notification channel. /// This is because we don't want to broadcast notifications to the user for failed transactions. + /// TODO we can delete this once we migrate to lq v2 async fn flush(&self, ctx: &Context<'_>, mut rcv: Receiver) { let sender = ctx.notifications(); spawn(async move { @@ -164,6 +181,17 @@ impl<'a> Executor<'a> { }); } + /// A transaction collects created live queries which can then be consumed when a transaction is committed + /// We use this function to get these transactions and send them to the invoker without channels + async fn consume_committed_live_query_registrations(&self) -> Option> { + if let Some(txn) = self.txn.as_ref() { + let txn = txn.lock().await; + Some(txn.consume_pending_live_queries()) + } else { + None + } + } + async fn set_ns(&self, ctx: &mut Context<'_>, opt: &mut Options, ns: &str) { let mut session = ctx.value("session").unwrap_or(&Value::None).clone(); session.put(NS.as_ref(), ns.to_owned().into()); @@ -184,7 +212,7 @@ impl<'a> Executor<'a> { mut ctx: Context<'_>, opt: Options, qry: Query, - ) -> Result, Error> { + ) -> Result<(Vec, Vec), Error> { // Create a notification channel let (send, recv) = channel::unbounded(); // Set the notification channel @@ -193,6 +221,7 @@ impl<'a> Executor<'a> { let mut buf: Vec = vec![]; // Initialise array of responses let mut out: Vec = vec![]; + let mut live_queries: Vec = vec![]; // Process all statements in query for stm in qry.into_iter() { // Log the statement @@ -249,6 +278,9 @@ impl<'a> Executor<'a> { let commit_error = self.commit(true).await.err(); buf = buf.into_iter().map(|v| self.buf_commit(v, &commit_error)).collect(); self.flush(&ctx, recv.clone()).await; + if let Some(lqs) = self.consume_committed_live_query_registrations().await { + live_queries.extend(lqs); + } out.append(&mut buf); debug_assert!(self.txn.is_none(), "commit(true) should have unset txn"); self.txn = None; @@ -294,6 +326,12 @@ impl<'a> Executor<'a> { Ok(_) => { // Flush live query notifications self.flush(&ctx, recv.clone()).await; + if let Some(lqs) = self + .consume_committed_live_query_registrations() + .await + { + live_queries.extend(lqs); + } Ok(Value::None) } } @@ -366,7 +404,11 @@ impl<'a> Executor<'a> { } else { // Flush the live query change notifications self.flush(&ctx, recv.clone()).await; - // Successful, committed result + if let Some(lqs) = + self.consume_committed_live_query_registrations().await + { + live_queries.extend(lqs); + } res } } else { @@ -392,8 +434,18 @@ impl<'a> Executor<'a> { e }), query_type: match (is_stm_live, is_stm_kill) { - (true, _) => QueryType::Live, - (_, true) => QueryType::Kill, + (true, _) => { + if let Some(lqs) = self.consume_committed_live_query_registrations().await { + live_queries.extend(lqs); + } + QueryType::Live + } + (_, true) => { + if let Some(lqs) = self.consume_committed_live_query_registrations().await { + live_queries.extend(lqs); + } + QueryType::Kill + } _ => QueryType::Other, }, }; @@ -408,7 +460,7 @@ impl<'a> Executor<'a> { } } // Return responses - Ok(out) + Ok((out, live_queries)) } } diff --git a/core/src/kvs/ds.rs b/core/src/kvs/ds.rs index a2060aa5..0f8bc6f4 100644 --- a/core/src/kvs/ds.rs +++ b/core/src/kvs/ds.rs @@ -15,14 +15,15 @@ use tracing::trace; use wasmtimer::std::{SystemTime, UNIX_EPOCH}; use crate::cf; -use crate::cf::ChangeSet; +use crate::cf::{ChangeSet, TableMutation}; use crate::ctx::Context; #[cfg(feature = "jwks")] use crate::dbs::capabilities::NetTarget; use crate::dbs::{ node::Timestamp, Attach, Capabilities, Executor, Notification, Options, Response, Session, - Variables, + Statement, Variables, Workable, }; +use crate::doc::Document; use crate::err::Error; use crate::fflags::FFLAGS; use crate::iam::{Action, Auth, Error as IamError, Resource, Role}; @@ -32,7 +33,7 @@ use crate::kvs::clock::SizedClock; #[allow(unused_imports)] use crate::kvs::clock::SystemClock; use crate::kvs::lq_structs::{ - LqEntry, LqIndexKey, LqIndexValue, LqSelector, LqValue, UnreachableLqType, + LqEntry, LqIndexKey, LqIndexValue, LqSelector, LqValue, TrackedResult, UnreachableLqType, }; use crate::kvs::{LockType, LockType::*, TransactionType, TransactionType::*}; use crate::sql::statements::show::ShowSince; @@ -76,10 +77,12 @@ pub struct Datastore { versionstamp_oracle: Arc>, // Whether this datastore enables live query notifications to subscribers notification_channel: Option<(Sender, Receiver)>, - // Map of Live Query ID to Live Query query - local_live_queries: Arc>>, - // Set of tracked change feeds - local_live_query_cfs: Arc>>, + // Map of Live Query identifier (ns+db+tb) for change feed tracking + // the mapping is to a list of affected live queries + local_live_queries: Arc>>>, + // Set of tracked change feeds with associated watermarks + // This is updated with new/removed live queries and improves cf request performance + cf_watermarks: Arc>>, // Clock for tracking time. It is read only and accessible to all transactions. It is behind a mutex as tests may write to it. clock: Arc, // The index store cache @@ -354,7 +357,7 @@ impl Datastore { clock, index_stores: IndexStores::default(), local_live_queries: Arc::new(RwLock::new(BTreeMap::new())), - local_live_query_cfs: Arc::new(RwLock::new(BTreeMap::new())), + cf_watermarks: Arc::new(RwLock::new(BTreeMap::new())), }) } @@ -841,26 +844,28 @@ impl Datastore { } } - /// This is a future that is from whatever is running the datastore as a SurrealDB instance (api WASM and native) - /// It's responsibility is to catch up all live queries based on changes to the relevant change feeds, - /// and send notifications after assessing authorisation. Live queries then have their watermarks updated. - pub async fn process_lq_notifications(&self) -> Result<(), Error> { + /// Poll change feeds for live query notifications + pub async fn process_lq_notifications(&self, opt: &Options) -> Result<(), Error> { // Runtime feature gate, as it is not production-ready if !FFLAGS.change_feed_live_queries.enabled() { return Ok(()); } // Return if there are no live queries if self.notification_channel.is_none() { + trace!("Channels is none, short-circuiting"); return Ok(()); } if self.local_live_queries.read().await.is_empty() { + trace!("No live queries, short-circuiting"); return Ok(()); } - // Find live queries that need to catch up + // Change map includes a mapping of selector to changesets, ordered by versionstamp let mut change_map: BTreeMap> = BTreeMap::new(); let mut tx = self.transaction(Read, Optimistic).await?; - for (selector, vs) in self.local_live_query_cfs.read().await.iter() { + let mut tracked_cfs = self.cf_watermarks.write().await; + let mut tracked_cfs_updates = Vec::with_capacity(tracked_cfs.len()); + for (selector, vs) in tracked_cfs.iter() { // Read the change feed for the selector let res = cf::read( &mut tx, @@ -874,14 +879,33 @@ impl Datastore { ) .await?; // Confirm we do need to change watermark - this is technically already handled by the cf range scan + if res.is_empty() { + trace!( + "There were no changes in the change feed for {:?} from versionstamp {:?}", + selector, + vs + ) + } if let Some(change_set) = res.last() { if conv::versionstamp_to_u64(&change_set.0) > conv::versionstamp_to_u64(vs) { + trace!("Adding a change set for lq notification processing"); + // Update the cf watermark so we can progress scans + // If the notifications fail from here-on, they are lost + // this is a separate vec that we later insert to because we are iterating immutably + // We shouldn't use a read lock because of consistency between watermark scans + tracked_cfs_updates.push((selector.clone(), change_set.0)); + // This does not guarantee a notification, as a changeset an include many tables and many changes change_map.insert(selector.clone(), res); } } } tx.cancel().await?; + // Now we update since we are no longer iterating immutably + for (selector, vs) in tracked_cfs_updates { + tracked_cfs.insert(selector, vs); + } + for (selector, change_sets) in change_map { // find matching live queries let lq_pairs: Vec<(LqIndexKey, LqIndexValue)> = { @@ -889,29 +913,86 @@ impl Datastore { lq_lock .iter() .filter(|(k, _)| k.selector == selector) - .map(|a| { - let (b, c) = (a.0.clone(), a.1.clone()); - (b, c) + .flat_map(|(lq_index, lq_values)| { + lq_values.iter().cloned().map(|x| (lq_index.clone(), x)) }) .to_owned() .collect() }; + // Find relevant changes + let tx = Arc::new(Mutex::new(self.transaction(Read, Optimistic).await?)); for change_set in change_sets { + // TODO(phughk): this loop can be on the inside so we are only checking lqs relavant to cf change for (lq_key, lq_value) in lq_pairs.iter() { + trace!( + "Processing live query for notification key={:?} and value={:?}", + lq_key, + lq_value + ); let change_vs = change_set.0; let database_mutation = &change_set.1; - for table_mutation in database_mutation.0.iter() { - if table_mutation.0 == lq_key.selector.tb { - // TODO(phughk): process live query logic - // TODO(SUR-291): enforce security - self.local_live_queries.write().await.insert( - (*lq_key).clone(), - LqIndexValue { - vs: change_vs, - ..(*lq_value).clone() - }, - ); + for table_mutations in database_mutation.0.iter() { + if table_mutations.0 == lq_key.selector.tb { + // Create a doc of the table value + // Run the 'lives' logic on the doc, while providing live queries instead of reading from storage + // This will generate and send notifications + for mutation in table_mutations.1.iter() { + if let Some(doc) = Self::construct_document(mutation) { + // We know we are only processing a single LQ at a time, so we can limit notifications to 1 + let notification_capacity = 1; + // We track notifications as a separate channel in case we want to process + // for the current state we only forward + let (sender, receiver) = + channel::bounded(notification_capacity); + doc.check_lqs_and_send_notifications( + opt, + &Statement::Live(&lq_value.stm), + &tx, + [&lq_value.stm].as_slice(), + &sender, + ) + .await + .map_err(|e| { + Error::Internal(format!( + "Error checking lqs for notifications: {:?}", + e + )) + })?; + + // Send the notifications to driver or api + // TODO: evaluate if we want channel directly instead of proxy + while let Ok(notification) = receiver.try_recv() { + trace!("Sending notification to client"); + self.notification_channel + .as_ref() + .unwrap() + .0 + .send(notification) + .await + .unwrap(); + } + trace!("Ended notification sending") + } + + // Update watermarks + trace!( + "Updating watermark to {:?} for index key {:?}", + change_vs, + lq_key + ); + + // For each live query we have processed we update the watermarks + self.local_live_queries.write().await.insert( + (*lq_key).clone(), + vec![LqIndexValue { + vs: change_vs, + ..lq_value.clone() + }], + ); + + // We also update the tracked_cfs with a minimum watermark + } } } } @@ -920,6 +1001,58 @@ impl Datastore { Ok(()) } + /// Construct a document from a Change Feed mutation + /// This is required to perform document operations such as live query notifications + fn construct_document(mutation: &TableMutation) -> Option { + match mutation { + TableMutation::Set(a, b) => { + let doc = Document::new(None, Some(a), None, b, Workable::Normal); + Some(doc) + } + TableMutation::Del(a) => { + let doc = Document::new(None, Some(a), None, &Value::None, Workable::Normal); + Some(doc) + } + TableMutation::Def(_) => None, + TableMutation::SetPrevious(id, _old, new) => { + let doc = Document::new(None, Some(id), None, new, Workable::Normal); + // TODO set previous value + Some(doc) + } + } + } + + /// Add live queries to track on the datastore + /// These get polled by the change feed tick + pub(crate) async fn track_live_queries(&self, lqs: &Vec) -> Result<(), Error> { + // Lock the local live queries + let mut lq_map = self.local_live_queries.write().await; + let mut cf_watermarks = self.cf_watermarks.write().await; + for lq in lqs { + match lq { + TrackedResult::LiveQuery(lq) => { + let lq_index_key: LqIndexKey = lq.as_key(); + let m = lq_map.get_mut(&lq_index_key); + match m { + Some(lq_index_value) => lq_index_value.push(lq.as_value()), + None => { + let lq_vec = vec![lq.as_value()]; + lq_map.insert(lq_index_key.clone(), lq_vec); + } + } + let selector = lq_index_key.selector; + // TODO(phughk): - read watermark for catchup + // We insert the current watermark. + cf_watermarks.entry(selector).or_insert_with(Versionstamp::default); + } + TrackedResult::KillQuery(_lq) => { + unimplemented!("Cannot kill queries yet") + } + } + } + Ok(()) + } + async fn save_timestamp_for_versionstamp_impl( &self, ts: u64, @@ -1167,7 +1300,15 @@ impl Datastore { // Store the query variables let ctx = vars.attach(ctx)?; // Process all statements - exe.execute(ctx, opt, ast).await + let res = exe.execute(ctx, opt, ast).await; + match res { + Ok((responses, lives)) => { + // Register live queries + self.track_live_queries(&lives).await?; + Ok(responses) + } + Err(e) => Err(e), + } } /// Ensure a SQL [`Value`] is fully computed diff --git a/core/src/kvs/lq_structs.rs b/core/src/kvs/lq_structs.rs index 4dae6276..9a62e6c6 100644 --- a/core/src/kvs/lq_structs.rs +++ b/core/src/kvs/lq_structs.rs @@ -83,26 +83,24 @@ pub(crate) struct LqIndexValue { #[derive(Debug)] #[cfg_attr(test, derive(PartialEq, Clone))] pub(crate) struct LqEntry { - #[allow(dead_code)] pub(crate) live_id: Uuid, - #[allow(dead_code)] pub(crate) ns: String, - #[allow(dead_code)] pub(crate) db: String, - #[allow(dead_code)] pub(crate) stm: LiveStatement, } /// This is a type representing information that is tracked outside of a datastore /// For example, live query IDs need to be tracked by websockets so they are closed correctly on closing a connection -#[allow(dead_code)] +#[derive(Debug)] +#[cfg_attr(test, derive(PartialEq, Clone))] pub(crate) enum TrackedResult { LiveQuery(LqEntry), + #[allow(dead_code)] + KillQuery(LqEntry), } impl LqEntry { /// Treat like an into from a borrow - #[allow(dead_code)] pub(crate) fn as_key(&self) -> LqIndexKey { let tb = self.stm.what.to_string(); LqIndexKey { @@ -115,8 +113,6 @@ impl LqEntry { } } - /// Treat like an into from a borrow - #[allow(dead_code)] pub(crate) fn as_value(&self) -> LqIndexValue { LqIndexValue { stm: self.stm.clone(), diff --git a/core/src/kvs/tests/mod.rs b/core/src/kvs/tests/mod.rs index 9665f200..40efd383 100644 --- a/core/src/kvs/tests/mod.rs +++ b/core/src/kvs/tests/mod.rs @@ -63,6 +63,7 @@ mod mem { include!("ndlq.rs"); include!("tblq.rs"); include!("tbnt.rs"); + include!("tx_test.rs"); } #[cfg(feature = "kv-rocksdb")] @@ -111,6 +112,7 @@ mod rocksdb { include!("ndlq.rs"); include!("tblq.rs"); include!("tbnt.rs"); + include!("tx_test.rs"); } #[cfg(feature = "kv-speedb")] @@ -157,6 +159,7 @@ mod speedb { include!("ndlq.rs"); include!("tblq.rs"); include!("tbnt.rs"); + include!("tx_test.rs"); } #[cfg(feature = "kv-tikv")] @@ -204,6 +207,7 @@ mod tikv { include!("ndlq.rs"); include!("tblq.rs"); include!("tbnt.rs"); + include!("tx_test.rs"); } #[cfg(feature = "kv-fdb")] @@ -251,6 +255,7 @@ mod fdb { include!("ndlq.rs"); include!("tblq.rs"); include!("tbnt.rs"); + include!("tx_test.rs"); } #[cfg(feature = "kv-surrealkv")] @@ -300,4 +305,5 @@ mod surrealkv { include!("ndlq.rs"); include!("tblq.rs"); include!("tbnt.rs"); + include!("tx_test.rs"); } diff --git a/core/src/kvs/tests/tx_test.rs b/core/src/kvs/tests/tx_test.rs new file mode 100644 index 00000000..2c6da4a4 --- /dev/null +++ b/core/src/kvs/tests/tx_test.rs @@ -0,0 +1,36 @@ +use crate::kvs::lq_structs::{LqEntry, TrackedResult}; + +#[tokio::test] +#[serial] +async fn live_queries_sent_to_tx_are_received() { + let node_id = uuid::uuid!("d0f1a200-e24e-44fe-98c1-2271a5781da7"); + let clock = Arc::new(SizedClock::Fake(FakeClock::new(Timestamp::default()))); + let test = init(node_id, clock).await.unwrap(); + let mut tx = test.db.transaction(Write, Optimistic).await.unwrap(); + + // Create live query data + let lq_entry = LqEntry { + live_id: sql::Uuid::new_v4(), + ns: "namespace".to_string(), + db: "database".to_string(), + stm: LiveStatement { + id: sql::Uuid::new_v4(), + node: sql::Uuid::from(node_id), + expr: Default::default(), + what: Default::default(), + cond: None, + fetch: None, + archived: None, + session: Some(Value::None), + auth: None, + }, + }; + tx.pre_commit_register_live_query(lq_entry.clone()).unwrap(); + + tx.commit().await.unwrap(); + + // Verify data + let live_queries = tx.consume_pending_live_queries(); + assert_eq!(live_queries.len(), 1); + assert_eq!(live_queries[0], TrackedResult::LiveQuery(lq_entry)); +} diff --git a/core/src/kvs/tx.rs b/core/src/kvs/tx.rs index ab538618..822d8a18 100644 --- a/core/src/kvs/tx.rs +++ b/core/src/kvs/tx.rs @@ -34,7 +34,7 @@ use crate::key::key_req::KeyRequirements; use crate::kvs::cache::Cache; use crate::kvs::cache::Entry; use crate::kvs::clock::SizedClock; -use crate::kvs::lq_structs::{LqEntry, LqValue}; +use crate::kvs::lq_structs::{LqEntry, LqValue, TrackedResult}; use crate::kvs::Check; use crate::sql; use crate::sql::paths::EDGE; @@ -328,17 +328,19 @@ impl Transaction { } } - #[allow(unused)] - pub(crate) fn consume_pending_live_queries(&self) -> Vec { - let mut lq: Vec = Vec::with_capacity(LQ_CAPACITY); + /// From the existing transaction, consume all the remaining live query registration events and return them synchronously + pub(crate) fn consume_pending_live_queries(&self) -> Vec { + let mut lq: Vec = Vec::with_capacity(LQ_CAPACITY); while let Ok(l) = self.prepared_live_queries.1.try_recv() { - lq.push(l); + lq.push(TrackedResult::LiveQuery(l)); } lq } /// Sends a live query to the transaction which is forwarded only once committed /// And removed once a transaction is aborted + // allow(dead_code) because this is used in v2, but not v1 + #[allow(dead_code)] pub(crate) fn pre_commit_register_live_query( &mut self, lq_entry: LqEntry, @@ -3136,7 +3138,7 @@ mod tests { #[cfg(all(test, feature = "kv-mem"))] mod tx_test { - use crate::kvs::lq_structs::LqEntry; + use crate::kvs::lq_structs::{LqEntry, TrackedResult}; use crate::kvs::Datastore; use crate::kvs::LockType::Optimistic; use crate::kvs::TransactionType::Write; @@ -3174,6 +3176,6 @@ mod tx_test { // Verify data let live_queries = tx.consume_pending_live_queries(); assert_eq!(live_queries.len(), 1); - assert_eq!(live_queries[0], lq_entry); + assert_eq!(live_queries[0], TrackedResult::LiveQuery(lq_entry)); } } diff --git a/core/src/lib.rs b/core/src/lib.rs index c09ecff0..7c4c36fe 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -1,5 +1,6 @@ #[macro_use] extern crate tracing; +extern crate core; #[macro_use] mod mac; diff --git a/core/src/sql/v2/changefeed.rs b/core/src/sql/v2/changefeed.rs index e641d0d3..bde873dc 100644 --- a/core/src/sql/v2/changefeed.rs +++ b/core/src/sql/v2/changefeed.rs @@ -10,7 +10,6 @@ use std::time; pub struct ChangeFeed { pub expiry: time::Duration, } - impl Display for ChangeFeed { fn fmt(&self, f: &mut Formatter) -> fmt::Result { write!(f, "CHANGEFEED {}", Duration(self.expiry))?; diff --git a/lib/src/api/engine/local/native.rs b/lib/src/api/engine/local/native.rs index 5905213c..272929ce 100644 --- a/lib/src/api/engine/local/native.rs +++ b/lib/src/api/engine/local/native.rs @@ -34,6 +34,7 @@ use std::sync::Arc; use std::sync::OnceLock; use std::task::Poll; use std::time::Duration; +use surrealdb_core::dbs::Options; use tokio::time; use tokio::time::MissedTickBehavior; @@ -208,6 +209,7 @@ pub(crate) fn router( } fn run_maintenance(kvs: Arc, tick_interval: Duration, stop_signal: Receiver<()>) { + trace!("Starting maintenance"); // Some classic ownership shenanigans let kvs_two = kvs.clone(); let stop_signal_two = stop_signal.clone(); @@ -235,6 +237,7 @@ fn run_maintenance(kvs: Arc, tick_interval: Duration, stop_signal: Re }); if FFLAGS.change_feed_live_queries.enabled() { + trace!("Live queries v2 enabled"); // Spawn the live query change feed consumer, which is used for catching up on relevant change feeds tokio::spawn(async move { let kvs = kvs_two; @@ -251,12 +254,15 @@ fn run_maintenance(kvs: Arc, tick_interval: Duration, stop_signal: Re let mut stream = streams.merge(); + let opt = Options::default(); while let Some(Some(_)) = stream.next().await { - match kvs.process_lq_notifications().await { + match kvs.process_lq_notifications(&opt).await { Ok(()) => trace!("Live Query poll ran successfully"), Err(error) => error!("Error running live query poll: {error}"), } } }); + } else { + trace!("Live queries v2 disabled") } } diff --git a/lib/src/api/engine/local/wasm.rs b/lib/src/api/engine/local/wasm.rs index 3d14833f..61008adf 100644 --- a/lib/src/api/engine/local/wasm.rs +++ b/lib/src/api/engine/local/wasm.rs @@ -34,6 +34,7 @@ use std::sync::Arc; use std::sync::OnceLock; use std::task::Poll; use std::time::Duration; +use surrealdb_core::dbs::Options; use wasm_bindgen_futures::spawn_local; use wasmtimer::tokio as time; use wasmtimer::tokio::MissedTickBehavior; @@ -246,8 +247,9 @@ fn run_maintenance(kvs: Arc, tick_interval: Duration, stop_signal: Re let mut stream = streams.merge(); + let opt = Options::default(); while let Some(Some(_)) = stream.next().await { - match kvs.process_lq_notifications().await { + match kvs.process_lq_notifications(&opt).await { Ok(()) => trace!("Live Query poll ran successfully"), Err(error) => error!("Error running live query poll: {error}"), } diff --git a/lib/tests/changefeeds.rs b/lib/tests/changefeeds.rs index f3686097..aaa141fa 100644 --- a/lib/tests/changefeeds.rs +++ b/lib/tests/changefeeds.rs @@ -1,12 +1,14 @@ -mod parse; use chrono::DateTime; -use parse::Parse; -mod helpers; + use helpers::new_ds; +use parse::Parse; use surrealdb::dbs::Session; use surrealdb::err::Error; use surrealdb::sql::Value; -use surrealdb_core::fflags::{FFlags, FFLAGS}; +use surrealdb_core::fflags::FFLAGS; + +mod helpers; +mod parse; #[tokio::test] async fn database_change_feeds() -> Result<(), Error> { diff --git a/lib/tests/helpers.rs b/lib/tests/helpers.rs index 9da799b6..b64dfac7 100644 --- a/lib/tests/helpers.rs +++ b/lib/tests/helpers.rs @@ -10,7 +10,7 @@ use surrealdb::iam::{Auth, Level, Role}; use surrealdb::kvs::Datastore; pub async fn new_ds() -> Result { - Ok(Datastore::new("memory").await?.with_capabilities(Capabilities::all())) + Ok(Datastore::new("memory").await?.with_capabilities(Capabilities::all()).with_notifications()) } #[allow(dead_code)] diff --git a/lib/tests/live.rs b/lib/tests/live.rs index a39c434f..f7efdc18 100644 --- a/lib/tests/live.rs +++ b/lib/tests/live.rs @@ -6,8 +6,8 @@ use surrealdb::dbs::Session; use surrealdb::err::Error; use surrealdb::sql::Value; use surrealdb_core::fflags::FFLAGS; -use surrealdb_core::kvs::LockType::Optimistic; -use surrealdb_core::kvs::TransactionType::Write; + +// RUST_LOG=trace cargo test -p surrealdb --features kv-mem --test live -- --nocapture #[tokio::test] async fn live_query_sends_registered_lq_details() -> Result<(), Error> { @@ -19,27 +19,35 @@ async fn live_query_sends_registered_lq_details() -> Result<(), Error> { LIVE SELECT * FROM lq_test_123; "; let dbs = new_ds().await?; - let ses = Session::owner().with_ns("test").with_db("test"); + let ses = Session::owner().with_ns("test").with_db("test").with_rt(true); let res = &mut dbs.execute(sql, &ses, None).await?; assert_eq!(res.len(), 2); - // + + // Define table didnt fail let tmp = res.remove(0).result; assert!(tmp.is_ok()); - // + + // Live query returned a valid uuid let actual = res.remove(0).result?; - let expected = Value::parse("{}"); - assert_eq!(actual, expected); - // - let tmp = res.remove(0).result?; - let val = Value::parse("[12345]"); - assert_eq!(tmp, val); - // - let tmp = res.remove(0).result; - assert!(tmp.is_ok()); - // - let tmp = res.remove(0).result?; - let val = Value::parse("[56789]"); - assert_eq!(tmp, val); - // + let live_id = match actual { + Value::Uuid(live_id) => live_id, + _ => panic!("Expected a UUID"), + }; + assert!(!live_id.is_nil()); + + // Create some data + let res = &mut dbs.execute("CREATE lq_test_123", &ses, None).await?; + assert_eq!(res.len(), 1); + + let result = res.remove(0); + assert!(result.result.is_ok()); + + dbs.process_lq_notifications(&Default::default()).await?; + + let notifications_chan = dbs.notifications().unwrap(); + + assert!(notifications_chan.try_recv().is_ok()); + assert!(notifications_chan.try_recv().is_err()); + Ok(()) } diff --git a/src/cli/start.rs b/src/cli/start.rs index 943305ad..dd48dc55 100644 --- a/src/cli/start.rs +++ b/src/cli/start.rs @@ -182,7 +182,9 @@ pub async fn init( // Start the kvs server dbs::init(dbs).await?; // Start the node agent + // This is equivalent to run_maintenance in native/wasm drivers let nd = node::init(ct.clone()); + let lq = node::live_query_change_feed(ct.clone()); // Start the web server net::init(ct).await?; // Wait for the node agent to stop @@ -190,6 +192,10 @@ pub async fn init( error!("Node agent failed while running: {}", e); return Err(Error::NodeAgent); } + if let Err(e) = lq.await { + error!("Live query change feed failed while running: {}", e); + return Err(Error::NodeAgent); + } // All ok Ok(()) } diff --git a/src/node/mod.rs b/src/node/mod.rs index 7811a567..a251f5b5 100644 --- a/src/node/mod.rs +++ b/src/node/mod.rs @@ -1,6 +1,11 @@ +use std::time::Duration; + +use surrealdb::dbs::Options; use tokio::task::JoinHandle; use tokio_util::sync::CancellationToken; +use surrealdb::fflags::FFLAGS; + use crate::cli::CF; const LOG: &str = "surrealdb::node"; @@ -36,3 +41,32 @@ pub fn init(ct: CancellationToken) -> JoinHandle<()> { info!(target: LOG, "Stopped node agent"); }) } + +// Start live query on change feeds notification processing +pub fn live_query_change_feed(ct: CancellationToken) -> JoinHandle<()> { + tokio::spawn(async move { + if !FFLAGS.change_feed_live_queries.enabled() { + return; + } + // Spawn the live query change feed consumer, which is used for catching up on relevant change feeds + tokio::spawn(async move { + let kvs = crate::dbs::DB.get().unwrap(); + let tick_interval = Duration::from_secs(1); + + let opt = Options::default(); + loop { + if let Err(e) = kvs.process_lq_notifications(&opt).await { + error!("Error running node agent live query tick: {}", e); + } + tokio::select! { + _ = ct.cancelled() => { + info!(target: LOG, "Gracefully stopping live query node agent"); + break; + } + _ = tokio::time::sleep(tick_interval) => {} + } + } + info!("Stopped live query node agent") + }); + }) +}