diff --git a/core/src/dbs/node.rs b/core/src/dbs/node.rs index 5ddd91a6..4f1907d0 100644 --- a/core/src/dbs/node.rs +++ b/core/src/dbs/node.rs @@ -20,7 +20,7 @@ pub struct ClusterMembership { // events in a cluster. It should be derived from a timestamp oracle, such as the // one available in TiKV via the client `TimestampExt` implementation. #[derive( - Clone, Copy, Debug, Eq, PartialEq, Deserialize, Serialize, PartialOrd, Hash, Store, Default, + Clone, Copy, Debug, Eq, PartialEq, Deserialize, Serialize, Ord, PartialOrd, Hash, Store, Default, )] #[revisioned(revision = 1)] pub struct Timestamp { diff --git a/core/src/doc/lives.rs b/core/src/doc/lives.rs index 62af8683..2652a5d8 100644 --- a/core/src/doc/lives.rs +++ b/core/src/doc/lives.rs @@ -6,6 +6,7 @@ use crate::dbs::{Action, Transaction}; use crate::doc::CursorDoc; use crate::doc::Document; use crate::err::Error; +use crate::fflags::FFLAGS; use crate::sql::paths::META; use crate::sql::paths::SC; use crate::sql::paths::SD; @@ -27,6 +28,11 @@ impl<'a> Document<'a> { if !opt.force && !self.changed() { return Ok(()); } + // Under the new mechanism, live query notifications only come from polling the change feed + // This check can be moved up the call stack, as this entire method will become unnecessary + if FFLAGS.change_feed_live_queries.enabled() { + return Ok(()); + } // Check if we can send notifications if let Some(chn) = &opt.sender { // Loop through all index statements diff --git a/core/src/fflags.rs b/core/src/fflags.rs index 181c35c1..11efd536 100644 --- a/core/src/fflags.rs +++ b/core/src/fflags.rs @@ -1,5 +1,12 @@ +//! Feature flags for SurrealDB +//! This is a public scope module that is not for external use +//! It is public for API access +/// + +/// FeatureFlags set for the project +/// Use this while implementing features #[allow(dead_code)] -pub(crate) static FFLAGS: FFlags = FFlags { +pub static FFLAGS: FFlags = FFlags { change_feed_live_queries: FFlagEnabledStatus { enabled_release: false, enabled_debug: false, @@ -17,15 +24,15 @@ pub(crate) static FFLAGS: FFlags = FFlags { #[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)] #[non_exhaustive] #[allow(dead_code)] -pub(crate) struct FFlags { - pub(crate) change_feed_live_queries: FFlagEnabledStatus, +pub struct FFlags { + pub change_feed_live_queries: FFlagEnabledStatus, } /// This struct is not used in the implementation; /// All the fields are here as information for people investigating the feature flag. #[allow(dead_code)] #[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)] -pub(crate) struct FFlagEnabledStatus { +pub struct FFlagEnabledStatus { pub(crate) enabled_release: bool, pub(crate) enabled_debug: bool, pub(crate) enabled_test: bool, @@ -40,7 +47,7 @@ pub(crate) struct FFlagEnabledStatus { impl FFlagEnabledStatus { #[allow(dead_code)] - pub(crate) fn enabled(&self) -> bool { + pub fn enabled(&self) -> bool { let mut enabled = false; if let Ok(env_var) = std::env::var(self.env_override) { if env_var.trim() == "true" { diff --git a/core/src/kvs/ds.rs b/core/src/kvs/ds.rs index dcd24693..8ee20116 100644 --- a/core/src/kvs/ds.rs +++ b/core/src/kvs/ds.rs @@ -1,5 +1,21 @@ -use super::tx::Transaction; +use std::cmp::Ordering; +use std::collections::{BTreeMap, BTreeSet}; +use std::fmt; +use std::sync::Arc; +use std::time::Duration; +#[cfg(not(target_arch = "wasm32"))] +use std::time::{SystemTime, UNIX_EPOCH}; + +use channel::{Receiver, Sender}; +use futures::{lock::Mutex, Future}; +use tokio::sync::RwLock; +use tracing::instrument; +use tracing::trace; +#[cfg(target_arch = "wasm32")] +use wasmtimer::std::{SystemTime, UNIX_EPOCH}; + use crate::cf; +use crate::cf::ChangeSet; use crate::ctx::Context; #[cfg(feature = "jwks")] use crate::dbs::capabilities::NetTarget; @@ -8,6 +24,7 @@ use crate::dbs::{ Variables, }; use crate::err::Error; +use crate::fflags::FFLAGS; use crate::iam::{Action, Auth, Error as IamError, Resource, Role}; use crate::idx::trees::store::IndexStores; use crate::key::root::hb::Hb; @@ -15,22 +32,13 @@ use crate::kvs::clock::SizedClock; #[allow(unused_imports)] use crate::kvs::clock::SystemClock; use crate::kvs::{LockType, LockType::*, TransactionType, TransactionType::*}; +use crate::sql::statements::show::ShowSince; +use crate::sql::statements::LiveStatement; use crate::sql::{self, statements::DefineUserStatement, Base, Query, Uuid, Value}; use crate::syn; -use crate::vs::Oracle; -use channel::{Receiver, Sender}; -use futures::{lock::Mutex, Future}; -use std::cmp::Ordering; -use std::collections::{BTreeMap, BTreeSet}; -use std::fmt; -use std::sync::Arc; -use std::time::Duration; -#[cfg(not(target_arch = "wasm32"))] -use std::time::{SystemTime, UNIX_EPOCH}; -use tracing::instrument; -use tracing::trace; -#[cfg(target_arch = "wasm32")] -use wasmtimer::std::{SystemTime, UNIX_EPOCH}; +use crate::vs::{conv, Oracle, Versionstamp}; + +use super::tx::Transaction; // If there are an infinite number of heartbeats, then we want to go batch-by-batch spread over several checks const HEARTBEAT_BATCH_SIZE: u32 = 1000; @@ -38,6 +46,8 @@ const LQ_CHANNEL_SIZE: usize = 100; // The batch size used for non-paged operations (i.e. if there are more results, they are ignored) const NON_PAGED_BATCH_SIZE: u32 = 100_000; +// In the future we will have proper pagination +const TEMPORARY_LQ_CF_BATCH_SIZE_TILL_WE_HAVE_PAGINATION: u32 = 1000; /// Used for cluster logic to move LQ data to LQ cleanup code /// Not a stored struct; Used only in this module @@ -85,6 +95,30 @@ impl Ord for LqType { } } +#[derive(Ord, PartialOrd, Eq, PartialEq, Clone)] +struct LqSelector { + ns: String, + db: String, + tb: String, +} + +/// This is an internal-only helper struct for organising the keys of how live queries are accessed +/// Because we want immutable keys, we cannot put mutable things in such as ts and vs +#[derive(Ord, PartialOrd, Eq, PartialEq, Clone)] +struct LqIndexKey { + selector: LqSelector, + lq: Uuid, +} + +/// Internal only struct +/// This can be assumed to have a mutable reference +#[derive(Eq, PartialEq, Clone)] +struct LqIndexValue { + query: LiveStatement, + vs: Versionstamp, + ts: Timestamp, +} + /// The underlying datastore instance which stores the dataset. #[allow(dead_code)] pub struct Datastore { @@ -110,6 +144,10 @@ pub struct Datastore { versionstamp_oracle: Arc>, // Whether this datastore enables live query notifications to subscribers notification_channel: Option<(Sender, Receiver)>, + // Map of Live Query ID to Live Query query + local_live_queries: Arc>>, + // Set of tracked change feeds + local_live_query_cfs: Arc>>, // Clock for tracking time. It is read only and accessible to all transactions. It is behind a mutex as tests may write to it. clock: Arc, // The index store cache @@ -362,6 +400,8 @@ impl Datastore { versionstamp_oracle: Arc::new(Mutex::new(Oracle::systime_counter())), clock, index_stores: IndexStores::default(), + local_live_queries: Arc::new(RwLock::new(BTreeMap::new())), + local_live_query_cfs: Arc::new(RwLock::new(BTreeMap::new())), }) } @@ -820,7 +860,7 @@ impl Datastore { // It is handy for testing, because it allows you to specify the timestamp, // without depending on a system clock. pub async fn tick_at(&self, ts: u64) -> Result<(), Error> { - self.save_timestamp_for_versionstamp(ts).await?; + let _vs = self.save_timestamp_for_versionstamp(ts).await?; self.garbage_collect_stale_change_feeds(ts).await?; // TODO Add LQ GC // TODO Add Node GC? @@ -828,17 +868,101 @@ impl Datastore { } // save_timestamp_for_versionstamp saves the current timestamp for the each database's current versionstamp. - pub(crate) async fn save_timestamp_for_versionstamp(&self, ts: u64) -> Result<(), Error> { + pub(crate) async fn save_timestamp_for_versionstamp( + &self, + ts: u64, + ) -> Result, Error> { let mut tx = self.transaction(Write, Optimistic).await?; - if let Err(e) = self.save_timestamp_for_versionstamp_impl(ts, &mut tx).await { - return match tx.cancel().await { - Ok(_) => { - Err(e) + match self.save_timestamp_for_versionstamp_impl(ts, &mut tx).await { + Ok(vs) => Ok(vs), + Err(e) => { + match tx.cancel().await { + Ok(_) => { + Err(e) + } + Err(txe) => { + Err(Error::Tx(format!("Error saving timestamp for versionstamp: {:?} and error cancelling transaction: {:?}", e, txe))) + } } - Err(txe) => { - Err(Error::Tx(format!("Error saving timestamp for versionstamp: {:?} and error cancelling transaction: {:?}", e, txe))) + } + } + } + + /// This is a future that is from whatever is running the datastore as a SurrealDB instance (api WASM and native) + /// It's responsibility is to catch up all live queries based on changes to the relevant change feeds, + /// and send notifications after assessing authorisation. Live queries then have their watermarks updated. + pub async fn process_lq_notifications(&self) -> Result<(), Error> { + // Runtime feature gate, as it is not production-ready + if !FFLAGS.change_feed_live_queries.enabled() { + return Ok(()); + } + // Return if there are no live queries + if self.notification_channel.is_none() { + return Ok(()); + } + if self.local_live_queries.read().await.is_empty() { + return Ok(()); + } + + // Find live queries that need to catch up + let mut change_map: BTreeMap> = BTreeMap::new(); + let mut tx = self.transaction(Read, Optimistic).await?; + for (selector, vs) in self.local_live_query_cfs.read().await.iter() { + // Read the change feed for the selector + let res = cf::read( + &mut tx, + &selector.ns, + &selector.db, + // Technically, we can not fetch by table and do the per-table filtering this side. + // That is an improvement though + Some(&selector.tb), + ShowSince::versionstamp(vs), + Some(TEMPORARY_LQ_CF_BATCH_SIZE_TILL_WE_HAVE_PAGINATION), + ) + .await?; + // Confirm we do need to change watermark - this is technically already handled by the cf range scan + if let Some(change_set) = res.last() { + if conv::versionstamp_to_u64(&change_set.0) > conv::versionstamp_to_u64(vs) { + change_map.insert(selector.clone(), res); } + } + } + tx.cancel().await?; + + for (selector, change_sets) in change_map { + // find matching live queries + let lq_pairs: Vec<(LqIndexKey, LqIndexValue)> = { + let lq_lock = self.local_live_queries.read().await; + lq_lock + .iter() + .filter(|(k, _)| k.selector == selector) + .map(|a| { + let (b, c) = (a.0.clone(), a.1.clone()); + (b, c) + }) + .to_owned() + .collect() }; + + for change_set in change_sets { + for (lq_key, lq_value) in lq_pairs.iter() { + let change_vs = change_set.0; + let database_mutation = &change_set.1; + for table_mutation in database_mutation.0.iter() { + if table_mutation.0 == lq_key.selector.tb { + // TODO(phughk): process live query logic + // TODO(SUR-291): enforce security + self.local_live_queries.write().await.insert( + (*lq_key).clone(), + LqIndexValue { + vs: change_vs, + ..(*lq_value).clone() + }, + ); + } + } + } + } } Ok(()) } @@ -847,7 +971,8 @@ impl Datastore { &self, ts: u64, tx: &mut Transaction, - ) -> Result<(), Error> { + ) -> Result, Error> { + let mut vs: Option = None; let nses = tx.all_ns().await?; let nses = nses.as_ref(); for ns in nses { @@ -856,11 +981,11 @@ impl Datastore { let dbs = dbs.as_ref(); for db in dbs { let db = db.name.as_str(); - tx.set_timestamp_for_versionstamp(ts, ns, db, true).await?; + vs = Some(tx.set_timestamp_for_versionstamp(ts, ns, db, true).await?); } } tx.commit().await?; - Ok(()) + Ok(vs) } // garbage_collect_stale_change_feeds deletes all change feed entries that are older than the watermarks. diff --git a/core/src/kvs/tx.rs b/core/src/kvs/tx.rs index 80c2237d..86206097 100644 --- a/core/src/kvs/tx.rs +++ b/core/src/kvs/tx.rs @@ -2674,7 +2674,7 @@ impl Transaction { ns: &str, db: &str, lock: bool, - ) -> Result<(), Error> { + ) -> Result { // This also works as an advisory lock on the ts keys so that there is // on other concurrent transactions that can write to the ts_key or the keys after it. let vs = self.get_timestamp(crate::key::database::vs::new(ns, db), lock).await?; @@ -2696,7 +2696,7 @@ impl Transaction { } } self.set(ts_key, vs).await?; - Ok(()) + Ok(vs) } pub(crate) async fn get_versionstamp_from_timestamp( diff --git a/core/src/lib.rs b/core/src/lib.rs index 551f4be7..fd8ad75d 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -21,7 +21,8 @@ pub mod dbs; pub mod env; #[doc(hidden)] pub mod err; -pub(crate) mod fflags; +#[doc(hidden)] +pub mod fflags; #[doc(hidden)] pub mod iam; #[doc(hidden)] diff --git a/core/src/sql/v1/statements/live.rs b/core/src/sql/v1/statements/live.rs index 2850f57b..d4f01f28 100644 --- a/core/src/sql/v1/statements/live.rs +++ b/core/src/sql/v1/statements/live.rs @@ -2,6 +2,7 @@ use crate::ctx::Context; use crate::dbs::{Options, Transaction}; use crate::doc::CursorDoc; use crate::err::Error; +use crate::fflags::FFLAGS; use crate::iam::Auth; use crate::sql::{Cond, Fetchs, Fields, Uuid, Value}; use derive::Store; @@ -100,6 +101,10 @@ impl LiveStatement { // Process the live query table match stm.what.compute(ctx, opt, txn, doc).await? { Value::Table(tb) => { + if FFLAGS.change_feed_live_queries.enabled() { + // We no longer need to write, as LQs are only computed locally from CF + return Ok(id.into()); + } // Store the current Node ID stm.node = nid.into(); // Insert the node live query diff --git a/core/src/sql/v1/statements/show.rs b/core/src/sql/v1/statements/show.rs index 1b176bc0..f2309e76 100644 --- a/core/src/sql/v1/statements/show.rs +++ b/core/src/sql/v1/statements/show.rs @@ -4,6 +4,7 @@ use crate::doc::CursorDoc; use crate::err::Error; use crate::iam::{Action, ResourceKind}; use crate::sql::{Base, Datetime, Table, Value}; +use crate::vs::{conv, Versionstamp}; use derive::Store; use revision::revisioned; use serde::{Deserialize, Serialize}; @@ -17,6 +18,19 @@ pub enum ShowSince { Versionstamp(u64), } +impl ShowSince { + pub fn versionstamp(vs: &Versionstamp) -> ShowSince { + ShowSince::Versionstamp(conv::versionstamp_to_u64(vs)) + } + + pub fn as_versionstamp(&self) -> Option { + match self { + ShowSince::Timestamp(_) => None, + ShowSince::Versionstamp(v) => Some(conv::u64_to_versionstamp(*v)), + } + } +} + // ShowStatement is used to show changes in a table or database via // the SHOW CHANGES statement. #[derive(Clone, Debug, Eq, PartialEq, PartialOrd, Serialize, Deserialize, Store, Hash)] @@ -82,3 +96,33 @@ impl fmt::Display for ShowStatement { Ok(()) } } + +#[cfg(test)] +mod test { + use crate::sql::Datetime; + + #[test] + fn timestamps_are_not_versionstamps() { + // given + let sql_dt = Datetime::try_from("2020-01-01T00:00:00Z").unwrap(); + + // when + let since = super::ShowSince::Timestamp(sql_dt); + + // then + assert_eq!(since.as_versionstamp(), None); + } + + #[test] + fn versionstamp_can_be_converted() { + // given + let versionstamp = crate::vs::conv::u64_to_versionstamp(1234567890); + let since = super::ShowSince::Versionstamp(1234567890); + + // when + let converted = since.as_versionstamp().unwrap(); + + // then + assert_eq!(converted, versionstamp); + } +} diff --git a/core/src/sql/v2/statements/show.rs b/core/src/sql/v2/statements/show.rs index 1b176bc0..fff36f1d 100644 --- a/core/src/sql/v2/statements/show.rs +++ b/core/src/sql/v2/statements/show.rs @@ -4,6 +4,7 @@ use crate::doc::CursorDoc; use crate::err::Error; use crate::iam::{Action, ResourceKind}; use crate::sql::{Base, Datetime, Table, Value}; +use crate::vs::{conv, Versionstamp}; use derive::Store; use revision::revisioned; use serde::{Deserialize, Serialize}; @@ -17,6 +18,19 @@ pub enum ShowSince { Versionstamp(u64), } +impl ShowSince { + pub fn versionstamp(vs: &Versionstamp) -> ShowSince { + ShowSince::Versionstamp(conv::versionstamp_to_u64(vs)) + } + + pub fn as_versionstamp(&self) -> Option { + match self { + ShowSince::Timestamp(_) => None, + ShowSince::Versionstamp(v) => Some(conv::u64_to_versionstamp(*v)), + } + } +} + // ShowStatement is used to show changes in a table or database via // the SHOW CHANGES statement. #[derive(Clone, Debug, Eq, PartialEq, PartialOrd, Serialize, Deserialize, Store, Hash)] diff --git a/core/src/vs/conv.rs b/core/src/vs/conv.rs index 923fb484..fab9b112 100644 --- a/core/src/vs/conv.rs +++ b/core/src/vs/conv.rs @@ -1,3 +1,4 @@ +use crate::vs::Versionstamp; use std::fmt; use thiserror::Error; @@ -74,6 +75,9 @@ pub fn try_u128_to_versionstamp(v: u128) -> Result<[u8; 10], Error> { Ok(buf) } +pub fn versionstamp_to_u64(vs: &Versionstamp) -> u64 { + u64::from_be_bytes(vs[..8].try_into().unwrap()) +} // to_u128_be converts a 10-byte versionstamp to a u128 assuming big-endian. // This is handy for human comparing versionstamps. #[allow(unused)] diff --git a/core/src/vs/mod.rs b/core/src/vs/mod.rs index 1c020234..ddfc94c2 100644 --- a/core/src/vs/mod.rs +++ b/core/src/vs/mod.rs @@ -3,6 +3,9 @@ //! by applications. //! This module might be migrated into the kvs or kvs::tx module in the future. +/// Versionstamp is a 10-byte array used to identify a specific version of a key. +/// The first 8 bytes are significant (the u64), and the remaining 2 bytes are not significant, but used for extra precision. +/// To convert to and from this module, see the conv module in this same directory. pub type Versionstamp = [u8; 10]; pub(crate) mod conv; diff --git a/lib/src/api/engine/local/native.rs b/lib/src/api/engine/local/native.rs index afa7f701..5905213c 100644 --- a/lib/src/api/engine/local/native.rs +++ b/lib/src/api/engine/local/native.rs @@ -13,6 +13,7 @@ use crate::api::Result; use crate::api::Surreal; use crate::dbs::Session; use crate::engine::IntervalStream; +use crate::fflags::FFLAGS; use crate::iam::Level; use crate::kvs::Datastore; use crate::opt::auth::Root; @@ -207,6 +208,11 @@ pub(crate) fn router( } fn run_maintenance(kvs: Arc, tick_interval: Duration, stop_signal: Receiver<()>) { + // Some classic ownership shenanigans + let kvs_two = kvs.clone(); + let stop_signal_two = stop_signal.clone(); + + // Spawn the ticker, which is used for tracking versionstamps and heartbeats across databases tokio::spawn(async move { let mut interval = time::interval(tick_interval); // Don't bombard the database if we miss some ticks @@ -221,10 +227,36 @@ fn run_maintenance(kvs: Arc, tick_interval: Duration, stop_signal: Re let mut stream = streams.merge(); while let Some(Some(_)) = stream.next().await { - match kvs.tick().await { + match kvs.clone().tick().await { Ok(()) => trace!("Node agent tick ran successfully"), Err(error) => error!("Error running node agent tick: {error}"), } } }); + + if FFLAGS.change_feed_live_queries.enabled() { + // Spawn the live query change feed consumer, which is used for catching up on relevant change feeds + tokio::spawn(async move { + let kvs = kvs_two; + let stop_signal = stop_signal_two; + let mut interval = time::interval(tick_interval); + // Don't bombard the database if we miss some ticks + interval.set_missed_tick_behavior(MissedTickBehavior::Delay); + // Delay sending the first tick + interval.tick().await; + + let ticker = IntervalStream::new(interval); + + let streams = (ticker.map(Some), stop_signal.into_stream().map(|_| None)); + + let mut stream = streams.merge(); + + while let Some(Some(_)) = stream.next().await { + match kvs.process_lq_notifications().await { + Ok(()) => trace!("Live Query poll ran successfully"), + Err(error) => error!("Error running live query poll: {error}"), + } + } + }); + } } diff --git a/lib/src/api/engine/local/wasm.rs b/lib/src/api/engine/local/wasm.rs index 4dcd49b9..3d14833f 100644 --- a/lib/src/api/engine/local/wasm.rs +++ b/lib/src/api/engine/local/wasm.rs @@ -13,6 +13,7 @@ use crate::api::Result; use crate::api::Surreal; use crate::dbs::Session; use crate::engine::IntervalStream; +use crate::fflags::FFLAGS; use crate::iam::Level; use crate::kvs::Datastore; use crate::opt::auth::Root; @@ -202,6 +203,11 @@ pub(crate) fn router( } fn run_maintenance(kvs: Arc, tick_interval: Duration, stop_signal: Receiver<()>) { + // Some classic ownership shenanigans + let kvs_two = kvs.clone(); + let stop_signal_two = stop_signal.clone(); + + // Spawn the ticker, which is used for tracking versionstamps and heartbeats across databases spawn_local(async move { let mut interval = time::interval(tick_interval); // Don't bombard the database if we miss some ticks @@ -222,4 +228,30 @@ fn run_maintenance(kvs: Arc, tick_interval: Duration, stop_signal: Re } } }); + + if FFLAGS.change_feed_live_queries.enabled() { + // Spawn the live query change feed consumer, which is used for catching up on relevant change feeds + spawn_local(async move { + let kvs = kvs_two; + let stop_signal = stop_signal_two; + let mut interval = time::interval(tick_interval); + // Don't bombard the database if we miss some ticks + interval.set_missed_tick_behavior(MissedTickBehavior::Delay); + // Delay sending the first tick + interval.tick().await; + + let ticker = IntervalStream::new(interval); + + let streams = (ticker.map(Some), stop_signal.into_stream().map(|_| None)); + + let mut stream = streams.merge(); + + while let Some(Some(_)) = stream.next().await { + match kvs.process_lq_notifications().await { + Ok(()) => trace!("Live Query poll ran successfully"), + Err(error) => error!("Error running live query poll: {error}"), + } + } + }) + } }