diff --git a/core/src/err/mod.rs b/core/src/err/mod.rs index 5cd9a928..0a14b300 100644 --- a/core/src/err/mod.rs +++ b/core/src/err/mod.rs @@ -923,6 +923,10 @@ pub enum Error { /// A node task has failed #[error("A node task has failed: {0}")] NodeAgent(&'static str), + + /// An error related to live query occurred + #[error("Failed to process Live Query: {0}")] + LiveQueryError(LiveQueryCause), } impl From for String { @@ -1037,3 +1041,14 @@ impl Serialize for Error { serializer.serialize_str(self.to_string().as_str()) } } + +#[derive(Error, Debug)] +#[non_exhaustive] +pub enum LiveQueryCause { + #[doc(hidden)] + #[error("The Live Query must have a change feed for it it work")] + MissingChangeFeed, + #[doc(hidden)] + #[error("The Live Query must have a change feed that includes relative changes")] + ChangeFeedNoOriginal, +} diff --git a/core/src/sql/statements/live.rs b/core/src/sql/statements/live.rs index 05f17f60..65d93dac 100644 --- a/core/src/sql/statements/live.rs +++ b/core/src/sql/statements/live.rs @@ -1,12 +1,13 @@ use crate::ctx::Context; use crate::dbs::{Options, Transaction}; use crate::doc::CursorDoc; -use crate::err::Error; +use crate::err::{Error, LiveQueryCause}; use crate::fflags::FFLAGS; use crate::iam::Auth; use crate::kvs::lq_structs::{LqEntry, TrackedResult}; -use crate::sql::{Cond, Fetchs, Fields, Uuid, Value}; +use crate::sql::{Cond, Fetchs, Fields, Table, Uuid, Value}; use derive::Store; +use futures::lock::MutexGuard; use revision::revisioned; use serde::{Deserialize, Serialize}; use std::fmt; @@ -101,12 +102,15 @@ impl LiveStatement { true => { let mut run = txn.lock().await; match stm.what.compute(ctx, opt, txn, doc).await? { - Value::Table(_tb) => { + Value::Table(tb) => { + let ns = opt.ns().to_string(); + let db = opt.db().to_string(); + self.validate_change_feed_valid(&mut run, &ns, &db, &tb).await?; // Send the live query registration hook to the transaction pre-commit channel run.pre_commit_register_async_event(TrackedResult::LiveQuery(LqEntry { live_id: stm.id, - ns: opt.ns().to_string(), - db: opt.db().to_string(), + ns, + db, stm, }))?; } @@ -143,6 +147,31 @@ impl LiveStatement { } } + async fn validate_change_feed_valid( + &self, + tx: &mut MutexGuard<'_, crate::kvs::Transaction>, + ns: &str, + db: &str, + tb: &Table, + ) -> Result<(), Error> { + // Find the table definition + let tb_definition = tx.get_and_cache_tb(ns, db, tb).await.map_err(|e| match e { + Error::TbNotFound { + value: _tb, + } => Error::LiveQueryError(LiveQueryCause::MissingChangeFeed), + _ => e, + })?; + // check it has a change feed + let cf = tb_definition + .changefeed + .ok_or(Error::LiveQueryError(LiveQueryCause::MissingChangeFeed))?; + // check the change feed includes the original - required for differentiating between CREATE and UPDATE + if !cf.store_original { + return Err(Error::LiveQueryError(LiveQueryCause::ChangeFeedNoOriginal)); + } + Ok(()) + } + pub(crate) fn archive(mut self, node_id: Uuid) -> LiveStatement { self.archived = Some(node_id); self diff --git a/lib/tests/live.rs b/lib/tests/live.rs index 3f92bc9e..0161d459 100644 --- a/lib/tests/live.rs +++ b/lib/tests/live.rs @@ -8,7 +8,29 @@ use surrealdb::fflags::FFLAGS; use surrealdb::sql::Value; #[tokio::test] -async fn live_query_sends_registered_lq_details() -> Result<(), Error> { +async fn live_query_fails_if_no_change_feed() -> Result<(), Error> { + if !FFLAGS.change_feed_live_queries.enabled() { + return Ok(()); + } + let sql = " + LIVE SELECT * FROM lq_test_123; + "; + let dbs = new_ds().await?; + let ses = Session::owner().with_ns("test").with_db("test").with_rt(true); + let res = &mut dbs.execute(sql, &ses, None).await?; + assert_eq!(res.len(), 1); + let res = res.remove(0).result; + assert!(res.is_err(), "{:?}", res); + let err = res.as_ref().err().unwrap(); + assert_eq!( + format!("{}", err), + "Failed to process Live Query: The Live Query must have a change feed for it it work" + ); + Ok(()) +} + +#[tokio::test] +async fn live_query_fails_if_change_feed_missing_diff() -> Result<(), Error> { if !FFLAGS.change_feed_live_queries.enabled() { return Ok(()); } @@ -20,13 +42,37 @@ async fn live_query_sends_registered_lq_details() -> Result<(), Error> { let ses = Session::owner().with_ns("test").with_db("test").with_rt(true); let res = &mut dbs.execute(sql, &ses, None).await?; assert_eq!(res.len(), 2); + res.remove(0).result.unwrap(); + let res = res.remove(0).result; + assert!(res.is_err(), "{:?}", res); + let err = res.as_ref().err().unwrap(); + assert_eq!( + format!("{}", err), + "Failed to process Live Query: The Live Query must have a change feed that includes relative changes" + ); + Ok(()) +} + +#[tokio::test] +async fn live_query_sends_registered_lq_details() -> Result<(), Error> { + if !FFLAGS.change_feed_live_queries.enabled() { + return Ok(()); + } + let sql = " + DEFINE TABLE lq_test_123 CHANGEFEED 10m INCLUDE ORIGINAL; + LIVE SELECT * FROM lq_test_123; + "; + let dbs = new_ds().await?; + let ses = Session::owner().with_ns("test").with_db("test").with_rt(true); + let res = &mut dbs.execute(sql, &ses, None).await?; + assert_eq!(res.len(), 2); // Define table didnt fail let tmp = res.remove(0).result; assert!(tmp.is_ok()); // Live query returned a valid uuid - let actual = res.remove(0).result?; + let actual = res.remove(0).result.unwrap(); let live_id = match actual { Value::Uuid(live_id) => live_id, _ => panic!("Expected a UUID"),