From 7b0771acb784464ef746ab343bdec61d8686f6ae Mon Sep 17 00:00:00 2001 From: Przemyslaw Hugh Kaznowski Date: Tue, 20 Feb 2024 11:11:49 +0000 Subject: [PATCH] Change Feed stores original value of difference (#3420) --- core/src/cf/mutations.rs | 61 +++++++++- core/src/cf/writer.rs | 125 +++++++++++++------ core/src/doc/changefeeds.rs | 9 +- core/src/fflags.rs | 1 + core/src/kvs/tx.rs | 3 +- lib/tests/api/mod.rs | 94 +++++++++++---- lib/tests/changefeeds.rs | 234 ++++++++++++++++++++++++++++++------ tests/cli_integration.rs | 28 +++-- 8 files changed, 445 insertions(+), 110 deletions(-) diff --git a/core/src/cf/mutations.rs b/core/src/cf/mutations.rs index c381e551..897929b1 100644 --- a/core/src/cf/mutations.rs +++ b/core/src/cf/mutations.rs @@ -12,13 +12,16 @@ use std::fmt::{self, Display, Formatter}; // Mutation is a single mutation to a table. #[derive(Clone, Debug, Eq, PartialEq, PartialOrd, Serialize, Deserialize, Store, Hash)] -#[revisioned(revision = 1)] +#[revisioned(revision = 2)] pub enum TableMutation { // Although the Value is supposed to contain a field "id" of Thing, // we do include it in the first field for convenience. Set(Thing, Value), Del(Thing), Def(DefineTableStatement), + #[revision(start = 2)] + // Includes the previous value that may be None + SetPrevious(Thing, Value, Value), } impl From for Value { @@ -67,7 +70,10 @@ impl TableMutation { pub fn into_value(self) -> Value { let (k, v) = match self { TableMutation::Set(_t, v) => ("update".to_string(), v), + TableMutation::SetPrevious(_t, Value::None, v) => ("create".to_string(), v), + TableMutation::SetPrevious(_t, _previous, v) => ("update".to_string(), v), TableMutation::Del(t) => { + // TODO(phughk): Future PR for lq on cf feature, store update in delete for diff and notification let mut h = BTreeMap::::new(); h.insert("id".to_string(), Value::Thing(t)); let o = Object::from(h); @@ -110,6 +116,7 @@ impl Display for TableMutation { fn fmt(&self, f: &mut Formatter) -> fmt::Result { match self { TableMutation::Set(id, v) => write!(f, "SET {} {}", id, v), + TableMutation::SetPrevious(id, _previous, v) => write!(f, "SET {} {}", id, v), TableMutation::Del(id) => write!(f, "DEL {}", id), TableMutation::Def(t) => write!(f, "{}", t), } @@ -160,6 +167,8 @@ impl Default for WriteMutationSet { #[cfg(test)] mod tests { + use crate::sql::Strand; + #[test] fn serialization() { use super::*; @@ -194,4 +203,54 @@ mod tests { r#"{"changes":[{"update":{"id":"mytb:tobie","note":"surreal"}},{"delete":{"id":"mytb:tobie"}},{"define_table":{"name":"mytb"}}],"versionstamp":1}"# ); } + + #[test] + fn serialization_rev2() { + use super::*; + use std::collections::HashMap; + let cs = ChangeSet( + [0, 0, 0, 0, 0, 0, 0, 0, 0, 1], + DatabaseMutation(vec![TableMutations( + "mytb".to_string(), + vec![ + TableMutation::SetPrevious( + Thing::from(("mytb".to_string(), "tobie".to_string())), + Value::None, + Value::Object(Object::from(HashMap::from([ + ( + "id", + Value::from(Thing::from(("mytb".to_string(), "tobie".to_string()))), + ), + ("note", Value::from("surreal")), + ]))), + ), + TableMutation::SetPrevious( + Thing::from(("mytb".to_string(), "tobie".to_string())), + Value::Strand(Strand::from("this would normally be an object")), + Value::Object(Object::from(HashMap::from([ + ( + "id", + Value::from(Thing::from(( + "mytb".to_string(), + "tobie2".to_string(), + ))), + ), + ("note", Value::from("surreal")), + ]))), + ), + TableMutation::Del(Thing::from(("mytb".to_string(), "tobie".to_string()))), + TableMutation::Def(DefineTableStatement { + name: "mytb".into(), + ..DefineTableStatement::default() + }), + ], + )]), + ); + let v = cs.into_value().into_json(); + let s = serde_json::to_string(&v).unwrap(); + assert_eq!( + s, + r#"{"changes":[{"create":{"id":"mytb:tobie","note":"surreal"}},{"update":{"id":"mytb:tobie2","note":"surreal"}},{"delete":{"id":"mytb:tobie"}},{"define_table":{"name":"mytb"}}],"versionstamp":1}"# + ); + } } diff --git a/core/src/cf/writer.rs b/core/src/cf/writer.rs index 9700b26a..9a2acf36 100644 --- a/core/src/cf/writer.rs +++ b/core/src/cf/writer.rs @@ -1,4 +1,5 @@ use crate::cf::{TableMutation, TableMutations}; +use crate::fflags::FFLAGS; use crate::kvs::Key; use crate::sql::statements::DefineTableStatement; use crate::sql::thing::Thing; @@ -59,13 +60,24 @@ impl Writer { } } - pub(crate) fn update(&mut self, ns: &str, db: &str, tb: &str, id: Thing, v: Cow<'_, Value>) { + pub(crate) fn update( + &mut self, + ns: &str, + db: &str, + tb: &str, + id: Thing, + p: Cow<'_, Value>, + v: Cow<'_, Value>, + ) { if v.is_some() { self.buf.push( ns.to_string(), db.to_string(), tb.to_string(), - TableMutation::Set(id, v.into_owned()), + match FFLAGS.change_feed_live_queries.enabled() { + true => TableMutation::SetPrevious(id, p.into_owned(), v.into_owned()), + false => TableMutation::Set(id, v.into_owned()), + }, ); } else { self.buf.push(ns.to_string(), db.to_string(), tb.to_string(), TableMutation::Del(id)); @@ -111,6 +123,7 @@ mod tests { use std::time::Duration; use crate::cf::{ChangeSet, DatabaseMutation, TableMutation, TableMutations}; + use crate::fflags::FFLAGS; use crate::key::key_req::KeyRequirements; use crate::kvs::{Datastore, LockType::*, TransactionType::*}; use crate::sql::changefeed::ChangeFeed; @@ -178,7 +191,9 @@ mod tests { id: Id::String("A".to_string()), }; let value_a: super::Value = "a".into(); - tx1.record_change(ns, db, tb, &thing_a, Cow::Borrowed(&value_a)); + // TODO(for this PR): This was just added to resolve compile issues but test should be fixed + let previous = Cow::from(Value::None); + tx1.record_change(ns, db, tb, &thing_a, previous.clone(), Cow::Borrowed(&value_a)); tx1.complete_changes(true).await.unwrap(); tx1.commit().await.unwrap(); @@ -188,7 +203,7 @@ mod tests { id: Id::String("C".to_string()), }; let value_c: Value = "c".into(); - tx2.record_change(ns, db, tb, &thing_c, Cow::Borrowed(&value_c)); + tx2.record_change(ns, db, tb, &thing_c, previous.clone(), Cow::Borrowed(&value_c)); tx2.complete_changes(true).await.unwrap(); tx2.commit().await.unwrap(); @@ -199,13 +214,13 @@ mod tests { id: Id::String("B".to_string()), }; let value_b: Value = "b".into(); - tx3.record_change(ns, db, tb, &thing_b, Cow::Borrowed(&value_b)); + tx3.record_change(ns, db, tb, &thing_b, previous.clone(), Cow::Borrowed(&value_b)); let thing_c2 = Thing { tb: tb.to_owned(), id: Id::String("C".to_string()), }; let value_c2: Value = "c2".into(); - tx3.record_change(ns, db, tb, &thing_c2, Cow::Borrowed(&value_c2)); + tx3.record_change(ns, db, tb, &thing_c2, previous.clone(), Cow::Borrowed(&value_c2)); tx3.complete_changes(true).await.unwrap(); tx3.commit().await.unwrap(); @@ -227,36 +242,64 @@ mod tests { vs::u64_to_versionstamp(2), DatabaseMutation(vec![TableMutations( "mytb".to_string(), - vec![TableMutation::Set( - Thing::from(("mytb".to_string(), "A".to_string())), - Value::from("a"), - )], + match FFLAGS.change_feed_live_queries.enabled() { + true => vec![TableMutation::SetPrevious( + Thing::from(("mytb".to_string(), "A".to_string())), + Value::None, + Value::from("a"), + )], + false => vec![TableMutation::Set( + Thing::from(("mytb".to_string(), "A".to_string())), + Value::from("a"), + )], + }, )]), ), ChangeSet( vs::u64_to_versionstamp(3), DatabaseMutation(vec![TableMutations( "mytb".to_string(), - vec![TableMutation::Set( - Thing::from(("mytb".to_string(), "C".to_string())), - Value::from("c"), - )], + match FFLAGS.change_feed_live_queries.enabled() { + true => vec![TableMutation::SetPrevious( + Thing::from(("mytb".to_string(), "C".to_string())), + Value::None, + Value::from("c"), + )], + false => vec![TableMutation::Set( + Thing::from(("mytb".to_string(), "C".to_string())), + Value::from("c"), + )], + }, )]), ), ChangeSet( vs::u64_to_versionstamp(4), DatabaseMutation(vec![TableMutations( "mytb".to_string(), - vec![ - TableMutation::Set( - Thing::from(("mytb".to_string(), "B".to_string())), - Value::from("b"), - ), - TableMutation::Set( - Thing::from(("mytb".to_string(), "C".to_string())), - Value::from("c2"), - ), - ], + match FFLAGS.change_feed_live_queries.enabled() { + true => vec![ + TableMutation::SetPrevious( + Thing::from(("mytb".to_string(), "B".to_string())), + Value::None, + Value::from("b"), + ), + TableMutation::SetPrevious( + Thing::from(("mytb".to_string(), "C".to_string())), + Value::None, + Value::from("c2"), + ), + ], + false => vec![ + TableMutation::Set( + Thing::from(("mytb".to_string(), "B".to_string())), + Value::from("b"), + ), + TableMutation::Set( + Thing::from(("mytb".to_string(), "C".to_string())), + Value::from("c2"), + ), + ], + }, )]), ), ]; @@ -281,16 +324,30 @@ mod tests { vs::u64_to_versionstamp(4), DatabaseMutation(vec![TableMutations( "mytb".to_string(), - vec![ - TableMutation::Set( - Thing::from(("mytb".to_string(), "B".to_string())), - Value::from("b"), - ), - TableMutation::Set( - Thing::from(("mytb".to_string(), "C".to_string())), - Value::from("c2"), - ), - ], + match FFLAGS.change_feed_live_queries.enabled() { + true => vec![ + TableMutation::SetPrevious( + Thing::from(("mytb".to_string(), "B".to_string())), + Value::None, + Value::from("b"), + ), + TableMutation::SetPrevious( + Thing::from(("mytb".to_string(), "C".to_string())), + Value::None, + Value::from("c2"), + ), + ], + false => vec![ + TableMutation::Set( + Thing::from(("mytb".to_string(), "B".to_string())), + Value::from("b"), + ), + TableMutation::Set( + Thing::from(("mytb".to_string(), "C".to_string())), + Value::from("c2"), + ), + ], + }, )]), )]; assert_eq!(r, want); diff --git a/core/src/doc/changefeeds.rs b/core/src/doc/changefeeds.rs index 9fdeaf12..63501d57 100644 --- a/core/src/doc/changefeeds.rs +++ b/core/src/doc/changefeeds.rs @@ -31,7 +31,14 @@ impl<'a> Document<'a> { let tb = tb.name.as_str(); let id = self.id.as_ref().unwrap(); // Create the changefeed entry - run.record_change(opt.ns(), opt.db(), tb, id, self.current.doc.clone()); + run.record_change( + opt.ns(), + opt.db(), + tb, + id, + self.initial.doc.clone(), + self.current.doc.clone(), + ); } // Carry on Ok(()) diff --git a/core/src/fflags.rs b/core/src/fflags.rs index 11efd536..3e8b84c8 100644 --- a/core/src/fflags.rs +++ b/core/src/fflags.rs @@ -7,6 +7,7 @@ /// Use this while implementing features #[allow(dead_code)] pub static FFLAGS: FFlags = FFlags { + // TODO(fflag-lqcf): This TODO signature marks tests that are affected by the fflag that do not have access to the fflag (scope) change_feed_live_queries: FFlagEnabledStatus { enabled_release: false, enabled_debug: false, diff --git a/core/src/kvs/tx.rs b/core/src/kvs/tx.rs index e8b23607..19ccccfa 100644 --- a/core/src/kvs/tx.rs +++ b/core/src/kvs/tx.rs @@ -2571,9 +2571,10 @@ impl Transaction { db: &str, tb: &str, id: &Thing, + p: Cow<'_, Value>, v: Cow<'_, Value>, ) { - self.cf.update(ns, db, tb, id.clone(), v) + self.cf.update(ns, db, tb, id.clone(), p, v) } // Records the table (re)definition in the changefeed if enabled. diff --git a/lib/tests/api/mod.rs b/lib/tests/api/mod.rs index 1860e3f2..04c22b9d 100644 --- a/lib/tests/api/mod.rs +++ b/lib/tests/api/mod.rs @@ -1,5 +1,7 @@ // Tests common to all protocols and storage engines +use surrealdb_core::fflags::FFLAGS; + static PERMITS: Semaphore = Semaphore::const_new(1); #[test_log::test(tokio::test)] @@ -953,20 +955,40 @@ async fn changefeed() { unreachable!() }; let changes = a.get("changes").unwrap().to_owned(); - assert_eq!( - changes, - surrealdb::sql::value( - "[ - { - update: { - id: user:amos, - name: 'Amos' - } + match FFLAGS.change_feed_live_queries.enabled() { + true => { + assert_eq!( + changes, + surrealdb::sql::value( + r#"[ + { + create: { + id: user:amos, + name: 'Amos' + } + } + ]"# + ) + .unwrap() + ); } - ]" - ) - .unwrap() - ); + false => { + assert_eq!( + changes, + surrealdb::sql::value( + r#"[ + { + update: { + id: user:amos, + name: 'Amos' + } + } + ]"# + ) + .unwrap() + ); + } + } // UPDATE user:jane let a = array.get(2).unwrap(); let Value::Object(a) = a else { @@ -977,20 +999,40 @@ async fn changefeed() { }; assert!(versionstamp1 < versionstamp2); let changes = a.get("changes").unwrap().to_owned(); - assert_eq!( - changes, - surrealdb::sql::value( - "[ - { - update: { - id: user:jane, - name: 'Jane' - } + match FFLAGS.change_feed_live_queries.enabled() { + true => { + assert_eq!( + changes, + surrealdb::sql::value( + "[ + { + create: { + id: user:jane, + name: 'Jane' + } + } + ]" + ) + .unwrap() + ); } - ]" - ) - .unwrap() - ); + false => { + assert_eq!( + changes, + surrealdb::sql::value( + "[ + { + update: { + id: user:jane, + name: 'Jane' + } + } + ]" + ) + .unwrap() + ); + } + } // UPDATE user:amos let a = array.get(3).unwrap(); let Value::Object(a) = a else { diff --git a/lib/tests/changefeeds.rs b/lib/tests/changefeeds.rs index 7a2cebec..f3686097 100644 --- a/lib/tests/changefeeds.rs +++ b/lib/tests/changefeeds.rs @@ -6,6 +6,7 @@ use helpers::new_ds; use surrealdb::dbs::Session; use surrealdb::err::Error; use surrealdb::sql::Value; +use surrealdb_core::fflags::{FFlags, FFLAGS}; #[tokio::test] async fn database_change_feeds() -> Result<(), Error> { @@ -64,8 +65,34 @@ async fn database_change_feeds() -> Result<(), Error> { assert_eq!(tmp, val); // SHOW CHANGES let tmp = res.remove(0).result?; - let val = Value::parse( - "[ + let val = match FFLAGS.change_feed_live_queries.enabled() { + true => Value::parse( + "[ + { + versionstamp: 65536, + changes: [ + { + create: { + id: person:test, + name: 'Name: Tobie' + } + } + ] + }, + { + versionstamp: 131072, + changes: [ + { + delete: { + id: person:test + } + } + ] + } + ]", + ), + false => Value::parse( + "[ { versionstamp: 65536, changes: [ @@ -88,7 +115,8 @@ async fn database_change_feeds() -> Result<(), Error> { ] } ]", - ); + ), + }; assert_eq!(tmp, val); // Retain for 1h let sql = " @@ -202,8 +230,77 @@ async fn table_change_feeds() -> Result<(), Error> { let _tmp = res.remove(0).result?; // SHOW CHANGES let tmp = res.remove(0).result?; - let val = Value::parse( - "[ + let val = match FFLAGS.change_feed_live_queries.enabled() { + true => Value::parse( + "[ + { + versionstamp: 65536, + changes: [ + { + define_table: { + name: 'person' + } + } + ] + }, + { + versionstamp: 131072, + changes: [ + { + create: { + id: person:test, + name: 'Name: Tobie' + } + } + ] + }, + { + versionstamp: 196608, + changes: [ + { + update: { + id: person:test, + name: 'Name: Jaime' + } + } + ] + }, + { + versionstamp: 262144, + changes: [ + { + update: { + id: person:test, + name: 'Name: Tobie' + } + } + ] + }, + { + versionstamp: 327680, + changes: [ + { + delete: { + id: person:test + } + } + ] + }, + { + versionstamp: 393216, + changes: [ + { + create: { + id: person:1000, + name: 'Name: Yusuke' + } + } + ] + } + ]", + ), + false => Value::parse( + "[ { versionstamp: 65536, changes: [ @@ -269,7 +366,8 @@ async fn table_change_feeds() -> Result<(), Error> { ] } ]", - ); + ), + }; assert_eq!(tmp, val); // Retain for 1h let sql = " @@ -370,10 +468,28 @@ async fn changefeed_with_ts() -> Result<(), Error> { unreachable!() }; let changes = a.get("changes").unwrap().to_owned(); - assert_eq!( - changes, - surrealdb::sql::value( - "[ + match FFLAGS.change_feed_live_queries.enabled() { + true => { + assert_eq!( + changes, + surrealdb::sql::value( + "[ + { + create: { + id: user:amos, + name: 'Amos' + } + } + ]" + ) + .unwrap() + ); + } + false => { + assert_eq!( + changes, + surrealdb::sql::value( + "[ { update: { id: user:amos, @@ -381,9 +497,11 @@ async fn changefeed_with_ts() -> Result<(), Error> { } } ]" - ) - .unwrap() - ); + ) + .unwrap() + ); + } + } // UPDATE user:jane let a = array.get(2).unwrap(); let Value::Object(a) = a else { @@ -394,20 +512,40 @@ async fn changefeed_with_ts() -> Result<(), Error> { }; assert!(versionstamp2 < versionstamp3); let changes = a.get("changes").unwrap().to_owned(); - assert_eq!( - changes, - surrealdb::sql::value( - "[ - { - update: { - id: user:jane, - name: 'Jane' - } + match FFLAGS.change_feed_live_queries.enabled() { + true => { + assert_eq!( + changes, + surrealdb::sql::value( + "[ + { + create: { + id: user:jane, + name: 'Jane' + } + } + ]" + ) + .unwrap() + ); } + false => { + assert_eq!( + changes, + surrealdb::sql::value( + "[ + { + update: { + id: user:jane, + name: 'Jane' + } + } ]" - ) - .unwrap() - ); + ) + .unwrap() + ); + } + } // UPDATE user:amos let a = array.get(3).unwrap(); let Value::Object(a) = a else { @@ -485,20 +623,40 @@ async fn changefeed_with_ts() -> Result<(), Error> { }; assert!(versionstamp2 == versionstamp1b); let changes = a.get("changes").unwrap().to_owned(); - assert_eq!( - changes, - surrealdb::sql::value( - "[ - { - update: { - id: user:amos, - name: 'Amos' - } + match FFLAGS.change_feed_live_queries.enabled() { + true => { + assert_eq!( + changes, + surrealdb::sql::value( + "[ + { + create: { + id: user:amos, + name: 'Amos' + } + } + ]" + ) + .unwrap() + ); } - ]" - ) - .unwrap() - ); + false => { + assert_eq!( + changes, + surrealdb::sql::value( + "[ + { + update: { + id: user:amos, + name: 'Amos' + } + } + ]" + ) + .unwrap() + ); + } + } // Save timestamp 3 let ts3_dt = "2023-08-01T00:00:10Z"; let ts3 = DateTime::parse_from_rfc3339(ts3_dt).unwrap(); diff --git a/tests/cli_integration.rs b/tests/cli_integration.rs index c46d3609..b7e5cb6a 100644 --- a/tests/cli_integration.rs +++ b/tests/cli_integration.rs @@ -9,6 +9,7 @@ mod cli_integration { use std::fs; use std::fs::File; use std::time; + use surrealdb::fflags::FFLAGS; use test_log::test; use tokio::time::sleep; use tracing::info; @@ -676,15 +677,24 @@ mod cli_integration { let args = format!( "sql --conn http://{addr} {creds} --ns {ns} --db {db} --multi --hide-welcome" ); - assert_eq!( - common::run(&args) - .input("SHOW CHANGES FOR TABLE thing SINCE 0 LIMIT 10;\n") - .output(), - Ok("[[{ changes: [{ define_table: { name: 'thing' } }], versionstamp: 65536 }, { changes: [{ update: { id: thing:one } }], versionstamp: 131072 }]]\n\n" - .to_owned()), - "failed to send sql: {args}" - ); - } + if FFLAGS.change_feed_live_queries.enabled() { + assert_eq!( + common::run(&args) + .input("SHOW CHANGES FOR TABLE thing SINCE 0 LIMIT 10;\n") + .output(), + Ok("[[{ changes: [{ define_table: { name: 'thing' } }], versionstamp: 65536 }, { changes: [{ create: { id: thing:one } }], versionstamp: 131072 }]]\n\n" + .to_owned()), + "failed to send sql: {args}"); + } else { + assert_eq!( + common::run(&args) + .input("SHOW CHANGES FOR TABLE thing SINCE 0 LIMIT 10;\n") + .output(), + Ok("[[{ changes: [{ define_table: { name: 'thing' } }], versionstamp: 65536 }, { changes: [{ update: { id: thing:one } }], versionstamp: 131072 }]]\n\n" + .to_owned()), + "failed to send sql: {args}" ); + } + }; sleep(TWO_SECS).await;