Make Change Feeds record patch values optionally (#3552)

Co-authored-by: Mees Delzenne <DelSkayn@users.noreply.github.com>
This commit is contained in:
Przemyslaw Hugh Kaznowski 2024-03-20 10:09:04 +00:00 committed by GitHub
parent fb6cfba3ac
commit 8b13546327
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
10 changed files with 222 additions and 58 deletions

View file

@ -3,6 +3,7 @@ use crate::sql::object::Object;
use crate::sql::statements::DefineTableStatement; use crate::sql::statements::DefineTableStatement;
use crate::sql::thing::Thing; use crate::sql::thing::Thing;
use crate::sql::value::Value; use crate::sql::value::Value;
use crate::sql::Operation;
use crate::vs::versionstamp_to_u64; use crate::vs::versionstamp_to_u64;
use derive::Store; use derive::Store;
use revision::revisioned; use revision::revisioned;
@ -20,8 +21,10 @@ pub enum TableMutation {
Del(Thing), Del(Thing),
Def(DefineTableStatement), Def(DefineTableStatement),
#[revision(start = 2)] #[revision(start = 2)]
// Includes the previous value that may be None /// Includes the ID, current value, and changes that were applied to achieve this value
SetPrevious(Thing, Value, Value), /// Example, ("mytb:tobie", {{"note": "surreal"}}, [{"op": "add", "path": "/note", "value": "surreal"}])
/// Means that we have already applied the add "/note" operation to achieve the recorded result
SetWithDiff(Thing, Value, Vec<Operation>),
} }
impl From<DefineTableStatement> for Value { impl From<DefineTableStatement> for Value {
@ -61,29 +64,49 @@ impl Default for DatabaseMutation {
Self::new() Self::new()
} }
} }
// Change is a set of mutations made to a table at the specific timestamp. // Change is a set of mutations made to a table at the specific timestamp.
#[derive(Clone, Debug, Eq, PartialEq, PartialOrd, Serialize, Deserialize, Store, Hash)] #[derive(Clone, Debug, Eq, PartialEq, PartialOrd, Serialize, Deserialize, Store, Hash)]
#[revisioned(revision = 1)] #[revisioned(revision = 1)]
pub struct ChangeSet(pub [u8; 10], pub DatabaseMutation); pub struct ChangeSet(pub [u8; 10], pub DatabaseMutation);
impl TableMutation { impl TableMutation {
/// Convert a stored change feed table mutation (record change) into a
/// Value that can be used in the storage of change feeds and their transmission to consumers
pub fn into_value(self) -> Value { pub fn into_value(self) -> Value {
let (k, v) = match self {
TableMutation::Set(_t, v) => ("update".to_string(), v),
TableMutation::SetPrevious(_t, Value::None, v) => ("create".to_string(), v),
TableMutation::SetPrevious(_t, _previous, v) => ("update".to_string(), v),
TableMutation::Del(t) => {
// TODO(phughk): Future PR for lq on cf feature, store update in delete for diff and notification
let mut h = BTreeMap::<String, Value>::new(); let mut h = BTreeMap::<String, Value>::new();
h.insert("id".to_string(), Value::Thing(t)); let h = match self {
let o = Object::from(h); TableMutation::Set(_thing, v) => {
("delete".to_string(), Value::Object(o)) h.insert("update".to_string(), v);
h
}
TableMutation::SetWithDiff(_thing, current, operations) => {
h.insert("current".to_string(), current);
h.insert(
"update".to_string(),
Value::Array(Array(
operations
.clone()
.into_iter()
.map(|x| Value::Object(Object::from(x)))
.collect(),
)),
);
h
}
TableMutation::Del(t) => {
// TODO(SUR-329): Store update in delete for diff and notification
let mut other = BTreeMap::<String, Value>::new();
other.insert("id".to_string(), Value::Thing(t));
let o = Object::from(other);
h.insert("delete".to_string(), Value::Object(o));
h
}
TableMutation::Def(t) => {
h.insert("define_table".to_string(), Value::from(t));
h
} }
TableMutation::Def(t) => ("define_table".to_string(), Value::from(t)),
}; };
let mut h = BTreeMap::<String, Value>::new();
h.insert(k, v);
let o = crate::sql::object::Object::from(h); let o = crate::sql::object::Object::from(h);
Value::Object(o) Value::Object(o)
} }
@ -116,7 +139,7 @@ impl Display for TableMutation {
fn fmt(&self, f: &mut Formatter) -> fmt::Result { fn fmt(&self, f: &mut Formatter) -> fmt::Result {
match self { match self {
TableMutation::Set(id, v) => write!(f, "SET {} {}", id, v), TableMutation::Set(id, v) => write!(f, "SET {} {}", id, v),
TableMutation::SetPrevious(id, _previous, v) => write!(f, "SET {} {}", id, v), TableMutation::SetWithDiff(id, _previous, v) => write!(f, "SET {} {:?}", id, v),
TableMutation::Del(id) => write!(f, "DEL {}", id), TableMutation::Del(id) => write!(f, "DEL {}", id),
TableMutation::Def(t) => write!(f, "{}", t), TableMutation::Def(t) => write!(f, "{}", t),
} }
@ -149,8 +172,8 @@ impl Display for ChangeSet {
} }
// WriteMutationSet is a set of mutations to be to a table at the specific timestamp. // WriteMutationSet is a set of mutations to be to a table at the specific timestamp.
#[derive(Clone, Debug, Eq, PartialEq, PartialOrd, Serialize, Deserialize, Store, Hash)]
#[revisioned(revision = 1)] #[revisioned(revision = 1)]
#[derive(Clone, Debug, Eq, PartialEq, PartialOrd, Serialize, Deserialize, Store, Hash)]
pub struct WriteMutationSet(pub Vec<TableMutations>); pub struct WriteMutationSet(pub Vec<TableMutations>);
impl WriteMutationSet { impl WriteMutationSet {
@ -213,9 +236,8 @@ mod tests {
DatabaseMutation(vec![TableMutations( DatabaseMutation(vec![TableMutations(
"mytb".to_string(), "mytb".to_string(),
vec![ vec![
TableMutation::SetPrevious( TableMutation::SetWithDiff(
Thing::from(("mytb".to_string(), "tobie".to_string())), Thing::from(("mytb".to_string(), "tobie".to_string())),
Value::None,
Value::Object(Object::from(HashMap::from([ Value::Object(Object::from(HashMap::from([
( (
"id", "id",
@ -223,10 +245,13 @@ mod tests {
), ),
("note", Value::from("surreal")), ("note", Value::from("surreal")),
]))), ]))),
vec![Operation::Add {
path: "/note".into(),
value: Value::from("surreal"),
}],
), ),
TableMutation::SetPrevious( TableMutation::SetWithDiff(
Thing::from(("mytb".to_string(), "tobie".to_string())), Thing::from(("mytb".to_string(), "tobie".to_string())),
Value::Strand(Strand::from("this would normally be an object")),
Value::Object(Object::from(HashMap::from([ Value::Object(Object::from(HashMap::from([
( (
"id", "id",
@ -237,6 +262,9 @@ mod tests {
), ),
("note", Value::from("surreal")), ("note", Value::from("surreal")),
]))), ]))),
vec![Operation::Remove {
path: "/temp".into(),
}],
), ),
TableMutation::Del(Thing::from(("mytb".to_string(), "tobie".to_string()))), TableMutation::Del(Thing::from(("mytb".to_string(), "tobie".to_string()))),
TableMutation::Def(DefineTableStatement { TableMutation::Def(DefineTableStatement {
@ -250,7 +278,7 @@ mod tests {
let s = serde_json::to_string(&v).unwrap(); let s = serde_json::to_string(&v).unwrap();
assert_eq!( assert_eq!(
s, s,
r#"{"changes":[{"create":{"id":"mytb:tobie","note":"surreal"}},{"update":{"id":"mytb:tobie2","note":"surreal"}},{"delete":{"id":"mytb:tobie"}},{"define_table":{"name":"mytb"}}],"versionstamp":1}"# r#"{"changes":[{"current":{"id":"mytb:tobie","note":"surreal"},"update":[{"op":"add","path":"/`/note`","value":"surreal"}]},{"current":{"id":"mytb:tobie2","note":"surreal"},"update":[{"op":"remove","path":"/`/temp`"}]},{"delete":{"id":"mytb:tobie"}},{"define_table":{"name":"mytb"}}],"versionstamp":1}"#
); );
} }
} }

View file

@ -1,9 +1,9 @@
use crate::cf::{TableMutation, TableMutations}; use crate::cf::{TableMutation, TableMutations};
use crate::fflags::FFLAGS;
use crate::kvs::Key; use crate::kvs::Key;
use crate::sql::statements::DefineTableStatement; use crate::sql::statements::DefineTableStatement;
use crate::sql::thing::Thing; use crate::sql::thing::Thing;
use crate::sql::value::Value; use crate::sql::value::Value;
use crate::sql::Idiom;
use std::borrow::Cow; use std::borrow::Cow;
use std::collections::HashMap; use std::collections::HashMap;
@ -60,23 +60,28 @@ impl Writer {
} }
} }
#[allow(clippy::too_many_arguments)]
pub(crate) fn update( pub(crate) fn update(
&mut self, &mut self,
ns: &str, ns: &str,
db: &str, db: &str,
tb: &str, tb: &str,
id: Thing, id: Thing,
p: Cow<'_, Value>, previous: Cow<'_, Value>,
v: Cow<'_, Value>, current: Cow<'_, Value>,
store_difference: bool,
) { ) {
if v.is_some() { if current.is_some() {
self.buf.push( self.buf.push(
ns.to_string(), ns.to_string(),
db.to_string(), db.to_string(),
tb.to_string(), tb.to_string(),
match FFLAGS.change_feed_live_queries.enabled() { match store_difference {
true => TableMutation::SetPrevious(id, p.into_owned(), v.into_owned()), true => {
false => TableMutation::Set(id, v.into_owned()), let patches = current.diff(&previous, Idiom(Vec::new()));
TableMutation::SetWithDiff(id, current.into_owned(), patches)
}
false => TableMutation::Set(id, current.into_owned()),
}, },
); );
} else { } else {
@ -136,6 +141,8 @@ mod tests {
use crate::sql::value::Value; use crate::sql::value::Value;
use crate::vs; use crate::vs;
const dont_store_previous: bool = false;
#[tokio::test] #[tokio::test]
async fn test_changefeed_read_write() { async fn test_changefeed_read_write() {
let ts = crate::sql::Datetime::default(); let ts = crate::sql::Datetime::default();
@ -193,9 +200,16 @@ mod tests {
id: Id::String("A".to_string()), id: Id::String("A".to_string()),
}; };
let value_a: super::Value = "a".into(); let value_a: super::Value = "a".into();
// TODO(for this PR): This was just added to resolve compile issues but test should be fixed let mut previous = Cow::from(Value::None);
let previous = Cow::from(Value::None); tx1.record_change(
tx1.record_change(ns, db, tb, &thing_a, previous.clone(), Cow::Borrowed(&value_a)); ns,
db,
tb,
&thing_a,
previous.clone(),
Cow::Borrowed(&value_a),
dont_store_previous,
);
tx1.complete_changes(true).await.unwrap(); tx1.complete_changes(true).await.unwrap();
tx1.commit().await.unwrap(); tx1.commit().await.unwrap();
@ -205,7 +219,15 @@ mod tests {
id: Id::String("C".to_string()), id: Id::String("C".to_string()),
}; };
let value_c: Value = "c".into(); let value_c: Value = "c".into();
tx2.record_change(ns, db, tb, &thing_c, previous.clone(), Cow::Borrowed(&value_c)); tx2.record_change(
ns,
db,
tb,
&thing_c,
previous.clone(),
Cow::Borrowed(&value_c),
dont_store_previous,
);
tx2.complete_changes(true).await.unwrap(); tx2.complete_changes(true).await.unwrap();
tx2.commit().await.unwrap(); tx2.commit().await.unwrap();
@ -216,13 +238,29 @@ mod tests {
id: Id::String("B".to_string()), id: Id::String("B".to_string()),
}; };
let value_b: Value = "b".into(); let value_b: Value = "b".into();
tx3.record_change(ns, db, tb, &thing_b, previous.clone(), Cow::Borrowed(&value_b)); tx3.record_change(
ns,
db,
tb,
&thing_b,
previous.clone(),
Cow::Borrowed(&value_b),
dont_store_previous,
);
let thing_c2 = Thing { let thing_c2 = Thing {
tb: tb.to_owned(), tb: tb.to_owned(),
id: Id::String("C".to_string()), id: Id::String("C".to_string()),
}; };
let value_c2: Value = "c2".into(); let value_c2: Value = "c2".into();
tx3.record_change(ns, db, tb, &thing_c2, previous.clone(), Cow::Borrowed(&value_c2)); tx3.record_change(
ns,
db,
tb,
&thing_c2,
previous.clone(),
Cow::Borrowed(&value_c2),
dont_store_previous,
);
tx3.complete_changes(true).await.unwrap(); tx3.complete_changes(true).await.unwrap();
tx3.commit().await.unwrap(); tx3.commit().await.unwrap();
@ -245,10 +283,10 @@ mod tests {
DatabaseMutation(vec![TableMutations( DatabaseMutation(vec![TableMutations(
"mytb".to_string(), "mytb".to_string(),
match FFLAGS.change_feed_live_queries.enabled() { match FFLAGS.change_feed_live_queries.enabled() {
true => vec![TableMutation::SetPrevious( true => vec![TableMutation::SetWithDiff(
Thing::from(("mytb".to_string(), "A".to_string())), Thing::from(("mytb".to_string(), "A".to_string())),
Value::None, Value::None,
Value::from("a"), vec![],
)], )],
false => vec![TableMutation::Set( false => vec![TableMutation::Set(
Thing::from(("mytb".to_string(), "A".to_string())), Thing::from(("mytb".to_string(), "A".to_string())),
@ -262,10 +300,10 @@ mod tests {
DatabaseMutation(vec![TableMutations( DatabaseMutation(vec![TableMutations(
"mytb".to_string(), "mytb".to_string(),
match FFLAGS.change_feed_live_queries.enabled() { match FFLAGS.change_feed_live_queries.enabled() {
true => vec![TableMutation::SetPrevious( true => vec![TableMutation::SetWithDiff(
Thing::from(("mytb".to_string(), "C".to_string())), Thing::from(("mytb".to_string(), "C".to_string())),
Value::None, Value::None,
Value::from("c"), vec![],
)], )],
false => vec![TableMutation::Set( false => vec![TableMutation::Set(
Thing::from(("mytb".to_string(), "C".to_string())), Thing::from(("mytb".to_string(), "C".to_string())),
@ -280,15 +318,15 @@ mod tests {
"mytb".to_string(), "mytb".to_string(),
match FFLAGS.change_feed_live_queries.enabled() { match FFLAGS.change_feed_live_queries.enabled() {
true => vec![ true => vec![
TableMutation::SetPrevious( TableMutation::SetWithDiff(
Thing::from(("mytb".to_string(), "B".to_string())), Thing::from(("mytb".to_string(), "B".to_string())),
Value::None, Value::None,
Value::from("b"), vec![],
), ),
TableMutation::SetPrevious( TableMutation::SetWithDiff(
Thing::from(("mytb".to_string(), "C".to_string())), Thing::from(("mytb".to_string(), "C".to_string())),
Value::None, Value::None,
Value::from("c2"), vec![],
), ),
], ],
false => vec![ false => vec![
@ -328,15 +366,15 @@ mod tests {
"mytb".to_string(), "mytb".to_string(),
match FFLAGS.change_feed_live_queries.enabled() { match FFLAGS.change_feed_live_queries.enabled() {
true => vec![ true => vec![
TableMutation::SetPrevious( TableMutation::SetWithDiff(
Thing::from(("mytb".to_string(), "B".to_string())), Thing::from(("mytb".to_string(), "B".to_string())),
Value::None, Value::None,
Value::from("b"), vec![],
), ),
TableMutation::SetPrevious( TableMutation::SetWithDiff(
Thing::from(("mytb".to_string(), "C".to_string())), Thing::from(("mytb".to_string(), "C".to_string())),
Value::None, Value::None,
Value::from("c2"), vec![],
), ),
], ],
false => vec![ false => vec![

View file

@ -26,7 +26,7 @@ impl<'a> Document<'a> {
// Get the database and the table for the record // Get the database and the table for the record
let db = run.add_and_cache_db(opt.ns(), opt.db(), opt.strict).await?; let db = run.add_and_cache_db(opt.ns(), opt.db(), opt.strict).await?;
// Check if changefeeds are enabled // Check if changefeeds are enabled
if db.changefeed.is_some() || tb.changefeed.is_some() { if let Some(cf) = db.as_ref().changefeed.as_ref().or(tb.as_ref().changefeed.as_ref()) {
// Get the arguments // Get the arguments
let tb = tb.name.as_str(); let tb = tb.name.as_str();
let id = self.id.as_ref().unwrap(); let id = self.id.as_ref().unwrap();
@ -38,6 +38,7 @@ impl<'a> Document<'a> {
id, id,
self.initial.doc.clone(), self.initial.doc.clone(),
self.current.doc.clone(), self.current.doc.clone(),
cf.store_original,
); );
} }
// Carry on // Carry on

View file

@ -1035,9 +1035,9 @@ impl Datastore {
Some(doc) Some(doc)
} }
TableMutation::Def(_) => None, TableMutation::Def(_) => None,
TableMutation::SetPrevious(id, _old, new) => { TableMutation::SetWithDiff(id, new, _operations) => {
let doc = Document::new(None, Some(id), None, new, Workable::Normal); let doc = Document::new(None, Some(id), None, new, Workable::Normal);
// TODO set previous value // TODO(SUR-328): reverse diff and apply to doc to retrieve original version of doc
Some(doc) Some(doc)
} }
} }

View file

@ -6,6 +6,9 @@ use std::cmp::Ordering;
/// Used for cluster logic to move LQ data to LQ cleanup code /// Used for cluster logic to move LQ data to LQ cleanup code
/// Not a stored struct; Used only in this module /// Not a stored struct; Used only in this module
///
/// This struct is public because it is used in Live Query errors for v1.
/// V1 is now deprecated and the struct can be made non-public
#[derive(Debug, Clone, Eq, PartialEq)] #[derive(Debug, Clone, Eq, PartialEq)]
pub struct LqValue { pub struct LqValue {
pub nd: Uuid, pub nd: Uuid,

View file

@ -2704,16 +2704,18 @@ impl Transaction {
// change will record the change in the changefeed if enabled. // change will record the change in the changefeed if enabled.
// To actually persist the record changes into the underlying kvs, // To actually persist the record changes into the underlying kvs,
// you must call the `complete_changes` function and then commit the transaction. // you must call the `complete_changes` function and then commit the transaction.
#[allow(clippy::too_many_arguments)]
pub(crate) fn record_change( pub(crate) fn record_change(
&mut self, &mut self,
ns: &str, ns: &str,
db: &str, db: &str,
tb: &str, tb: &str,
id: &Thing, id: &Thing,
p: Cow<'_, Value>, previous: Cow<'_, Value>,
v: Cow<'_, Value>, current: Cow<'_, Value>,
store_difference: bool,
) { ) {
self.cf.update(ns, db, tb, id.clone(), p, v) self.cf.update(ns, db, tb, id.clone(), previous, current, store_difference)
} }
// Records the table (re)definition in the changefeed if enabled. // Records the table (re)definition in the changefeed if enabled.

View file

@ -5,7 +5,7 @@ use std::fmt::{self, Display, Formatter};
use std::str; use std::str;
use std::time; use std::time;
#[derive(Clone, Debug, Eq, PartialEq, PartialOrd, Serialize, Deserialize, Hash)] #[derive(Clone, Copy, Debug, Eq, PartialEq, PartialOrd, Serialize, Deserialize, Hash)]
#[revisioned(revision = 2)] #[revisioned(revision = 2)]
pub struct ChangeFeed { pub struct ChangeFeed {
pub expiry: time::Duration, pub expiry: time::Duration,

View file

@ -62,6 +62,12 @@ impl From<String> for Idiom {
} }
} }
impl From<&str> for Idiom {
fn from(v: &str) -> Self {
Self(vec![Part::from(v)])
}
}
impl From<Vec<Part>> for Idiom { impl From<Vec<Part>> for Idiom {
fn from(v: Vec<Part>) -> Self { fn from(v: Vec<Part>) -> Self {
Self(v) Self(v)
@ -73,6 +79,7 @@ impl From<&[Part]> for Idiom {
Self(v.to_vec()) Self(v.to_vec())
} }
} }
impl From<Part> for Idiom { impl From<Part> for Idiom {
fn from(v: Part) -> Self { fn from(v: Part) -> Self {
Self(vec![v]) Self(vec![v])

View file

@ -65,6 +65,7 @@ macro_rules! expected {
} }
#[cfg(test)] #[cfg(test)]
#[macro_export]
macro_rules! test_parse { macro_rules! test_parse {
($func:ident$( ( $($e:expr),* $(,)? ))? , $t:literal) => {{ ($func:ident$( ( $($e:expr),* $(,)? ))? , $t:literal) => {{
let mut parser = $crate::syn::v2::parser::Parser::new($t.as_bytes()); let mut parser = $crate::syn::v2::parser::Parser::new($t.as_bytes());

View file

@ -1,3 +1,5 @@
mod parse;
use chrono::DateTime; use chrono::DateTime;
use helpers::new_ds; use helpers::new_ds;
@ -11,7 +13,6 @@ use surrealdb::kvs::TransactionType::Write;
use surrealdb::sql::Value; use surrealdb::sql::Value;
mod helpers; mod helpers;
mod parse;
#[test_log::test(tokio::test)] #[test_log::test(tokio::test)]
async fn database_change_feeds() -> Result<(), Error> { async fn database_change_feeds() -> Result<(), Error> {
@ -185,7 +186,6 @@ async fn database_change_feeds() -> Result<(), Error> {
let res = &mut dbs.execute(sql, &ses, None).await?; let res = &mut dbs.execute(sql, &ses, None).await?;
let tmp = res.remove(0).result?; let tmp = res.remove(0).result?;
assert_eq!(tmp, cf_val_arr); assert_eq!(tmp, cf_val_arr);
// GC after 1hs // GC after 1hs
let one_hour_in_secs = 3600; let one_hour_in_secs = 3600;
current_time += one_hour_in_secs; current_time += one_hour_in_secs;
@ -735,3 +735,87 @@ async fn changefeed_with_ts() -> Result<(), Error> {
assert_eq!(array.len(), 0); assert_eq!(array.len(), 0);
Ok(()) Ok(())
} }
#[tokio::test]
async fn changefeed_with_original() -> Result<(), Error> {
if !FFLAGS.change_feed_live_queries.enabled() {
return Ok(());
}
let db = new_ds().await?;
let ses = Session::owner().with_ns("test").with_db("test");
// Enable change feeds
db.execute("DEFINE TABLE user CHANGEFEED 1h INCLUDE ORIGINAL;", &ses, None)
.await?
.remove(0)
.result?;
db.execute("CREATE user CONTENT {'id': 'id_one'};", &ses, None).await?.remove(0).result?;
// Now validate original values are stored
let value: Value =
db.execute("SHOW CHANGES FOR TABLE user SINCE 0", &ses, None).await?.remove(0).result?;
let Value::Array(array) = value else {
unreachable!()
};
assert_eq!(array.len(), 2);
assert_eq!(
array.get(0).unwrap(),
&surrealdb::sql::value(
r#"{
"changes": [{
"define_table": {
"name": "user",
},
}],
"versionstamp": 65536
}"#
)
.unwrap()
);
assert_eq!(
array.get(1).unwrap(),
&surrealdb::sql::value(
r#"
{
"changes": [{
"create": {
"id": user:id_one,
},
"original": None,
}],
"versionstamp": 131072
}
"#
)
.unwrap()
);
db.execute("UPDATE user:id_one SET name = 'Raynor';", &ses, None).await?.remove(0).result?;
let array =
db.execute("SHOW CHANGES FOR TABLE user SINCE 0", &ses, None).await?.remove(0).result?;
let Value::Array(array) = array else {
unreachable!()
};
assert_eq!(array.len(), 3);
assert_eq!(
array.get(2).unwrap(),
&surrealdb::sql::value(
r#"
{
"changes": [{
"update": {
"id": user:id_one,
"name": "Raynor",
},
"original": {
"id": user:id_one,
},
}],
"versionstamp": 196608,
}"#
)
.unwrap()
);
Ok(())
}