f93f4999ed
Co-authored-by: Mees Delzenne <mees.delzenne@gmail.com>
737 lines
14 KiB
Rust
737 lines
14 KiB
Rust
use chrono::DateTime;
|
|
|
|
use helpers::new_ds;
|
|
use parse::Parse;
|
|
use surrealdb::dbs::Session;
|
|
use surrealdb::err::Error;
|
|
use surrealdb::fflags::FFLAGS;
|
|
use surrealdb::kvs::Datastore;
|
|
use surrealdb::kvs::LockType::Optimistic;
|
|
use surrealdb::kvs::TransactionType::Write;
|
|
use surrealdb::sql::Value;
|
|
|
|
mod helpers;
|
|
mod parse;
|
|
|
|
#[test_log::test(tokio::test)]
|
|
async fn database_change_feeds() -> Result<(), Error> {
|
|
// This is a unique shared identifier
|
|
let identifier = "alpaca";
|
|
let ns = format!("namespace_{identifier}");
|
|
let db = format!("database_{identifier}");
|
|
let sql = format!(
|
|
"
|
|
DEFINE DATABASE {db} CHANGEFEED 1h;
|
|
DEFINE TABLE person;
|
|
DEFINE FIELD name ON TABLE person
|
|
ASSERT
|
|
IF $input THEN
|
|
$input = /^[A-Z]{{1}}[a-z]+$/
|
|
ELSE
|
|
true
|
|
END
|
|
VALUE
|
|
IF $input THEN
|
|
'Name: ' + $input
|
|
ELSE
|
|
$value
|
|
END
|
|
;
|
|
"
|
|
);
|
|
let sql2 = "
|
|
UPDATE person:test CONTENT { name: 'Tobie' };
|
|
DELETE person:test;
|
|
SHOW CHANGES FOR TABLE person SINCE 0;
|
|
";
|
|
let dbs = new_ds().await?;
|
|
let ses = Session::owner().with_ns(ns.as_str()).with_db(db.as_str());
|
|
let mut current_time = 0u64;
|
|
dbs.tick_at(current_time).await?;
|
|
let res = &mut dbs.execute(sql.as_str(), &ses, None).await?;
|
|
// Increment by a second (sic)
|
|
current_time += 1;
|
|
dbs.tick_at(current_time).await?;
|
|
assert_eq!(res.len(), 3);
|
|
// DEFINE DATABASE
|
|
let tmp = res.remove(0).result;
|
|
assert!(tmp.is_ok());
|
|
// DEFINE TABLE
|
|
let tmp = res.remove(0).result;
|
|
assert!(tmp.is_ok());
|
|
// DEFINE FIELD
|
|
let tmp = res.remove(0).result;
|
|
assert!(tmp.is_ok());
|
|
|
|
let cf_val_arr = match FFLAGS.change_feed_live_queries.enabled() {
|
|
true => Value::parse(
|
|
"[
|
|
{
|
|
versionstamp: 2,
|
|
changes: [
|
|
{
|
|
create: {
|
|
id: person:test,
|
|
name: 'Name: Tobie'
|
|
}
|
|
}
|
|
]
|
|
},
|
|
{
|
|
versionstamp: 3,
|
|
changes: [
|
|
{
|
|
delete: {
|
|
id: person:test
|
|
}
|
|
}
|
|
]
|
|
}
|
|
]",
|
|
),
|
|
false => Value::parse(
|
|
"[
|
|
{
|
|
versionstamp: 2,
|
|
changes: [
|
|
{
|
|
update: {
|
|
id: person:test,
|
|
name: 'Name: Tobie'
|
|
}
|
|
}
|
|
]
|
|
},
|
|
{
|
|
versionstamp: 3,
|
|
changes: [
|
|
{
|
|
delete: {
|
|
id: person:test
|
|
}
|
|
}
|
|
]
|
|
}
|
|
]",
|
|
),
|
|
};
|
|
|
|
// Declare check that is repeatable
|
|
async fn check_test(
|
|
dbs: &Datastore,
|
|
sql2: &str,
|
|
ses: &Session,
|
|
cf_val_arr: &Value,
|
|
) -> Result<(), String> {
|
|
let res = &mut dbs.execute(sql2, ses, None).await?;
|
|
assert_eq!(res.len(), 3);
|
|
// UPDATE CONTENT
|
|
let tmp = res.remove(0).result?;
|
|
let val = Value::parse(
|
|
"[
|
|
{
|
|
id: person:test,
|
|
name: 'Name: Tobie',
|
|
}
|
|
]",
|
|
);
|
|
Some(&tmp)
|
|
.filter(|x| *x == &val)
|
|
.map(|v| ())
|
|
.ok_or(format!("Expected UPDATE value:\nleft: {}\nright: {}", tmp, val))?;
|
|
// DELETE
|
|
let tmp = res.remove(0).result?;
|
|
let val = Value::parse("[]");
|
|
Some(&tmp)
|
|
.filter(|x| *x == &val)
|
|
.map(|v| ())
|
|
.ok_or(format!("Expected DELETE value:\nleft: {}\nright: {}", tmp, val))?;
|
|
// SHOW CHANGES
|
|
let tmp = res.remove(0).result?;
|
|
Some(&tmp)
|
|
.filter(|x| *x == cf_val_arr)
|
|
.map(|v| ())
|
|
.ok_or(format!("Expected SHOW CHANGES value:\nleft: {}\nright: {}", tmp, cf_val_arr))?;
|
|
Ok(())
|
|
}
|
|
|
|
// Check the validation with repeats
|
|
let limit = 1;
|
|
for i in 0..limit {
|
|
let test_result = check_test(&dbs, sql2, &ses, &cf_val_arr).await;
|
|
match test_result {
|
|
Ok(_) => break,
|
|
Err(e) => {
|
|
if i == limit - 1 {
|
|
panic!("Failed after retries: {}", e);
|
|
}
|
|
println!("Failed after retry {}:\n{}", i, e);
|
|
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
|
|
}
|
|
}
|
|
}
|
|
// Retain for 1h
|
|
let sql = "
|
|
SHOW CHANGES FOR TABLE person SINCE 0;
|
|
";
|
|
// This is neccessary to mark a point in time that can be GC'd
|
|
current_time += 1;
|
|
dbs.tick_at(current_time).await?;
|
|
let mut tx = dbs.transaction(Write, Optimistic).await?;
|
|
#[cfg(feature = "sql2")]
|
|
tx.print_all().await;
|
|
tx.cancel().await?;
|
|
|
|
let res = &mut dbs.execute(sql, &ses, None).await?;
|
|
let tmp = res.remove(0).result?;
|
|
assert_eq!(tmp, cf_val_arr);
|
|
|
|
// GC after 1hs
|
|
let one_hour_in_secs = 3600;
|
|
current_time += one_hour_in_secs;
|
|
current_time += 1;
|
|
dbs.tick_at(current_time).await?;
|
|
let res = &mut dbs.execute(sql, &ses, None).await?;
|
|
let tmp = res.remove(0).result?;
|
|
let val = Value::parse("[]");
|
|
assert_eq!(tmp, val);
|
|
//
|
|
Ok(())
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn table_change_feeds() -> Result<(), Error> {
|
|
let sql = "
|
|
DEFINE TABLE person CHANGEFEED 1h;
|
|
DEFINE FIELD name ON TABLE person
|
|
ASSERT
|
|
IF $input THEN
|
|
$input = /^[A-Z]{1}[a-z]+$/
|
|
ELSE
|
|
true
|
|
END
|
|
VALUE
|
|
IF $input THEN
|
|
'Name: ' + $input
|
|
ELSE
|
|
$value
|
|
END
|
|
;
|
|
UPDATE person:test CONTENT { name: 'Tobie' };
|
|
UPDATE person:test REPLACE { name: 'jaime' };
|
|
UPDATE person:test MERGE { name: 'Jaime' };
|
|
UPDATE person:test SET name = 'tobie';
|
|
UPDATE person:test SET name = 'Tobie';
|
|
DELETE person:test;
|
|
CREATE person:1000 SET name = 'Yusuke';
|
|
SHOW CHANGES FOR TABLE person SINCE 0;
|
|
";
|
|
let dbs = new_ds().await?;
|
|
let ses = Session::owner().with_ns("test-tb-cf").with_db("test-tb-cf");
|
|
let start_ts = 0u64;
|
|
let end_ts = start_ts + 1;
|
|
dbs.tick_at(start_ts).await?;
|
|
let res = &mut dbs.execute(sql, &ses, None).await?;
|
|
dbs.tick_at(end_ts).await?;
|
|
assert_eq!(res.len(), 10);
|
|
// DEFINE TABLE
|
|
let tmp = res.remove(0).result;
|
|
assert!(tmp.is_ok());
|
|
// DEFINE FIELD
|
|
let tmp = res.remove(0).result;
|
|
assert!(tmp.is_ok());
|
|
// UPDATE CONTENT
|
|
let tmp = res.remove(0).result?;
|
|
let val = Value::parse(
|
|
"[
|
|
{
|
|
id: person:test,
|
|
name: 'Name: Tobie',
|
|
}
|
|
]",
|
|
);
|
|
assert_eq!(tmp, val);
|
|
// UPDATE REPLACE
|
|
let tmp = res.remove(0).result;
|
|
assert!(matches!(
|
|
tmp.err(),
|
|
Some(e) if e.to_string() == r#"Found 'Name: jaime' for field `name`, with record `person:test`, but field must conform to: IF $input THEN $input = /^[A-Z]{1}[a-z]+$/ ELSE true END"#
|
|
));
|
|
// UPDATE MERGE
|
|
let tmp = res.remove(0).result?;
|
|
let val = Value::parse(
|
|
"[
|
|
{
|
|
id: person:test,
|
|
name: 'Name: Jaime',
|
|
}
|
|
]",
|
|
);
|
|
assert_eq!(tmp, val);
|
|
// UPDATE SET
|
|
let tmp = res.remove(0).result;
|
|
assert!(matches!(
|
|
tmp.err(),
|
|
Some(e) if e.to_string() == r#"Found 'Name: tobie' for field `name`, with record `person:test`, but field must conform to: IF $input THEN $input = /^[A-Z]{1}[a-z]+$/ ELSE true END"#
|
|
));
|
|
// UPDATE SET
|
|
let tmp = res.remove(0).result?;
|
|
let val = Value::parse(
|
|
"[
|
|
{
|
|
id: person:test,
|
|
name: 'Name: Tobie',
|
|
}
|
|
]",
|
|
);
|
|
assert_eq!(tmp, val);
|
|
// DELETE
|
|
let tmp = res.remove(0).result?;
|
|
let val = Value::parse("[]");
|
|
assert_eq!(tmp, val);
|
|
// CREATE
|
|
let _tmp = res.remove(0).result?;
|
|
// SHOW CHANGES
|
|
let tmp = res.remove(0).result?;
|
|
let val = match FFLAGS.change_feed_live_queries.enabled() {
|
|
true => Value::parse(
|
|
"[
|
|
{
|
|
versionstamp: 1,
|
|
changes: [
|
|
{
|
|
define_table: {
|
|
name: 'person'
|
|
}
|
|
}
|
|
]
|
|
},
|
|
{
|
|
versionstamp: 2,
|
|
changes: [
|
|
{
|
|
create: {
|
|
id: person:test,
|
|
name: 'Name: Tobie'
|
|
}
|
|
}
|
|
]
|
|
},
|
|
{
|
|
versionstamp: 3,
|
|
changes: [
|
|
{
|
|
update: {
|
|
id: person:test,
|
|
name: 'Name: Jaime'
|
|
}
|
|
}
|
|
]
|
|
},
|
|
{
|
|
versionstamp: 4,
|
|
changes: [
|
|
{
|
|
update: {
|
|
id: person:test,
|
|
name: 'Name: Tobie'
|
|
}
|
|
}
|
|
]
|
|
},
|
|
{
|
|
versionstamp: 5,
|
|
changes: [
|
|
{
|
|
delete: {
|
|
id: person:test
|
|
}
|
|
}
|
|
]
|
|
},
|
|
{
|
|
versionstamp: 6,
|
|
changes: [
|
|
{
|
|
create: {
|
|
id: person:1000,
|
|
name: 'Name: Yusuke'
|
|
}
|
|
}
|
|
]
|
|
}
|
|
]",
|
|
),
|
|
false => Value::parse(
|
|
"[
|
|
{
|
|
versionstamp: 1,
|
|
changes: [
|
|
{
|
|
define_table: {
|
|
name: 'person'
|
|
}
|
|
}
|
|
]
|
|
},
|
|
{
|
|
versionstamp: 2,
|
|
changes: [
|
|
{
|
|
update: {
|
|
id: person:test,
|
|
name: 'Name: Tobie'
|
|
}
|
|
}
|
|
]
|
|
},
|
|
{
|
|
versionstamp: 3,
|
|
changes: [
|
|
{
|
|
update: {
|
|
id: person:test,
|
|
name: 'Name: Jaime'
|
|
}
|
|
}
|
|
]
|
|
},
|
|
{
|
|
versionstamp: 4,
|
|
changes: [
|
|
{
|
|
update: {
|
|
id: person:test,
|
|
name: 'Name: Tobie'
|
|
}
|
|
}
|
|
]
|
|
},
|
|
{
|
|
versionstamp: 5,
|
|
changes: [
|
|
{
|
|
delete: {
|
|
id: person:test
|
|
}
|
|
}
|
|
]
|
|
},
|
|
{
|
|
versionstamp: 6,
|
|
changes: [
|
|
{
|
|
update: {
|
|
id: person:1000,
|
|
name: 'Name: Yusuke'
|
|
}
|
|
}
|
|
]
|
|
}
|
|
]",
|
|
),
|
|
};
|
|
assert_eq!(tmp, val);
|
|
// Retain for 1h
|
|
let sql = "
|
|
SHOW CHANGES FOR TABLE person SINCE 0;
|
|
";
|
|
dbs.tick_at(end_ts + 3599).await?;
|
|
let res = &mut dbs.execute(sql, &ses, None).await?;
|
|
let tmp = res.remove(0).result?;
|
|
assert_eq!(tmp, val);
|
|
// GC after 1hs
|
|
dbs.tick_at(end_ts + 3600).await?;
|
|
let res = &mut dbs.execute(sql, &ses, None).await?;
|
|
let tmp = res.remove(0).result?;
|
|
let val = Value::parse("[]");
|
|
assert_eq!(tmp, val);
|
|
//
|
|
Ok(())
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn changefeed_with_ts() -> Result<(), Error> {
|
|
let db = new_ds().await?;
|
|
let ses = Session::owner().with_ns("test-cf-ts").with_db("test-cf-ts");
|
|
// Enable change feeds
|
|
let sql = "
|
|
DEFINE TABLE user CHANGEFEED 1h;
|
|
";
|
|
db.execute(sql, &ses, None).await?.remove(0).result?;
|
|
// Save timestamp 1
|
|
let ts1_dt = "2023-08-01T00:00:00Z";
|
|
let ts1 = DateTime::parse_from_rfc3339(ts1_dt).unwrap();
|
|
db.tick_at(ts1.timestamp().try_into().unwrap()).await.unwrap();
|
|
// Create and update users
|
|
let sql = "
|
|
CREATE user:amos SET name = 'Amos';
|
|
CREATE user:jane SET name = 'Jane';
|
|
UPDATE user:amos SET name = 'AMOS';
|
|
";
|
|
let table = "user";
|
|
let res = db.execute(sql, &ses, None).await?;
|
|
for res in res {
|
|
res.result?;
|
|
}
|
|
let sql = format!("UPDATE {table} SET name = 'Doe'");
|
|
let users = db.execute(&sql, &ses, None).await?.remove(0).result?;
|
|
let expected = Value::parse(
|
|
"[
|
|
{
|
|
id: user:amos,
|
|
name: 'Doe',
|
|
},
|
|
{
|
|
id: user:jane,
|
|
name: 'Doe',
|
|
},
|
|
]",
|
|
);
|
|
assert_eq!(users, expected);
|
|
let sql = format!("SELECT * FROM {table}");
|
|
let users = db.execute(&sql, &ses, None).await?.remove(0).result?;
|
|
assert_eq!(users, expected);
|
|
let sql = "
|
|
SHOW CHANGES FOR TABLE user SINCE 0 LIMIT 10;
|
|
";
|
|
let value: Value = db.execute(sql, &ses, None).await?.remove(0).result?;
|
|
let Value::Array(array) = value.clone() else {
|
|
unreachable!()
|
|
};
|
|
assert_eq!(array.len(), 5);
|
|
// DEFINE TABLE
|
|
let a = array.first().unwrap();
|
|
let Value::Object(a) = a else {
|
|
unreachable!()
|
|
};
|
|
let Value::Number(_versionstamp1) = a.get("versionstamp").unwrap() else {
|
|
unreachable!()
|
|
};
|
|
let changes = a.get("changes").unwrap().to_owned();
|
|
assert_eq!(
|
|
changes,
|
|
surrealdb::sql::value(
|
|
"[
|
|
{
|
|
define_table: {
|
|
name: 'user'
|
|
}
|
|
}
|
|
]"
|
|
)
|
|
.unwrap()
|
|
);
|
|
// UPDATE user:amos
|
|
let a = array.get(1).unwrap();
|
|
let Value::Object(a) = a else {
|
|
unreachable!()
|
|
};
|
|
let Value::Number(versionstamp2) = a.get("versionstamp").unwrap() else {
|
|
unreachable!()
|
|
};
|
|
let changes = a.get("changes").unwrap().to_owned();
|
|
match FFLAGS.change_feed_live_queries.enabled() {
|
|
true => {
|
|
assert_eq!(
|
|
changes,
|
|
surrealdb::sql::value(
|
|
"[
|
|
{
|
|
create: {
|
|
id: user:amos,
|
|
name: 'Amos'
|
|
}
|
|
}
|
|
]"
|
|
)
|
|
.unwrap()
|
|
);
|
|
}
|
|
false => {
|
|
assert_eq!(
|
|
changes,
|
|
surrealdb::sql::value(
|
|
"[
|
|
{
|
|
update: {
|
|
id: user:amos,
|
|
name: 'Amos'
|
|
}
|
|
}
|
|
]"
|
|
)
|
|
.unwrap()
|
|
);
|
|
}
|
|
}
|
|
// UPDATE user:jane
|
|
let a = array.get(2).unwrap();
|
|
let Value::Object(a) = a else {
|
|
unreachable!()
|
|
};
|
|
let Value::Number(versionstamp3) = a.get("versionstamp").unwrap() else {
|
|
unreachable!()
|
|
};
|
|
assert!(versionstamp2 < versionstamp3);
|
|
let changes = a.get("changes").unwrap().to_owned();
|
|
match FFLAGS.change_feed_live_queries.enabled() {
|
|
true => {
|
|
assert_eq!(
|
|
changes,
|
|
surrealdb::sql::value(
|
|
"[
|
|
{
|
|
create: {
|
|
id: user:jane,
|
|
name: 'Jane'
|
|
}
|
|
}
|
|
]"
|
|
)
|
|
.unwrap()
|
|
);
|
|
}
|
|
false => {
|
|
assert_eq!(
|
|
changes,
|
|
surrealdb::sql::value(
|
|
"[
|
|
{
|
|
update: {
|
|
id: user:jane,
|
|
name: 'Jane'
|
|
}
|
|
}
|
|
]"
|
|
)
|
|
.unwrap()
|
|
);
|
|
}
|
|
}
|
|
// UPDATE user:amos
|
|
let a = array.get(3).unwrap();
|
|
let Value::Object(a) = a else {
|
|
unreachable!()
|
|
};
|
|
let Value::Number(versionstamp4) = a.get("versionstamp").unwrap() else {
|
|
unreachable!()
|
|
};
|
|
assert!(versionstamp3 < versionstamp4);
|
|
let changes = a.get("changes").unwrap().to_owned();
|
|
assert_eq!(
|
|
changes,
|
|
surrealdb::sql::value(
|
|
"[
|
|
{
|
|
update: {
|
|
id: user:amos,
|
|
name: 'AMOS'
|
|
}
|
|
}
|
|
]"
|
|
)
|
|
.unwrap()
|
|
);
|
|
// UPDATE table
|
|
let a = array.get(4).unwrap();
|
|
let Value::Object(a) = a else {
|
|
unreachable!()
|
|
};
|
|
let Value::Number(versionstamp5) = a.get("versionstamp").unwrap() else {
|
|
unreachable!()
|
|
};
|
|
assert!(versionstamp4 < versionstamp5);
|
|
let changes = a.get("changes").unwrap().to_owned();
|
|
assert_eq!(
|
|
changes,
|
|
surrealdb::sql::value(
|
|
"[
|
|
{
|
|
update: {
|
|
id: user:amos,
|
|
name: 'Doe'
|
|
}
|
|
},
|
|
{
|
|
update: {
|
|
id: user:jane,
|
|
name: 'Doe'
|
|
}
|
|
}
|
|
]"
|
|
)
|
|
.unwrap()
|
|
);
|
|
// Save timestamp 2
|
|
let ts2_dt = "2023-08-01T00:00:05Z";
|
|
let ts2 = DateTime::parse_from_rfc3339(ts2_dt).unwrap();
|
|
db.tick_at(ts2.timestamp().try_into().unwrap()).await.unwrap();
|
|
//
|
|
// Show changes using timestamp 1
|
|
//
|
|
let sql = format!("SHOW CHANGES FOR TABLE user SINCE d'{ts1_dt}' LIMIT 10; ");
|
|
let value: Value = db.execute(&sql, &ses, None).await?.remove(0).result?;
|
|
let Value::Array(array) = value.clone() else {
|
|
unreachable!()
|
|
};
|
|
assert_eq!(array.len(), 4);
|
|
// UPDATE user:amos
|
|
let a = array.first().unwrap();
|
|
let Value::Object(a) = a else {
|
|
unreachable!()
|
|
};
|
|
let Value::Number(versionstamp1b) = a.get("versionstamp").unwrap() else {
|
|
unreachable!()
|
|
};
|
|
assert!(versionstamp2 == versionstamp1b);
|
|
let changes = a.get("changes").unwrap().to_owned();
|
|
match FFLAGS.change_feed_live_queries.enabled() {
|
|
true => {
|
|
assert_eq!(
|
|
changes,
|
|
surrealdb::sql::value(
|
|
"[
|
|
{
|
|
create: {
|
|
id: user:amos,
|
|
name: 'Amos'
|
|
}
|
|
}
|
|
]"
|
|
)
|
|
.unwrap()
|
|
);
|
|
}
|
|
false => {
|
|
assert_eq!(
|
|
changes,
|
|
surrealdb::sql::value(
|
|
"[
|
|
{
|
|
update: {
|
|
id: user:amos,
|
|
name: 'Amos'
|
|
}
|
|
}
|
|
]"
|
|
)
|
|
.unwrap()
|
|
);
|
|
}
|
|
}
|
|
// Save timestamp 3
|
|
let ts3_dt = "2023-08-01T00:00:10Z";
|
|
let ts3 = DateTime::parse_from_rfc3339(ts3_dt).unwrap();
|
|
db.tick_at(ts3.timestamp().try_into().unwrap()).await.unwrap();
|
|
//
|
|
// Show changes using timestamp 3
|
|
//
|
|
let sql = format!("SHOW CHANGES FOR TABLE user SINCE d'{ts3_dt}' LIMIT 10; ");
|
|
let value: Value = db.execute(&sql, &ses, None).await?.remove(0).result?;
|
|
let Value::Array(array) = value.clone() else {
|
|
unreachable!()
|
|
};
|
|
assert_eq!(array.len(), 0);
|
|
Ok(())
|
|
}
|