Invert CF diffs to derive original ()

Co-authored-by: Emmanuel Keller <keller.emmanuel@gmail.com>
This commit is contained in:
Przemyslaw Hugh Kaznowski 2024-04-29 10:37:21 +01:00 committed by GitHub
parent 6783f5ee11
commit 22aec455b5
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
13 changed files with 161 additions and 50 deletions
core/src

View file

@ -23,8 +23,8 @@ pub enum TableMutation {
Del(Thing),
Def(DefineTableStatement),
#[revision(start = 2)]
/// Includes the ID, current value (after change), changes that were applied to achieve this
/// value, and if this is a new record (i.e. create = true vs update = false)
/// Includes the ID, current value (after change), changes that can be applied to get the original
/// value
/// Example, ("mytb:tobie", {{"note": "surreal"}}, [{"op": "add", "path": "/note", "value": "surreal"}], false)
/// Means that we have already applied the add "/note" operation to achieve the recorded result
SetWithDiff(Thing, Value, Vec<Operation>),

View file

@ -82,13 +82,18 @@ impl Writer {
tb.to_string(),
match store_difference {
true => {
let patches = current.diff(&previous, Idiom(Vec::new()));
let new_record = !previous.is_some();
trace!("The record is new_record={new_record} because previous is {previous:?}");
if previous.is_none() {
TableMutation::Set(id, current.into_owned())
} else {
TableMutation::SetWithDiff(id, current.into_owned(), patches)
// We intentionally record the patches in reverse (current -> previous)
// because we cannot otherwise resolve operations such as "replace" and "remove".
let patches_to_create_previous =
current.diff(&previous, Idiom::default());
TableMutation::SetWithDiff(
id,
current.into_owned(),
patches_to_create_previous,
)
}
}
false => TableMutation::Set(id, current.into_owned()),
@ -138,6 +143,7 @@ mod tests {
use std::time::Duration;
use crate::cf::{ChangeSet, DatabaseMutation, TableMutation, TableMutations};
use crate::dbs::Session;
use crate::fflags::FFLAGS;
use crate::key::key_req::KeyRequirements;
use crate::kvs::{Datastore, LockType::*, Transaction, TransactionType::*};
@ -149,7 +155,7 @@ mod tests {
};
use crate::sql::thing::Thing;
use crate::sql::value::Value;
use crate::sql::Datetime;
use crate::sql::{Datetime, Idiom, Number, Object, Operation, Strand};
use crate::vs;
use crate::vs::{conv, Versionstamp};
@ -160,9 +166,9 @@ mod tests {
const TB: &str = "mytb";
#[tokio::test]
async fn test_changefeed_read_write() {
async fn changefeed_read_write() {
let ts = Datetime::default();
let ds = init().await;
let ds = init(false).await;
// Let the db remember the timestamp for the current versionstamp
// so that we can replay change feeds from the timestamp later.
@ -382,9 +388,9 @@ mod tests {
}
#[test_log::test(tokio::test)]
async fn test_scan_picks_up_from_offset() {
async fn scan_picks_up_from_offset() {
// Given we have 2 entries in change feeds
let ds = init().await;
let ds = init(false).await;
ds.tick_at(5).await.unwrap();
let _id1 = record_change_feed_entry(
ds.transaction(Write, Optimistic).await.unwrap(),
@ -403,18 +409,110 @@ mod tests {
.await;
// When we scan from the versionstamp between the changes
let r = change_feed(ds.transaction(Write, Optimistic).await.unwrap(), &vs2).await;
let r = change_feed_vs(ds.transaction(Write, Optimistic).await.unwrap(), &vs2).await;
// Then there is only 1 change
assert_eq!(r.len(), 1);
assert!(r[0].0 >= vs2, "{:?}", r);
// And scanning with previous offset includes both values (without table definitions)
let r = change_feed(ds.transaction(Write, Optimistic).await.unwrap(), &vs1).await;
let r = change_feed_vs(ds.transaction(Write, Optimistic).await.unwrap(), &vs1).await;
assert_eq!(r.len(), 2);
}
async fn change_feed(mut tx: Transaction, vs: &Versionstamp) -> Vec<ChangeSet> {
#[test_log::test(tokio::test)]
async fn set_with_diff_records_diff_to_achieve_original() {
if !FFLAGS.change_feed_live_queries.enabled() {
return;
}
let ts = Datetime::default();
let ds = init(true).await;
// Create a doc
ds.tick_at(ts.0.timestamp().try_into().unwrap()).await.unwrap();
let thing = Thing {
tb: TB.to_owned(),
id: Id::String("A".to_string()),
};
let ses = Session::owner().with_ns(NS).with_db(DB);
let res =
ds.execute(format!("CREATE {thing} SET value=50").as_str(), &ses, None).await.unwrap();
assert_eq!(res.len(), 1, "{:?}", res);
let res = res.into_iter().next().unwrap();
res.result.unwrap();
// Now update it
ds.tick_at((ts.0.timestamp() + 10).try_into().unwrap()).await.unwrap();
let res = ds
.execute(
format!("UPDATE {thing} SET value=100, new_field=\"new_value\"").as_str(),
&ses,
None,
)
.await
.unwrap();
assert_eq!(res.len(), 1, "{:?}", res);
let res = res.into_iter().next().unwrap();
res.result.unwrap();
// Now read the change feed
let tx = ds.transaction(Write, Optimistic).await.unwrap();
let r = change_feed_ts(tx, &ts).await;
let expected_obj_first = Value::Object(Object::from(map! {
"id".to_string() => Value::Thing(thing.clone()),
"value".to_string() => Value::Number(Number::Int(50)),
}));
let expected_obj_second = Value::Object(Object::from(map! {
"id".to_string() => Value::Thing(thing.clone()),
"value".to_string() => Value::Number(Number::Int(100)),
"new_field".to_string() => Value::Strand(Strand::from("new_value")),
}));
assert_eq!(r.len(), 2, "{:?}", r);
let expected: Vec<ChangeSet> = vec![
ChangeSet(
vs::u64_to_versionstamp(2),
DatabaseMutation(vec![TableMutations(
TB.to_string(),
vec![TableMutation::Set(
Thing::from((TB.to_string(), "A".to_string())),
expected_obj_first,
)],
)]),
),
ChangeSet(
vs::u64_to_versionstamp(4),
DatabaseMutation(vec![TableMutations(
TB.to_string(),
vec![TableMutation::SetWithDiff(
Thing::from((TB.to_string(), "A".to_string())),
expected_obj_second,
vec![
// We need to remove the field to achieve the previous value
Operation::Remove {
path: Idiom::from("new_field"),
},
Operation::Replace {
path: Idiom::from("value"),
value: Value::Number(Number::Int(50)),
},
],
)],
)]),
),
];
assert_eq!(r, expected);
}
async fn change_feed_ts(mut tx: Transaction, ts: &Datetime) -> Vec<ChangeSet> {
let r =
crate::cf::read(&mut tx, NS, DB, Some(TB), ShowSince::Timestamp(ts.clone()), Some(10))
.await
.unwrap();
tx.cancel().await.unwrap();
r
}
async fn change_feed_vs(mut tx: Transaction, vs: &Versionstamp) -> Vec<ChangeSet> {
let r = crate::cf::read(
&mut tx,
NS,
@ -450,7 +548,7 @@ mod tests {
thing
}
async fn init() -> Datastore {
async fn init(store_diff: bool) -> Datastore {
let dns = DefineNamespaceStatement {
name: crate::sql::Ident(NS.to_string()),
..Default::default()
@ -459,15 +557,15 @@ mod tests {
name: crate::sql::Ident(DB.to_string()),
changefeed: Some(ChangeFeed {
expiry: Duration::from_secs(10),
store_original: false,
store_diff,
}),
..Default::default()
};
let dtb = DefineTableStatement {
name: TB.into(),
changefeed: Some(ChangeFeed {
expiry: Duration::from_secs(10),
store_original: false,
expiry: Duration::from_secs(10 * 60),
store_diff,
}),
..Default::default()
};

View file

@ -38,7 +38,7 @@ impl<'a> Document<'a> {
id,
self.initial.doc.clone(),
self.current.doc.clone(),
cf.store_original,
cf.store_diff,
);
}
// Carry on

View file

@ -12,10 +12,7 @@ impl From<js::CaughtError<'_>> for Error {
Some(file) => format!(" at {file}:{line}"),
None => String::default(),
},
match e.message() {
Some(message) => message,
None => String::default(),
},
e.message().unwrap_or_default(),
match e.stack() {
Some(stack) => format!("\n{stack}"),
None => String::default(),

View file

@ -150,12 +150,7 @@ impl Vector {
let norm_a = Self::normalize(a);
let norm_b = Self::normalize(b);
let mut s = Self::dot(&norm_a, &norm_b);
if s < -1.0 {
s = -1.0;
}
if s > 1.0 {
s = 1.0;
}
s = s.clamp(-1.0, 1.0);
1.0 - s
}

View file

@ -68,7 +68,7 @@ mod test {
use crate::cf::TableMutation;
use crate::kvs::lq_v2_doc::construct_document;
use crate::sql::statements::DefineTableStatement;
use crate::sql::{Strand, Thing, Value};
use crate::sql::{Idiom, Object, Operation, Strand, Thing, Value};
#[test]
fn test_construct_document_create() {
@ -99,14 +99,35 @@ mod test {
#[test]
fn test_construct_document_update() {
let thing = Thing::from(("table", "id"));
let value = Value::Strand(Strand::from("value"));
let operations = vec![];
let tb_mutation = TableMutation::SetWithDiff(thing.clone(), value, operations);
let current_value = Value::Object(Object(map! {
"first_field".to_string() => Value::Strand(Strand::from("first_value")),
"second_field".to_string() => Value::Strand(Strand::from("second_value")),
}));
let operations = vec![
Operation::Remove {
path: Idiom::from("first_field"),
},
Operation::Replace {
path: Idiom::from("second_field"),
value: Value::Strand(Strand::from("original_value")),
},
Operation::Add {
path: Idiom::from("third_field"),
value: Value::Strand(Strand::from("third_value")),
},
];
let expected_original = Value::Object(Object(map! {
"second_field".to_string() => Value::Strand(Strand::from("original_value")),
"third_field".to_string() => Value::Strand(Strand::from("third_value")),
}));
let tb_mutation =
TableMutation::SetWithDiff(thing.clone(), current_value.clone(), operations);
let doc = construct_document(&tb_mutation).unwrap();
let doc = doc.unwrap();
assert!(!doc.is_new());
assert!(doc.initial_doc().is_strand(), "{:?}", doc.initial_doc());
assert!(doc.current_doc().is_strand(), "{:?}", doc.current_doc());
assert!(!doc.is_delete());
assert_eq!(doc.initial_doc(), &expected_original, "{:?}", doc.initial_doc());
assert_eq!(doc.current_doc(), &current_value, "{:?}", doc.current_doc());
}
#[test]

View file

@ -32,7 +32,7 @@ impl<'a> Arbitrary<'a> for ChangeFeed {
fn arbitrary(u: &mut Unstructured<'a>) -> Result<Self> {
Ok(Self {
expiry: time::Duration::new(u64::arbitrary(u)?, u32::arbitrary(u)?),
store_original: bool::arbitrary(u)?,
store_diff: bool::arbitrary(u)?,
})
}
}

View file

@ -13,12 +13,12 @@ use std::time;
pub struct ChangeFeed {
pub expiry: time::Duration,
#[revision(start = 2)]
pub store_original: bool,
pub store_diff: bool,
}
impl Display for ChangeFeed {
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
write!(f, "CHANGEFEED {}", Duration(self.expiry))?;
if self.store_original {
if self.store_diff {
write!(f, " INCLUDE ORIGINAL")?;
};
Ok(())
@ -29,7 +29,7 @@ impl Default for ChangeFeed {
fn default() -> Self {
Self {
expiry: time::Duration::from_secs(0),
store_original: false,
store_diff: false,
}
}
}

View file

@ -176,7 +176,7 @@ impl LiveStatement {
.changefeed
.ok_or(Error::LiveQueryError(LiveQueryCause::MissingChangeFeed))?;
// check the change feed includes the original - required for differentiating between CREATE and UPDATE
if !cf.store_original {
if !cf.store_diff {
return Err(Error::LiveQueryError(LiveQueryCause::ChangeFeedNoOriginal));
}
Ok(())

View file

@ -40,7 +40,7 @@ impl ser::Serializer for Serializer {
#[non_exhaustive]
pub struct SerializeChangeFeed {
expiry: Duration,
store_original: bool,
store_diff: bool,
}
impl serde::ser::SerializeStruct for SerializeChangeFeed {
@ -55,8 +55,8 @@ impl serde::ser::SerializeStruct for SerializeChangeFeed {
"expiry" => {
self.expiry = value.serialize(ser::duration::Serializer.wrap())?;
}
"store_original" => {
self.store_original = value.serialize(ser::primitive::bool::Serializer.wrap())?;
"store_diff" => {
self.store_diff = value.serialize(ser::primitive::bool::Serializer.wrap())?;
}
key => {
return Err(Error::custom(format!("unexpected field `ChangeFeed::{key}`")));
@ -68,7 +68,7 @@ impl serde::ser::SerializeStruct for SerializeChangeFeed {
fn end(self) -> Result<Self::Ok, Error> {
Ok(ChangeFeed {
expiry: self.expiry,
store_original: self.store_original,
store_diff: self.store_diff,
})
}
}

View file

@ -341,7 +341,7 @@ impl Parser<'_> {
/// Expects the parser to have already eating the `CHANGEFEED` keyword
pub fn parse_changefeed(&mut self) -> ParseResult<ChangeFeed> {
let expiry = self.next_token_value::<Duration>()?.0;
let store_original = if self.eat(t!("INCLUDE")) {
let store_diff = if self.eat(t!("INCLUDE")) {
expected!(self, TokenKind::ChangeFeedInclude(ChangeFeedInclude::Original));
true
} else {
@ -350,7 +350,7 @@ impl Parser<'_> {
Ok(ChangeFeed {
expiry,
store_original,
store_diff,
})
}

View file

@ -151,7 +151,7 @@ fn parse_define_database() {
comment: Some(Strand("test".to_string())),
changefeed: Some(ChangeFeed {
expiry: std::time::Duration::from_secs(60) * 10,
store_original: true,
store_diff: true,
}),
if_not_exists: false,
}))
@ -323,7 +323,7 @@ fn parse_define_table() {
},
changefeed: Some(ChangeFeed {
expiry: std::time::Duration::from_secs(1),
store_original: true,
store_diff: true,
}),
comment: None,
if_not_exists: false,

View file

@ -166,7 +166,7 @@ fn statements() -> Vec<Statement> {
comment: Some(Strand("test".to_string())),
changefeed: Some(ChangeFeed {
expiry: std::time::Duration::from_secs(60) * 10,
store_original: false,
store_diff: false,
}),
if_not_exists: false,
})),
@ -244,7 +244,7 @@ fn statements() -> Vec<Statement> {
},
changefeed: Some(ChangeFeed {
expiry: std::time::Duration::from_secs(1),
store_original: false,
store_diff: false,
}),
comment: None,
if_not_exists: false,