Make tick a configuration option (#2495)

This commit is contained in:
Rushmore Mushambi 2023-08-25 09:55:22 +02:00 committed by GitHub
parent 6cfc270d32
commit fd364e56da
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
17 changed files with 336 additions and 342 deletions

View file

@ -93,8 +93,6 @@ pub enum Method {
Signin, Signin,
/// Signs up on the server /// Signs up on the server
Signup, Signup,
/// Runs a series of node maintenance operations
Tick,
/// Removes a parameter from a connection /// Removes a parameter from a connection
Unset, Unset,
/// Perfoms an update operation /// Perfoms an update operation

View file

@ -48,7 +48,6 @@ use crate::dbs::Session;
use crate::kvs::Datastore; use crate::kvs::Datastore;
use crate::opt::IntoEndpoint; use crate::opt::IntoEndpoint;
use crate::sql::Array; use crate::sql::Array;
use crate::sql::Number;
use crate::sql::Query; use crate::sql::Query;
use crate::sql::Statement; use crate::sql::Statement;
use crate::sql::Statements; use crate::sql::Statements;
@ -60,6 +59,7 @@ use std::marker::PhantomData;
use std::mem; use std::mem;
#[cfg(not(target_arch = "wasm32"))] #[cfg(not(target_arch = "wasm32"))]
use std::path::PathBuf; use std::path::PathBuf;
use std::time::Duration;
#[cfg(not(target_arch = "wasm32"))] #[cfg(not(target_arch = "wasm32"))]
use tokio::fs::OpenOptions; use tokio::fs::OpenOptions;
#[cfg(not(target_arch = "wasm32"))] #[cfg(not(target_arch = "wasm32"))]
@ -71,6 +71,8 @@ use tokio::io::AsyncWrite;
#[cfg(not(target_arch = "wasm32"))] #[cfg(not(target_arch = "wasm32"))]
use tokio::io::AsyncWriteExt; use tokio::io::AsyncWriteExt;
const DEFAULT_TICK_INTERVAL: Duration = Duration::from_secs(10);
/// In-memory database /// In-memory database
/// ///
/// # Examples /// # Examples
@ -641,17 +643,6 @@ async fn router(
vars.insert(key, value); vars.insert(key, value);
Ok(DbResponse::Other(Value::None)) Ok(DbResponse::Other(Value::None))
} }
Method::Tick => {
let ts = match &mut params[..1] {
[Value::Number(Number::Int(ts))] => {
let ts = ts.to_owned();
ts as u64
}
_ => unreachable!(),
};
kvs.tick_at(ts).await?;
Ok(DbResponse::Other(Value::None))
}
Method::Unset => { Method::Unset => {
if let [Value::Strand(Strand(key))] = &params[..1] { if let [Value::Strand(Strand(key))] = &params[..1] {
vars.remove(key); vars.remove(key);

View file

@ -5,6 +5,7 @@ use crate::api::conn::Param;
use crate::api::conn::Route; use crate::api::conn::Route;
use crate::api::conn::Router; use crate::api::conn::Router;
use crate::api::engine::local::Db; use crate::api::engine::local::Db;
use crate::api::engine::local::DEFAULT_TICK_INTERVAL;
use crate::api::err::Error; use crate::api::err::Error;
use crate::api::opt::Endpoint; use crate::api::opt::Endpoint;
use crate::api::ExtraFeatures; use crate::api::ExtraFeatures;
@ -12,12 +13,14 @@ use crate::api::OnceLockExt;
use crate::api::Result; use crate::api::Result;
use crate::api::Surreal; use crate::api::Surreal;
use crate::dbs::Session; use crate::dbs::Session;
use crate::engine::IntervalStream;
use crate::iam::Level; use crate::iam::Level;
use crate::kvs::Datastore; use crate::kvs::Datastore;
use crate::opt::auth::Root; use crate::opt::auth::Root;
use flume::Receiver; use flume::Receiver;
use flume::Sender; use flume::Sender;
use futures::StreamExt; use futures::StreamExt;
use futures_concurrency::stream::Merge as _;
use std::collections::BTreeMap; use std::collections::BTreeMap;
use std::collections::HashSet; use std::collections::HashSet;
use std::future::Future; use std::future::Future;
@ -26,6 +29,9 @@ use std::pin::Pin;
use std::sync::atomic::AtomicI64; use std::sync::atomic::AtomicI64;
use std::sync::Arc; use std::sync::Arc;
use std::sync::OnceLock; use std::sync::OnceLock;
use std::time::Duration;
use tokio::time;
use tokio::time::MissedTickBehavior;
impl crate::api::Connection for Db {} impl crate::api::Connection for Db {}
@ -141,10 +147,15 @@ pub(crate) fn router(
false => kvs, false => kvs,
}; };
let kvs = Arc::new(kvs);
let mut vars = BTreeMap::new(); let mut vars = BTreeMap::new();
let mut stream = route_rx.into_stream(); let mut stream = route_rx.into_stream();
let mut session = Session::default(); let mut session = Session::default();
let (maintenance_tx, maintenance_rx) = flume::bounded::<()>(1);
let tick_interval = address.config.tick_interval.unwrap_or(DEFAULT_TICK_INTERVAL);
run_maintenance(kvs.clone(), tick_interval, maintenance_rx);
while let Some(Some(route)) = stream.next().await { while let Some(Some(route)) = stream.next().await {
match super::router(route.request, &kvs, &mut session, &mut vars).await { match super::router(route.request, &kvs, &mut session, &mut vars).await {
Ok(value) => { Ok(value) => {
@ -155,5 +166,31 @@ pub(crate) fn router(
} }
} }
} }
// Stop maintenance tasks
let _ = maintenance_tx.into_send_async(()).await;
});
}
fn run_maintenance(kvs: Arc<Datastore>, tick_interval: Duration, stop_signal: Receiver<()>) {
tokio::spawn(async move {
let mut interval = time::interval(tick_interval);
// Don't bombard the database if we miss some ticks
interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
// Delay sending the first tick
interval.tick().await;
let ticker = IntervalStream::new(interval);
let streams = (ticker.map(Some), stop_signal.into_stream().map(|_| None));
let mut stream = streams.merge();
while let Some(Some(_)) = stream.next().await {
match kvs.tick().await {
Ok(()) => trace!("Node agent tick ran successfully"),
Err(error) => error!("Error running node agent tick: {error}"),
}
}
}); });
} }

View file

@ -5,17 +5,20 @@ use crate::api::conn::Param;
use crate::api::conn::Route; use crate::api::conn::Route;
use crate::api::conn::Router; use crate::api::conn::Router;
use crate::api::engine::local::Db; use crate::api::engine::local::Db;
use crate::api::engine::local::DEFAULT_TICK_INTERVAL;
use crate::api::opt::Endpoint; use crate::api::opt::Endpoint;
use crate::api::OnceLockExt; use crate::api::OnceLockExt;
use crate::api::Result; use crate::api::Result;
use crate::api::Surreal; use crate::api::Surreal;
use crate::dbs::Session; use crate::dbs::Session;
use crate::engine::IntervalStream;
use crate::iam::Level; use crate::iam::Level;
use crate::kvs::Datastore; use crate::kvs::Datastore;
use crate::opt::auth::Root; use crate::opt::auth::Root;
use flume::Receiver; use flume::Receiver;
use flume::Sender; use flume::Sender;
use futures::StreamExt; use futures::StreamExt;
use futures_concurrency::stream::Merge as _;
use std::collections::BTreeMap; use std::collections::BTreeMap;
use std::collections::HashSet; use std::collections::HashSet;
use std::future::Future; use std::future::Future;
@ -24,7 +27,10 @@ use std::pin::Pin;
use std::sync::atomic::AtomicI64; use std::sync::atomic::AtomicI64;
use std::sync::Arc; use std::sync::Arc;
use std::sync::OnceLock; use std::sync::OnceLock;
use std::time::Duration;
use wasm_bindgen_futures::spawn_local; use wasm_bindgen_futures::spawn_local;
use wasmtimer::tokio as time;
use wasmtimer::tokio::MissedTickBehavior;
impl crate::api::Connection for Db {} impl crate::api::Connection for Db {}
@ -108,7 +114,6 @@ pub(crate) fn router(
return; return;
} }
} }
let _ = conn_tx.into_send_async(Ok(())).await; let _ = conn_tx.into_send_async(Ok(())).await;
kvs.with_auth_enabled(configured_root.is_some()) kvs.with_auth_enabled(configured_root.is_some())
} }
@ -128,10 +133,15 @@ pub(crate) fn router(
false => kvs, false => kvs,
}; };
let kvs = Arc::new(kvs);
let mut vars = BTreeMap::new(); let mut vars = BTreeMap::new();
let mut stream = route_rx.into_stream(); let mut stream = route_rx.into_stream();
let mut session = Session::default(); let mut session = Session::default();
let (maintenance_tx, maintenance_rx) = flume::bounded::<()>(1);
let tick_interval = address.config.tick_interval.unwrap_or(DEFAULT_TICK_INTERVAL);
run_maintenance(kvs.clone(), tick_interval, maintenance_rx);
while let Some(Some(route)) = stream.next().await { while let Some(Some(route)) = stream.next().await {
match super::router(route.request, &kvs, &mut session, &mut vars).await { match super::router(route.request, &kvs, &mut session, &mut vars).await {
Ok(value) => { Ok(value) => {
@ -142,5 +152,31 @@ pub(crate) fn router(
} }
} }
} }
// Stop maintenance tasks
let _ = maintenance_tx.into_send_async(()).await;
});
}
fn run_maintenance(kvs: Arc<Datastore>, tick_interval: Duration, stop_signal: Receiver<()>) {
spawn_local(async move {
let mut interval = time::interval(tick_interval);
// Don't bombard the database if we miss some ticks
interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
// Delay sending the first tick
interval.tick().await;
let ticker = IntervalStream::new(interval);
let streams = (ticker.map(Some), stop_signal.into_stream().map(|_| None));
let mut stream = streams.merge();
while let Some(Some(_)) = stream.next().await {
match kvs.tick().await {
Ok(()) => trace!("Node agent tick ran successfully"),
Err(error) => error!("Error running node agent tick: {error}"),
}
}
}); });
} }

View file

@ -24,7 +24,19 @@ use crate::sql::Fields;
use crate::sql::Output; use crate::sql::Output;
use crate::sql::Value; use crate::sql::Value;
use crate::sql::Values; use crate::sql::Values;
use futures::Stream;
use std::mem; use std::mem;
use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
#[cfg(not(target_arch = "wasm32"))]
use tokio::time::Instant;
#[cfg(not(target_arch = "wasm32"))]
use tokio::time::Interval;
#[cfg(target_arch = "wasm32")]
use wasmtimer::std::Instant;
#[cfg(target_arch = "wasm32")]
use wasmtimer::tokio::Interval;
#[allow(dead_code)] // used by the the embedded database and `http` #[allow(dead_code)] // used by the the embedded database and `http`
fn split_params(params: &mut [Value]) -> (bool, Values, Value) { fn split_params(params: &mut [Value]) -> (bool, Values, Value) {
@ -135,3 +147,23 @@ fn delete_statement(params: &mut [Value]) -> (bool, DeleteStatement) {
}, },
) )
} }
struct IntervalStream {
inner: Interval,
}
impl IntervalStream {
fn new(interval: Interval) -> Self {
Self {
inner: interval,
}
}
}
impl Stream for IntervalStream {
type Item = Instant;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Instant>> {
self.inner.poll_tick(cx).map(Some)
}
}

View file

@ -517,9 +517,6 @@ async fn router(
let value = health(request).await?; let value = health(request).await?;
Ok(DbResponse::Other(value)) Ok(DbResponse::Other(value))
} }
Method::Tick => Err(crate::Error::Api(crate::error::Api::InvalidRequest(
"Tick is not implemented for remote backends".to_string(),
))),
Method::Version => { Method::Version => {
let path = base_url.join(method.as_str())?; let path = base_url.join(method.as_str())?;
let request = client.get(path); let request = client.get(path);

View file

@ -17,21 +17,9 @@ use crate::opt::IntoEndpoint;
use crate::sql::Array; use crate::sql::Array;
use crate::sql::Strand; use crate::sql::Strand;
use crate::sql::Value; use crate::sql::Value;
use futures::Stream;
use serde::Deserialize; use serde::Deserialize;
use std::marker::PhantomData; use std::marker::PhantomData;
use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
use std::time::Duration; use std::time::Duration;
#[cfg(not(target_arch = "wasm32"))]
use tokio::time::Instant;
#[cfg(not(target_arch = "wasm32"))]
use tokio::time::Interval;
#[cfg(target_arch = "wasm32")]
use wasmtimer::std::Instant;
#[cfg(target_arch = "wasm32")]
use wasmtimer::tokio::Interval;
pub(crate) const PATH: &str = "rpc"; pub(crate) const PATH: &str = "rpc";
const PING_INTERVAL: Duration = Duration::from_secs(5); const PING_INTERVAL: Duration = Duration::from_secs(5);
@ -149,23 +137,3 @@ pub(crate) struct Response {
id: Option<Value>, id: Option<Value>,
pub(crate) result: ServerResult, pub(crate) result: ServerResult,
} }
struct IntervalStream {
inner: Interval,
}
impl IntervalStream {
fn new(interval: Interval) -> Self {
Self {
inner: interval,
}
}
}
impl Stream for IntervalStream {
type Item = Instant;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Instant>> {
self.inner.poll_tick(cx).map(Some)
}
}

View file

@ -16,7 +16,7 @@ use crate::api::opt::Tls;
use crate::api::OnceLockExt; use crate::api::OnceLockExt;
use crate::api::Result; use crate::api::Result;
use crate::api::Surreal; use crate::api::Surreal;
use crate::engine::remote::ws::IntervalStream; use crate::engine::IntervalStream;
use crate::sql::serde::{deserialize, serialize}; use crate::sql::serde::{deserialize, serialize};
use crate::sql::Strand; use crate::sql::Strand;
use crate::sql::Value; use crate::sql::Value;
@ -230,10 +230,6 @@ pub(crate) fn router(
vars.remove(key); vars.remove(key);
} }
} }
Method::Tick => {
// Remote backend doesn't support tick
return;
}
_ => {} _ => {}
} }
let method_str = match method { let method_str = match method {

View file

@ -14,7 +14,7 @@ use crate::api::opt::Endpoint;
use crate::api::OnceLockExt; use crate::api::OnceLockExt;
use crate::api::Result; use crate::api::Result;
use crate::api::Surreal; use crate::api::Surreal;
use crate::engine::remote::ws::IntervalStream; use crate::engine::IntervalStream;
use crate::sql::serde::{deserialize, serialize}; use crate::sql::serde::{deserialize, serialize};
use crate::sql::Strand; use crate::sql::Strand;
use crate::sql::Value; use crate::sql::Value;

View file

@ -21,7 +21,6 @@ mod select;
mod set; mod set;
mod signin; mod signin;
mod signup; mod signup;
mod tick;
mod unset; mod unset;
mod update; mod update;
mod use_db; mod use_db;
@ -58,7 +57,6 @@ pub use select::Select;
pub use set::Set; pub use set::Set;
pub use signin::Signin; pub use signin::Signin;
pub use signup::Signup; pub use signup::Signup;
pub use tick::Tick;
pub use unset::Unset; pub use unset::Unset;
pub use update::Update; pub use update::Update;
pub use use_db::UseDb; pub use use_db::UseDb;
@ -105,7 +103,6 @@ impl Method {
Method::Set => "set", Method::Set => "set",
Method::Signin => "signin", Method::Signin => "signin",
Method::Signup => "signup", Method::Signup => "signup",
Method::Tick => "tick",
Method::Unset => "unset", Method::Unset => "unset",
Method::Update => "update", Method::Update => "update",
Method::Use => "use", Method::Use => "use",
@ -907,27 +904,6 @@ where
} }
} }
/// Runs the embedded datastore's "tick" operation at the specified timestamp.
/// This is used by the client to periodically run node maintenance tasks
/// which are usually run by the server itself in a client-server setup.
///
/// # Examples
///
/// ```no_run
/// # #[tokio::main]
/// # async fn main() -> surrealdb::Result<()> {
/// # let db = surrealdb::engine::any::connect("mem://").await?;
/// db.tick(123).await?;
/// # Ok(())
/// # }
/// ```
pub fn tick(&self, ts: u64) -> Tick<C> {
Tick {
router: self.router.extract(),
ts,
}
}
/// Returns the version of the server /// Returns the version of the server
/// ///
/// # Examples /// # Examples

View file

@ -68,10 +68,6 @@ pub(super) fn mock(route_rx: Receiver<Option<Route>>) {
} }
_ => unreachable!(), _ => unreachable!(),
}, },
Method::Tick => match &params[..] {
[Value::Number(..)] => Ok(DbResponse::Other(Value::None)),
_ => unreachable!(),
},
Method::Update | Method::Merge | Method::Patch => match &params[..] { Method::Update | Method::Merge | Method::Patch => match &params[..] {
[Value::Thing(..)] | [Value::Thing(..), _] => { [Value::Thing(..)] | [Value::Thing(..), _] => {
Ok(DbResponse::Other(to_value(User::default()).unwrap())) Ok(DbResponse::Other(to_value(User::default()).unwrap()))

View file

@ -1,31 +0,0 @@
use crate::api::conn::Method;
use crate::api::conn::Param;
use crate::api::conn::Router;
use crate::api::Connection;
use crate::api::Result;
use std::future::Future;
use std::future::IntoFuture;
use std::pin::Pin;
/// A tick-at-specified-timestamp future
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct Tick<'r, C: Connection> {
pub(super) router: Result<&'r Router<C>>,
pub(super) ts: u64,
}
impl<'r, Client> IntoFuture for Tick<'r, Client>
where
Client: Connection,
{
type Output = Result<()>;
type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send + Sync + 'r>>;
fn into_future(self) -> Self::IntoFuture {
Box::pin(async move {
let mut conn = Client::new(Method::Tick);
conn.execute_unit(self.router?, Param::new(vec![self.ts.into()])).await
})
}
}

View file

@ -15,6 +15,7 @@ pub struct Config {
pub(crate) auth: Level, pub(crate) auth: Level,
pub(crate) username: String, pub(crate) username: String,
pub(crate) password: String, pub(crate) password: String,
pub(crate) tick_interval: Option<Duration>,
} }
impl Config { impl Config {
@ -24,13 +25,11 @@ impl Config {
} }
/// Set the strict value of the config to the supplied value /// Set the strict value of the config to the supplied value
/// Enables `strict` server mode
pub fn set_strict(mut self, strict: bool) -> Self { pub fn set_strict(mut self, strict: bool) -> Self {
self.strict = strict; self.strict = strict;
self self
} }
/// Set the config to use strict mode
/// Enables `strict` server mode /// Enables `strict` server mode
pub fn strict(mut self) -> Self { pub fn strict(mut self) -> Self {
self.strict = true; self.strict = true;
@ -84,4 +83,10 @@ impl Config {
self.tls_config = Some(super::Tls::Native(config)); self.tls_config = Some(super::Tls::Native(config));
self self
} }
/// Set the interval at which the database should run node maintenance tasks
pub fn tick_interval(mut self, interval: impl Into<Option<Duration>>) -> Self {
self.tick_interval = interval.into().filter(|x| !x.is_zero());
self
}
} }

View file

@ -550,9 +550,7 @@ impl Datastore {
} }
// tick is called periodically to perform maintenance tasks. // tick is called periodically to perform maintenance tasks.
// On a linux/windows/macos system, this is called every TICK_INTERVAL. // This is called every TICK_INTERVAL.
// On embedded scenarios like WASM where there is no background thread,
// this should be called by the application.
pub async fn tick(&self) -> Result<(), Error> { pub async fn tick(&self) -> Result<(), Error> {
let now = std::time::SystemTime::now() let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH) .duration_since(std::time::UNIX_EPOCH)

View file

@ -9,6 +9,7 @@ mod api_integration {
use std::ops::Bound; use std::ops::Bound;
use std::sync::Arc; use std::sync::Arc;
use std::sync::Mutex; use std::sync::Mutex;
use std::time::Duration;
use surrealdb::error::Api as ApiError; use surrealdb::error::Api as ApiError;
use surrealdb::error::Db as DbError; use surrealdb::error::Db as DbError;
use surrealdb::opt::auth::Database; use surrealdb::opt::auth::Database;
@ -35,6 +36,7 @@ mod api_integration {
const NS: &str = "test-ns"; const NS: &str = "test-ns";
const ROOT_USER: &str = "root"; const ROOT_USER: &str = "root";
const ROOT_PASS: &str = "root"; const ROOT_PASS: &str = "root";
const TICK_INTERVAL: Duration = Duration::from_secs(1);
// Used to ensure that only one test at a time is setting up the underlaying datastore. // Used to ensure that only one test at a time is setting up the underlaying datastore.
// When auth is enabled, multiple tests may try to create the same root user at the same time. // When auth is enabled, multiple tests may try to create the same root user at the same time.
static SETUP_MUTEX: Lazy<Arc<Mutex<()>>> = Lazy::new(|| Arc::new(Mutex::new(()))); static SETUP_MUTEX: Lazy<Arc<Mutex<()>>> = Lazy::new(|| Arc::new(Mutex::new(())));
@ -132,7 +134,7 @@ mod api_integration {
username: ROOT_USER, username: ROOT_USER,
password: ROOT_PASS, password: ROOT_PASS,
}; };
let config = Config::new().user(root); let config = Config::new().user(root).tick_interval(TICK_INTERVAL);
let db = Surreal::new::<Mem>(config).await.unwrap(); let db = Surreal::new::<Mem>(config).await.unwrap();
db.signin(root).await.unwrap(); db.signin(root).await.unwrap();
db db
@ -205,7 +207,6 @@ mod api_integration {
} }
include!("api/mod.rs"); include!("api/mod.rs");
include!("api/local.rs");
include!("api/backup.rs"); include!("api/backup.rs");
} }
@ -223,14 +224,13 @@ mod api_integration {
username: ROOT_USER, username: ROOT_USER,
password: ROOT_PASS, password: ROOT_PASS,
}; };
let config = Config::new().user(root); let config = Config::new().user(root).tick_interval(TICK_INTERVAL);
let db = Surreal::new::<File>((path, config)).await.unwrap(); let db = Surreal::new::<File>((path, config)).await.unwrap();
db.signin(root).await.unwrap(); db.signin(root).await.unwrap();
db db
} }
include!("api/mod.rs"); include!("api/mod.rs");
include!("api/local.rs");
include!("api/backup.rs"); include!("api/backup.rs");
} }
@ -248,14 +248,13 @@ mod api_integration {
username: ROOT_USER, username: ROOT_USER,
password: ROOT_PASS, password: ROOT_PASS,
}; };
let config = Config::new().user(root); let config = Config::new().user(root).tick_interval(TICK_INTERVAL);
let db = Surreal::new::<RocksDb>((path, config)).await.unwrap(); let db = Surreal::new::<RocksDb>((path, config)).await.unwrap();
db.signin(root).await.unwrap(); db.signin(root).await.unwrap();
db db
} }
include!("api/mod.rs"); include!("api/mod.rs");
include!("api/local.rs");
include!("api/backup.rs"); include!("api/backup.rs");
} }
@ -273,14 +272,13 @@ mod api_integration {
username: ROOT_USER, username: ROOT_USER,
password: ROOT_PASS, password: ROOT_PASS,
}; };
let config = Config::new().user(root); let config = Config::new().user(root).tick_interval(TICK_INTERVAL);
let db = Surreal::new::<SpeeDb>((path, config)).await.unwrap(); let db = Surreal::new::<SpeeDb>((path, config)).await.unwrap();
db.signin(root).await.unwrap(); db.signin(root).await.unwrap();
db db
} }
include!("api/mod.rs"); include!("api/mod.rs");
include!("api/local.rs");
include!("api/backup.rs"); include!("api/backup.rs");
} }
@ -297,14 +295,13 @@ mod api_integration {
username: ROOT_USER, username: ROOT_USER,
password: ROOT_PASS, password: ROOT_PASS,
}; };
let config = Config::new().user(root); let config = Config::new().user(root).tick_interval(TICK_INTERVAL);
let db = Surreal::new::<TiKv>(("127.0.0.1:2379", config)).await.unwrap(); let db = Surreal::new::<TiKv>(("127.0.0.1:2379", config)).await.unwrap();
db.signin(root).await.unwrap(); db.signin(root).await.unwrap();
db db
} }
include!("api/mod.rs"); include!("api/mod.rs");
include!("api/local.rs");
include!("api/backup.rs"); include!("api/backup.rs");
} }
@ -321,14 +318,13 @@ mod api_integration {
username: ROOT_USER, username: ROOT_USER,
password: ROOT_PASS, password: ROOT_PASS,
}; };
let config = Config::new().user(root); let config = Config::new().user(root).tick_interval(TICK_INTERVAL);
let db = Surreal::new::<FDb>(("/etc/foundationdb/fdb.cluster", config)).await.unwrap(); let db = Surreal::new::<FDb>(("/etc/foundationdb/fdb.cluster", config)).await.unwrap();
db.signin(root).await.unwrap(); db.signin(root).await.unwrap();
db db
} }
include!("api/mod.rs"); include!("api/mod.rs");
include!("api/local.rs");
include!("api/backup.rs"); include!("api/backup.rs");
} }

View file

@ -1,210 +0,0 @@
#[tokio::test]
async fn changefeed_with_ts() {
let db = new_db().await;
db.use_ns(NS).use_db(Ulid::new().to_string()).await.unwrap();
// Enable change feeds
let sql = "
DEFINE TABLE user CHANGEFEED 1h;
";
let response = db.query(sql).await.unwrap();
response.check().unwrap();
// Save timestamp 1
let ts1_dt = "2023-08-01T00:00:00Z";
let ts1 = DateTime::parse_from_rfc3339(ts1_dt.clone()).unwrap();
db.tick(ts1.timestamp().try_into().unwrap()).await.unwrap();
// Create and update users
let sql = "
CREATE user:amos SET name = 'Amos';
CREATE user:jane SET name = 'Jane';
UPDATE user:amos SET name = 'AMOS';
";
let table = "user";
let response = db.query(sql).await.unwrap();
response.check().unwrap();
let users: Vec<RecordBuf> = db
.update(table)
.content(Record {
name: "Doe",
})
.await
.unwrap();
let expected = &[
RecordBuf {
id: thing("user:amos").unwrap(),
name: "Doe".to_owned(),
},
RecordBuf {
id: thing("user:jane").unwrap(),
name: "Doe".to_owned(),
},
];
assert_eq!(users, expected);
let users: Vec<RecordBuf> = db.select(table).await.unwrap();
assert_eq!(users, expected);
let sql = "
SHOW CHANGES FOR TABLE user SINCE 0 LIMIT 10;
";
let mut response = db.query(sql).await.unwrap();
let value: Value = response.take(0).unwrap();
let Value::Array(array) = value.clone() else {
unreachable!()
};
assert_eq!(array.len(), 4);
// UPDATE user:amos
let a = array.get(0).unwrap();
let Value::Object(a) = a else {
unreachable!()
};
let Value::Number(versionstamp1) = a.get("versionstamp").unwrap() else {
unreachable!()
};
let changes = a.get("changes").unwrap().to_owned();
assert_eq!(
changes,
surrealdb::sql::value(
"[
{
update: {
id: user:amos,
name: 'Amos'
}
}
]"
)
.unwrap()
);
// UPDATE user:jane
let a = array.get(1).unwrap();
let Value::Object(a) = a else {
unreachable!()
};
let Value::Number(versionstamp2) = a.get("versionstamp").unwrap() else {
unreachable!()
};
assert!(versionstamp1 < versionstamp2);
let changes = a.get("changes").unwrap().to_owned();
assert_eq!(
changes,
surrealdb::sql::value(
"[
{
update: {
id: user:jane,
name: 'Jane'
}
}
]"
)
.unwrap()
);
// UPDATE user:amos
let a = array.get(2).unwrap();
let Value::Object(a) = a else {
unreachable!()
};
let Value::Number(versionstamp3) = a.get("versionstamp").unwrap() else {
unreachable!()
};
assert!(versionstamp2 < versionstamp3);
let changes = a.get("changes").unwrap().to_owned();
assert_eq!(
changes,
surrealdb::sql::value(
"[
{
update: {
id: user:amos,
name: 'AMOS'
}
}
]"
)
.unwrap()
);
// UPDATE table
let a = array.get(3).unwrap();
let Value::Object(a) = a else {
unreachable!()
};
let Value::Number(versionstamp4) = a.get("versionstamp").unwrap() else {
unreachable!()
};
assert!(versionstamp3 < versionstamp4);
let changes = a.get("changes").unwrap().to_owned();
assert_eq!(
changes,
surrealdb::sql::value(
"[
{
update: {
id: user:amos,
name: 'Doe'
}
},
{
update: {
id: user:jane,
name: 'Doe'
}
}
]"
)
.unwrap()
);
// Save timestamp 2
let ts2_dt = "2023-08-01T00:00:05Z";
let ts2 = DateTime::parse_from_rfc3339(ts2_dt.clone()).unwrap();
db.tick(ts2.timestamp().try_into().unwrap()).await.unwrap();
//
// Show changes using timestamp 1
//
let sql = format!(
"
SHOW CHANGES FOR TABLE user SINCE '{ts1_dt}' LIMIT 10;
"
);
let mut response = db.query(sql).await.unwrap();
let value: Value = response.take(0).unwrap();
let Value::Array(array) = value.clone() else {
unreachable!()
};
assert_eq!(array.len(), 4);
// UPDATE user:amos
let a = array.get(0).unwrap();
let Value::Object(a) = a else {
unreachable!()
};
let Value::Number(versionstamp1b) = a.get("versionstamp").unwrap() else {
unreachable!()
};
assert!(versionstamp1 == versionstamp1b);
let changes = a.get("changes").unwrap().to_owned();
assert_eq!(
changes,
surrealdb::sql::value(
"[
{
update: {
id: user:amos,
name: 'Amos'
}
}
]"
)
.unwrap()
);
// Save timestamp 3
let ts3_dt = "2023-08-01T00:00:10Z";
let ts3 = DateTime::parse_from_rfc3339(ts3_dt.clone()).unwrap();
db.tick(ts3.timestamp().try_into().unwrap()).await.unwrap();
//
// Show changes using timestamp 3
//
let sql = format!("SHOW CHANGES FOR TABLE user SINCE '{ts3_dt}' LIMIT 10;");
let mut response = db.query(sql).await.unwrap();
let value: Value = response.take(0).unwrap();
let Value::Array(array) = value.clone() else {
unreachable!()
};
assert_eq!(array.len(), 0);
}

View file

@ -1,4 +1,5 @@
mod parse; mod parse;
use chrono::DateTime;
use parse::Parse; use parse::Parse;
use surrealdb::dbs::Session; use surrealdb::dbs::Session;
use surrealdb::err::Error; use surrealdb::err::Error;
@ -175,3 +176,211 @@ async fn table_change_feeds() -> Result<(), Error> {
// //
Ok(()) Ok(())
} }
#[tokio::test]
async fn changefeed_with_ts() -> Result<(), Error> {
let db = Datastore::new("memory").await?;
let ses = Session::owner().with_ns("test").with_db("test");
// Enable change feeds
let sql = "
DEFINE TABLE user CHANGEFEED 1h;
";
db.execute(sql, &ses, None).await?.remove(0).result?;
// Save timestamp 1
let ts1_dt = "2023-08-01T00:00:00Z";
let ts1 = DateTime::parse_from_rfc3339(ts1_dt.clone()).unwrap();
db.tick_at(ts1.timestamp().try_into().unwrap()).await.unwrap();
// Create and update users
let sql = "
CREATE user:amos SET name = 'Amos';
CREATE user:jane SET name = 'Jane';
UPDATE user:amos SET name = 'AMOS';
";
let table = "user";
let res = db.execute(sql, &ses, None).await?;
for res in res {
res.result?;
}
let sql = format!("UPDATE {table} SET name = 'Doe'");
let users = db.execute(&sql, &ses, None).await?.remove(0).result?;
let expected = Value::parse(
"[
{
id: user:amos,
name: 'Doe',
},
{
id: user:jane,
name: 'Doe',
},
]",
);
assert_eq!(users, expected);
let sql = format!("SELECT * FROM {table}");
let users = db.execute(&sql, &ses, None).await?.remove(0).result?;
assert_eq!(users, expected);
let sql = "
SHOW CHANGES FOR TABLE user SINCE 0 LIMIT 10;
";
let value: Value = db.execute(sql, &ses, None).await?.remove(0).result?;
let Value::Array(array) = value.clone() else {
unreachable!()
};
assert_eq!(array.len(), 4);
// UPDATE user:amos
let a = array.get(0).unwrap();
let Value::Object(a) = a else {
unreachable!()
};
let Value::Number(versionstamp1) = a.get("versionstamp").unwrap() else {
unreachable!()
};
let changes = a.get("changes").unwrap().to_owned();
assert_eq!(
changes,
surrealdb::sql::value(
"[
{
update: {
id: user:amos,
name: 'Amos'
}
}
]"
)
.unwrap()
);
// UPDATE user:jane
let a = array.get(1).unwrap();
let Value::Object(a) = a else {
unreachable!()
};
let Value::Number(versionstamp2) = a.get("versionstamp").unwrap() else {
unreachable!()
};
assert!(versionstamp1 < versionstamp2);
let changes = a.get("changes").unwrap().to_owned();
assert_eq!(
changes,
surrealdb::sql::value(
"[
{
update: {
id: user:jane,
name: 'Jane'
}
}
]"
)
.unwrap()
);
// UPDATE user:amos
let a = array.get(2).unwrap();
let Value::Object(a) = a else {
unreachable!()
};
let Value::Number(versionstamp3) = a.get("versionstamp").unwrap() else {
unreachable!()
};
assert!(versionstamp2 < versionstamp3);
let changes = a.get("changes").unwrap().to_owned();
assert_eq!(
changes,
surrealdb::sql::value(
"[
{
update: {
id: user:amos,
name: 'AMOS'
}
}
]"
)
.unwrap()
);
// UPDATE table
let a = array.get(3).unwrap();
let Value::Object(a) = a else {
unreachable!()
};
let Value::Number(versionstamp4) = a.get("versionstamp").unwrap() else {
unreachable!()
};
assert!(versionstamp3 < versionstamp4);
let changes = a.get("changes").unwrap().to_owned();
assert_eq!(
changes,
surrealdb::sql::value(
"[
{
update: {
id: user:amos,
name: 'Doe'
}
},
{
update: {
id: user:jane,
name: 'Doe'
}
}
]"
)
.unwrap()
);
// Save timestamp 2
let ts2_dt = "2023-08-01T00:00:05Z";
let ts2 = DateTime::parse_from_rfc3339(ts2_dt.clone()).unwrap();
db.tick_at(ts2.timestamp().try_into().unwrap()).await.unwrap();
//
// Show changes using timestamp 1
//
let sql = format!(
"
SHOW CHANGES FOR TABLE user SINCE '{ts1_dt}' LIMIT 10;
"
);
let value: Value = db.execute(&sql, &ses, None).await?.remove(0).result?;
let Value::Array(array) = value.clone() else {
unreachable!()
};
assert_eq!(array.len(), 4);
// UPDATE user:amos
let a = array.get(0).unwrap();
let Value::Object(a) = a else {
unreachable!()
};
let Value::Number(versionstamp1b) = a.get("versionstamp").unwrap() else {
unreachable!()
};
assert!(versionstamp1 == versionstamp1b);
let changes = a.get("changes").unwrap().to_owned();
assert_eq!(
changes,
surrealdb::sql::value(
"[
{
update: {
id: user:amos,
name: 'Amos'
}
}
]"
)
.unwrap()
);
// Save timestamp 3
let ts3_dt = "2023-08-01T00:00:10Z";
let ts3 = DateTime::parse_from_rfc3339(ts3_dt.clone()).unwrap();
db.tick_at(ts3.timestamp().try_into().unwrap()).await.unwrap();
//
// Show changes using timestamp 3
//
let sql = format!("SHOW CHANGES FOR TABLE user SINCE '{ts3_dt}' LIMIT 10;");
let value: Value = db.execute(&sql, &ses, None).await?.remove(0).result?;
let Value::Array(array) = value.clone() else {
unreachable!()
};
assert_eq!(array.len(), 0);
Ok(())
}