Change Feed stores original value of difference (#3420)
This commit is contained in:
parent
0ad7e8c363
commit
7b0771acb7
8 changed files with 445 additions and 110 deletions
|
@ -12,13 +12,16 @@ use std::fmt::{self, Display, Formatter};
|
||||||
|
|
||||||
// Mutation is a single mutation to a table.
|
// Mutation is a single mutation to a table.
|
||||||
#[derive(Clone, Debug, Eq, PartialEq, PartialOrd, Serialize, Deserialize, Store, Hash)]
|
#[derive(Clone, Debug, Eq, PartialEq, PartialOrd, Serialize, Deserialize, Store, Hash)]
|
||||||
#[revisioned(revision = 1)]
|
#[revisioned(revision = 2)]
|
||||||
pub enum TableMutation {
|
pub enum TableMutation {
|
||||||
// Although the Value is supposed to contain a field "id" of Thing,
|
// Although the Value is supposed to contain a field "id" of Thing,
|
||||||
// we do include it in the first field for convenience.
|
// we do include it in the first field for convenience.
|
||||||
Set(Thing, Value),
|
Set(Thing, Value),
|
||||||
Del(Thing),
|
Del(Thing),
|
||||||
Def(DefineTableStatement),
|
Def(DefineTableStatement),
|
||||||
|
#[revision(start = 2)]
|
||||||
|
// Includes the previous value that may be None
|
||||||
|
SetPrevious(Thing, Value, Value),
|
||||||
}
|
}
|
||||||
|
|
||||||
impl From<DefineTableStatement> for Value {
|
impl From<DefineTableStatement> for Value {
|
||||||
|
@ -67,7 +70,10 @@ impl TableMutation {
|
||||||
pub fn into_value(self) -> Value {
|
pub fn into_value(self) -> Value {
|
||||||
let (k, v) = match self {
|
let (k, v) = match self {
|
||||||
TableMutation::Set(_t, v) => ("update".to_string(), v),
|
TableMutation::Set(_t, v) => ("update".to_string(), v),
|
||||||
|
TableMutation::SetPrevious(_t, Value::None, v) => ("create".to_string(), v),
|
||||||
|
TableMutation::SetPrevious(_t, _previous, v) => ("update".to_string(), v),
|
||||||
TableMutation::Del(t) => {
|
TableMutation::Del(t) => {
|
||||||
|
// TODO(phughk): Future PR for lq on cf feature, store update in delete for diff and notification
|
||||||
let mut h = BTreeMap::<String, Value>::new();
|
let mut h = BTreeMap::<String, Value>::new();
|
||||||
h.insert("id".to_string(), Value::Thing(t));
|
h.insert("id".to_string(), Value::Thing(t));
|
||||||
let o = Object::from(h);
|
let o = Object::from(h);
|
||||||
|
@ -110,6 +116,7 @@ impl Display for TableMutation {
|
||||||
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
|
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
|
||||||
match self {
|
match self {
|
||||||
TableMutation::Set(id, v) => write!(f, "SET {} {}", id, v),
|
TableMutation::Set(id, v) => write!(f, "SET {} {}", id, v),
|
||||||
|
TableMutation::SetPrevious(id, _previous, v) => write!(f, "SET {} {}", id, v),
|
||||||
TableMutation::Del(id) => write!(f, "DEL {}", id),
|
TableMutation::Del(id) => write!(f, "DEL {}", id),
|
||||||
TableMutation::Def(t) => write!(f, "{}", t),
|
TableMutation::Def(t) => write!(f, "{}", t),
|
||||||
}
|
}
|
||||||
|
@ -160,6 +167,8 @@ impl Default for WriteMutationSet {
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
|
use crate::sql::Strand;
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn serialization() {
|
fn serialization() {
|
||||||
use super::*;
|
use super::*;
|
||||||
|
@ -194,4 +203,54 @@ mod tests {
|
||||||
r#"{"changes":[{"update":{"id":"mytb:tobie","note":"surreal"}},{"delete":{"id":"mytb:tobie"}},{"define_table":{"name":"mytb"}}],"versionstamp":1}"#
|
r#"{"changes":[{"update":{"id":"mytb:tobie","note":"surreal"}},{"delete":{"id":"mytb:tobie"}},{"define_table":{"name":"mytb"}}],"versionstamp":1}"#
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn serialization_rev2() {
|
||||||
|
use super::*;
|
||||||
|
use std::collections::HashMap;
|
||||||
|
let cs = ChangeSet(
|
||||||
|
[0, 0, 0, 0, 0, 0, 0, 0, 0, 1],
|
||||||
|
DatabaseMutation(vec![TableMutations(
|
||||||
|
"mytb".to_string(),
|
||||||
|
vec![
|
||||||
|
TableMutation::SetPrevious(
|
||||||
|
Thing::from(("mytb".to_string(), "tobie".to_string())),
|
||||||
|
Value::None,
|
||||||
|
Value::Object(Object::from(HashMap::from([
|
||||||
|
(
|
||||||
|
"id",
|
||||||
|
Value::from(Thing::from(("mytb".to_string(), "tobie".to_string()))),
|
||||||
|
),
|
||||||
|
("note", Value::from("surreal")),
|
||||||
|
]))),
|
||||||
|
),
|
||||||
|
TableMutation::SetPrevious(
|
||||||
|
Thing::from(("mytb".to_string(), "tobie".to_string())),
|
||||||
|
Value::Strand(Strand::from("this would normally be an object")),
|
||||||
|
Value::Object(Object::from(HashMap::from([
|
||||||
|
(
|
||||||
|
"id",
|
||||||
|
Value::from(Thing::from((
|
||||||
|
"mytb".to_string(),
|
||||||
|
"tobie2".to_string(),
|
||||||
|
))),
|
||||||
|
),
|
||||||
|
("note", Value::from("surreal")),
|
||||||
|
]))),
|
||||||
|
),
|
||||||
|
TableMutation::Del(Thing::from(("mytb".to_string(), "tobie".to_string()))),
|
||||||
|
TableMutation::Def(DefineTableStatement {
|
||||||
|
name: "mytb".into(),
|
||||||
|
..DefineTableStatement::default()
|
||||||
|
}),
|
||||||
|
],
|
||||||
|
)]),
|
||||||
|
);
|
||||||
|
let v = cs.into_value().into_json();
|
||||||
|
let s = serde_json::to_string(&v).unwrap();
|
||||||
|
assert_eq!(
|
||||||
|
s,
|
||||||
|
r#"{"changes":[{"create":{"id":"mytb:tobie","note":"surreal"}},{"update":{"id":"mytb:tobie2","note":"surreal"}},{"delete":{"id":"mytb:tobie"}},{"define_table":{"name":"mytb"}}],"versionstamp":1}"#
|
||||||
|
);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,4 +1,5 @@
|
||||||
use crate::cf::{TableMutation, TableMutations};
|
use crate::cf::{TableMutation, TableMutations};
|
||||||
|
use crate::fflags::FFLAGS;
|
||||||
use crate::kvs::Key;
|
use crate::kvs::Key;
|
||||||
use crate::sql::statements::DefineTableStatement;
|
use crate::sql::statements::DefineTableStatement;
|
||||||
use crate::sql::thing::Thing;
|
use crate::sql::thing::Thing;
|
||||||
|
@ -59,13 +60,24 @@ impl Writer {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn update(&mut self, ns: &str, db: &str, tb: &str, id: Thing, v: Cow<'_, Value>) {
|
pub(crate) fn update(
|
||||||
|
&mut self,
|
||||||
|
ns: &str,
|
||||||
|
db: &str,
|
||||||
|
tb: &str,
|
||||||
|
id: Thing,
|
||||||
|
p: Cow<'_, Value>,
|
||||||
|
v: Cow<'_, Value>,
|
||||||
|
) {
|
||||||
if v.is_some() {
|
if v.is_some() {
|
||||||
self.buf.push(
|
self.buf.push(
|
||||||
ns.to_string(),
|
ns.to_string(),
|
||||||
db.to_string(),
|
db.to_string(),
|
||||||
tb.to_string(),
|
tb.to_string(),
|
||||||
TableMutation::Set(id, v.into_owned()),
|
match FFLAGS.change_feed_live_queries.enabled() {
|
||||||
|
true => TableMutation::SetPrevious(id, p.into_owned(), v.into_owned()),
|
||||||
|
false => TableMutation::Set(id, v.into_owned()),
|
||||||
|
},
|
||||||
);
|
);
|
||||||
} else {
|
} else {
|
||||||
self.buf.push(ns.to_string(), db.to_string(), tb.to_string(), TableMutation::Del(id));
|
self.buf.push(ns.to_string(), db.to_string(), tb.to_string(), TableMutation::Del(id));
|
||||||
|
@ -111,6 +123,7 @@ mod tests {
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
use crate::cf::{ChangeSet, DatabaseMutation, TableMutation, TableMutations};
|
use crate::cf::{ChangeSet, DatabaseMutation, TableMutation, TableMutations};
|
||||||
|
use crate::fflags::FFLAGS;
|
||||||
use crate::key::key_req::KeyRequirements;
|
use crate::key::key_req::KeyRequirements;
|
||||||
use crate::kvs::{Datastore, LockType::*, TransactionType::*};
|
use crate::kvs::{Datastore, LockType::*, TransactionType::*};
|
||||||
use crate::sql::changefeed::ChangeFeed;
|
use crate::sql::changefeed::ChangeFeed;
|
||||||
|
@ -178,7 +191,9 @@ mod tests {
|
||||||
id: Id::String("A".to_string()),
|
id: Id::String("A".to_string()),
|
||||||
};
|
};
|
||||||
let value_a: super::Value = "a".into();
|
let value_a: super::Value = "a".into();
|
||||||
tx1.record_change(ns, db, tb, &thing_a, Cow::Borrowed(&value_a));
|
// TODO(for this PR): This was just added to resolve compile issues but test should be fixed
|
||||||
|
let previous = Cow::from(Value::None);
|
||||||
|
tx1.record_change(ns, db, tb, &thing_a, previous.clone(), Cow::Borrowed(&value_a));
|
||||||
tx1.complete_changes(true).await.unwrap();
|
tx1.complete_changes(true).await.unwrap();
|
||||||
tx1.commit().await.unwrap();
|
tx1.commit().await.unwrap();
|
||||||
|
|
||||||
|
@ -188,7 +203,7 @@ mod tests {
|
||||||
id: Id::String("C".to_string()),
|
id: Id::String("C".to_string()),
|
||||||
};
|
};
|
||||||
let value_c: Value = "c".into();
|
let value_c: Value = "c".into();
|
||||||
tx2.record_change(ns, db, tb, &thing_c, Cow::Borrowed(&value_c));
|
tx2.record_change(ns, db, tb, &thing_c, previous.clone(), Cow::Borrowed(&value_c));
|
||||||
tx2.complete_changes(true).await.unwrap();
|
tx2.complete_changes(true).await.unwrap();
|
||||||
tx2.commit().await.unwrap();
|
tx2.commit().await.unwrap();
|
||||||
|
|
||||||
|
@ -199,13 +214,13 @@ mod tests {
|
||||||
id: Id::String("B".to_string()),
|
id: Id::String("B".to_string()),
|
||||||
};
|
};
|
||||||
let value_b: Value = "b".into();
|
let value_b: Value = "b".into();
|
||||||
tx3.record_change(ns, db, tb, &thing_b, Cow::Borrowed(&value_b));
|
tx3.record_change(ns, db, tb, &thing_b, previous.clone(), Cow::Borrowed(&value_b));
|
||||||
let thing_c2 = Thing {
|
let thing_c2 = Thing {
|
||||||
tb: tb.to_owned(),
|
tb: tb.to_owned(),
|
||||||
id: Id::String("C".to_string()),
|
id: Id::String("C".to_string()),
|
||||||
};
|
};
|
||||||
let value_c2: Value = "c2".into();
|
let value_c2: Value = "c2".into();
|
||||||
tx3.record_change(ns, db, tb, &thing_c2, Cow::Borrowed(&value_c2));
|
tx3.record_change(ns, db, tb, &thing_c2, previous.clone(), Cow::Borrowed(&value_c2));
|
||||||
tx3.complete_changes(true).await.unwrap();
|
tx3.complete_changes(true).await.unwrap();
|
||||||
tx3.commit().await.unwrap();
|
tx3.commit().await.unwrap();
|
||||||
|
|
||||||
|
@ -227,36 +242,64 @@ mod tests {
|
||||||
vs::u64_to_versionstamp(2),
|
vs::u64_to_versionstamp(2),
|
||||||
DatabaseMutation(vec![TableMutations(
|
DatabaseMutation(vec![TableMutations(
|
||||||
"mytb".to_string(),
|
"mytb".to_string(),
|
||||||
vec![TableMutation::Set(
|
match FFLAGS.change_feed_live_queries.enabled() {
|
||||||
Thing::from(("mytb".to_string(), "A".to_string())),
|
true => vec![TableMutation::SetPrevious(
|
||||||
Value::from("a"),
|
Thing::from(("mytb".to_string(), "A".to_string())),
|
||||||
)],
|
Value::None,
|
||||||
|
Value::from("a"),
|
||||||
|
)],
|
||||||
|
false => vec![TableMutation::Set(
|
||||||
|
Thing::from(("mytb".to_string(), "A".to_string())),
|
||||||
|
Value::from("a"),
|
||||||
|
)],
|
||||||
|
},
|
||||||
)]),
|
)]),
|
||||||
),
|
),
|
||||||
ChangeSet(
|
ChangeSet(
|
||||||
vs::u64_to_versionstamp(3),
|
vs::u64_to_versionstamp(3),
|
||||||
DatabaseMutation(vec![TableMutations(
|
DatabaseMutation(vec![TableMutations(
|
||||||
"mytb".to_string(),
|
"mytb".to_string(),
|
||||||
vec![TableMutation::Set(
|
match FFLAGS.change_feed_live_queries.enabled() {
|
||||||
Thing::from(("mytb".to_string(), "C".to_string())),
|
true => vec![TableMutation::SetPrevious(
|
||||||
Value::from("c"),
|
Thing::from(("mytb".to_string(), "C".to_string())),
|
||||||
)],
|
Value::None,
|
||||||
|
Value::from("c"),
|
||||||
|
)],
|
||||||
|
false => vec![TableMutation::Set(
|
||||||
|
Thing::from(("mytb".to_string(), "C".to_string())),
|
||||||
|
Value::from("c"),
|
||||||
|
)],
|
||||||
|
},
|
||||||
)]),
|
)]),
|
||||||
),
|
),
|
||||||
ChangeSet(
|
ChangeSet(
|
||||||
vs::u64_to_versionstamp(4),
|
vs::u64_to_versionstamp(4),
|
||||||
DatabaseMutation(vec![TableMutations(
|
DatabaseMutation(vec![TableMutations(
|
||||||
"mytb".to_string(),
|
"mytb".to_string(),
|
||||||
vec![
|
match FFLAGS.change_feed_live_queries.enabled() {
|
||||||
TableMutation::Set(
|
true => vec![
|
||||||
Thing::from(("mytb".to_string(), "B".to_string())),
|
TableMutation::SetPrevious(
|
||||||
Value::from("b"),
|
Thing::from(("mytb".to_string(), "B".to_string())),
|
||||||
),
|
Value::None,
|
||||||
TableMutation::Set(
|
Value::from("b"),
|
||||||
Thing::from(("mytb".to_string(), "C".to_string())),
|
),
|
||||||
Value::from("c2"),
|
TableMutation::SetPrevious(
|
||||||
),
|
Thing::from(("mytb".to_string(), "C".to_string())),
|
||||||
],
|
Value::None,
|
||||||
|
Value::from("c2"),
|
||||||
|
),
|
||||||
|
],
|
||||||
|
false => vec![
|
||||||
|
TableMutation::Set(
|
||||||
|
Thing::from(("mytb".to_string(), "B".to_string())),
|
||||||
|
Value::from("b"),
|
||||||
|
),
|
||||||
|
TableMutation::Set(
|
||||||
|
Thing::from(("mytb".to_string(), "C".to_string())),
|
||||||
|
Value::from("c2"),
|
||||||
|
),
|
||||||
|
],
|
||||||
|
},
|
||||||
)]),
|
)]),
|
||||||
),
|
),
|
||||||
];
|
];
|
||||||
|
@ -281,16 +324,30 @@ mod tests {
|
||||||
vs::u64_to_versionstamp(4),
|
vs::u64_to_versionstamp(4),
|
||||||
DatabaseMutation(vec![TableMutations(
|
DatabaseMutation(vec![TableMutations(
|
||||||
"mytb".to_string(),
|
"mytb".to_string(),
|
||||||
vec![
|
match FFLAGS.change_feed_live_queries.enabled() {
|
||||||
TableMutation::Set(
|
true => vec![
|
||||||
Thing::from(("mytb".to_string(), "B".to_string())),
|
TableMutation::SetPrevious(
|
||||||
Value::from("b"),
|
Thing::from(("mytb".to_string(), "B".to_string())),
|
||||||
),
|
Value::None,
|
||||||
TableMutation::Set(
|
Value::from("b"),
|
||||||
Thing::from(("mytb".to_string(), "C".to_string())),
|
),
|
||||||
Value::from("c2"),
|
TableMutation::SetPrevious(
|
||||||
),
|
Thing::from(("mytb".to_string(), "C".to_string())),
|
||||||
],
|
Value::None,
|
||||||
|
Value::from("c2"),
|
||||||
|
),
|
||||||
|
],
|
||||||
|
false => vec![
|
||||||
|
TableMutation::Set(
|
||||||
|
Thing::from(("mytb".to_string(), "B".to_string())),
|
||||||
|
Value::from("b"),
|
||||||
|
),
|
||||||
|
TableMutation::Set(
|
||||||
|
Thing::from(("mytb".to_string(), "C".to_string())),
|
||||||
|
Value::from("c2"),
|
||||||
|
),
|
||||||
|
],
|
||||||
|
},
|
||||||
)]),
|
)]),
|
||||||
)];
|
)];
|
||||||
assert_eq!(r, want);
|
assert_eq!(r, want);
|
||||||
|
|
|
@ -31,7 +31,14 @@ impl<'a> Document<'a> {
|
||||||
let tb = tb.name.as_str();
|
let tb = tb.name.as_str();
|
||||||
let id = self.id.as_ref().unwrap();
|
let id = self.id.as_ref().unwrap();
|
||||||
// Create the changefeed entry
|
// Create the changefeed entry
|
||||||
run.record_change(opt.ns(), opt.db(), tb, id, self.current.doc.clone());
|
run.record_change(
|
||||||
|
opt.ns(),
|
||||||
|
opt.db(),
|
||||||
|
tb,
|
||||||
|
id,
|
||||||
|
self.initial.doc.clone(),
|
||||||
|
self.current.doc.clone(),
|
||||||
|
);
|
||||||
}
|
}
|
||||||
// Carry on
|
// Carry on
|
||||||
Ok(())
|
Ok(())
|
||||||
|
|
|
@ -7,6 +7,7 @@
|
||||||
/// Use this while implementing features
|
/// Use this while implementing features
|
||||||
#[allow(dead_code)]
|
#[allow(dead_code)]
|
||||||
pub static FFLAGS: FFlags = FFlags {
|
pub static FFLAGS: FFlags = FFlags {
|
||||||
|
// TODO(fflag-lqcf): This TODO signature marks tests that are affected by the fflag that do not have access to the fflag (scope)
|
||||||
change_feed_live_queries: FFlagEnabledStatus {
|
change_feed_live_queries: FFlagEnabledStatus {
|
||||||
enabled_release: false,
|
enabled_release: false,
|
||||||
enabled_debug: false,
|
enabled_debug: false,
|
||||||
|
|
|
@ -2571,9 +2571,10 @@ impl Transaction {
|
||||||
db: &str,
|
db: &str,
|
||||||
tb: &str,
|
tb: &str,
|
||||||
id: &Thing,
|
id: &Thing,
|
||||||
|
p: Cow<'_, Value>,
|
||||||
v: Cow<'_, Value>,
|
v: Cow<'_, Value>,
|
||||||
) {
|
) {
|
||||||
self.cf.update(ns, db, tb, id.clone(), v)
|
self.cf.update(ns, db, tb, id.clone(), p, v)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Records the table (re)definition in the changefeed if enabled.
|
// Records the table (re)definition in the changefeed if enabled.
|
||||||
|
|
|
@ -1,5 +1,7 @@
|
||||||
// Tests common to all protocols and storage engines
|
// Tests common to all protocols and storage engines
|
||||||
|
|
||||||
|
use surrealdb_core::fflags::FFLAGS;
|
||||||
|
|
||||||
static PERMITS: Semaphore = Semaphore::const_new(1);
|
static PERMITS: Semaphore = Semaphore::const_new(1);
|
||||||
|
|
||||||
#[test_log::test(tokio::test)]
|
#[test_log::test(tokio::test)]
|
||||||
|
@ -953,20 +955,40 @@ async fn changefeed() {
|
||||||
unreachable!()
|
unreachable!()
|
||||||
};
|
};
|
||||||
let changes = a.get("changes").unwrap().to_owned();
|
let changes = a.get("changes").unwrap().to_owned();
|
||||||
assert_eq!(
|
match FFLAGS.change_feed_live_queries.enabled() {
|
||||||
changes,
|
true => {
|
||||||
surrealdb::sql::value(
|
assert_eq!(
|
||||||
"[
|
changes,
|
||||||
{
|
surrealdb::sql::value(
|
||||||
update: {
|
r#"[
|
||||||
id: user:amos,
|
{
|
||||||
name: 'Amos'
|
create: {
|
||||||
}
|
id: user:amos,
|
||||||
|
name: 'Amos'
|
||||||
|
}
|
||||||
|
}
|
||||||
|
]"#
|
||||||
|
)
|
||||||
|
.unwrap()
|
||||||
|
);
|
||||||
}
|
}
|
||||||
]"
|
false => {
|
||||||
)
|
assert_eq!(
|
||||||
.unwrap()
|
changes,
|
||||||
);
|
surrealdb::sql::value(
|
||||||
|
r#"[
|
||||||
|
{
|
||||||
|
update: {
|
||||||
|
id: user:amos,
|
||||||
|
name: 'Amos'
|
||||||
|
}
|
||||||
|
}
|
||||||
|
]"#
|
||||||
|
)
|
||||||
|
.unwrap()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
// UPDATE user:jane
|
// UPDATE user:jane
|
||||||
let a = array.get(2).unwrap();
|
let a = array.get(2).unwrap();
|
||||||
let Value::Object(a) = a else {
|
let Value::Object(a) = a else {
|
||||||
|
@ -977,20 +999,40 @@ async fn changefeed() {
|
||||||
};
|
};
|
||||||
assert!(versionstamp1 < versionstamp2);
|
assert!(versionstamp1 < versionstamp2);
|
||||||
let changes = a.get("changes").unwrap().to_owned();
|
let changes = a.get("changes").unwrap().to_owned();
|
||||||
assert_eq!(
|
match FFLAGS.change_feed_live_queries.enabled() {
|
||||||
changes,
|
true => {
|
||||||
surrealdb::sql::value(
|
assert_eq!(
|
||||||
"[
|
changes,
|
||||||
{
|
surrealdb::sql::value(
|
||||||
update: {
|
"[
|
||||||
id: user:jane,
|
{
|
||||||
name: 'Jane'
|
create: {
|
||||||
}
|
id: user:jane,
|
||||||
|
name: 'Jane'
|
||||||
|
}
|
||||||
|
}
|
||||||
|
]"
|
||||||
|
)
|
||||||
|
.unwrap()
|
||||||
|
);
|
||||||
}
|
}
|
||||||
]"
|
false => {
|
||||||
)
|
assert_eq!(
|
||||||
.unwrap()
|
changes,
|
||||||
);
|
surrealdb::sql::value(
|
||||||
|
"[
|
||||||
|
{
|
||||||
|
update: {
|
||||||
|
id: user:jane,
|
||||||
|
name: 'Jane'
|
||||||
|
}
|
||||||
|
}
|
||||||
|
]"
|
||||||
|
)
|
||||||
|
.unwrap()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
// UPDATE user:amos
|
// UPDATE user:amos
|
||||||
let a = array.get(3).unwrap();
|
let a = array.get(3).unwrap();
|
||||||
let Value::Object(a) = a else {
|
let Value::Object(a) = a else {
|
||||||
|
|
|
@ -6,6 +6,7 @@ use helpers::new_ds;
|
||||||
use surrealdb::dbs::Session;
|
use surrealdb::dbs::Session;
|
||||||
use surrealdb::err::Error;
|
use surrealdb::err::Error;
|
||||||
use surrealdb::sql::Value;
|
use surrealdb::sql::Value;
|
||||||
|
use surrealdb_core::fflags::{FFlags, FFLAGS};
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn database_change_feeds() -> Result<(), Error> {
|
async fn database_change_feeds() -> Result<(), Error> {
|
||||||
|
@ -64,8 +65,34 @@ async fn database_change_feeds() -> Result<(), Error> {
|
||||||
assert_eq!(tmp, val);
|
assert_eq!(tmp, val);
|
||||||
// SHOW CHANGES
|
// SHOW CHANGES
|
||||||
let tmp = res.remove(0).result?;
|
let tmp = res.remove(0).result?;
|
||||||
let val = Value::parse(
|
let val = match FFLAGS.change_feed_live_queries.enabled() {
|
||||||
"[
|
true => Value::parse(
|
||||||
|
"[
|
||||||
|
{
|
||||||
|
versionstamp: 65536,
|
||||||
|
changes: [
|
||||||
|
{
|
||||||
|
create: {
|
||||||
|
id: person:test,
|
||||||
|
name: 'Name: Tobie'
|
||||||
|
}
|
||||||
|
}
|
||||||
|
]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
versionstamp: 131072,
|
||||||
|
changes: [
|
||||||
|
{
|
||||||
|
delete: {
|
||||||
|
id: person:test
|
||||||
|
}
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
]",
|
||||||
|
),
|
||||||
|
false => Value::parse(
|
||||||
|
"[
|
||||||
{
|
{
|
||||||
versionstamp: 65536,
|
versionstamp: 65536,
|
||||||
changes: [
|
changes: [
|
||||||
|
@ -88,7 +115,8 @@ async fn database_change_feeds() -> Result<(), Error> {
|
||||||
]
|
]
|
||||||
}
|
}
|
||||||
]",
|
]",
|
||||||
);
|
),
|
||||||
|
};
|
||||||
assert_eq!(tmp, val);
|
assert_eq!(tmp, val);
|
||||||
// Retain for 1h
|
// Retain for 1h
|
||||||
let sql = "
|
let sql = "
|
||||||
|
@ -202,8 +230,77 @@ async fn table_change_feeds() -> Result<(), Error> {
|
||||||
let _tmp = res.remove(0).result?;
|
let _tmp = res.remove(0).result?;
|
||||||
// SHOW CHANGES
|
// SHOW CHANGES
|
||||||
let tmp = res.remove(0).result?;
|
let tmp = res.remove(0).result?;
|
||||||
let val = Value::parse(
|
let val = match FFLAGS.change_feed_live_queries.enabled() {
|
||||||
"[
|
true => Value::parse(
|
||||||
|
"[
|
||||||
|
{
|
||||||
|
versionstamp: 65536,
|
||||||
|
changes: [
|
||||||
|
{
|
||||||
|
define_table: {
|
||||||
|
name: 'person'
|
||||||
|
}
|
||||||
|
}
|
||||||
|
]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
versionstamp: 131072,
|
||||||
|
changes: [
|
||||||
|
{
|
||||||
|
create: {
|
||||||
|
id: person:test,
|
||||||
|
name: 'Name: Tobie'
|
||||||
|
}
|
||||||
|
}
|
||||||
|
]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
versionstamp: 196608,
|
||||||
|
changes: [
|
||||||
|
{
|
||||||
|
update: {
|
||||||
|
id: person:test,
|
||||||
|
name: 'Name: Jaime'
|
||||||
|
}
|
||||||
|
}
|
||||||
|
]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
versionstamp: 262144,
|
||||||
|
changes: [
|
||||||
|
{
|
||||||
|
update: {
|
||||||
|
id: person:test,
|
||||||
|
name: 'Name: Tobie'
|
||||||
|
}
|
||||||
|
}
|
||||||
|
]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
versionstamp: 327680,
|
||||||
|
changes: [
|
||||||
|
{
|
||||||
|
delete: {
|
||||||
|
id: person:test
|
||||||
|
}
|
||||||
|
}
|
||||||
|
]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
versionstamp: 393216,
|
||||||
|
changes: [
|
||||||
|
{
|
||||||
|
create: {
|
||||||
|
id: person:1000,
|
||||||
|
name: 'Name: Yusuke'
|
||||||
|
}
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
]",
|
||||||
|
),
|
||||||
|
false => Value::parse(
|
||||||
|
"[
|
||||||
{
|
{
|
||||||
versionstamp: 65536,
|
versionstamp: 65536,
|
||||||
changes: [
|
changes: [
|
||||||
|
@ -269,7 +366,8 @@ async fn table_change_feeds() -> Result<(), Error> {
|
||||||
]
|
]
|
||||||
}
|
}
|
||||||
]",
|
]",
|
||||||
);
|
),
|
||||||
|
};
|
||||||
assert_eq!(tmp, val);
|
assert_eq!(tmp, val);
|
||||||
// Retain for 1h
|
// Retain for 1h
|
||||||
let sql = "
|
let sql = "
|
||||||
|
@ -370,10 +468,28 @@ async fn changefeed_with_ts() -> Result<(), Error> {
|
||||||
unreachable!()
|
unreachable!()
|
||||||
};
|
};
|
||||||
let changes = a.get("changes").unwrap().to_owned();
|
let changes = a.get("changes").unwrap().to_owned();
|
||||||
assert_eq!(
|
match FFLAGS.change_feed_live_queries.enabled() {
|
||||||
changes,
|
true => {
|
||||||
surrealdb::sql::value(
|
assert_eq!(
|
||||||
"[
|
changes,
|
||||||
|
surrealdb::sql::value(
|
||||||
|
"[
|
||||||
|
{
|
||||||
|
create: {
|
||||||
|
id: user:amos,
|
||||||
|
name: 'Amos'
|
||||||
|
}
|
||||||
|
}
|
||||||
|
]"
|
||||||
|
)
|
||||||
|
.unwrap()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
false => {
|
||||||
|
assert_eq!(
|
||||||
|
changes,
|
||||||
|
surrealdb::sql::value(
|
||||||
|
"[
|
||||||
{
|
{
|
||||||
update: {
|
update: {
|
||||||
id: user:amos,
|
id: user:amos,
|
||||||
|
@ -381,9 +497,11 @@ async fn changefeed_with_ts() -> Result<(), Error> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
]"
|
]"
|
||||||
)
|
)
|
||||||
.unwrap()
|
.unwrap()
|
||||||
);
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
// UPDATE user:jane
|
// UPDATE user:jane
|
||||||
let a = array.get(2).unwrap();
|
let a = array.get(2).unwrap();
|
||||||
let Value::Object(a) = a else {
|
let Value::Object(a) = a else {
|
||||||
|
@ -394,20 +512,40 @@ async fn changefeed_with_ts() -> Result<(), Error> {
|
||||||
};
|
};
|
||||||
assert!(versionstamp2 < versionstamp3);
|
assert!(versionstamp2 < versionstamp3);
|
||||||
let changes = a.get("changes").unwrap().to_owned();
|
let changes = a.get("changes").unwrap().to_owned();
|
||||||
assert_eq!(
|
match FFLAGS.change_feed_live_queries.enabled() {
|
||||||
changes,
|
true => {
|
||||||
surrealdb::sql::value(
|
assert_eq!(
|
||||||
"[
|
changes,
|
||||||
{
|
surrealdb::sql::value(
|
||||||
update: {
|
"[
|
||||||
id: user:jane,
|
{
|
||||||
name: 'Jane'
|
create: {
|
||||||
}
|
id: user:jane,
|
||||||
|
name: 'Jane'
|
||||||
|
}
|
||||||
|
}
|
||||||
|
]"
|
||||||
|
)
|
||||||
|
.unwrap()
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
false => {
|
||||||
|
assert_eq!(
|
||||||
|
changes,
|
||||||
|
surrealdb::sql::value(
|
||||||
|
"[
|
||||||
|
{
|
||||||
|
update: {
|
||||||
|
id: user:jane,
|
||||||
|
name: 'Jane'
|
||||||
|
}
|
||||||
|
}
|
||||||
]"
|
]"
|
||||||
)
|
)
|
||||||
.unwrap()
|
.unwrap()
|
||||||
);
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
// UPDATE user:amos
|
// UPDATE user:amos
|
||||||
let a = array.get(3).unwrap();
|
let a = array.get(3).unwrap();
|
||||||
let Value::Object(a) = a else {
|
let Value::Object(a) = a else {
|
||||||
|
@ -485,20 +623,40 @@ async fn changefeed_with_ts() -> Result<(), Error> {
|
||||||
};
|
};
|
||||||
assert!(versionstamp2 == versionstamp1b);
|
assert!(versionstamp2 == versionstamp1b);
|
||||||
let changes = a.get("changes").unwrap().to_owned();
|
let changes = a.get("changes").unwrap().to_owned();
|
||||||
assert_eq!(
|
match FFLAGS.change_feed_live_queries.enabled() {
|
||||||
changes,
|
true => {
|
||||||
surrealdb::sql::value(
|
assert_eq!(
|
||||||
"[
|
changes,
|
||||||
{
|
surrealdb::sql::value(
|
||||||
update: {
|
"[
|
||||||
id: user:amos,
|
{
|
||||||
name: 'Amos'
|
create: {
|
||||||
}
|
id: user:amos,
|
||||||
|
name: 'Amos'
|
||||||
|
}
|
||||||
|
}
|
||||||
|
]"
|
||||||
|
)
|
||||||
|
.unwrap()
|
||||||
|
);
|
||||||
}
|
}
|
||||||
]"
|
false => {
|
||||||
)
|
assert_eq!(
|
||||||
.unwrap()
|
changes,
|
||||||
);
|
surrealdb::sql::value(
|
||||||
|
"[
|
||||||
|
{
|
||||||
|
update: {
|
||||||
|
id: user:amos,
|
||||||
|
name: 'Amos'
|
||||||
|
}
|
||||||
|
}
|
||||||
|
]"
|
||||||
|
)
|
||||||
|
.unwrap()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
// Save timestamp 3
|
// Save timestamp 3
|
||||||
let ts3_dt = "2023-08-01T00:00:10Z";
|
let ts3_dt = "2023-08-01T00:00:10Z";
|
||||||
let ts3 = DateTime::parse_from_rfc3339(ts3_dt).unwrap();
|
let ts3 = DateTime::parse_from_rfc3339(ts3_dt).unwrap();
|
||||||
|
|
|
@ -9,6 +9,7 @@ mod cli_integration {
|
||||||
use std::fs;
|
use std::fs;
|
||||||
use std::fs::File;
|
use std::fs::File;
|
||||||
use std::time;
|
use std::time;
|
||||||
|
use surrealdb::fflags::FFLAGS;
|
||||||
use test_log::test;
|
use test_log::test;
|
||||||
use tokio::time::sleep;
|
use tokio::time::sleep;
|
||||||
use tracing::info;
|
use tracing::info;
|
||||||
|
@ -676,15 +677,24 @@ mod cli_integration {
|
||||||
let args = format!(
|
let args = format!(
|
||||||
"sql --conn http://{addr} {creds} --ns {ns} --db {db} --multi --hide-welcome"
|
"sql --conn http://{addr} {creds} --ns {ns} --db {db} --multi --hide-welcome"
|
||||||
);
|
);
|
||||||
assert_eq!(
|
if FFLAGS.change_feed_live_queries.enabled() {
|
||||||
common::run(&args)
|
assert_eq!(
|
||||||
.input("SHOW CHANGES FOR TABLE thing SINCE 0 LIMIT 10;\n")
|
common::run(&args)
|
||||||
.output(),
|
.input("SHOW CHANGES FOR TABLE thing SINCE 0 LIMIT 10;\n")
|
||||||
Ok("[[{ changes: [{ define_table: { name: 'thing' } }], versionstamp: 65536 }, { changes: [{ update: { id: thing:one } }], versionstamp: 131072 }]]\n\n"
|
.output(),
|
||||||
.to_owned()),
|
Ok("[[{ changes: [{ define_table: { name: 'thing' } }], versionstamp: 65536 }, { changes: [{ create: { id: thing:one } }], versionstamp: 131072 }]]\n\n"
|
||||||
"failed to send sql: {args}"
|
.to_owned()),
|
||||||
);
|
"failed to send sql: {args}");
|
||||||
}
|
} else {
|
||||||
|
assert_eq!(
|
||||||
|
common::run(&args)
|
||||||
|
.input("SHOW CHANGES FOR TABLE thing SINCE 0 LIMIT 10;\n")
|
||||||
|
.output(),
|
||||||
|
Ok("[[{ changes: [{ define_table: { name: 'thing' } }], versionstamp: 65536 }, { changes: [{ update: { id: thing:one } }], versionstamp: 131072 }]]\n\n"
|
||||||
|
.to_owned()),
|
||||||
|
"failed to send sql: {args}" );
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
sleep(TWO_SECS).await;
|
sleep(TWO_SECS).await;
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue