Introduce guidepost code for live queries on change feeds (#3392)

Co-authored-by: Rushmore Mushambi <rushmore@webenchanter.com>
Co-authored-by: Mees Delzenne <DelSkayn@users.noreply.github.com>
This commit is contained in:
Przemyslaw Hugh Kaznowski 2024-02-06 14:46:44 +00:00 committed by GitHub
parent 9cd6a5034a
commit 8bce4d7789
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
13 changed files with 309 additions and 36 deletions

View file

@ -20,7 +20,7 @@ pub struct ClusterMembership {
// events in a cluster. It should be derived from a timestamp oracle, such as the
// one available in TiKV via the client `TimestampExt` implementation.
#[derive(
Clone, Copy, Debug, Eq, PartialEq, Deserialize, Serialize, PartialOrd, Hash, Store, Default,
Clone, Copy, Debug, Eq, PartialEq, Deserialize, Serialize, Ord, PartialOrd, Hash, Store, Default,
)]
#[revisioned(revision = 1)]
pub struct Timestamp {

View file

@ -6,6 +6,7 @@ use crate::dbs::{Action, Transaction};
use crate::doc::CursorDoc;
use crate::doc::Document;
use crate::err::Error;
use crate::fflags::FFLAGS;
use crate::sql::paths::META;
use crate::sql::paths::SC;
use crate::sql::paths::SD;
@ -27,6 +28,11 @@ impl<'a> Document<'a> {
if !opt.force && !self.changed() {
return Ok(());
}
// Under the new mechanism, live query notifications only come from polling the change feed
// This check can be moved up the call stack, as this entire method will become unnecessary
if FFLAGS.change_feed_live_queries.enabled() {
return Ok(());
}
// Check if we can send notifications
if let Some(chn) = &opt.sender {
// Loop through all index statements

View file

@ -1,5 +1,12 @@
//! Feature flags for SurrealDB
//! This is a public scope module that is not for external use
//! It is public for API access
///
/// FeatureFlags set for the project
/// Use this while implementing features
#[allow(dead_code)]
pub(crate) static FFLAGS: FFlags = FFlags {
pub static FFLAGS: FFlags = FFlags {
change_feed_live_queries: FFlagEnabledStatus {
enabled_release: false,
enabled_debug: false,
@ -17,15 +24,15 @@ pub(crate) static FFLAGS: FFlags = FFlags {
#[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
#[non_exhaustive]
#[allow(dead_code)]
pub(crate) struct FFlags {
pub(crate) change_feed_live_queries: FFlagEnabledStatus,
pub struct FFlags {
pub change_feed_live_queries: FFlagEnabledStatus,
}
/// This struct is not used in the implementation;
/// All the fields are here as information for people investigating the feature flag.
#[allow(dead_code)]
#[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
pub(crate) struct FFlagEnabledStatus {
pub struct FFlagEnabledStatus {
pub(crate) enabled_release: bool,
pub(crate) enabled_debug: bool,
pub(crate) enabled_test: bool,
@ -40,7 +47,7 @@ pub(crate) struct FFlagEnabledStatus {
impl FFlagEnabledStatus {
#[allow(dead_code)]
pub(crate) fn enabled(&self) -> bool {
pub fn enabled(&self) -> bool {
let mut enabled = false;
if let Ok(env_var) = std::env::var(self.env_override) {
if env_var.trim() == "true" {

View file

@ -1,5 +1,21 @@
use super::tx::Transaction;
use std::cmp::Ordering;
use std::collections::{BTreeMap, BTreeSet};
use std::fmt;
use std::sync::Arc;
use std::time::Duration;
#[cfg(not(target_arch = "wasm32"))]
use std::time::{SystemTime, UNIX_EPOCH};
use channel::{Receiver, Sender};
use futures::{lock::Mutex, Future};
use tokio::sync::RwLock;
use tracing::instrument;
use tracing::trace;
#[cfg(target_arch = "wasm32")]
use wasmtimer::std::{SystemTime, UNIX_EPOCH};
use crate::cf;
use crate::cf::ChangeSet;
use crate::ctx::Context;
#[cfg(feature = "jwks")]
use crate::dbs::capabilities::NetTarget;
@ -8,6 +24,7 @@ use crate::dbs::{
Variables,
};
use crate::err::Error;
use crate::fflags::FFLAGS;
use crate::iam::{Action, Auth, Error as IamError, Resource, Role};
use crate::idx::trees::store::IndexStores;
use crate::key::root::hb::Hb;
@ -15,22 +32,13 @@ use crate::kvs::clock::SizedClock;
#[allow(unused_imports)]
use crate::kvs::clock::SystemClock;
use crate::kvs::{LockType, LockType::*, TransactionType, TransactionType::*};
use crate::sql::statements::show::ShowSince;
use crate::sql::statements::LiveStatement;
use crate::sql::{self, statements::DefineUserStatement, Base, Query, Uuid, Value};
use crate::syn;
use crate::vs::Oracle;
use channel::{Receiver, Sender};
use futures::{lock::Mutex, Future};
use std::cmp::Ordering;
use std::collections::{BTreeMap, BTreeSet};
use std::fmt;
use std::sync::Arc;
use std::time::Duration;
#[cfg(not(target_arch = "wasm32"))]
use std::time::{SystemTime, UNIX_EPOCH};
use tracing::instrument;
use tracing::trace;
#[cfg(target_arch = "wasm32")]
use wasmtimer::std::{SystemTime, UNIX_EPOCH};
use crate::vs::{conv, Oracle, Versionstamp};
use super::tx::Transaction;
// If there are an infinite number of heartbeats, then we want to go batch-by-batch spread over several checks
const HEARTBEAT_BATCH_SIZE: u32 = 1000;
@ -38,6 +46,8 @@ const LQ_CHANNEL_SIZE: usize = 100;
// The batch size used for non-paged operations (i.e. if there are more results, they are ignored)
const NON_PAGED_BATCH_SIZE: u32 = 100_000;
// In the future we will have proper pagination
const TEMPORARY_LQ_CF_BATCH_SIZE_TILL_WE_HAVE_PAGINATION: u32 = 1000;
/// Used for cluster logic to move LQ data to LQ cleanup code
/// Not a stored struct; Used only in this module
@ -85,6 +95,30 @@ impl Ord for LqType {
}
}
#[derive(Ord, PartialOrd, Eq, PartialEq, Clone)]
struct LqSelector {
ns: String,
db: String,
tb: String,
}
/// This is an internal-only helper struct for organising the keys of how live queries are accessed
/// Because we want immutable keys, we cannot put mutable things in such as ts and vs
#[derive(Ord, PartialOrd, Eq, PartialEq, Clone)]
struct LqIndexKey {
selector: LqSelector,
lq: Uuid,
}
/// Internal only struct
/// This can be assumed to have a mutable reference
#[derive(Eq, PartialEq, Clone)]
struct LqIndexValue {
query: LiveStatement,
vs: Versionstamp,
ts: Timestamp,
}
/// The underlying datastore instance which stores the dataset.
#[allow(dead_code)]
pub struct Datastore {
@ -110,6 +144,10 @@ pub struct Datastore {
versionstamp_oracle: Arc<Mutex<Oracle>>,
// Whether this datastore enables live query notifications to subscribers
notification_channel: Option<(Sender<Notification>, Receiver<Notification>)>,
// Map of Live Query ID to Live Query query
local_live_queries: Arc<RwLock<BTreeMap<LqIndexKey, LqIndexValue>>>,
// Set of tracked change feeds
local_live_query_cfs: Arc<RwLock<BTreeMap<LqSelector, Versionstamp>>>,
// Clock for tracking time. It is read only and accessible to all transactions. It is behind a mutex as tests may write to it.
clock: Arc<SizedClock>,
// The index store cache
@ -362,6 +400,8 @@ impl Datastore {
versionstamp_oracle: Arc::new(Mutex::new(Oracle::systime_counter())),
clock,
index_stores: IndexStores::default(),
local_live_queries: Arc::new(RwLock::new(BTreeMap::new())),
local_live_query_cfs: Arc::new(RwLock::new(BTreeMap::new())),
})
}
@ -820,7 +860,7 @@ impl Datastore {
// It is handy for testing, because it allows you to specify the timestamp,
// without depending on a system clock.
pub async fn tick_at(&self, ts: u64) -> Result<(), Error> {
self.save_timestamp_for_versionstamp(ts).await?;
let _vs = self.save_timestamp_for_versionstamp(ts).await?;
self.garbage_collect_stale_change_feeds(ts).await?;
// TODO Add LQ GC
// TODO Add Node GC?
@ -828,17 +868,101 @@ impl Datastore {
}
// save_timestamp_for_versionstamp saves the current timestamp for the each database's current versionstamp.
pub(crate) async fn save_timestamp_for_versionstamp(&self, ts: u64) -> Result<(), Error> {
pub(crate) async fn save_timestamp_for_versionstamp(
&self,
ts: u64,
) -> Result<Option<Versionstamp>, Error> {
let mut tx = self.transaction(Write, Optimistic).await?;
if let Err(e) = self.save_timestamp_for_versionstamp_impl(ts, &mut tx).await {
return match tx.cancel().await {
Ok(_) => {
Err(e)
match self.save_timestamp_for_versionstamp_impl(ts, &mut tx).await {
Ok(vs) => Ok(vs),
Err(e) => {
match tx.cancel().await {
Ok(_) => {
Err(e)
}
Err(txe) => {
Err(Error::Tx(format!("Error saving timestamp for versionstamp: {:?} and error cancelling transaction: {:?}", e, txe)))
}
}
Err(txe) => {
Err(Error::Tx(format!("Error saving timestamp for versionstamp: {:?} and error cancelling transaction: {:?}", e, txe)))
}
}
}
/// This is a future that is from whatever is running the datastore as a SurrealDB instance (api WASM and native)
/// It's responsibility is to catch up all live queries based on changes to the relevant change feeds,
/// and send notifications after assessing authorisation. Live queries then have their watermarks updated.
pub async fn process_lq_notifications(&self) -> Result<(), Error> {
// Runtime feature gate, as it is not production-ready
if !FFLAGS.change_feed_live_queries.enabled() {
return Ok(());
}
// Return if there are no live queries
if self.notification_channel.is_none() {
return Ok(());
}
if self.local_live_queries.read().await.is_empty() {
return Ok(());
}
// Find live queries that need to catch up
let mut change_map: BTreeMap<LqSelector, Vec<ChangeSet>> = BTreeMap::new();
let mut tx = self.transaction(Read, Optimistic).await?;
for (selector, vs) in self.local_live_query_cfs.read().await.iter() {
// Read the change feed for the selector
let res = cf::read(
&mut tx,
&selector.ns,
&selector.db,
// Technically, we can not fetch by table and do the per-table filtering this side.
// That is an improvement though
Some(&selector.tb),
ShowSince::versionstamp(vs),
Some(TEMPORARY_LQ_CF_BATCH_SIZE_TILL_WE_HAVE_PAGINATION),
)
.await?;
// Confirm we do need to change watermark - this is technically already handled by the cf range scan
if let Some(change_set) = res.last() {
if conv::versionstamp_to_u64(&change_set.0) > conv::versionstamp_to_u64(vs) {
change_map.insert(selector.clone(), res);
}
}
}
tx.cancel().await?;
for (selector, change_sets) in change_map {
// find matching live queries
let lq_pairs: Vec<(LqIndexKey, LqIndexValue)> = {
let lq_lock = self.local_live_queries.read().await;
lq_lock
.iter()
.filter(|(k, _)| k.selector == selector)
.map(|a| {
let (b, c) = (a.0.clone(), a.1.clone());
(b, c)
})
.to_owned()
.collect()
};
for change_set in change_sets {
for (lq_key, lq_value) in lq_pairs.iter() {
let change_vs = change_set.0;
let database_mutation = &change_set.1;
for table_mutation in database_mutation.0.iter() {
if table_mutation.0 == lq_key.selector.tb {
// TODO(phughk): process live query logic
// TODO(SUR-291): enforce security
self.local_live_queries.write().await.insert(
(*lq_key).clone(),
LqIndexValue {
vs: change_vs,
..(*lq_value).clone()
},
);
}
}
}
}
}
Ok(())
}
@ -847,7 +971,8 @@ impl Datastore {
&self,
ts: u64,
tx: &mut Transaction,
) -> Result<(), Error> {
) -> Result<Option<Versionstamp>, Error> {
let mut vs: Option<Versionstamp> = None;
let nses = tx.all_ns().await?;
let nses = nses.as_ref();
for ns in nses {
@ -856,11 +981,11 @@ impl Datastore {
let dbs = dbs.as_ref();
for db in dbs {
let db = db.name.as_str();
tx.set_timestamp_for_versionstamp(ts, ns, db, true).await?;
vs = Some(tx.set_timestamp_for_versionstamp(ts, ns, db, true).await?);
}
}
tx.commit().await?;
Ok(())
Ok(vs)
}
// garbage_collect_stale_change_feeds deletes all change feed entries that are older than the watermarks.

View file

@ -2674,7 +2674,7 @@ impl Transaction {
ns: &str,
db: &str,
lock: bool,
) -> Result<(), Error> {
) -> Result<Versionstamp, Error> {
// This also works as an advisory lock on the ts keys so that there is
// on other concurrent transactions that can write to the ts_key or the keys after it.
let vs = self.get_timestamp(crate::key::database::vs::new(ns, db), lock).await?;
@ -2696,7 +2696,7 @@ impl Transaction {
}
}
self.set(ts_key, vs).await?;
Ok(())
Ok(vs)
}
pub(crate) async fn get_versionstamp_from_timestamp(

View file

@ -21,7 +21,8 @@ pub mod dbs;
pub mod env;
#[doc(hidden)]
pub mod err;
pub(crate) mod fflags;
#[doc(hidden)]
pub mod fflags;
#[doc(hidden)]
pub mod iam;
#[doc(hidden)]

View file

@ -2,6 +2,7 @@ use crate::ctx::Context;
use crate::dbs::{Options, Transaction};
use crate::doc::CursorDoc;
use crate::err::Error;
use crate::fflags::FFLAGS;
use crate::iam::Auth;
use crate::sql::{Cond, Fetchs, Fields, Uuid, Value};
use derive::Store;
@ -100,6 +101,10 @@ impl LiveStatement {
// Process the live query table
match stm.what.compute(ctx, opt, txn, doc).await? {
Value::Table(tb) => {
if FFLAGS.change_feed_live_queries.enabled() {
// We no longer need to write, as LQs are only computed locally from CF
return Ok(id.into());
}
// Store the current Node ID
stm.node = nid.into();
// Insert the node live query

View file

@ -4,6 +4,7 @@ use crate::doc::CursorDoc;
use crate::err::Error;
use crate::iam::{Action, ResourceKind};
use crate::sql::{Base, Datetime, Table, Value};
use crate::vs::{conv, Versionstamp};
use derive::Store;
use revision::revisioned;
use serde::{Deserialize, Serialize};
@ -17,6 +18,19 @@ pub enum ShowSince {
Versionstamp(u64),
}
impl ShowSince {
pub fn versionstamp(vs: &Versionstamp) -> ShowSince {
ShowSince::Versionstamp(conv::versionstamp_to_u64(vs))
}
pub fn as_versionstamp(&self) -> Option<Versionstamp> {
match self {
ShowSince::Timestamp(_) => None,
ShowSince::Versionstamp(v) => Some(conv::u64_to_versionstamp(*v)),
}
}
}
// ShowStatement is used to show changes in a table or database via
// the SHOW CHANGES statement.
#[derive(Clone, Debug, Eq, PartialEq, PartialOrd, Serialize, Deserialize, Store, Hash)]
@ -82,3 +96,33 @@ impl fmt::Display for ShowStatement {
Ok(())
}
}
#[cfg(test)]
mod test {
use crate::sql::Datetime;
#[test]
fn timestamps_are_not_versionstamps() {
// given
let sql_dt = Datetime::try_from("2020-01-01T00:00:00Z").unwrap();
// when
let since = super::ShowSince::Timestamp(sql_dt);
// then
assert_eq!(since.as_versionstamp(), None);
}
#[test]
fn versionstamp_can_be_converted() {
// given
let versionstamp = crate::vs::conv::u64_to_versionstamp(1234567890);
let since = super::ShowSince::Versionstamp(1234567890);
// when
let converted = since.as_versionstamp().unwrap();
// then
assert_eq!(converted, versionstamp);
}
}

View file

@ -4,6 +4,7 @@ use crate::doc::CursorDoc;
use crate::err::Error;
use crate::iam::{Action, ResourceKind};
use crate::sql::{Base, Datetime, Table, Value};
use crate::vs::{conv, Versionstamp};
use derive::Store;
use revision::revisioned;
use serde::{Deserialize, Serialize};
@ -17,6 +18,19 @@ pub enum ShowSince {
Versionstamp(u64),
}
impl ShowSince {
pub fn versionstamp(vs: &Versionstamp) -> ShowSince {
ShowSince::Versionstamp(conv::versionstamp_to_u64(vs))
}
pub fn as_versionstamp(&self) -> Option<Versionstamp> {
match self {
ShowSince::Timestamp(_) => None,
ShowSince::Versionstamp(v) => Some(conv::u64_to_versionstamp(*v)),
}
}
}
// ShowStatement is used to show changes in a table or database via
// the SHOW CHANGES statement.
#[derive(Clone, Debug, Eq, PartialEq, PartialOrd, Serialize, Deserialize, Store, Hash)]

View file

@ -1,3 +1,4 @@
use crate::vs::Versionstamp;
use std::fmt;
use thiserror::Error;
@ -74,6 +75,9 @@ pub fn try_u128_to_versionstamp(v: u128) -> Result<[u8; 10], Error> {
Ok(buf)
}
pub fn versionstamp_to_u64(vs: &Versionstamp) -> u64 {
u64::from_be_bytes(vs[..8].try_into().unwrap())
}
// to_u128_be converts a 10-byte versionstamp to a u128 assuming big-endian.
// This is handy for human comparing versionstamps.
#[allow(unused)]

View file

@ -3,6 +3,9 @@
//! by applications.
//! This module might be migrated into the kvs or kvs::tx module in the future.
/// Versionstamp is a 10-byte array used to identify a specific version of a key.
/// The first 8 bytes are significant (the u64), and the remaining 2 bytes are not significant, but used for extra precision.
/// To convert to and from this module, see the conv module in this same directory.
pub type Versionstamp = [u8; 10];
pub(crate) mod conv;

View file

@ -13,6 +13,7 @@ use crate::api::Result;
use crate::api::Surreal;
use crate::dbs::Session;
use crate::engine::IntervalStream;
use crate::fflags::FFLAGS;
use crate::iam::Level;
use crate::kvs::Datastore;
use crate::opt::auth::Root;
@ -207,6 +208,11 @@ pub(crate) fn router(
}
fn run_maintenance(kvs: Arc<Datastore>, tick_interval: Duration, stop_signal: Receiver<()>) {
// Some classic ownership shenanigans
let kvs_two = kvs.clone();
let stop_signal_two = stop_signal.clone();
// Spawn the ticker, which is used for tracking versionstamps and heartbeats across databases
tokio::spawn(async move {
let mut interval = time::interval(tick_interval);
// Don't bombard the database if we miss some ticks
@ -221,10 +227,36 @@ fn run_maintenance(kvs: Arc<Datastore>, tick_interval: Duration, stop_signal: Re
let mut stream = streams.merge();
while let Some(Some(_)) = stream.next().await {
match kvs.tick().await {
match kvs.clone().tick().await {
Ok(()) => trace!("Node agent tick ran successfully"),
Err(error) => error!("Error running node agent tick: {error}"),
}
}
});
if FFLAGS.change_feed_live_queries.enabled() {
// Spawn the live query change feed consumer, which is used for catching up on relevant change feeds
tokio::spawn(async move {
let kvs = kvs_two;
let stop_signal = stop_signal_two;
let mut interval = time::interval(tick_interval);
// Don't bombard the database if we miss some ticks
interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
// Delay sending the first tick
interval.tick().await;
let ticker = IntervalStream::new(interval);
let streams = (ticker.map(Some), stop_signal.into_stream().map(|_| None));
let mut stream = streams.merge();
while let Some(Some(_)) = stream.next().await {
match kvs.process_lq_notifications().await {
Ok(()) => trace!("Live Query poll ran successfully"),
Err(error) => error!("Error running live query poll: {error}"),
}
}
});
}
}

View file

@ -13,6 +13,7 @@ use crate::api::Result;
use crate::api::Surreal;
use crate::dbs::Session;
use crate::engine::IntervalStream;
use crate::fflags::FFLAGS;
use crate::iam::Level;
use crate::kvs::Datastore;
use crate::opt::auth::Root;
@ -202,6 +203,11 @@ pub(crate) fn router(
}
fn run_maintenance(kvs: Arc<Datastore>, tick_interval: Duration, stop_signal: Receiver<()>) {
// Some classic ownership shenanigans
let kvs_two = kvs.clone();
let stop_signal_two = stop_signal.clone();
// Spawn the ticker, which is used for tracking versionstamps and heartbeats across databases
spawn_local(async move {
let mut interval = time::interval(tick_interval);
// Don't bombard the database if we miss some ticks
@ -222,4 +228,30 @@ fn run_maintenance(kvs: Arc<Datastore>, tick_interval: Duration, stop_signal: Re
}
}
});
if FFLAGS.change_feed_live_queries.enabled() {
// Spawn the live query change feed consumer, which is used for catching up on relevant change feeds
spawn_local(async move {
let kvs = kvs_two;
let stop_signal = stop_signal_two;
let mut interval = time::interval(tick_interval);
// Don't bombard the database if we miss some ticks
interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
// Delay sending the first tick
interval.tick().await;
let ticker = IntervalStream::new(interval);
let streams = (ticker.map(Some), stop_signal.into_stream().map(|_| None));
let mut stream = streams.merge();
while let Some(Some(_)) = stream.next().await {
match kvs.process_lq_notifications().await {
Ok(()) => trace!("Live Query poll ran successfully"),
Err(error) => error!("Error running live query poll: {error}"),
}
}
})
}
}