Add original value to Del in change feeds (#3969)
This commit is contained in:
parent
0d67160200
commit
63a7caf40e
7 changed files with 104 additions and 18 deletions
core/src
lib/tests/api
|
@ -28,6 +28,9 @@ pub enum TableMutation {
|
|||
/// Example, ("mytb:tobie", {{"note": "surreal"}}, [{"op": "add", "path": "/note", "value": "surreal"}], false)
|
||||
/// Means that we have already applied the add "/note" operation to achieve the recorded result
|
||||
SetWithDiff(Thing, Value, Vec<Operation>),
|
||||
#[revision(start = 2)]
|
||||
/// Delete a record where the ID is stored, and the now-deleted value
|
||||
DelWithOriginal(Thing, Value),
|
||||
}
|
||||
|
||||
impl From<DefineTableStatement> for Value {
|
||||
|
@ -105,17 +108,27 @@ impl TableMutation {
|
|||
h
|
||||
}
|
||||
TableMutation::Del(t) => {
|
||||
// TODO(SUR-329): Store update in delete for diff and notification
|
||||
let mut other = BTreeMap::<String, Value>::new();
|
||||
other.insert("id".to_string(), Value::Thing(t));
|
||||
let o = Object::from(other);
|
||||
h.insert("delete".to_string(), Value::Object(o));
|
||||
h.insert(
|
||||
"delete".to_string(),
|
||||
Value::Object(Object::from(map! {
|
||||
"id".to_string() => Value::Thing(t)
|
||||
})),
|
||||
);
|
||||
h
|
||||
}
|
||||
TableMutation::Def(t) => {
|
||||
h.insert("define_table".to_string(), Value::from(t));
|
||||
h
|
||||
}
|
||||
TableMutation::DelWithOriginal(id, _val) => {
|
||||
h.insert(
|
||||
"delete".to_string(),
|
||||
Value::Object(Object::from(map! {
|
||||
"id".to_string() => Value::Thing(id),
|
||||
})),
|
||||
);
|
||||
h
|
||||
}
|
||||
};
|
||||
let o = crate::sql::object::Object::from(h);
|
||||
Value::Object(o)
|
||||
|
@ -151,6 +164,7 @@ impl Display for TableMutation {
|
|||
TableMutation::Set(id, v) => write!(f, "SET {} {}", id, v),
|
||||
TableMutation::SetWithDiff(id, _previous, v) => write!(f, "SET {} {:?}", id, v),
|
||||
TableMutation::Del(id) => write!(f, "DEL {}", id),
|
||||
TableMutation::DelWithOriginal(id, _) => write!(f, "DEL {}", id),
|
||||
TableMutation::Def(t) => write!(f, "{}", t),
|
||||
}
|
||||
}
|
||||
|
@ -276,6 +290,13 @@ mod tests {
|
|||
}],
|
||||
),
|
||||
TableMutation::Del(Thing::from(("mytb".to_string(), "tobie".to_string()))),
|
||||
TableMutation::DelWithOriginal(
|
||||
Thing::from(("mytb".to_string(), "tobie".to_string())),
|
||||
Value::Object(Object::from(map! {
|
||||
"id" => Value::from(Thing::from(("mytb".to_string(), "tobie".to_string()))),
|
||||
"note" => Value::from("surreal"),
|
||||
})),
|
||||
),
|
||||
TableMutation::Def(DefineTableStatement {
|
||||
name: "mytb".into(),
|
||||
..DefineTableStatement::default()
|
||||
|
@ -287,7 +308,7 @@ mod tests {
|
|||
let s = serde_json::to_string(&v).unwrap();
|
||||
assert_eq!(
|
||||
s,
|
||||
r#"{"changes":[{"current":{"id":"mytb:tobie","note":"surreal"},"update":[{"op":"add","path":"/`/note`","value":"surreal"}]},{"current":{"id":"mytb:tobie2","note":"surreal"},"update":[{"op":"remove","path":"/`/temp`"}]},{"delete":{"id":"mytb:tobie"}},{"define_table":{"name":"mytb"}}],"versionstamp":65536}"#
|
||||
r#"{"changes":[{"current":{"id":"mytb:tobie","note":"surreal"},"update":[{"op":"add","path":"/`/note`","value":"surreal"}]},{"current":{"id":"mytb:tobie2","note":"surreal"},"update":[{"op":"remove","path":"/`/temp`"}]},{"delete":{"id":"mytb:tobie"}},{"delete":{"id":"mytb:tobie"}},{"define_table":{"name":"mytb"}}],"versionstamp":65536}"#
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -100,7 +100,15 @@ impl Writer {
|
|||
},
|
||||
);
|
||||
} else {
|
||||
self.buf.push(ns.to_string(), db.to_string(), tb.to_string(), TableMutation::Del(id));
|
||||
self.buf.push(
|
||||
ns.to_string(),
|
||||
db.to_string(),
|
||||
tb.to_string(),
|
||||
match store_difference {
|
||||
true => TableMutation::DelWithOriginal(id, previous.into_owned()),
|
||||
false => TableMutation::Del(id),
|
||||
},
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -120,10 +120,14 @@ impl<'a> Document<'a> {
|
|||
}
|
||||
}
|
||||
|
||||
/// Get the current document, as it is being modified
|
||||
#[allow(unused)]
|
||||
pub(crate) fn current_doc(&self) -> &Value {
|
||||
self.current.doc.as_ref()
|
||||
}
|
||||
|
||||
/// Get the initial version of the document before it is modified
|
||||
#[allow(unused)]
|
||||
pub(crate) fn initial_doc(&self) -> &Value {
|
||||
self.initial.doc.as_ref()
|
||||
}
|
||||
|
|
|
@ -60,6 +60,17 @@ pub(in crate::kvs) fn construct_document(
|
|||
// TODO(SUR-328): reverse diff and apply to doc to retrieve original version of doc
|
||||
Ok(Some(doc))
|
||||
}
|
||||
TableMutation::DelWithOriginal(id, val) => {
|
||||
let doc = Document::new_artificial(
|
||||
None,
|
||||
Some(id),
|
||||
None,
|
||||
Cow::Owned(Value::None),
|
||||
Cow::Borrowed(val),
|
||||
Workable::Normal,
|
||||
);
|
||||
Ok(Some(doc))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -151,6 +162,31 @@ mod test {
|
|||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_construct_document_delete_with_original() {
|
||||
let thing = Thing::from(("table", "id"));
|
||||
let original = Value::Object(Object(map! {
|
||||
"id".to_string() => Value::Thing(thing.clone()),
|
||||
"some_key".to_string() => Value::Strand(Strand::from("some_value")),
|
||||
}));
|
||||
let tb_mutation = TableMutation::DelWithOriginal(thing.clone(), original);
|
||||
let doc = construct_document(&tb_mutation).unwrap();
|
||||
let doc = doc.unwrap();
|
||||
// The previous and current doc values are "None", so technically this is a new doc as per
|
||||
// current == None
|
||||
assert!(!doc.is_new(), "{:?}", doc);
|
||||
assert!(doc.is_delete(), "{:?}", doc);
|
||||
assert!(doc.current_doc().is_none());
|
||||
assert!(doc.initial_doc().is_some());
|
||||
match doc.initial_doc() {
|
||||
Value::Object(o) => {
|
||||
assert!(o.contains_key("id"));
|
||||
assert_eq!(o.get("id").unwrap(), &Value::Thing(thing));
|
||||
}
|
||||
_ => panic!("Initial doc should be an object"),
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_construct_document_none_for_schema() {
|
||||
let tb_mutation = TableMutation::Def(DefineTableStatement::default());
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
use crate::cf;
|
||||
use crate::cf::{ChangeSet, TableMutation};
|
||||
use crate::cf::ChangeSet;
|
||||
use crate::dbs::{Options, Statement};
|
||||
use crate::err::Error;
|
||||
use crate::fflags::FFLAGS;
|
||||
|
@ -167,13 +167,6 @@ async fn process_change_set_for_notifications(
|
|||
// for the current state we only forward
|
||||
let (local_notification_channel_sender, local_notification_channel_recv) =
|
||||
channel::bounded(notification_capacity);
|
||||
if doc.initial_doc().is_none()
|
||||
&& doc.current_doc().is_none()
|
||||
&& !matches!(mutation, TableMutation::Del(_))
|
||||
{
|
||||
// If we have a None to None mutation, and it isn't delete, then it indicates a bad document
|
||||
panic!("Doc was wrong and the mutation was {:?}", mutation);
|
||||
}
|
||||
doc.check_lqs_and_send_notifications(
|
||||
stk,
|
||||
opt,
|
||||
|
|
|
@ -43,7 +43,7 @@ async fn scan_node_lq() {
|
|||
|
||||
#[test_log::test(tokio::test)]
|
||||
async fn live_params_are_evaluated() {
|
||||
if !FFLAGS.change_feed_live_queries.enabled() {
|
||||
if !crate::fflags::FFLAGS.change_feed_live_queries.enabled() {
|
||||
return;
|
||||
}
|
||||
let node_id = Uuid::parse_str("9cb22db9-1851-4781-8847-d781a3f373ae").unwrap();
|
||||
|
|
|
@ -204,9 +204,11 @@ async fn live_select_record_ranges() {
|
|||
|
||||
// Delete the record
|
||||
let _: Option<RecordId> = db.delete(¬ification.data.id).await.unwrap();
|
||||
|
||||
// Pull the notification
|
||||
let notification: Notification<RecordId> =
|
||||
tokio::time::timeout(LQ_TIMEOUT, users.next()).await.unwrap().unwrap().unwrap();
|
||||
|
||||
// It should be deleted
|
||||
assert_eq!(notification.action, Action::Delete);
|
||||
}
|
||||
|
@ -226,7 +228,11 @@ async fn live_select_record_ranges() {
|
|||
db.select(Resource::from(&table)).range("jane".."john").live().await.unwrap();
|
||||
|
||||
// Create a record
|
||||
db.create(Resource::from((table, "job"))).await.unwrap();
|
||||
let created_value = match db.create(Resource::from((table, "job"))).await.unwrap() {
|
||||
Value::Object(created_value) => created_value,
|
||||
_ => panic!("Expected an object"),
|
||||
};
|
||||
|
||||
// Pull the notification
|
||||
let notification: Notification<Value> =
|
||||
tokio::time::timeout(LQ_TIMEOUT, users.next()).await.unwrap().unwrap();
|
||||
|
@ -234,6 +240,25 @@ async fn live_select_record_ranges() {
|
|||
assert!(notification.data.is_object());
|
||||
// It should be newly created
|
||||
assert_eq!(notification.action, Action::Create);
|
||||
|
||||
// Delete the record
|
||||
let thing = match created_value.0.get("id").unwrap() {
|
||||
Value::Thing(thing) => thing,
|
||||
_ => panic!("Expected a thing"),
|
||||
};
|
||||
db.query("DELETE $item").bind(("item", thing.clone())).await.unwrap();
|
||||
|
||||
// Pull the notification
|
||||
let notification: Notification<Value> =
|
||||
tokio::time::timeout(LQ_TIMEOUT, users.next()).await.unwrap().unwrap();
|
||||
|
||||
// It should be deleted
|
||||
assert_eq!(notification.action, Action::Delete);
|
||||
let notification = match notification.data {
|
||||
Value::Object(notification) => notification,
|
||||
_ => panic!("Expected an object"),
|
||||
};
|
||||
assert_eq!(notification.0, created_value.0);
|
||||
}
|
||||
|
||||
drop(permit);
|
||||
|
@ -244,7 +269,6 @@ async fn live_select_query() {
|
|||
let (permit, db) = new_db().await;
|
||||
|
||||
db.use_ns(NS).use_db(Ulid::new().to_string()).await.unwrap();
|
||||
|
||||
{
|
||||
let table = format!("table_{}", Ulid::new());
|
||||
if FFLAGS.change_feed_live_queries.enabled() {
|
||||
|
|
Loading…
Reference in a new issue