From 8b13546327008a8a0d9c55c1ea0be1376e878cea Mon Sep 17 00:00:00 2001 From: Przemyslaw Hugh Kaznowski Date: Wed, 20 Mar 2024 10:09:04 +0000 Subject: [PATCH] Make Change Feeds record patch values optionally (#3552) Co-authored-by: Mees Delzenne --- core/src/cf/mutations.rs | 76 ++++++++++++++++++++---------- core/src/cf/writer.rs | 88 +++++++++++++++++++++++++---------- core/src/doc/changefeeds.rs | 3 +- core/src/kvs/ds.rs | 4 +- core/src/kvs/lq_structs.rs | 3 ++ core/src/kvs/tx.rs | 8 ++-- core/src/sql/changefeed.rs | 2 +- core/src/sql/idiom.rs | 7 +++ core/src/syn/v2/parser/mac.rs | 1 + lib/tests/changefeeds.rs | 88 ++++++++++++++++++++++++++++++++++- 10 files changed, 222 insertions(+), 58 deletions(-) diff --git a/core/src/cf/mutations.rs b/core/src/cf/mutations.rs index 6505b135..17d3dded 100644 --- a/core/src/cf/mutations.rs +++ b/core/src/cf/mutations.rs @@ -3,6 +3,7 @@ use crate::sql::object::Object; use crate::sql::statements::DefineTableStatement; use crate::sql::thing::Thing; use crate::sql::value::Value; +use crate::sql::Operation; use crate::vs::versionstamp_to_u64; use derive::Store; use revision::revisioned; @@ -20,8 +21,10 @@ pub enum TableMutation { Del(Thing), Def(DefineTableStatement), #[revision(start = 2)] - // Includes the previous value that may be None - SetPrevious(Thing, Value, Value), + /// Includes the ID, current value, and changes that were applied to achieve this value + /// Example, ("mytb:tobie", {{"note": "surreal"}}, [{"op": "add", "path": "/note", "value": "surreal"}]) + /// Means that we have already applied the add "/note" operation to achieve the recorded result + SetWithDiff(Thing, Value, Vec), } impl From for Value { @@ -61,29 +64,49 @@ impl Default for DatabaseMutation { Self::new() } } + // Change is a set of mutations made to a table at the specific timestamp. #[derive(Clone, Debug, Eq, PartialEq, PartialOrd, Serialize, Deserialize, Store, Hash)] #[revisioned(revision = 1)] pub struct ChangeSet(pub [u8; 10], pub DatabaseMutation); impl TableMutation { + /// Convert a stored change feed table mutation (record change) into a + /// Value that can be used in the storage of change feeds and their transmission to consumers pub fn into_value(self) -> Value { - let (k, v) = match self { - TableMutation::Set(_t, v) => ("update".to_string(), v), - TableMutation::SetPrevious(_t, Value::None, v) => ("create".to_string(), v), - TableMutation::SetPrevious(_t, _previous, v) => ("update".to_string(), v), - TableMutation::Del(t) => { - // TODO(phughk): Future PR for lq on cf feature, store update in delete for diff and notification - let mut h = BTreeMap::::new(); - h.insert("id".to_string(), Value::Thing(t)); - let o = Object::from(h); - ("delete".to_string(), Value::Object(o)) - } - TableMutation::Def(t) => ("define_table".to_string(), Value::from(t)), - }; - let mut h = BTreeMap::::new(); - h.insert(k, v); + let h = match self { + TableMutation::Set(_thing, v) => { + h.insert("update".to_string(), v); + h + } + TableMutation::SetWithDiff(_thing, current, operations) => { + h.insert("current".to_string(), current); + h.insert( + "update".to_string(), + Value::Array(Array( + operations + .clone() + .into_iter() + .map(|x| Value::Object(Object::from(x))) + .collect(), + )), + ); + h + } + TableMutation::Del(t) => { + // TODO(SUR-329): Store update in delete for diff and notification + let mut other = BTreeMap::::new(); + other.insert("id".to_string(), Value::Thing(t)); + let o = Object::from(other); + h.insert("delete".to_string(), Value::Object(o)); + h + } + TableMutation::Def(t) => { + h.insert("define_table".to_string(), Value::from(t)); + h + } + }; let o = crate::sql::object::Object::from(h); Value::Object(o) } @@ -116,7 +139,7 @@ impl Display for TableMutation { fn fmt(&self, f: &mut Formatter) -> fmt::Result { match self { TableMutation::Set(id, v) => write!(f, "SET {} {}", id, v), - TableMutation::SetPrevious(id, _previous, v) => write!(f, "SET {} {}", id, v), + TableMutation::SetWithDiff(id, _previous, v) => write!(f, "SET {} {:?}", id, v), TableMutation::Del(id) => write!(f, "DEL {}", id), TableMutation::Def(t) => write!(f, "{}", t), } @@ -149,8 +172,8 @@ impl Display for ChangeSet { } // WriteMutationSet is a set of mutations to be to a table at the specific timestamp. -#[derive(Clone, Debug, Eq, PartialEq, PartialOrd, Serialize, Deserialize, Store, Hash)] #[revisioned(revision = 1)] +#[derive(Clone, Debug, Eq, PartialEq, PartialOrd, Serialize, Deserialize, Store, Hash)] pub struct WriteMutationSet(pub Vec); impl WriteMutationSet { @@ -213,9 +236,8 @@ mod tests { DatabaseMutation(vec![TableMutations( "mytb".to_string(), vec![ - TableMutation::SetPrevious( + TableMutation::SetWithDiff( Thing::from(("mytb".to_string(), "tobie".to_string())), - Value::None, Value::Object(Object::from(HashMap::from([ ( "id", @@ -223,10 +245,13 @@ mod tests { ), ("note", Value::from("surreal")), ]))), + vec![Operation::Add { + path: "/note".into(), + value: Value::from("surreal"), + }], ), - TableMutation::SetPrevious( + TableMutation::SetWithDiff( Thing::from(("mytb".to_string(), "tobie".to_string())), - Value::Strand(Strand::from("this would normally be an object")), Value::Object(Object::from(HashMap::from([ ( "id", @@ -237,6 +262,9 @@ mod tests { ), ("note", Value::from("surreal")), ]))), + vec![Operation::Remove { + path: "/temp".into(), + }], ), TableMutation::Del(Thing::from(("mytb".to_string(), "tobie".to_string()))), TableMutation::Def(DefineTableStatement { @@ -250,7 +278,7 @@ mod tests { let s = serde_json::to_string(&v).unwrap(); assert_eq!( s, - r#"{"changes":[{"create":{"id":"mytb:tobie","note":"surreal"}},{"update":{"id":"mytb:tobie2","note":"surreal"}},{"delete":{"id":"mytb:tobie"}},{"define_table":{"name":"mytb"}}],"versionstamp":1}"# + r#"{"changes":[{"current":{"id":"mytb:tobie","note":"surreal"},"update":[{"op":"add","path":"/`/note`","value":"surreal"}]},{"current":{"id":"mytb:tobie2","note":"surreal"},"update":[{"op":"remove","path":"/`/temp`"}]},{"delete":{"id":"mytb:tobie"}},{"define_table":{"name":"mytb"}}],"versionstamp":1}"# ); } } diff --git a/core/src/cf/writer.rs b/core/src/cf/writer.rs index 7756d2a4..7cf789dc 100644 --- a/core/src/cf/writer.rs +++ b/core/src/cf/writer.rs @@ -1,9 +1,9 @@ use crate::cf::{TableMutation, TableMutations}; -use crate::fflags::FFLAGS; use crate::kvs::Key; use crate::sql::statements::DefineTableStatement; use crate::sql::thing::Thing; use crate::sql::value::Value; +use crate::sql::Idiom; use std::borrow::Cow; use std::collections::HashMap; @@ -60,23 +60,28 @@ impl Writer { } } + #[allow(clippy::too_many_arguments)] pub(crate) fn update( &mut self, ns: &str, db: &str, tb: &str, id: Thing, - p: Cow<'_, Value>, - v: Cow<'_, Value>, + previous: Cow<'_, Value>, + current: Cow<'_, Value>, + store_difference: bool, ) { - if v.is_some() { + if current.is_some() { self.buf.push( ns.to_string(), db.to_string(), tb.to_string(), - match FFLAGS.change_feed_live_queries.enabled() { - true => TableMutation::SetPrevious(id, p.into_owned(), v.into_owned()), - false => TableMutation::Set(id, v.into_owned()), + match store_difference { + true => { + let patches = current.diff(&previous, Idiom(Vec::new())); + TableMutation::SetWithDiff(id, current.into_owned(), patches) + } + false => TableMutation::Set(id, current.into_owned()), }, ); } else { @@ -136,6 +141,8 @@ mod tests { use crate::sql::value::Value; use crate::vs; + const dont_store_previous: bool = false; + #[tokio::test] async fn test_changefeed_read_write() { let ts = crate::sql::Datetime::default(); @@ -193,9 +200,16 @@ mod tests { id: Id::String("A".to_string()), }; let value_a: super::Value = "a".into(); - // TODO(for this PR): This was just added to resolve compile issues but test should be fixed - let previous = Cow::from(Value::None); - tx1.record_change(ns, db, tb, &thing_a, previous.clone(), Cow::Borrowed(&value_a)); + let mut previous = Cow::from(Value::None); + tx1.record_change( + ns, + db, + tb, + &thing_a, + previous.clone(), + Cow::Borrowed(&value_a), + dont_store_previous, + ); tx1.complete_changes(true).await.unwrap(); tx1.commit().await.unwrap(); @@ -205,7 +219,15 @@ mod tests { id: Id::String("C".to_string()), }; let value_c: Value = "c".into(); - tx2.record_change(ns, db, tb, &thing_c, previous.clone(), Cow::Borrowed(&value_c)); + tx2.record_change( + ns, + db, + tb, + &thing_c, + previous.clone(), + Cow::Borrowed(&value_c), + dont_store_previous, + ); tx2.complete_changes(true).await.unwrap(); tx2.commit().await.unwrap(); @@ -216,13 +238,29 @@ mod tests { id: Id::String("B".to_string()), }; let value_b: Value = "b".into(); - tx3.record_change(ns, db, tb, &thing_b, previous.clone(), Cow::Borrowed(&value_b)); + tx3.record_change( + ns, + db, + tb, + &thing_b, + previous.clone(), + Cow::Borrowed(&value_b), + dont_store_previous, + ); let thing_c2 = Thing { tb: tb.to_owned(), id: Id::String("C".to_string()), }; let value_c2: Value = "c2".into(); - tx3.record_change(ns, db, tb, &thing_c2, previous.clone(), Cow::Borrowed(&value_c2)); + tx3.record_change( + ns, + db, + tb, + &thing_c2, + previous.clone(), + Cow::Borrowed(&value_c2), + dont_store_previous, + ); tx3.complete_changes(true).await.unwrap(); tx3.commit().await.unwrap(); @@ -245,10 +283,10 @@ mod tests { DatabaseMutation(vec![TableMutations( "mytb".to_string(), match FFLAGS.change_feed_live_queries.enabled() { - true => vec![TableMutation::SetPrevious( + true => vec![TableMutation::SetWithDiff( Thing::from(("mytb".to_string(), "A".to_string())), Value::None, - Value::from("a"), + vec![], )], false => vec![TableMutation::Set( Thing::from(("mytb".to_string(), "A".to_string())), @@ -262,10 +300,10 @@ mod tests { DatabaseMutation(vec![TableMutations( "mytb".to_string(), match FFLAGS.change_feed_live_queries.enabled() { - true => vec![TableMutation::SetPrevious( + true => vec![TableMutation::SetWithDiff( Thing::from(("mytb".to_string(), "C".to_string())), Value::None, - Value::from("c"), + vec![], )], false => vec![TableMutation::Set( Thing::from(("mytb".to_string(), "C".to_string())), @@ -280,15 +318,15 @@ mod tests { "mytb".to_string(), match FFLAGS.change_feed_live_queries.enabled() { true => vec![ - TableMutation::SetPrevious( + TableMutation::SetWithDiff( Thing::from(("mytb".to_string(), "B".to_string())), Value::None, - Value::from("b"), + vec![], ), - TableMutation::SetPrevious( + TableMutation::SetWithDiff( Thing::from(("mytb".to_string(), "C".to_string())), Value::None, - Value::from("c2"), + vec![], ), ], false => vec![ @@ -328,15 +366,15 @@ mod tests { "mytb".to_string(), match FFLAGS.change_feed_live_queries.enabled() { true => vec![ - TableMutation::SetPrevious( + TableMutation::SetWithDiff( Thing::from(("mytb".to_string(), "B".to_string())), Value::None, - Value::from("b"), + vec![], ), - TableMutation::SetPrevious( + TableMutation::SetWithDiff( Thing::from(("mytb".to_string(), "C".to_string())), Value::None, - Value::from("c2"), + vec![], ), ], false => vec![ diff --git a/core/src/doc/changefeeds.rs b/core/src/doc/changefeeds.rs index 63501d57..d39cb586 100644 --- a/core/src/doc/changefeeds.rs +++ b/core/src/doc/changefeeds.rs @@ -26,7 +26,7 @@ impl<'a> Document<'a> { // Get the database and the table for the record let db = run.add_and_cache_db(opt.ns(), opt.db(), opt.strict).await?; // Check if changefeeds are enabled - if db.changefeed.is_some() || tb.changefeed.is_some() { + if let Some(cf) = db.as_ref().changefeed.as_ref().or(tb.as_ref().changefeed.as_ref()) { // Get the arguments let tb = tb.name.as_str(); let id = self.id.as_ref().unwrap(); @@ -38,6 +38,7 @@ impl<'a> Document<'a> { id, self.initial.doc.clone(), self.current.doc.clone(), + cf.store_original, ); } // Carry on diff --git a/core/src/kvs/ds.rs b/core/src/kvs/ds.rs index 8e3f1e19..bd5d902c 100644 --- a/core/src/kvs/ds.rs +++ b/core/src/kvs/ds.rs @@ -1035,9 +1035,9 @@ impl Datastore { Some(doc) } TableMutation::Def(_) => None, - TableMutation::SetPrevious(id, _old, new) => { + TableMutation::SetWithDiff(id, new, _operations) => { let doc = Document::new(None, Some(id), None, new, Workable::Normal); - // TODO set previous value + // TODO(SUR-328): reverse diff and apply to doc to retrieve original version of doc Some(doc) } } diff --git a/core/src/kvs/lq_structs.rs b/core/src/kvs/lq_structs.rs index 6422fda5..43deaaf1 100644 --- a/core/src/kvs/lq_structs.rs +++ b/core/src/kvs/lq_structs.rs @@ -6,6 +6,9 @@ use std::cmp::Ordering; /// Used for cluster logic to move LQ data to LQ cleanup code /// Not a stored struct; Used only in this module +/// +/// This struct is public because it is used in Live Query errors for v1. +/// V1 is now deprecated and the struct can be made non-public #[derive(Debug, Clone, Eq, PartialEq)] pub struct LqValue { pub nd: Uuid, diff --git a/core/src/kvs/tx.rs b/core/src/kvs/tx.rs index 6f98c7e1..91842072 100644 --- a/core/src/kvs/tx.rs +++ b/core/src/kvs/tx.rs @@ -2704,16 +2704,18 @@ impl Transaction { // change will record the change in the changefeed if enabled. // To actually persist the record changes into the underlying kvs, // you must call the `complete_changes` function and then commit the transaction. + #[allow(clippy::too_many_arguments)] pub(crate) fn record_change( &mut self, ns: &str, db: &str, tb: &str, id: &Thing, - p: Cow<'_, Value>, - v: Cow<'_, Value>, + previous: Cow<'_, Value>, + current: Cow<'_, Value>, + store_difference: bool, ) { - self.cf.update(ns, db, tb, id.clone(), p, v) + self.cf.update(ns, db, tb, id.clone(), previous, current, store_difference) } // Records the table (re)definition in the changefeed if enabled. diff --git a/core/src/sql/changefeed.rs b/core/src/sql/changefeed.rs index 96c4c380..1b891662 100644 --- a/core/src/sql/changefeed.rs +++ b/core/src/sql/changefeed.rs @@ -5,7 +5,7 @@ use std::fmt::{self, Display, Formatter}; use std::str; use std::time; -#[derive(Clone, Debug, Eq, PartialEq, PartialOrd, Serialize, Deserialize, Hash)] +#[derive(Clone, Copy, Debug, Eq, PartialEq, PartialOrd, Serialize, Deserialize, Hash)] #[revisioned(revision = 2)] pub struct ChangeFeed { pub expiry: time::Duration, diff --git a/core/src/sql/idiom.rs b/core/src/sql/idiom.rs index aa40210c..025c597a 100644 --- a/core/src/sql/idiom.rs +++ b/core/src/sql/idiom.rs @@ -62,6 +62,12 @@ impl From for Idiom { } } +impl From<&str> for Idiom { + fn from(v: &str) -> Self { + Self(vec![Part::from(v)]) + } +} + impl From> for Idiom { fn from(v: Vec) -> Self { Self(v) @@ -73,6 +79,7 @@ impl From<&[Part]> for Idiom { Self(v.to_vec()) } } + impl From for Idiom { fn from(v: Part) -> Self { Self(vec![v]) diff --git a/core/src/syn/v2/parser/mac.rs b/core/src/syn/v2/parser/mac.rs index 8ca11fc3..64b289de 100644 --- a/core/src/syn/v2/parser/mac.rs +++ b/core/src/syn/v2/parser/mac.rs @@ -65,6 +65,7 @@ macro_rules! expected { } #[cfg(test)] +#[macro_export] macro_rules! test_parse { ($func:ident$( ( $($e:expr),* $(,)? ))? , $t:literal) => {{ let mut parser = $crate::syn::v2::parser::Parser::new($t.as_bytes()); diff --git a/lib/tests/changefeeds.rs b/lib/tests/changefeeds.rs index f8410949..f92d1439 100644 --- a/lib/tests/changefeeds.rs +++ b/lib/tests/changefeeds.rs @@ -1,3 +1,5 @@ +mod parse; + use chrono::DateTime; use helpers::new_ds; @@ -11,7 +13,6 @@ use surrealdb::kvs::TransactionType::Write; use surrealdb::sql::Value; mod helpers; -mod parse; #[test_log::test(tokio::test)] async fn database_change_feeds() -> Result<(), Error> { @@ -185,7 +186,6 @@ async fn database_change_feeds() -> Result<(), Error> { let res = &mut dbs.execute(sql, &ses, None).await?; let tmp = res.remove(0).result?; assert_eq!(tmp, cf_val_arr); - // GC after 1hs let one_hour_in_secs = 3600; current_time += one_hour_in_secs; @@ -735,3 +735,87 @@ async fn changefeed_with_ts() -> Result<(), Error> { assert_eq!(array.len(), 0); Ok(()) } + +#[tokio::test] +async fn changefeed_with_original() -> Result<(), Error> { + if !FFLAGS.change_feed_live_queries.enabled() { + return Ok(()); + } + let db = new_ds().await?; + let ses = Session::owner().with_ns("test").with_db("test"); + // Enable change feeds + db.execute("DEFINE TABLE user CHANGEFEED 1h INCLUDE ORIGINAL;", &ses, None) + .await? + .remove(0) + .result?; + db.execute("CREATE user CONTENT {'id': 'id_one'};", &ses, None).await?.remove(0).result?; + + // Now validate original values are stored + let value: Value = + db.execute("SHOW CHANGES FOR TABLE user SINCE 0", &ses, None).await?.remove(0).result?; + let Value::Array(array) = value else { + unreachable!() + }; + assert_eq!(array.len(), 2); + + assert_eq!( + array.get(0).unwrap(), + &surrealdb::sql::value( + r#"{ + "changes": [{ + "define_table": { + "name": "user", + }, + }], + "versionstamp": 65536 + }"# + ) + .unwrap() + ); + assert_eq!( + array.get(1).unwrap(), + &surrealdb::sql::value( + r#" + { + "changes": [{ + "create": { + "id": user:id_one, + }, + "original": None, + }], + "versionstamp": 131072 + } + "# + ) + .unwrap() + ); + + db.execute("UPDATE user:id_one SET name = 'Raynor';", &ses, None).await?.remove(0).result?; + let array = + db.execute("SHOW CHANGES FOR TABLE user SINCE 0", &ses, None).await?.remove(0).result?; + let Value::Array(array) = array else { + unreachable!() + }; + assert_eq!(array.len(), 3); + assert_eq!( + array.get(2).unwrap(), + &surrealdb::sql::value( + r#" + { + "changes": [{ + "update": { + "id": user:id_one, + "name": "Raynor", + }, + "original": { + "id": user:id_one, + }, + }], + "versionstamp": 196608, + }"# + ) + .unwrap() + ); + + Ok(()) +}