From 3ab03b00a8a864628d2c4906b77a42d3a259ecdf Mon Sep 17 00:00:00 2001 From: Yusuke Kuoka Date: Tue, 12 Sep 2023 23:57:27 +0900 Subject: [PATCH] Fix change feeds enabled at the database-level (#2688) Co-authored-by: Yusuke Kuoka --- lib/src/doc/changefeeds.rs | 18 +++---- lib/tests/changefeeds.rs | 101 +++++++++++++++++++++++++++++++++++++ 2 files changed, 110 insertions(+), 9 deletions(-) diff --git a/lib/src/doc/changefeeds.rs b/lib/src/doc/changefeeds.rs index 8f645ed4..9fdeaf12 100644 --- a/lib/src/doc/changefeeds.rs +++ b/lib/src/doc/changefeeds.rs @@ -17,21 +17,21 @@ impl<'a> Document<'a> { if !self.changed() { return Ok(()); } - // Get the table for the record + // let tb = self.tb(opt, txn).await?; + // Clone transaction + let run = txn.clone(); + // Claim transaction + let mut run = run.lock().await; + // Get the database and the table for the record + let db = run.add_and_cache_db(opt.ns(), opt.db(), opt.strict).await?; // Check if changefeeds are enabled - if tb.changefeed.is_some() { - // Clone transaction - let run = txn.clone(); - // Claim transaction - let mut run = run.lock().await; + if db.changefeed.is_some() || tb.changefeed.is_some() { // Get the arguments - let ns = opt.ns(); - let db = opt.db(); let tb = tb.name.as_str(); let id = self.id.as_ref().unwrap(); // Create the changefeed entry - run.record_change(ns, db, tb, id, self.current.doc.clone()); + run.record_change(opt.ns(), opt.db(), tb, id, self.current.doc.clone()); } // Carry on Ok(()) diff --git a/lib/tests/changefeeds.rs b/lib/tests/changefeeds.rs index fbca559a..dfd4be2c 100644 --- a/lib/tests/changefeeds.rs +++ b/lib/tests/changefeeds.rs @@ -7,6 +7,107 @@ use surrealdb::dbs::Session; use surrealdb::err::Error; use surrealdb::sql::Value; +#[tokio::test] +async fn database_change_feeds() -> Result<(), Error> { + let sql = " + DEFINE DATABASE test CHANGEFEED 1h; + DEFINE TABLE person; + DEFINE FIELD name ON TABLE person + ASSERT + IF $input THEN + $input = /^[A-Z]{1}[a-z]+$/ + ELSE + true + END + VALUE + IF $input THEN + 'Name: ' + $input + ELSE + $value + END + ; + UPDATE person:test CONTENT { name: 'Tobie' }; + DELETE person:test; + SHOW CHANGES FOR TABLE person SINCE 0; + "; + let dbs = new_ds().await?; + let ses = Session::owner().with_ns("test").with_db("test"); + let start_ts = 0u64; + let end_ts = start_ts + 1; + dbs.tick_at(start_ts).await?; + let res = &mut dbs.execute(&sql, &ses, None).await?; + dbs.tick_at(end_ts).await?; + assert_eq!(res.len(), 6); + // DEFINE DATABASE + let tmp = res.remove(0).result; + assert!(tmp.is_ok()); + // DEFINE TABLE + let tmp = res.remove(0).result; + assert!(tmp.is_ok()); + // DEFINE FIELD + let tmp = res.remove(0).result; + assert!(tmp.is_ok()); + // UPDATE CONTENT + let tmp = res.remove(0).result?; + let val = Value::parse( + "[ + { + id: person:test, + name: 'Name: Tobie', + } + ]", + ); + assert_eq!(tmp, val); + // DELETE + let tmp = res.remove(0).result?; + let val = Value::parse("[]"); + assert_eq!(tmp, val); + // SHOW CHANGES + let tmp = res.remove(0).result?; + let val = Value::parse( + "[ + { + versionstamp: 65536, + changes: [ + { + update: { + id: person:test, + name: 'Name: Tobie' + } + } + ] + }, + { + versionstamp: 131072, + changes: [ + { + delete: { + id: person:test + } + } + ] + } + ]", + ); + assert_eq!(tmp, val); + // Retain for 1h + let sql = " + SHOW CHANGES FOR TABLE person SINCE 0; + "; + dbs.tick_at(end_ts + 3599).await?; + let res = &mut dbs.execute(&sql, &ses, None).await?; + let tmp = res.remove(0).result?; + assert_eq!(tmp, val); + // GC after 1hs + dbs.tick_at(end_ts + 3600).await?; + let res = &mut dbs.execute(&sql, &ses, None).await?; + let tmp = res.remove(0).result?; + let val = Value::parse("[]"); + assert_eq!(tmp, val); + // + Ok(()) +} + #[tokio::test] async fn table_change_feeds() -> Result<(), Error> { let sql = "