From e06cd111cf7ac005f80f9ee85c2fd6f742f689ea Mon Sep 17 00:00:00 2001 From: Przemyslaw Hugh Kaznowski Date: Fri, 1 Mar 2024 17:12:53 +0000 Subject: [PATCH] Add Engine Config for configuring how many live queries can be created per transaction (#3559) --- core/src/dbs/capabilities.rs | 8 +++--- core/src/kvs/ds.rs | 52 +++++++++++++++++++++--------------- core/src/kvs/tx.rs | 8 +++--- core/src/lib.rs | 1 + core/src/options.rs | 18 +++++++++++++ src/cli/config.rs | 2 ++ src/cli/start.rs | 1 + src/dbs/mod.rs | 7 ++++- 8 files changed, 67 insertions(+), 30 deletions(-) create mode 100644 core/src/options.rs diff --git a/core/src/dbs/capabilities.rs b/core/src/dbs/capabilities.rs index e3288cb7..9c935f8a 100644 --- a/core/src/dbs/capabilities.rs +++ b/core/src/dbs/capabilities.rs @@ -168,10 +168,10 @@ pub struct Capabilities { impl std::fmt::Display for Capabilities { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!( - f, - "scripting={}, guest_access={}, live_query_notifications={}, allow_funcs={}, deny_funcs={}, allow_net={}, deny_net={}", - self.scripting, self.guest_access, self.live_query_notifications, self.allow_funcs, self.deny_funcs, self.allow_net, self.deny_net - ) + f, + "scripting={}, guest_access={}, live_query_notifications={}, allow_funcs={}, deny_funcs={}, allow_net={}, deny_net={}", + self.scripting, self.guest_access, self.live_query_notifications, self.allow_funcs, self.deny_funcs, self.allow_net, self.deny_net + ) } } diff --git a/core/src/kvs/ds.rs b/core/src/kvs/ds.rs index 0f8bc6f4..73ae7b16 100644 --- a/core/src/kvs/ds.rs +++ b/core/src/kvs/ds.rs @@ -36,6 +36,7 @@ use crate::kvs::lq_structs::{ LqEntry, LqIndexKey, LqIndexValue, LqSelector, LqValue, TrackedResult, UnreachableLqType, }; use crate::kvs::{LockType, LockType::*, TransactionType, TransactionType::*}; +use crate::options::EngineOptions; use crate::sql::statements::show::ShowSince; use crate::sql::{self, statements::DefineUserStatement, Base, Query, Uuid, Value}; use crate::syn; @@ -49,8 +50,6 @@ const LQ_CHANNEL_SIZE: usize = 100; // The batch size used for non-paged operations (i.e. if there are more results, they are ignored) const NON_PAGED_BATCH_SIZE: u32 = 100_000; -// In the future we will have proper pagination -const TEMPORARY_LQ_CF_BATCH_SIZE_TILL_WE_HAVE_PAGINATION: u32 = 1000; /// The underlying datastore instance which stores the dataset. #[allow(dead_code)] @@ -72,6 +71,7 @@ pub struct Datastore { transaction_timeout: Option, // Capabilities for this datastore capabilities: Capabilities, + engine_options: EngineOptions, // The versionstamp oracle for this datastore. // Used only in some datastores, such as tikv. versionstamp_oracle: Arc>, @@ -353,6 +353,7 @@ impl Datastore { transaction_timeout: None, notification_channel: None, capabilities: Capabilities::default(), + engine_options: EngineOptions::default(), versionstamp_oracle: Arc::new(Mutex::new(Oracle::systime_counter())), clock, index_stores: IndexStores::default(), @@ -410,6 +411,12 @@ impl Datastore { self } + /// Set the engine options for the datastore + pub fn with_engine_options(mut self, engine_options: EngineOptions) -> Self { + self.engine_options = engine_options; + self + } + pub fn index_store(&self) -> &IndexStores { &self.index_stores } @@ -830,18 +837,18 @@ impl Datastore { ) -> Result, Error> { let mut tx = self.transaction(Write, Optimistic).await?; match self.save_timestamp_for_versionstamp_impl(ts, &mut tx).await { - Ok(vs) => Ok(vs), - Err(e) => { - match tx.cancel().await { - Ok(_) => { - Err(e) - } - Err(txe) => { - Err(Error::Tx(format!("Error saving timestamp for versionstamp: {:?} and error cancelling transaction: {:?}", e, txe))) - } - } - } - } + Ok(vs) => Ok(vs), + Err(e) => { + match tx.cancel().await { + Ok(_) => { + Err(e) + } + Err(txe) => { + Err(Error::Tx(format!("Error saving timestamp for versionstamp: {:?} and error cancelling transaction: {:?}", e, txe))) + } + } + } + } } /// Poll change feeds for live query notifications @@ -875,7 +882,7 @@ impl Datastore { // That is an improvement though Some(&selector.tb), ShowSince::versionstamp(vs), - Some(TEMPORARY_LQ_CF_BATCH_SIZE_TILL_WE_HAVE_PAGINATION), + Some(self.engine_options.live_query_catchup_size), ) .await?; // Confirm we do need to change watermark - this is technically already handled by the cf range scan @@ -1079,13 +1086,13 @@ impl Datastore { let mut tx = self.transaction(Write, Optimistic).await?; if let Err(e) = self.garbage_collect_stale_change_feeds_impl(ts, &mut tx).await { return match tx.cancel().await { - Ok(_) => { - Err(e) - } - Err(txe) => { - Err(Error::Tx(format!("Error garbage collecting stale change feeds: {:?} and error cancelling transaction: {:?}", e, txe))) - } - }; + Ok(_) => { + Err(e) + } + Err(txe) => { + Err(Error::Tx(format!("Error garbage collecting stale change feeds: {:?} and error cancelling transaction: {:?}", e, txe))) + } + }; } Ok(()) } @@ -1208,6 +1215,7 @@ impl Datastore { vso: self.versionstamp_oracle.clone(), clock: self.clock.clone(), prepared_live_queries: (Arc::new(send), Arc::new(recv)), + engine_options: self.engine_options, }) } diff --git a/core/src/kvs/tx.rs b/core/src/kvs/tx.rs index 822d8a18..b54ce0f1 100644 --- a/core/src/kvs/tx.rs +++ b/core/src/kvs/tx.rs @@ -36,6 +36,7 @@ use crate::kvs::cache::Entry; use crate::kvs::clock::SizedClock; use crate::kvs::lq_structs::{LqEntry, LqValue, TrackedResult}; use crate::kvs::Check; +use crate::options::EngineOptions; use crate::sql; use crate::sql::paths::EDGE; use crate::sql::paths::IN; @@ -51,8 +52,6 @@ use super::kv::Convert; use super::Key; use super::Val; -const LQ_CAPACITY: usize = 100; - #[derive(Copy, Clone, Debug)] pub enum Limit { Unlimited, @@ -93,6 +92,7 @@ pub struct Transaction { pub(super) vso: Arc>, pub(super) clock: Arc, pub(super) prepared_live_queries: (Arc>, Arc>), + pub(super) engine_options: EngineOptions, } #[allow(clippy::large_enum_variant)] @@ -112,6 +112,7 @@ pub(super) enum Inner { #[cfg(feature = "kv-surrealkv")] SurrealKV(super::surrealkv::Transaction), } + #[derive(Copy, Clone)] pub enum TransactionType { Read, @@ -330,7 +331,8 @@ impl Transaction { /// From the existing transaction, consume all the remaining live query registration events and return them synchronously pub(crate) fn consume_pending_live_queries(&self) -> Vec { - let mut lq: Vec = Vec::with_capacity(LQ_CAPACITY); + let mut lq: Vec = + Vec::with_capacity(self.engine_options.new_live_queries_per_transaction as usize); while let Ok(l) = self.prepared_live_queries.1.try_recv() { lq.push(TrackedResult::LiveQuery(l)); } diff --git a/core/src/lib.rs b/core/src/lib.rs index 7c4c36fe..95bd5fe2 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -37,6 +37,7 @@ pub mod kvs; #[cfg(any(feature = "ml", feature = "ml2", feature = "jwks"))] #[doc(hidden)] pub mod obs; +pub mod options; #[doc(hidden)] pub mod syn; diff --git a/core/src/options.rs b/core/src/options.rs new file mode 100644 index 00000000..ea21d051 --- /dev/null +++ b/core/src/options.rs @@ -0,0 +1,18 @@ +/// Configuration for the engine behaviour +/// The defaults are optimal so please only modify these if you know deliberately why you are modifying them. +#[derive(Clone, Copy, Debug)] +pub struct EngineOptions { + /// The maximum number of live queries that can be created in a single transaction + pub new_live_queries_per_transaction: u32, + /// The size of batches being requested per update in order to catch up a live query + pub live_query_catchup_size: u32, +} + +impl Default for EngineOptions { + fn default() -> Self { + Self { + new_live_queries_per_transaction: 100, + live_query_catchup_size: 1000, + } + } +} diff --git a/src/cli/config.rs b/src/cli/config.rs index ffbb19bd..5ed0c6a0 100644 --- a/src/cli/config.rs +++ b/src/cli/config.rs @@ -5,6 +5,7 @@ use std::{net::SocketAddr, path::PathBuf}; pub static CF: OnceLock = OnceLock::new(); use std::time::Duration; +use surrealdb::options::EngineOptions; #[derive(Clone, Debug)] pub struct Config { @@ -16,4 +17,5 @@ pub struct Config { pub crt: Option, pub key: Option, pub tick_interval: Duration, + pub engine: Option, } diff --git a/src/cli/start.rs b/src/cli/start.rs index dd48dc55..3cef8696 100644 --- a/src/cli/start.rs +++ b/src/cli/start.rs @@ -173,6 +173,7 @@ pub async fn init( tick_interval, crt: web.as_ref().and_then(|x| x.web_crt.clone()), key: web.as_ref().and_then(|x| x.web_key.clone()), + engine: None, }); // This is the cancellation token propagated down to // all the async functions that needs to be stopped gracefully. diff --git a/src/dbs/mod.rs b/src/dbs/mod.rs index 30321add..02cd6eaa 100644 --- a/src/dbs/mod.rs +++ b/src/dbs/mod.rs @@ -244,7 +244,7 @@ pub async fn init( debug!("Server capabilities: {caps}"); // Parse and setup the desired kv datastore - let dbs = Datastore::new(&opt.path) + let mut dbs = Datastore::new(&opt.path) .await? .with_notifications() .with_strict_mode(strict_mode) @@ -253,6 +253,11 @@ pub async fn init( .with_auth_enabled(auth_enabled) .with_auth_level_enabled(auth_level_enabled) .with_capabilities(caps); + if let Some(engine_options) = opt.engine { + dbs = dbs.with_engine_options(engine_options); + } + // Make immutable + let dbs = dbs; dbs.bootstrap().await?;