From fd364e56da26bfed76b3ae41361307ec0572a144 Mon Sep 17 00:00:00 2001 From: Rushmore Mushambi Date: Fri, 25 Aug 2023 09:55:22 +0200 Subject: [PATCH] Make tick a configuration option (#2495) --- lib/src/api/conn.rs | 2 - lib/src/api/engine/local/mod.rs | 15 +- lib/src/api/engine/local/native.rs | 37 +++++ lib/src/api/engine/local/wasm.rs | 38 ++++- lib/src/api/engine/mod.rs | 32 ++++ lib/src/api/engine/remote/http/mod.rs | 3 - lib/src/api/engine/remote/ws/mod.rs | 32 ---- lib/src/api/engine/remote/ws/native.rs | 6 +- lib/src/api/engine/remote/ws/wasm.rs | 2 +- lib/src/api/method/mod.rs | 24 --- lib/src/api/method/tests/server.rs | 4 - lib/src/api/method/tick.rs | 31 ---- lib/src/api/opt/config.rs | 9 +- lib/src/kvs/ds.rs | 4 +- lib/tests/api.rs | 20 +-- lib/tests/api/local.rs | 210 ------------------------- lib/tests/changefeeds.rs | 209 ++++++++++++++++++++++++ 17 files changed, 336 insertions(+), 342 deletions(-) delete mode 100644 lib/src/api/method/tick.rs delete mode 100644 lib/tests/api/local.rs diff --git a/lib/src/api/conn.rs b/lib/src/api/conn.rs index 8dbd3923..9c807214 100644 --- a/lib/src/api/conn.rs +++ b/lib/src/api/conn.rs @@ -93,8 +93,6 @@ pub enum Method { Signin, /// Signs up on the server Signup, - /// Runs a series of node maintenance operations - Tick, /// Removes a parameter from a connection Unset, /// Perfoms an update operation diff --git a/lib/src/api/engine/local/mod.rs b/lib/src/api/engine/local/mod.rs index f3ec9dff..e5c41cd0 100644 --- a/lib/src/api/engine/local/mod.rs +++ b/lib/src/api/engine/local/mod.rs @@ -48,7 +48,6 @@ use crate::dbs::Session; use crate::kvs::Datastore; use crate::opt::IntoEndpoint; use crate::sql::Array; -use crate::sql::Number; use crate::sql::Query; use crate::sql::Statement; use crate::sql::Statements; @@ -60,6 +59,7 @@ use std::marker::PhantomData; use std::mem; #[cfg(not(target_arch = "wasm32"))] use std::path::PathBuf; +use std::time::Duration; #[cfg(not(target_arch = "wasm32"))] use tokio::fs::OpenOptions; #[cfg(not(target_arch = "wasm32"))] @@ -71,6 +71,8 @@ use tokio::io::AsyncWrite; #[cfg(not(target_arch = "wasm32"))] use tokio::io::AsyncWriteExt; +const DEFAULT_TICK_INTERVAL: Duration = Duration::from_secs(10); + /// In-memory database /// /// # Examples @@ -641,17 +643,6 @@ async fn router( vars.insert(key, value); Ok(DbResponse::Other(Value::None)) } - Method::Tick => { - let ts = match &mut params[..1] { - [Value::Number(Number::Int(ts))] => { - let ts = ts.to_owned(); - ts as u64 - } - _ => unreachable!(), - }; - kvs.tick_at(ts).await?; - Ok(DbResponse::Other(Value::None)) - } Method::Unset => { if let [Value::Strand(Strand(key))] = ¶ms[..1] { vars.remove(key); diff --git a/lib/src/api/engine/local/native.rs b/lib/src/api/engine/local/native.rs index 9af4ed09..01306f06 100644 --- a/lib/src/api/engine/local/native.rs +++ b/lib/src/api/engine/local/native.rs @@ -5,6 +5,7 @@ use crate::api::conn::Param; use crate::api::conn::Route; use crate::api::conn::Router; use crate::api::engine::local::Db; +use crate::api::engine::local::DEFAULT_TICK_INTERVAL; use crate::api::err::Error; use crate::api::opt::Endpoint; use crate::api::ExtraFeatures; @@ -12,12 +13,14 @@ use crate::api::OnceLockExt; use crate::api::Result; use crate::api::Surreal; use crate::dbs::Session; +use crate::engine::IntervalStream; use crate::iam::Level; use crate::kvs::Datastore; use crate::opt::auth::Root; use flume::Receiver; use flume::Sender; use futures::StreamExt; +use futures_concurrency::stream::Merge as _; use std::collections::BTreeMap; use std::collections::HashSet; use std::future::Future; @@ -26,6 +29,9 @@ use std::pin::Pin; use std::sync::atomic::AtomicI64; use std::sync::Arc; use std::sync::OnceLock; +use std::time::Duration; +use tokio::time; +use tokio::time::MissedTickBehavior; impl crate::api::Connection for Db {} @@ -141,10 +147,15 @@ pub(crate) fn router( false => kvs, }; + let kvs = Arc::new(kvs); let mut vars = BTreeMap::new(); let mut stream = route_rx.into_stream(); let mut session = Session::default(); + let (maintenance_tx, maintenance_rx) = flume::bounded::<()>(1); + let tick_interval = address.config.tick_interval.unwrap_or(DEFAULT_TICK_INTERVAL); + run_maintenance(kvs.clone(), tick_interval, maintenance_rx); + while let Some(Some(route)) = stream.next().await { match super::router(route.request, &kvs, &mut session, &mut vars).await { Ok(value) => { @@ -155,5 +166,31 @@ pub(crate) fn router( } } } + + // Stop maintenance tasks + let _ = maintenance_tx.into_send_async(()).await; + }); +} + +fn run_maintenance(kvs: Arc, tick_interval: Duration, stop_signal: Receiver<()>) { + tokio::spawn(async move { + let mut interval = time::interval(tick_interval); + // Don't bombard the database if we miss some ticks + interval.set_missed_tick_behavior(MissedTickBehavior::Delay); + // Delay sending the first tick + interval.tick().await; + + let ticker = IntervalStream::new(interval); + + let streams = (ticker.map(Some), stop_signal.into_stream().map(|_| None)); + + let mut stream = streams.merge(); + + while let Some(Some(_)) = stream.next().await { + match kvs.tick().await { + Ok(()) => trace!("Node agent tick ran successfully"), + Err(error) => error!("Error running node agent tick: {error}"), + } + } }); } diff --git a/lib/src/api/engine/local/wasm.rs b/lib/src/api/engine/local/wasm.rs index a6f66ba4..a9e76b9b 100644 --- a/lib/src/api/engine/local/wasm.rs +++ b/lib/src/api/engine/local/wasm.rs @@ -5,17 +5,20 @@ use crate::api::conn::Param; use crate::api::conn::Route; use crate::api::conn::Router; use crate::api::engine::local::Db; +use crate::api::engine::local::DEFAULT_TICK_INTERVAL; use crate::api::opt::Endpoint; use crate::api::OnceLockExt; use crate::api::Result; use crate::api::Surreal; use crate::dbs::Session; +use crate::engine::IntervalStream; use crate::iam::Level; use crate::kvs::Datastore; use crate::opt::auth::Root; use flume::Receiver; use flume::Sender; use futures::StreamExt; +use futures_concurrency::stream::Merge as _; use std::collections::BTreeMap; use std::collections::HashSet; use std::future::Future; @@ -24,7 +27,10 @@ use std::pin::Pin; use std::sync::atomic::AtomicI64; use std::sync::Arc; use std::sync::OnceLock; +use std::time::Duration; use wasm_bindgen_futures::spawn_local; +use wasmtimer::tokio as time; +use wasmtimer::tokio::MissedTickBehavior; impl crate::api::Connection for Db {} @@ -108,7 +114,6 @@ pub(crate) fn router( return; } } - let _ = conn_tx.into_send_async(Ok(())).await; kvs.with_auth_enabled(configured_root.is_some()) } @@ -128,10 +133,15 @@ pub(crate) fn router( false => kvs, }; + let kvs = Arc::new(kvs); let mut vars = BTreeMap::new(); let mut stream = route_rx.into_stream(); let mut session = Session::default(); + let (maintenance_tx, maintenance_rx) = flume::bounded::<()>(1); + let tick_interval = address.config.tick_interval.unwrap_or(DEFAULT_TICK_INTERVAL); + run_maintenance(kvs.clone(), tick_interval, maintenance_rx); + while let Some(Some(route)) = stream.next().await { match super::router(route.request, &kvs, &mut session, &mut vars).await { Ok(value) => { @@ -142,5 +152,31 @@ pub(crate) fn router( } } } + + // Stop maintenance tasks + let _ = maintenance_tx.into_send_async(()).await; + }); +} + +fn run_maintenance(kvs: Arc, tick_interval: Duration, stop_signal: Receiver<()>) { + spawn_local(async move { + let mut interval = time::interval(tick_interval); + // Don't bombard the database if we miss some ticks + interval.set_missed_tick_behavior(MissedTickBehavior::Delay); + // Delay sending the first tick + interval.tick().await; + + let ticker = IntervalStream::new(interval); + + let streams = (ticker.map(Some), stop_signal.into_stream().map(|_| None)); + + let mut stream = streams.merge(); + + while let Some(Some(_)) = stream.next().await { + match kvs.tick().await { + Ok(()) => trace!("Node agent tick ran successfully"), + Err(error) => error!("Error running node agent tick: {error}"), + } + } }); } diff --git a/lib/src/api/engine/mod.rs b/lib/src/api/engine/mod.rs index 445a95cd..b04c3f8c 100644 --- a/lib/src/api/engine/mod.rs +++ b/lib/src/api/engine/mod.rs @@ -24,7 +24,19 @@ use crate::sql::Fields; use crate::sql::Output; use crate::sql::Value; use crate::sql::Values; +use futures::Stream; use std::mem; +use std::pin::Pin; +use std::task::Context; +use std::task::Poll; +#[cfg(not(target_arch = "wasm32"))] +use tokio::time::Instant; +#[cfg(not(target_arch = "wasm32"))] +use tokio::time::Interval; +#[cfg(target_arch = "wasm32")] +use wasmtimer::std::Instant; +#[cfg(target_arch = "wasm32")] +use wasmtimer::tokio::Interval; #[allow(dead_code)] // used by the the embedded database and `http` fn split_params(params: &mut [Value]) -> (bool, Values, Value) { @@ -135,3 +147,23 @@ fn delete_statement(params: &mut [Value]) -> (bool, DeleteStatement) { }, ) } + +struct IntervalStream { + inner: Interval, +} + +impl IntervalStream { + fn new(interval: Interval) -> Self { + Self { + inner: interval, + } + } +} + +impl Stream for IntervalStream { + type Item = Instant; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.inner.poll_tick(cx).map(Some) + } +} diff --git a/lib/src/api/engine/remote/http/mod.rs b/lib/src/api/engine/remote/http/mod.rs index 58a5c473..5306b67c 100644 --- a/lib/src/api/engine/remote/http/mod.rs +++ b/lib/src/api/engine/remote/http/mod.rs @@ -517,9 +517,6 @@ async fn router( let value = health(request).await?; Ok(DbResponse::Other(value)) } - Method::Tick => Err(crate::Error::Api(crate::error::Api::InvalidRequest( - "Tick is not implemented for remote backends".to_string(), - ))), Method::Version => { let path = base_url.join(method.as_str())?; let request = client.get(path); diff --git a/lib/src/api/engine/remote/ws/mod.rs b/lib/src/api/engine/remote/ws/mod.rs index a132a938..810e97b1 100644 --- a/lib/src/api/engine/remote/ws/mod.rs +++ b/lib/src/api/engine/remote/ws/mod.rs @@ -17,21 +17,9 @@ use crate::opt::IntoEndpoint; use crate::sql::Array; use crate::sql::Strand; use crate::sql::Value; -use futures::Stream; use serde::Deserialize; use std::marker::PhantomData; -use std::pin::Pin; -use std::task::Context; -use std::task::Poll; use std::time::Duration; -#[cfg(not(target_arch = "wasm32"))] -use tokio::time::Instant; -#[cfg(not(target_arch = "wasm32"))] -use tokio::time::Interval; -#[cfg(target_arch = "wasm32")] -use wasmtimer::std::Instant; -#[cfg(target_arch = "wasm32")] -use wasmtimer::tokio::Interval; pub(crate) const PATH: &str = "rpc"; const PING_INTERVAL: Duration = Duration::from_secs(5); @@ -149,23 +137,3 @@ pub(crate) struct Response { id: Option, pub(crate) result: ServerResult, } - -struct IntervalStream { - inner: Interval, -} - -impl IntervalStream { - fn new(interval: Interval) -> Self { - Self { - inner: interval, - } - } -} - -impl Stream for IntervalStream { - type Item = Instant; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.inner.poll_tick(cx).map(Some) - } -} diff --git a/lib/src/api/engine/remote/ws/native.rs b/lib/src/api/engine/remote/ws/native.rs index cea7edf6..0d797317 100644 --- a/lib/src/api/engine/remote/ws/native.rs +++ b/lib/src/api/engine/remote/ws/native.rs @@ -16,7 +16,7 @@ use crate::api::opt::Tls; use crate::api::OnceLockExt; use crate::api::Result; use crate::api::Surreal; -use crate::engine::remote::ws::IntervalStream; +use crate::engine::IntervalStream; use crate::sql::serde::{deserialize, serialize}; use crate::sql::Strand; use crate::sql::Value; @@ -230,10 +230,6 @@ pub(crate) fn router( vars.remove(key); } } - Method::Tick => { - // Remote backend doesn't support tick - return; - } _ => {} } let method_str = match method { diff --git a/lib/src/api/engine/remote/ws/wasm.rs b/lib/src/api/engine/remote/ws/wasm.rs index 4c2d4434..83a88fac 100644 --- a/lib/src/api/engine/remote/ws/wasm.rs +++ b/lib/src/api/engine/remote/ws/wasm.rs @@ -14,7 +14,7 @@ use crate::api::opt::Endpoint; use crate::api::OnceLockExt; use crate::api::Result; use crate::api::Surreal; -use crate::engine::remote::ws::IntervalStream; +use crate::engine::IntervalStream; use crate::sql::serde::{deserialize, serialize}; use crate::sql::Strand; use crate::sql::Value; diff --git a/lib/src/api/method/mod.rs b/lib/src/api/method/mod.rs index 4c47a205..4e959c89 100644 --- a/lib/src/api/method/mod.rs +++ b/lib/src/api/method/mod.rs @@ -21,7 +21,6 @@ mod select; mod set; mod signin; mod signup; -mod tick; mod unset; mod update; mod use_db; @@ -58,7 +57,6 @@ pub use select::Select; pub use set::Set; pub use signin::Signin; pub use signup::Signup; -pub use tick::Tick; pub use unset::Unset; pub use update::Update; pub use use_db::UseDb; @@ -105,7 +103,6 @@ impl Method { Method::Set => "set", Method::Signin => "signin", Method::Signup => "signup", - Method::Tick => "tick", Method::Unset => "unset", Method::Update => "update", Method::Use => "use", @@ -907,27 +904,6 @@ where } } - /// Runs the embedded datastore's "tick" operation at the specified timestamp. - /// This is used by the client to periodically run node maintenance tasks - /// which are usually run by the server itself in a client-server setup. - /// - /// # Examples - /// - /// ```no_run - /// # #[tokio::main] - /// # async fn main() -> surrealdb::Result<()> { - /// # let db = surrealdb::engine::any::connect("mem://").await?; - /// db.tick(123).await?; - /// # Ok(()) - /// # } - /// ``` - pub fn tick(&self, ts: u64) -> Tick { - Tick { - router: self.router.extract(), - ts, - } - } - /// Returns the version of the server /// /// # Examples diff --git a/lib/src/api/method/tests/server.rs b/lib/src/api/method/tests/server.rs index 33bcd434..c3cea484 100644 --- a/lib/src/api/method/tests/server.rs +++ b/lib/src/api/method/tests/server.rs @@ -68,10 +68,6 @@ pub(super) fn mock(route_rx: Receiver>) { } _ => unreachable!(), }, - Method::Tick => match ¶ms[..] { - [Value::Number(..)] => Ok(DbResponse::Other(Value::None)), - _ => unreachable!(), - }, Method::Update | Method::Merge | Method::Patch => match ¶ms[..] { [Value::Thing(..)] | [Value::Thing(..), _] => { Ok(DbResponse::Other(to_value(User::default()).unwrap())) diff --git a/lib/src/api/method/tick.rs b/lib/src/api/method/tick.rs deleted file mode 100644 index 22e2bbda..00000000 --- a/lib/src/api/method/tick.rs +++ /dev/null @@ -1,31 +0,0 @@ -use crate::api::conn::Method; -use crate::api::conn::Param; -use crate::api::conn::Router; -use crate::api::Connection; -use crate::api::Result; -use std::future::Future; -use std::future::IntoFuture; -use std::pin::Pin; - -/// A tick-at-specified-timestamp future -#[derive(Debug)] -#[must_use = "futures do nothing unless you `.await` or poll them"] -pub struct Tick<'r, C: Connection> { - pub(super) router: Result<&'r Router>, - pub(super) ts: u64, -} - -impl<'r, Client> IntoFuture for Tick<'r, Client> -where - Client: Connection, -{ - type Output = Result<()>; - type IntoFuture = Pin + Send + Sync + 'r>>; - - fn into_future(self) -> Self::IntoFuture { - Box::pin(async move { - let mut conn = Client::new(Method::Tick); - conn.execute_unit(self.router?, Param::new(vec![self.ts.into()])).await - }) - } -} diff --git a/lib/src/api/opt/config.rs b/lib/src/api/opt/config.rs index 131cec4e..ab9a2945 100644 --- a/lib/src/api/opt/config.rs +++ b/lib/src/api/opt/config.rs @@ -15,6 +15,7 @@ pub struct Config { pub(crate) auth: Level, pub(crate) username: String, pub(crate) password: String, + pub(crate) tick_interval: Option, } impl Config { @@ -24,13 +25,11 @@ impl Config { } /// Set the strict value of the config to the supplied value - /// Enables `strict` server mode pub fn set_strict(mut self, strict: bool) -> Self { self.strict = strict; self } - /// Set the config to use strict mode /// Enables `strict` server mode pub fn strict(mut self) -> Self { self.strict = true; @@ -84,4 +83,10 @@ impl Config { self.tls_config = Some(super::Tls::Native(config)); self } + + /// Set the interval at which the database should run node maintenance tasks + pub fn tick_interval(mut self, interval: impl Into>) -> Self { + self.tick_interval = interval.into().filter(|x| !x.is_zero()); + self + } } diff --git a/lib/src/kvs/ds.rs b/lib/src/kvs/ds.rs index a8c14aa1..297c9c11 100644 --- a/lib/src/kvs/ds.rs +++ b/lib/src/kvs/ds.rs @@ -550,9 +550,7 @@ impl Datastore { } // tick is called periodically to perform maintenance tasks. - // On a linux/windows/macos system, this is called every TICK_INTERVAL. - // On embedded scenarios like WASM where there is no background thread, - // this should be called by the application. + // This is called every TICK_INTERVAL. pub async fn tick(&self) -> Result<(), Error> { let now = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) diff --git a/lib/tests/api.rs b/lib/tests/api.rs index 1f3ce68d..b13e3594 100644 --- a/lib/tests/api.rs +++ b/lib/tests/api.rs @@ -9,6 +9,7 @@ mod api_integration { use std::ops::Bound; use std::sync::Arc; use std::sync::Mutex; + use std::time::Duration; use surrealdb::error::Api as ApiError; use surrealdb::error::Db as DbError; use surrealdb::opt::auth::Database; @@ -35,6 +36,7 @@ mod api_integration { const NS: &str = "test-ns"; const ROOT_USER: &str = "root"; const ROOT_PASS: &str = "root"; + const TICK_INTERVAL: Duration = Duration::from_secs(1); // Used to ensure that only one test at a time is setting up the underlaying datastore. // When auth is enabled, multiple tests may try to create the same root user at the same time. static SETUP_MUTEX: Lazy>> = Lazy::new(|| Arc::new(Mutex::new(()))); @@ -132,7 +134,7 @@ mod api_integration { username: ROOT_USER, password: ROOT_PASS, }; - let config = Config::new().user(root); + let config = Config::new().user(root).tick_interval(TICK_INTERVAL); let db = Surreal::new::(config).await.unwrap(); db.signin(root).await.unwrap(); db @@ -205,7 +207,6 @@ mod api_integration { } include!("api/mod.rs"); - include!("api/local.rs"); include!("api/backup.rs"); } @@ -223,14 +224,13 @@ mod api_integration { username: ROOT_USER, password: ROOT_PASS, }; - let config = Config::new().user(root); + let config = Config::new().user(root).tick_interval(TICK_INTERVAL); let db = Surreal::new::((path, config)).await.unwrap(); db.signin(root).await.unwrap(); db } include!("api/mod.rs"); - include!("api/local.rs"); include!("api/backup.rs"); } @@ -248,14 +248,13 @@ mod api_integration { username: ROOT_USER, password: ROOT_PASS, }; - let config = Config::new().user(root); + let config = Config::new().user(root).tick_interval(TICK_INTERVAL); let db = Surreal::new::((path, config)).await.unwrap(); db.signin(root).await.unwrap(); db } include!("api/mod.rs"); - include!("api/local.rs"); include!("api/backup.rs"); } @@ -273,14 +272,13 @@ mod api_integration { username: ROOT_USER, password: ROOT_PASS, }; - let config = Config::new().user(root); + let config = Config::new().user(root).tick_interval(TICK_INTERVAL); let db = Surreal::new::((path, config)).await.unwrap(); db.signin(root).await.unwrap(); db } include!("api/mod.rs"); - include!("api/local.rs"); include!("api/backup.rs"); } @@ -297,14 +295,13 @@ mod api_integration { username: ROOT_USER, password: ROOT_PASS, }; - let config = Config::new().user(root); + let config = Config::new().user(root).tick_interval(TICK_INTERVAL); let db = Surreal::new::(("127.0.0.1:2379", config)).await.unwrap(); db.signin(root).await.unwrap(); db } include!("api/mod.rs"); - include!("api/local.rs"); include!("api/backup.rs"); } @@ -321,14 +318,13 @@ mod api_integration { username: ROOT_USER, password: ROOT_PASS, }; - let config = Config::new().user(root); + let config = Config::new().user(root).tick_interval(TICK_INTERVAL); let db = Surreal::new::(("/etc/foundationdb/fdb.cluster", config)).await.unwrap(); db.signin(root).await.unwrap(); db } include!("api/mod.rs"); - include!("api/local.rs"); include!("api/backup.rs"); } diff --git a/lib/tests/api/local.rs b/lib/tests/api/local.rs deleted file mode 100644 index a9a9780e..00000000 --- a/lib/tests/api/local.rs +++ /dev/null @@ -1,210 +0,0 @@ -#[tokio::test] -async fn changefeed_with_ts() { - let db = new_db().await; - db.use_ns(NS).use_db(Ulid::new().to_string()).await.unwrap(); - // Enable change feeds - let sql = " - DEFINE TABLE user CHANGEFEED 1h; - "; - let response = db.query(sql).await.unwrap(); - response.check().unwrap(); - // Save timestamp 1 - let ts1_dt = "2023-08-01T00:00:00Z"; - let ts1 = DateTime::parse_from_rfc3339(ts1_dt.clone()).unwrap(); - db.tick(ts1.timestamp().try_into().unwrap()).await.unwrap(); - // Create and update users - let sql = " - CREATE user:amos SET name = 'Amos'; - CREATE user:jane SET name = 'Jane'; - UPDATE user:amos SET name = 'AMOS'; - "; - let table = "user"; - let response = db.query(sql).await.unwrap(); - response.check().unwrap(); - let users: Vec = db - .update(table) - .content(Record { - name: "Doe", - }) - .await - .unwrap(); - let expected = &[ - RecordBuf { - id: thing("user:amos").unwrap(), - name: "Doe".to_owned(), - }, - RecordBuf { - id: thing("user:jane").unwrap(), - name: "Doe".to_owned(), - }, - ]; - assert_eq!(users, expected); - let users: Vec = db.select(table).await.unwrap(); - assert_eq!(users, expected); - let sql = " - SHOW CHANGES FOR TABLE user SINCE 0 LIMIT 10; - "; - let mut response = db.query(sql).await.unwrap(); - let value: Value = response.take(0).unwrap(); - let Value::Array(array) = value.clone() else { - unreachable!() - }; - assert_eq!(array.len(), 4); - // UPDATE user:amos - let a = array.get(0).unwrap(); - let Value::Object(a) = a else { - unreachable!() - }; - let Value::Number(versionstamp1) = a.get("versionstamp").unwrap() else { - unreachable!() - }; - let changes = a.get("changes").unwrap().to_owned(); - assert_eq!( - changes, - surrealdb::sql::value( - "[ - { - update: { - id: user:amos, - name: 'Amos' - } - } - ]" - ) - .unwrap() - ); - // UPDATE user:jane - let a = array.get(1).unwrap(); - let Value::Object(a) = a else { - unreachable!() - }; - let Value::Number(versionstamp2) = a.get("versionstamp").unwrap() else { - unreachable!() - }; - assert!(versionstamp1 < versionstamp2); - let changes = a.get("changes").unwrap().to_owned(); - assert_eq!( - changes, - surrealdb::sql::value( - "[ - { - update: { - id: user:jane, - name: 'Jane' - } - } - ]" - ) - .unwrap() - ); - // UPDATE user:amos - let a = array.get(2).unwrap(); - let Value::Object(a) = a else { - unreachable!() - }; - let Value::Number(versionstamp3) = a.get("versionstamp").unwrap() else { - unreachable!() - }; - assert!(versionstamp2 < versionstamp3); - let changes = a.get("changes").unwrap().to_owned(); - assert_eq!( - changes, - surrealdb::sql::value( - "[ - { - update: { - id: user:amos, - name: 'AMOS' - } - } - ]" - ) - .unwrap() - ); - // UPDATE table - let a = array.get(3).unwrap(); - let Value::Object(a) = a else { - unreachable!() - }; - let Value::Number(versionstamp4) = a.get("versionstamp").unwrap() else { - unreachable!() - }; - assert!(versionstamp3 < versionstamp4); - let changes = a.get("changes").unwrap().to_owned(); - assert_eq!( - changes, - surrealdb::sql::value( - "[ - { - update: { - id: user:amos, - name: 'Doe' - } - }, - { - update: { - id: user:jane, - name: 'Doe' - } - } - ]" - ) - .unwrap() - ); - // Save timestamp 2 - let ts2_dt = "2023-08-01T00:00:05Z"; - let ts2 = DateTime::parse_from_rfc3339(ts2_dt.clone()).unwrap(); - db.tick(ts2.timestamp().try_into().unwrap()).await.unwrap(); - // - // Show changes using timestamp 1 - // - let sql = format!( - " - SHOW CHANGES FOR TABLE user SINCE '{ts1_dt}' LIMIT 10; - " - ); - let mut response = db.query(sql).await.unwrap(); - let value: Value = response.take(0).unwrap(); - let Value::Array(array) = value.clone() else { - unreachable!() - }; - assert_eq!(array.len(), 4); - // UPDATE user:amos - let a = array.get(0).unwrap(); - let Value::Object(a) = a else { - unreachable!() - }; - let Value::Number(versionstamp1b) = a.get("versionstamp").unwrap() else { - unreachable!() - }; - assert!(versionstamp1 == versionstamp1b); - let changes = a.get("changes").unwrap().to_owned(); - assert_eq!( - changes, - surrealdb::sql::value( - "[ - { - update: { - id: user:amos, - name: 'Amos' - } - } - ]" - ) - .unwrap() - ); - // Save timestamp 3 - let ts3_dt = "2023-08-01T00:00:10Z"; - let ts3 = DateTime::parse_from_rfc3339(ts3_dt.clone()).unwrap(); - db.tick(ts3.timestamp().try_into().unwrap()).await.unwrap(); - // - // Show changes using timestamp 3 - // - let sql = format!("SHOW CHANGES FOR TABLE user SINCE '{ts3_dt}' LIMIT 10;"); - let mut response = db.query(sql).await.unwrap(); - let value: Value = response.take(0).unwrap(); - let Value::Array(array) = value.clone() else { - unreachable!() - }; - assert_eq!(array.len(), 0); -} diff --git a/lib/tests/changefeeds.rs b/lib/tests/changefeeds.rs index 9bed5704..7099ff08 100644 --- a/lib/tests/changefeeds.rs +++ b/lib/tests/changefeeds.rs @@ -1,4 +1,5 @@ mod parse; +use chrono::DateTime; use parse::Parse; use surrealdb::dbs::Session; use surrealdb::err::Error; @@ -175,3 +176,211 @@ async fn table_change_feeds() -> Result<(), Error> { // Ok(()) } + +#[tokio::test] +async fn changefeed_with_ts() -> Result<(), Error> { + let db = Datastore::new("memory").await?; + let ses = Session::owner().with_ns("test").with_db("test"); + // Enable change feeds + let sql = " + DEFINE TABLE user CHANGEFEED 1h; + "; + db.execute(sql, &ses, None).await?.remove(0).result?; + // Save timestamp 1 + let ts1_dt = "2023-08-01T00:00:00Z"; + let ts1 = DateTime::parse_from_rfc3339(ts1_dt.clone()).unwrap(); + db.tick_at(ts1.timestamp().try_into().unwrap()).await.unwrap(); + // Create and update users + let sql = " + CREATE user:amos SET name = 'Amos'; + CREATE user:jane SET name = 'Jane'; + UPDATE user:amos SET name = 'AMOS'; + "; + let table = "user"; + let res = db.execute(sql, &ses, None).await?; + for res in res { + res.result?; + } + let sql = format!("UPDATE {table} SET name = 'Doe'"); + let users = db.execute(&sql, &ses, None).await?.remove(0).result?; + let expected = Value::parse( + "[ + { + id: user:amos, + name: 'Doe', + }, + { + id: user:jane, + name: 'Doe', + }, + ]", + ); + assert_eq!(users, expected); + let sql = format!("SELECT * FROM {table}"); + let users = db.execute(&sql, &ses, None).await?.remove(0).result?; + assert_eq!(users, expected); + let sql = " + SHOW CHANGES FOR TABLE user SINCE 0 LIMIT 10; + "; + let value: Value = db.execute(sql, &ses, None).await?.remove(0).result?; + let Value::Array(array) = value.clone() else { + unreachable!() + }; + assert_eq!(array.len(), 4); + // UPDATE user:amos + let a = array.get(0).unwrap(); + let Value::Object(a) = a else { + unreachable!() + }; + let Value::Number(versionstamp1) = a.get("versionstamp").unwrap() else { + unreachable!() + }; + let changes = a.get("changes").unwrap().to_owned(); + assert_eq!( + changes, + surrealdb::sql::value( + "[ + { + update: { + id: user:amos, + name: 'Amos' + } + } + ]" + ) + .unwrap() + ); + // UPDATE user:jane + let a = array.get(1).unwrap(); + let Value::Object(a) = a else { + unreachable!() + }; + let Value::Number(versionstamp2) = a.get("versionstamp").unwrap() else { + unreachable!() + }; + assert!(versionstamp1 < versionstamp2); + let changes = a.get("changes").unwrap().to_owned(); + assert_eq!( + changes, + surrealdb::sql::value( + "[ + { + update: { + id: user:jane, + name: 'Jane' + } + } + ]" + ) + .unwrap() + ); + // UPDATE user:amos + let a = array.get(2).unwrap(); + let Value::Object(a) = a else { + unreachable!() + }; + let Value::Number(versionstamp3) = a.get("versionstamp").unwrap() else { + unreachable!() + }; + assert!(versionstamp2 < versionstamp3); + let changes = a.get("changes").unwrap().to_owned(); + assert_eq!( + changes, + surrealdb::sql::value( + "[ + { + update: { + id: user:amos, + name: 'AMOS' + } + } + ]" + ) + .unwrap() + ); + // UPDATE table + let a = array.get(3).unwrap(); + let Value::Object(a) = a else { + unreachable!() + }; + let Value::Number(versionstamp4) = a.get("versionstamp").unwrap() else { + unreachable!() + }; + assert!(versionstamp3 < versionstamp4); + let changes = a.get("changes").unwrap().to_owned(); + assert_eq!( + changes, + surrealdb::sql::value( + "[ + { + update: { + id: user:amos, + name: 'Doe' + } + }, + { + update: { + id: user:jane, + name: 'Doe' + } + } + ]" + ) + .unwrap() + ); + // Save timestamp 2 + let ts2_dt = "2023-08-01T00:00:05Z"; + let ts2 = DateTime::parse_from_rfc3339(ts2_dt.clone()).unwrap(); + db.tick_at(ts2.timestamp().try_into().unwrap()).await.unwrap(); + // + // Show changes using timestamp 1 + // + let sql = format!( + " + SHOW CHANGES FOR TABLE user SINCE '{ts1_dt}' LIMIT 10; + " + ); + let value: Value = db.execute(&sql, &ses, None).await?.remove(0).result?; + let Value::Array(array) = value.clone() else { + unreachable!() + }; + assert_eq!(array.len(), 4); + // UPDATE user:amos + let a = array.get(0).unwrap(); + let Value::Object(a) = a else { + unreachable!() + }; + let Value::Number(versionstamp1b) = a.get("versionstamp").unwrap() else { + unreachable!() + }; + assert!(versionstamp1 == versionstamp1b); + let changes = a.get("changes").unwrap().to_owned(); + assert_eq!( + changes, + surrealdb::sql::value( + "[ + { + update: { + id: user:amos, + name: 'Amos' + } + } + ]" + ) + .unwrap() + ); + // Save timestamp 3 + let ts3_dt = "2023-08-01T00:00:10Z"; + let ts3 = DateTime::parse_from_rfc3339(ts3_dt.clone()).unwrap(); + db.tick_at(ts3.timestamp().try_into().unwrap()).await.unwrap(); + // + // Show changes using timestamp 3 + // + let sql = format!("SHOW CHANGES FOR TABLE user SINCE '{ts3_dt}' LIMIT 10;"); + let value: Value = db.execute(&sql, &ses, None).await?.remove(0).result?; + let Value::Array(array) = value.clone() else { + unreachable!() + }; + assert_eq!(array.len(), 0); + Ok(()) +}