SUR-347 - Live Queries error when no change feed (#3772)

This commit is contained in:
Przemyslaw Hugh Kaznowski 2024-03-26 17:49:49 +00:00 committed by GitHub
parent e93649503c
commit 5d3e537abe
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 97 additions and 7 deletions

View file

@ -923,6 +923,10 @@ pub enum Error {
/// A node task has failed
#[error("A node task has failed: {0}")]
NodeAgent(&'static str),
/// An error related to live query occurred
#[error("Failed to process Live Query: {0}")]
LiveQueryError(LiveQueryCause),
}
impl From<Error> for String {
@ -1037,3 +1041,14 @@ impl Serialize for Error {
serializer.serialize_str(self.to_string().as_str())
}
}
#[derive(Error, Debug)]
#[non_exhaustive]
pub enum LiveQueryCause {
#[doc(hidden)]
#[error("The Live Query must have a change feed for it it work")]
MissingChangeFeed,
#[doc(hidden)]
#[error("The Live Query must have a change feed that includes relative changes")]
ChangeFeedNoOriginal,
}

View file

@ -1,12 +1,13 @@
use crate::ctx::Context;
use crate::dbs::{Options, Transaction};
use crate::doc::CursorDoc;
use crate::err::Error;
use crate::err::{Error, LiveQueryCause};
use crate::fflags::FFLAGS;
use crate::iam::Auth;
use crate::kvs::lq_structs::{LqEntry, TrackedResult};
use crate::sql::{Cond, Fetchs, Fields, Uuid, Value};
use crate::sql::{Cond, Fetchs, Fields, Table, Uuid, Value};
use derive::Store;
use futures::lock::MutexGuard;
use revision::revisioned;
use serde::{Deserialize, Serialize};
use std::fmt;
@ -101,12 +102,15 @@ impl LiveStatement {
true => {
let mut run = txn.lock().await;
match stm.what.compute(ctx, opt, txn, doc).await? {
Value::Table(_tb) => {
Value::Table(tb) => {
let ns = opt.ns().to_string();
let db = opt.db().to_string();
self.validate_change_feed_valid(&mut run, &ns, &db, &tb).await?;
// Send the live query registration hook to the transaction pre-commit channel
run.pre_commit_register_async_event(TrackedResult::LiveQuery(LqEntry {
live_id: stm.id,
ns: opt.ns().to_string(),
db: opt.db().to_string(),
ns,
db,
stm,
}))?;
}
@ -143,6 +147,31 @@ impl LiveStatement {
}
}
async fn validate_change_feed_valid(
&self,
tx: &mut MutexGuard<'_, crate::kvs::Transaction>,
ns: &str,
db: &str,
tb: &Table,
) -> Result<(), Error> {
// Find the table definition
let tb_definition = tx.get_and_cache_tb(ns, db, tb).await.map_err(|e| match e {
Error::TbNotFound {
value: _tb,
} => Error::LiveQueryError(LiveQueryCause::MissingChangeFeed),
_ => e,
})?;
// check it has a change feed
let cf = tb_definition
.changefeed
.ok_or(Error::LiveQueryError(LiveQueryCause::MissingChangeFeed))?;
// check the change feed includes the original - required for differentiating between CREATE and UPDATE
if !cf.store_original {
return Err(Error::LiveQueryError(LiveQueryCause::ChangeFeedNoOriginal));
}
Ok(())
}
pub(crate) fn archive(mut self, node_id: Uuid) -> LiveStatement {
self.archived = Some(node_id);
self

View file

@ -8,7 +8,29 @@ use surrealdb::fflags::FFLAGS;
use surrealdb::sql::Value;
#[tokio::test]
async fn live_query_sends_registered_lq_details() -> Result<(), Error> {
async fn live_query_fails_if_no_change_feed() -> Result<(), Error> {
if !FFLAGS.change_feed_live_queries.enabled() {
return Ok(());
}
let sql = "
LIVE SELECT * FROM lq_test_123;
";
let dbs = new_ds().await?;
let ses = Session::owner().with_ns("test").with_db("test").with_rt(true);
let res = &mut dbs.execute(sql, &ses, None).await?;
assert_eq!(res.len(), 1);
let res = res.remove(0).result;
assert!(res.is_err(), "{:?}", res);
let err = res.as_ref().err().unwrap();
assert_eq!(
format!("{}", err),
"Failed to process Live Query: The Live Query must have a change feed for it it work"
);
Ok(())
}
#[tokio::test]
async fn live_query_fails_if_change_feed_missing_diff() -> Result<(), Error> {
if !FFLAGS.change_feed_live_queries.enabled() {
return Ok(());
}
@ -20,13 +42,37 @@ async fn live_query_sends_registered_lq_details() -> Result<(), Error> {
let ses = Session::owner().with_ns("test").with_db("test").with_rt(true);
let res = &mut dbs.execute(sql, &ses, None).await?;
assert_eq!(res.len(), 2);
res.remove(0).result.unwrap();
let res = res.remove(0).result;
assert!(res.is_err(), "{:?}", res);
let err = res.as_ref().err().unwrap();
assert_eq!(
format!("{}", err),
"Failed to process Live Query: The Live Query must have a change feed that includes relative changes"
);
Ok(())
}
#[tokio::test]
async fn live_query_sends_registered_lq_details() -> Result<(), Error> {
if !FFLAGS.change_feed_live_queries.enabled() {
return Ok(());
}
let sql = "
DEFINE TABLE lq_test_123 CHANGEFEED 10m INCLUDE ORIGINAL;
LIVE SELECT * FROM lq_test_123;
";
let dbs = new_ds().await?;
let ses = Session::owner().with_ns("test").with_db("test").with_rt(true);
let res = &mut dbs.execute(sql, &ses, None).await?;
assert_eq!(res.len(), 2);
// Define table didnt fail
let tmp = res.remove(0).result;
assert!(tmp.is_ok());
// Live query returned a valid uuid
let actual = res.remove(0).result?;
let actual = res.remove(0).result.unwrap();
let live_id = match actual {
Value::Uuid(live_id) => live_id,
_ => panic!("Expected a UUID"),