Fix change feeds enabled at the database-level (#2688)

Co-authored-by: Yusuke Kuoka <yusuke.kuoka@surrealdb.com>
This commit is contained in:
Yusuke Kuoka 2023-09-12 23:57:27 +09:00 committed by GitHub
parent 3c48558ca2
commit 3ab03b00a8
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 110 additions and 9 deletions

View file

@ -17,21 +17,21 @@ impl<'a> Document<'a> {
if !self.changed() { if !self.changed() {
return Ok(()); return Ok(());
} }
// Get the table for the record //
let tb = self.tb(opt, txn).await?; let tb = self.tb(opt, txn).await?;
// Check if changefeeds are enabled
if tb.changefeed.is_some() {
// Clone transaction // Clone transaction
let run = txn.clone(); let run = txn.clone();
// Claim transaction // Claim transaction
let mut run = run.lock().await; let mut run = run.lock().await;
// Get the database and the table for the record
let db = run.add_and_cache_db(opt.ns(), opt.db(), opt.strict).await?;
// Check if changefeeds are enabled
if db.changefeed.is_some() || tb.changefeed.is_some() {
// Get the arguments // Get the arguments
let ns = opt.ns();
let db = opt.db();
let tb = tb.name.as_str(); let tb = tb.name.as_str();
let id = self.id.as_ref().unwrap(); let id = self.id.as_ref().unwrap();
// Create the changefeed entry // Create the changefeed entry
run.record_change(ns, db, tb, id, self.current.doc.clone()); run.record_change(opt.ns(), opt.db(), tb, id, self.current.doc.clone());
} }
// Carry on // Carry on
Ok(()) Ok(())

View file

@ -7,6 +7,107 @@ use surrealdb::dbs::Session;
use surrealdb::err::Error; use surrealdb::err::Error;
use surrealdb::sql::Value; use surrealdb::sql::Value;
#[tokio::test]
async fn database_change_feeds() -> Result<(), Error> {
let sql = "
DEFINE DATABASE test CHANGEFEED 1h;
DEFINE TABLE person;
DEFINE FIELD name ON TABLE person
ASSERT
IF $input THEN
$input = /^[A-Z]{1}[a-z]+$/
ELSE
true
END
VALUE
IF $input THEN
'Name: ' + $input
ELSE
$value
END
;
UPDATE person:test CONTENT { name: 'Tobie' };
DELETE person:test;
SHOW CHANGES FOR TABLE person SINCE 0;
";
let dbs = new_ds().await?;
let ses = Session::owner().with_ns("test").with_db("test");
let start_ts = 0u64;
let end_ts = start_ts + 1;
dbs.tick_at(start_ts).await?;
let res = &mut dbs.execute(&sql, &ses, None).await?;
dbs.tick_at(end_ts).await?;
assert_eq!(res.len(), 6);
// DEFINE DATABASE
let tmp = res.remove(0).result;
assert!(tmp.is_ok());
// DEFINE TABLE
let tmp = res.remove(0).result;
assert!(tmp.is_ok());
// DEFINE FIELD
let tmp = res.remove(0).result;
assert!(tmp.is_ok());
// UPDATE CONTENT
let tmp = res.remove(0).result?;
let val = Value::parse(
"[
{
id: person:test,
name: 'Name: Tobie',
}
]",
);
assert_eq!(tmp, val);
// DELETE
let tmp = res.remove(0).result?;
let val = Value::parse("[]");
assert_eq!(tmp, val);
// SHOW CHANGES
let tmp = res.remove(0).result?;
let val = Value::parse(
"[
{
versionstamp: 65536,
changes: [
{
update: {
id: person:test,
name: 'Name: Tobie'
}
}
]
},
{
versionstamp: 131072,
changes: [
{
delete: {
id: person:test
}
}
]
}
]",
);
assert_eq!(tmp, val);
// Retain for 1h
let sql = "
SHOW CHANGES FOR TABLE person SINCE 0;
";
dbs.tick_at(end_ts + 3599).await?;
let res = &mut dbs.execute(&sql, &ses, None).await?;
let tmp = res.remove(0).result?;
assert_eq!(tmp, val);
// GC after 1hs
dbs.tick_at(end_ts + 3600).await?;
let res = &mut dbs.execute(&sql, &ses, None).await?;
let tmp = res.remove(0).result?;
let val = Value::parse("[]");
assert_eq!(tmp, val);
//
Ok(())
}
#[tokio::test] #[tokio::test]
async fn table_change_feeds() -> Result<(), Error> { async fn table_change_feeds() -> Result<(), Error> {
let sql = " let sql = "