Add notifications for LQ v2 on CF (#3480)

Co-authored-by: Mees Delzenne <DelSkayn@users.noreply.github.com>
This commit is contained in:
Przemyslaw Hugh Kaznowski 2024-02-27 15:18:25 +00:00 committed by GitHub
parent 04c8b864cd
commit 888184f50f
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
15 changed files with 378 additions and 87 deletions

View file

@ -1,3 +1,16 @@
use std::sync::Arc;
use channel::Receiver;
use futures::lock::Mutex;
use futures::StreamExt;
#[cfg(not(target_arch = "wasm32"))]
use tokio::spawn;
use tracing::instrument;
use trice::Instant;
#[cfg(target_arch = "wasm32")]
use wasm_bindgen_futures::spawn_local as spawn;
use crate::ctx::Context;
use crate::dbs::response::Response;
use crate::dbs::Notification;
@ -7,6 +20,7 @@ use crate::dbs::Transaction;
use crate::err::Error;
use crate::iam::Action;
use crate::iam::ResourceKind;
use crate::kvs::lq_structs::TrackedResult;
use crate::kvs::TransactionType;
use crate::kvs::{Datastore, LockType::*, TransactionType::*};
use crate::sql::paths::DB;
@ -15,16 +29,6 @@ use crate::sql::query::Query;
use crate::sql::statement::Statement;
use crate::sql::value::Value;
use crate::sql::Base;
use channel::Receiver;
use futures::lock::Mutex;
use futures::StreamExt;
use std::sync::Arc;
#[cfg(not(target_arch = "wasm32"))]
use tokio::spawn;
use tracing::instrument;
use trice::Instant;
#[cfg(target_arch = "wasm32")]
use wasm_bindgen_futures::spawn_local as spawn;
pub(crate) struct Executor<'a> {
err: bool,
@ -83,7 +87,19 @@ impl<'a> Executor<'a> {
let _ = txn.cancel().await;
} else {
let r = match txn.complete_changes(false).await {
Ok(_) => txn.commit().await,
Ok(_) => {
match txn.commit().await {
Ok(()) => {
// Commit succeeded, do post commit operations that do not matter to the tx
let lqs: Vec<TrackedResult> =
txn.consume_pending_live_queries();
// Track the live queries in the data store
self.kvs.track_live_queries(&lqs).await?;
Ok(())
}
Err(e) => Err(e),
}
}
r => r,
};
if let Err(e) = r {
@ -151,6 +167,7 @@ impl<'a> Executor<'a> {
/// Flush notifications from a buffer channel (live queries) to the committed notification channel.
/// This is because we don't want to broadcast notifications to the user for failed transactions.
/// TODO we can delete this once we migrate to lq v2
async fn flush(&self, ctx: &Context<'_>, mut rcv: Receiver<Notification>) {
let sender = ctx.notifications();
spawn(async move {
@ -164,6 +181,17 @@ impl<'a> Executor<'a> {
});
}
/// A transaction collects created live queries which can then be consumed when a transaction is committed
/// We use this function to get these transactions and send them to the invoker without channels
async fn consume_committed_live_query_registrations(&self) -> Option<Vec<TrackedResult>> {
if let Some(txn) = self.txn.as_ref() {
let txn = txn.lock().await;
Some(txn.consume_pending_live_queries())
} else {
None
}
}
async fn set_ns(&self, ctx: &mut Context<'_>, opt: &mut Options, ns: &str) {
let mut session = ctx.value("session").unwrap_or(&Value::None).clone();
session.put(NS.as_ref(), ns.to_owned().into());
@ -184,7 +212,7 @@ impl<'a> Executor<'a> {
mut ctx: Context<'_>,
opt: Options,
qry: Query,
) -> Result<Vec<Response>, Error> {
) -> Result<(Vec<Response>, Vec<TrackedResult>), Error> {
// Create a notification channel
let (send, recv) = channel::unbounded();
// Set the notification channel
@ -193,6 +221,7 @@ impl<'a> Executor<'a> {
let mut buf: Vec<Response> = vec![];
// Initialise array of responses
let mut out: Vec<Response> = vec![];
let mut live_queries: Vec<TrackedResult> = vec![];
// Process all statements in query
for stm in qry.into_iter() {
// Log the statement
@ -249,6 +278,9 @@ impl<'a> Executor<'a> {
let commit_error = self.commit(true).await.err();
buf = buf.into_iter().map(|v| self.buf_commit(v, &commit_error)).collect();
self.flush(&ctx, recv.clone()).await;
if let Some(lqs) = self.consume_committed_live_query_registrations().await {
live_queries.extend(lqs);
}
out.append(&mut buf);
debug_assert!(self.txn.is_none(), "commit(true) should have unset txn");
self.txn = None;
@ -294,6 +326,12 @@ impl<'a> Executor<'a> {
Ok(_) => {
// Flush live query notifications
self.flush(&ctx, recv.clone()).await;
if let Some(lqs) = self
.consume_committed_live_query_registrations()
.await
{
live_queries.extend(lqs);
}
Ok(Value::None)
}
}
@ -366,7 +404,11 @@ impl<'a> Executor<'a> {
} else {
// Flush the live query change notifications
self.flush(&ctx, recv.clone()).await;
// Successful, committed result
if let Some(lqs) =
self.consume_committed_live_query_registrations().await
{
live_queries.extend(lqs);
}
res
}
} else {
@ -392,8 +434,18 @@ impl<'a> Executor<'a> {
e
}),
query_type: match (is_stm_live, is_stm_kill) {
(true, _) => QueryType::Live,
(_, true) => QueryType::Kill,
(true, _) => {
if let Some(lqs) = self.consume_committed_live_query_registrations().await {
live_queries.extend(lqs);
}
QueryType::Live
}
(_, true) => {
if let Some(lqs) = self.consume_committed_live_query_registrations().await {
live_queries.extend(lqs);
}
QueryType::Kill
}
_ => QueryType::Other,
},
};
@ -408,7 +460,7 @@ impl<'a> Executor<'a> {
}
}
// Return responses
Ok(out)
Ok((out, live_queries))
}
}

View file

@ -15,14 +15,15 @@ use tracing::trace;
use wasmtimer::std::{SystemTime, UNIX_EPOCH};
use crate::cf;
use crate::cf::ChangeSet;
use crate::cf::{ChangeSet, TableMutation};
use crate::ctx::Context;
#[cfg(feature = "jwks")]
use crate::dbs::capabilities::NetTarget;
use crate::dbs::{
node::Timestamp, Attach, Capabilities, Executor, Notification, Options, Response, Session,
Variables,
Statement, Variables, Workable,
};
use crate::doc::Document;
use crate::err::Error;
use crate::fflags::FFLAGS;
use crate::iam::{Action, Auth, Error as IamError, Resource, Role};
@ -32,7 +33,7 @@ use crate::kvs::clock::SizedClock;
#[allow(unused_imports)]
use crate::kvs::clock::SystemClock;
use crate::kvs::lq_structs::{
LqEntry, LqIndexKey, LqIndexValue, LqSelector, LqValue, UnreachableLqType,
LqEntry, LqIndexKey, LqIndexValue, LqSelector, LqValue, TrackedResult, UnreachableLqType,
};
use crate::kvs::{LockType, LockType::*, TransactionType, TransactionType::*};
use crate::sql::statements::show::ShowSince;
@ -76,10 +77,12 @@ pub struct Datastore {
versionstamp_oracle: Arc<Mutex<Oracle>>,
// Whether this datastore enables live query notifications to subscribers
notification_channel: Option<(Sender<Notification>, Receiver<Notification>)>,
// Map of Live Query ID to Live Query query
local_live_queries: Arc<RwLock<BTreeMap<LqIndexKey, LqIndexValue>>>,
// Set of tracked change feeds
local_live_query_cfs: Arc<RwLock<BTreeMap<LqSelector, Versionstamp>>>,
// Map of Live Query identifier (ns+db+tb) for change feed tracking
// the mapping is to a list of affected live queries
local_live_queries: Arc<RwLock<BTreeMap<LqIndexKey, Vec<LqIndexValue>>>>,
// Set of tracked change feeds with associated watermarks
// This is updated with new/removed live queries and improves cf request performance
cf_watermarks: Arc<RwLock<BTreeMap<LqSelector, Versionstamp>>>,
// Clock for tracking time. It is read only and accessible to all transactions. It is behind a mutex as tests may write to it.
clock: Arc<SizedClock>,
// The index store cache
@ -354,7 +357,7 @@ impl Datastore {
clock,
index_stores: IndexStores::default(),
local_live_queries: Arc::new(RwLock::new(BTreeMap::new())),
local_live_query_cfs: Arc::new(RwLock::new(BTreeMap::new())),
cf_watermarks: Arc::new(RwLock::new(BTreeMap::new())),
})
}
@ -841,26 +844,28 @@ impl Datastore {
}
}
/// This is a future that is from whatever is running the datastore as a SurrealDB instance (api WASM and native)
/// It's responsibility is to catch up all live queries based on changes to the relevant change feeds,
/// and send notifications after assessing authorisation. Live queries then have their watermarks updated.
pub async fn process_lq_notifications(&self) -> Result<(), Error> {
/// Poll change feeds for live query notifications
pub async fn process_lq_notifications(&self, opt: &Options) -> Result<(), Error> {
// Runtime feature gate, as it is not production-ready
if !FFLAGS.change_feed_live_queries.enabled() {
return Ok(());
}
// Return if there are no live queries
if self.notification_channel.is_none() {
trace!("Channels is none, short-circuiting");
return Ok(());
}
if self.local_live_queries.read().await.is_empty() {
trace!("No live queries, short-circuiting");
return Ok(());
}
// Find live queries that need to catch up
// Change map includes a mapping of selector to changesets, ordered by versionstamp
let mut change_map: BTreeMap<LqSelector, Vec<ChangeSet>> = BTreeMap::new();
let mut tx = self.transaction(Read, Optimistic).await?;
for (selector, vs) in self.local_live_query_cfs.read().await.iter() {
let mut tracked_cfs = self.cf_watermarks.write().await;
let mut tracked_cfs_updates = Vec::with_capacity(tracked_cfs.len());
for (selector, vs) in tracked_cfs.iter() {
// Read the change feed for the selector
let res = cf::read(
&mut tx,
@ -874,14 +879,33 @@ impl Datastore {
)
.await?;
// Confirm we do need to change watermark - this is technically already handled by the cf range scan
if res.is_empty() {
trace!(
"There were no changes in the change feed for {:?} from versionstamp {:?}",
selector,
vs
)
}
if let Some(change_set) = res.last() {
if conv::versionstamp_to_u64(&change_set.0) > conv::versionstamp_to_u64(vs) {
trace!("Adding a change set for lq notification processing");
// Update the cf watermark so we can progress scans
// If the notifications fail from here-on, they are lost
// this is a separate vec that we later insert to because we are iterating immutably
// We shouldn't use a read lock because of consistency between watermark scans
tracked_cfs_updates.push((selector.clone(), change_set.0));
// This does not guarantee a notification, as a changeset an include many tables and many changes
change_map.insert(selector.clone(), res);
}
}
}
tx.cancel().await?;
// Now we update since we are no longer iterating immutably
for (selector, vs) in tracked_cfs_updates {
tracked_cfs.insert(selector, vs);
}
for (selector, change_sets) in change_map {
// find matching live queries
let lq_pairs: Vec<(LqIndexKey, LqIndexValue)> = {
@ -889,29 +913,86 @@ impl Datastore {
lq_lock
.iter()
.filter(|(k, _)| k.selector == selector)
.map(|a| {
let (b, c) = (a.0.clone(), a.1.clone());
(b, c)
.flat_map(|(lq_index, lq_values)| {
lq_values.iter().cloned().map(|x| (lq_index.clone(), x))
})
.to_owned()
.collect()
};
// Find relevant changes
let tx = Arc::new(Mutex::new(self.transaction(Read, Optimistic).await?));
for change_set in change_sets {
// TODO(phughk): this loop can be on the inside so we are only checking lqs relavant to cf change
for (lq_key, lq_value) in lq_pairs.iter() {
trace!(
"Processing live query for notification key={:?} and value={:?}",
lq_key,
lq_value
);
let change_vs = change_set.0;
let database_mutation = &change_set.1;
for table_mutation in database_mutation.0.iter() {
if table_mutation.0 == lq_key.selector.tb {
// TODO(phughk): process live query logic
// TODO(SUR-291): enforce security
self.local_live_queries.write().await.insert(
(*lq_key).clone(),
LqIndexValue {
vs: change_vs,
..(*lq_value).clone()
},
);
for table_mutations in database_mutation.0.iter() {
if table_mutations.0 == lq_key.selector.tb {
// Create a doc of the table value
// Run the 'lives' logic on the doc, while providing live queries instead of reading from storage
// This will generate and send notifications
for mutation in table_mutations.1.iter() {
if let Some(doc) = Self::construct_document(mutation) {
// We know we are only processing a single LQ at a time, so we can limit notifications to 1
let notification_capacity = 1;
// We track notifications as a separate channel in case we want to process
// for the current state we only forward
let (sender, receiver) =
channel::bounded(notification_capacity);
doc.check_lqs_and_send_notifications(
opt,
&Statement::Live(&lq_value.stm),
&tx,
[&lq_value.stm].as_slice(),
&sender,
)
.await
.map_err(|e| {
Error::Internal(format!(
"Error checking lqs for notifications: {:?}",
e
))
})?;
// Send the notifications to driver or api
// TODO: evaluate if we want channel directly instead of proxy
while let Ok(notification) = receiver.try_recv() {
trace!("Sending notification to client");
self.notification_channel
.as_ref()
.unwrap()
.0
.send(notification)
.await
.unwrap();
}
trace!("Ended notification sending")
}
// Update watermarks
trace!(
"Updating watermark to {:?} for index key {:?}",
change_vs,
lq_key
);
// For each live query we have processed we update the watermarks
self.local_live_queries.write().await.insert(
(*lq_key).clone(),
vec![LqIndexValue {
vs: change_vs,
..lq_value.clone()
}],
);
// We also update the tracked_cfs with a minimum watermark
}
}
}
}
@ -920,6 +1001,58 @@ impl Datastore {
Ok(())
}
/// Construct a document from a Change Feed mutation
/// This is required to perform document operations such as live query notifications
fn construct_document(mutation: &TableMutation) -> Option<Document> {
match mutation {
TableMutation::Set(a, b) => {
let doc = Document::new(None, Some(a), None, b, Workable::Normal);
Some(doc)
}
TableMutation::Del(a) => {
let doc = Document::new(None, Some(a), None, &Value::None, Workable::Normal);
Some(doc)
}
TableMutation::Def(_) => None,
TableMutation::SetPrevious(id, _old, new) => {
let doc = Document::new(None, Some(id), None, new, Workable::Normal);
// TODO set previous value
Some(doc)
}
}
}
/// Add live queries to track on the datastore
/// These get polled by the change feed tick
pub(crate) async fn track_live_queries(&self, lqs: &Vec<TrackedResult>) -> Result<(), Error> {
// Lock the local live queries
let mut lq_map = self.local_live_queries.write().await;
let mut cf_watermarks = self.cf_watermarks.write().await;
for lq in lqs {
match lq {
TrackedResult::LiveQuery(lq) => {
let lq_index_key: LqIndexKey = lq.as_key();
let m = lq_map.get_mut(&lq_index_key);
match m {
Some(lq_index_value) => lq_index_value.push(lq.as_value()),
None => {
let lq_vec = vec![lq.as_value()];
lq_map.insert(lq_index_key.clone(), lq_vec);
}
}
let selector = lq_index_key.selector;
// TODO(phughk): - read watermark for catchup
// We insert the current watermark.
cf_watermarks.entry(selector).or_insert_with(Versionstamp::default);
}
TrackedResult::KillQuery(_lq) => {
unimplemented!("Cannot kill queries yet")
}
}
}
Ok(())
}
async fn save_timestamp_for_versionstamp_impl(
&self,
ts: u64,
@ -1167,7 +1300,15 @@ impl Datastore {
// Store the query variables
let ctx = vars.attach(ctx)?;
// Process all statements
exe.execute(ctx, opt, ast).await
let res = exe.execute(ctx, opt, ast).await;
match res {
Ok((responses, lives)) => {
// Register live queries
self.track_live_queries(&lives).await?;
Ok(responses)
}
Err(e) => Err(e),
}
}
/// Ensure a SQL [`Value`] is fully computed

View file

@ -83,26 +83,24 @@ pub(crate) struct LqIndexValue {
#[derive(Debug)]
#[cfg_attr(test, derive(PartialEq, Clone))]
pub(crate) struct LqEntry {
#[allow(dead_code)]
pub(crate) live_id: Uuid,
#[allow(dead_code)]
pub(crate) ns: String,
#[allow(dead_code)]
pub(crate) db: String,
#[allow(dead_code)]
pub(crate) stm: LiveStatement,
}
/// This is a type representing information that is tracked outside of a datastore
/// For example, live query IDs need to be tracked by websockets so they are closed correctly on closing a connection
#[allow(dead_code)]
#[derive(Debug)]
#[cfg_attr(test, derive(PartialEq, Clone))]
pub(crate) enum TrackedResult {
LiveQuery(LqEntry),
#[allow(dead_code)]
KillQuery(LqEntry),
}
impl LqEntry {
/// Treat like an into from a borrow
#[allow(dead_code)]
pub(crate) fn as_key(&self) -> LqIndexKey {
let tb = self.stm.what.to_string();
LqIndexKey {
@ -115,8 +113,6 @@ impl LqEntry {
}
}
/// Treat like an into from a borrow
#[allow(dead_code)]
pub(crate) fn as_value(&self) -> LqIndexValue {
LqIndexValue {
stm: self.stm.clone(),

View file

@ -63,6 +63,7 @@ mod mem {
include!("ndlq.rs");
include!("tblq.rs");
include!("tbnt.rs");
include!("tx_test.rs");
}
#[cfg(feature = "kv-rocksdb")]
@ -111,6 +112,7 @@ mod rocksdb {
include!("ndlq.rs");
include!("tblq.rs");
include!("tbnt.rs");
include!("tx_test.rs");
}
#[cfg(feature = "kv-speedb")]
@ -157,6 +159,7 @@ mod speedb {
include!("ndlq.rs");
include!("tblq.rs");
include!("tbnt.rs");
include!("tx_test.rs");
}
#[cfg(feature = "kv-tikv")]
@ -204,6 +207,7 @@ mod tikv {
include!("ndlq.rs");
include!("tblq.rs");
include!("tbnt.rs");
include!("tx_test.rs");
}
#[cfg(feature = "kv-fdb")]
@ -251,6 +255,7 @@ mod fdb {
include!("ndlq.rs");
include!("tblq.rs");
include!("tbnt.rs");
include!("tx_test.rs");
}
#[cfg(feature = "kv-surrealkv")]
@ -300,4 +305,5 @@ mod surrealkv {
include!("ndlq.rs");
include!("tblq.rs");
include!("tbnt.rs");
include!("tx_test.rs");
}

View file

@ -0,0 +1,36 @@
use crate::kvs::lq_structs::{LqEntry, TrackedResult};
#[tokio::test]
#[serial]
async fn live_queries_sent_to_tx_are_received() {
let node_id = uuid::uuid!("d0f1a200-e24e-44fe-98c1-2271a5781da7");
let clock = Arc::new(SizedClock::Fake(FakeClock::new(Timestamp::default())));
let test = init(node_id, clock).await.unwrap();
let mut tx = test.db.transaction(Write, Optimistic).await.unwrap();
// Create live query data
let lq_entry = LqEntry {
live_id: sql::Uuid::new_v4(),
ns: "namespace".to_string(),
db: "database".to_string(),
stm: LiveStatement {
id: sql::Uuid::new_v4(),
node: sql::Uuid::from(node_id),
expr: Default::default(),
what: Default::default(),
cond: None,
fetch: None,
archived: None,
session: Some(Value::None),
auth: None,
},
};
tx.pre_commit_register_live_query(lq_entry.clone()).unwrap();
tx.commit().await.unwrap();
// Verify data
let live_queries = tx.consume_pending_live_queries();
assert_eq!(live_queries.len(), 1);
assert_eq!(live_queries[0], TrackedResult::LiveQuery(lq_entry));
}

View file

@ -34,7 +34,7 @@ use crate::key::key_req::KeyRequirements;
use crate::kvs::cache::Cache;
use crate::kvs::cache::Entry;
use crate::kvs::clock::SizedClock;
use crate::kvs::lq_structs::{LqEntry, LqValue};
use crate::kvs::lq_structs::{LqEntry, LqValue, TrackedResult};
use crate::kvs::Check;
use crate::sql;
use crate::sql::paths::EDGE;
@ -328,17 +328,19 @@ impl Transaction {
}
}
#[allow(unused)]
pub(crate) fn consume_pending_live_queries(&self) -> Vec<LqEntry> {
let mut lq: Vec<LqEntry> = Vec::with_capacity(LQ_CAPACITY);
/// From the existing transaction, consume all the remaining live query registration events and return them synchronously
pub(crate) fn consume_pending_live_queries(&self) -> Vec<TrackedResult> {
let mut lq: Vec<TrackedResult> = Vec::with_capacity(LQ_CAPACITY);
while let Ok(l) = self.prepared_live_queries.1.try_recv() {
lq.push(l);
lq.push(TrackedResult::LiveQuery(l));
}
lq
}
/// Sends a live query to the transaction which is forwarded only once committed
/// And removed once a transaction is aborted
// allow(dead_code) because this is used in v2, but not v1
#[allow(dead_code)]
pub(crate) fn pre_commit_register_live_query(
&mut self,
lq_entry: LqEntry,
@ -3136,7 +3138,7 @@ mod tests {
#[cfg(all(test, feature = "kv-mem"))]
mod tx_test {
use crate::kvs::lq_structs::LqEntry;
use crate::kvs::lq_structs::{LqEntry, TrackedResult};
use crate::kvs::Datastore;
use crate::kvs::LockType::Optimistic;
use crate::kvs::TransactionType::Write;
@ -3174,6 +3176,6 @@ mod tx_test {
// Verify data
let live_queries = tx.consume_pending_live_queries();
assert_eq!(live_queries.len(), 1);
assert_eq!(live_queries[0], lq_entry);
assert_eq!(live_queries[0], TrackedResult::LiveQuery(lq_entry));
}
}

View file

@ -1,5 +1,6 @@
#[macro_use]
extern crate tracing;
extern crate core;
#[macro_use]
mod mac;

View file

@ -10,7 +10,6 @@ use std::time;
pub struct ChangeFeed {
pub expiry: time::Duration,
}
impl Display for ChangeFeed {
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
write!(f, "CHANGEFEED {}", Duration(self.expiry))?;

View file

@ -34,6 +34,7 @@ use std::sync::Arc;
use std::sync::OnceLock;
use std::task::Poll;
use std::time::Duration;
use surrealdb_core::dbs::Options;
use tokio::time;
use tokio::time::MissedTickBehavior;
@ -208,6 +209,7 @@ pub(crate) fn router(
}
fn run_maintenance(kvs: Arc<Datastore>, tick_interval: Duration, stop_signal: Receiver<()>) {
trace!("Starting maintenance");
// Some classic ownership shenanigans
let kvs_two = kvs.clone();
let stop_signal_two = stop_signal.clone();
@ -235,6 +237,7 @@ fn run_maintenance(kvs: Arc<Datastore>, tick_interval: Duration, stop_signal: Re
});
if FFLAGS.change_feed_live_queries.enabled() {
trace!("Live queries v2 enabled");
// Spawn the live query change feed consumer, which is used for catching up on relevant change feeds
tokio::spawn(async move {
let kvs = kvs_two;
@ -251,12 +254,15 @@ fn run_maintenance(kvs: Arc<Datastore>, tick_interval: Duration, stop_signal: Re
let mut stream = streams.merge();
let opt = Options::default();
while let Some(Some(_)) = stream.next().await {
match kvs.process_lq_notifications().await {
match kvs.process_lq_notifications(&opt).await {
Ok(()) => trace!("Live Query poll ran successfully"),
Err(error) => error!("Error running live query poll: {error}"),
}
}
});
} else {
trace!("Live queries v2 disabled")
}
}

View file

@ -34,6 +34,7 @@ use std::sync::Arc;
use std::sync::OnceLock;
use std::task::Poll;
use std::time::Duration;
use surrealdb_core::dbs::Options;
use wasm_bindgen_futures::spawn_local;
use wasmtimer::tokio as time;
use wasmtimer::tokio::MissedTickBehavior;
@ -246,8 +247,9 @@ fn run_maintenance(kvs: Arc<Datastore>, tick_interval: Duration, stop_signal: Re
let mut stream = streams.merge();
let opt = Options::default();
while let Some(Some(_)) = stream.next().await {
match kvs.process_lq_notifications().await {
match kvs.process_lq_notifications(&opt).await {
Ok(()) => trace!("Live Query poll ran successfully"),
Err(error) => error!("Error running live query poll: {error}"),
}

View file

@ -1,12 +1,14 @@
mod parse;
use chrono::DateTime;
use parse::Parse;
mod helpers;
use helpers::new_ds;
use parse::Parse;
use surrealdb::dbs::Session;
use surrealdb::err::Error;
use surrealdb::sql::Value;
use surrealdb_core::fflags::{FFlags, FFLAGS};
use surrealdb_core::fflags::FFLAGS;
mod helpers;
mod parse;
#[tokio::test]
async fn database_change_feeds() -> Result<(), Error> {

View file

@ -10,7 +10,7 @@ use surrealdb::iam::{Auth, Level, Role};
use surrealdb::kvs::Datastore;
pub async fn new_ds() -> Result<Datastore, Error> {
Ok(Datastore::new("memory").await?.with_capabilities(Capabilities::all()))
Ok(Datastore::new("memory").await?.with_capabilities(Capabilities::all()).with_notifications())
}
#[allow(dead_code)]

View file

@ -6,8 +6,8 @@ use surrealdb::dbs::Session;
use surrealdb::err::Error;
use surrealdb::sql::Value;
use surrealdb_core::fflags::FFLAGS;
use surrealdb_core::kvs::LockType::Optimistic;
use surrealdb_core::kvs::TransactionType::Write;
// RUST_LOG=trace cargo test -p surrealdb --features kv-mem --test live -- --nocapture
#[tokio::test]
async fn live_query_sends_registered_lq_details() -> Result<(), Error> {
@ -19,27 +19,35 @@ async fn live_query_sends_registered_lq_details() -> Result<(), Error> {
LIVE SELECT * FROM lq_test_123;
";
let dbs = new_ds().await?;
let ses = Session::owner().with_ns("test").with_db("test");
let ses = Session::owner().with_ns("test").with_db("test").with_rt(true);
let res = &mut dbs.execute(sql, &ses, None).await?;
assert_eq!(res.len(), 2);
//
// Define table didnt fail
let tmp = res.remove(0).result;
assert!(tmp.is_ok());
//
// Live query returned a valid uuid
let actual = res.remove(0).result?;
let expected = Value::parse("{}");
assert_eq!(actual, expected);
//
let tmp = res.remove(0).result?;
let val = Value::parse("[12345]");
assert_eq!(tmp, val);
//
let tmp = res.remove(0).result;
assert!(tmp.is_ok());
//
let tmp = res.remove(0).result?;
let val = Value::parse("[56789]");
assert_eq!(tmp, val);
//
let live_id = match actual {
Value::Uuid(live_id) => live_id,
_ => panic!("Expected a UUID"),
};
assert!(!live_id.is_nil());
// Create some data
let res = &mut dbs.execute("CREATE lq_test_123", &ses, None).await?;
assert_eq!(res.len(), 1);
let result = res.remove(0);
assert!(result.result.is_ok());
dbs.process_lq_notifications(&Default::default()).await?;
let notifications_chan = dbs.notifications().unwrap();
assert!(notifications_chan.try_recv().is_ok());
assert!(notifications_chan.try_recv().is_err());
Ok(())
}

View file

@ -182,7 +182,9 @@ pub async fn init(
// Start the kvs server
dbs::init(dbs).await?;
// Start the node agent
// This is equivalent to run_maintenance in native/wasm drivers
let nd = node::init(ct.clone());
let lq = node::live_query_change_feed(ct.clone());
// Start the web server
net::init(ct).await?;
// Wait for the node agent to stop
@ -190,6 +192,10 @@ pub async fn init(
error!("Node agent failed while running: {}", e);
return Err(Error::NodeAgent);
}
if let Err(e) = lq.await {
error!("Live query change feed failed while running: {}", e);
return Err(Error::NodeAgent);
}
// All ok
Ok(())
}

View file

@ -1,6 +1,11 @@
use std::time::Duration;
use surrealdb::dbs::Options;
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
use surrealdb::fflags::FFLAGS;
use crate::cli::CF;
const LOG: &str = "surrealdb::node";
@ -36,3 +41,32 @@ pub fn init(ct: CancellationToken) -> JoinHandle<()> {
info!(target: LOG, "Stopped node agent");
})
}
// Start live query on change feeds notification processing
pub fn live_query_change_feed(ct: CancellationToken) -> JoinHandle<()> {
tokio::spawn(async move {
if !FFLAGS.change_feed_live_queries.enabled() {
return;
}
// Spawn the live query change feed consumer, which is used for catching up on relevant change feeds
tokio::spawn(async move {
let kvs = crate::dbs::DB.get().unwrap();
let tick_interval = Duration::from_secs(1);
let opt = Options::default();
loop {
if let Err(e) = kvs.process_lq_notifications(&opt).await {
error!("Error running node agent live query tick: {}", e);
}
tokio::select! {
_ = ct.cancelled() => {
info!(target: LOG, "Gracefully stopping live query node agent");
break;
}
_ = tokio::time::sleep(tick_interval) => {}
}
}
info!("Stopped live query node agent")
});
})
}