Add Engine Config for configuring how many live queries can be created per transaction (#3559)

This commit is contained in:
Przemyslaw Hugh Kaznowski 2024-03-01 17:12:53 +00:00 committed by GitHub
parent 9fc5af2660
commit e06cd111cf
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
8 changed files with 67 additions and 30 deletions

View file

@ -168,10 +168,10 @@ pub struct Capabilities {
impl std::fmt::Display for Capabilities { impl std::fmt::Display for Capabilities {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!( write!(
f, f,
"scripting={}, guest_access={}, live_query_notifications={}, allow_funcs={}, deny_funcs={}, allow_net={}, deny_net={}", "scripting={}, guest_access={}, live_query_notifications={}, allow_funcs={}, deny_funcs={}, allow_net={}, deny_net={}",
self.scripting, self.guest_access, self.live_query_notifications, self.allow_funcs, self.deny_funcs, self.allow_net, self.deny_net self.scripting, self.guest_access, self.live_query_notifications, self.allow_funcs, self.deny_funcs, self.allow_net, self.deny_net
) )
} }
} }

View file

@ -36,6 +36,7 @@ use crate::kvs::lq_structs::{
LqEntry, LqIndexKey, LqIndexValue, LqSelector, LqValue, TrackedResult, UnreachableLqType, LqEntry, LqIndexKey, LqIndexValue, LqSelector, LqValue, TrackedResult, UnreachableLqType,
}; };
use crate::kvs::{LockType, LockType::*, TransactionType, TransactionType::*}; use crate::kvs::{LockType, LockType::*, TransactionType, TransactionType::*};
use crate::options::EngineOptions;
use crate::sql::statements::show::ShowSince; use crate::sql::statements::show::ShowSince;
use crate::sql::{self, statements::DefineUserStatement, Base, Query, Uuid, Value}; use crate::sql::{self, statements::DefineUserStatement, Base, Query, Uuid, Value};
use crate::syn; use crate::syn;
@ -49,8 +50,6 @@ const LQ_CHANNEL_SIZE: usize = 100;
// The batch size used for non-paged operations (i.e. if there are more results, they are ignored) // The batch size used for non-paged operations (i.e. if there are more results, they are ignored)
const NON_PAGED_BATCH_SIZE: u32 = 100_000; const NON_PAGED_BATCH_SIZE: u32 = 100_000;
// In the future we will have proper pagination
const TEMPORARY_LQ_CF_BATCH_SIZE_TILL_WE_HAVE_PAGINATION: u32 = 1000;
/// The underlying datastore instance which stores the dataset. /// The underlying datastore instance which stores the dataset.
#[allow(dead_code)] #[allow(dead_code)]
@ -72,6 +71,7 @@ pub struct Datastore {
transaction_timeout: Option<Duration>, transaction_timeout: Option<Duration>,
// Capabilities for this datastore // Capabilities for this datastore
capabilities: Capabilities, capabilities: Capabilities,
engine_options: EngineOptions,
// The versionstamp oracle for this datastore. // The versionstamp oracle for this datastore.
// Used only in some datastores, such as tikv. // Used only in some datastores, such as tikv.
versionstamp_oracle: Arc<Mutex<Oracle>>, versionstamp_oracle: Arc<Mutex<Oracle>>,
@ -353,6 +353,7 @@ impl Datastore {
transaction_timeout: None, transaction_timeout: None,
notification_channel: None, notification_channel: None,
capabilities: Capabilities::default(), capabilities: Capabilities::default(),
engine_options: EngineOptions::default(),
versionstamp_oracle: Arc::new(Mutex::new(Oracle::systime_counter())), versionstamp_oracle: Arc::new(Mutex::new(Oracle::systime_counter())),
clock, clock,
index_stores: IndexStores::default(), index_stores: IndexStores::default(),
@ -410,6 +411,12 @@ impl Datastore {
self self
} }
/// Set the engine options for the datastore
pub fn with_engine_options(mut self, engine_options: EngineOptions) -> Self {
self.engine_options = engine_options;
self
}
pub fn index_store(&self) -> &IndexStores { pub fn index_store(&self) -> &IndexStores {
&self.index_stores &self.index_stores
} }
@ -830,18 +837,18 @@ impl Datastore {
) -> Result<Option<Versionstamp>, Error> { ) -> Result<Option<Versionstamp>, Error> {
let mut tx = self.transaction(Write, Optimistic).await?; let mut tx = self.transaction(Write, Optimistic).await?;
match self.save_timestamp_for_versionstamp_impl(ts, &mut tx).await { match self.save_timestamp_for_versionstamp_impl(ts, &mut tx).await {
Ok(vs) => Ok(vs), Ok(vs) => Ok(vs),
Err(e) => { Err(e) => {
match tx.cancel().await { match tx.cancel().await {
Ok(_) => { Ok(_) => {
Err(e) Err(e)
} }
Err(txe) => { Err(txe) => {
Err(Error::Tx(format!("Error saving timestamp for versionstamp: {:?} and error cancelling transaction: {:?}", e, txe))) Err(Error::Tx(format!("Error saving timestamp for versionstamp: {:?} and error cancelling transaction: {:?}", e, txe)))
} }
} }
} }
} }
} }
/// Poll change feeds for live query notifications /// Poll change feeds for live query notifications
@ -875,7 +882,7 @@ impl Datastore {
// That is an improvement though // That is an improvement though
Some(&selector.tb), Some(&selector.tb),
ShowSince::versionstamp(vs), ShowSince::versionstamp(vs),
Some(TEMPORARY_LQ_CF_BATCH_SIZE_TILL_WE_HAVE_PAGINATION), Some(self.engine_options.live_query_catchup_size),
) )
.await?; .await?;
// Confirm we do need to change watermark - this is technically already handled by the cf range scan // Confirm we do need to change watermark - this is technically already handled by the cf range scan
@ -1079,13 +1086,13 @@ impl Datastore {
let mut tx = self.transaction(Write, Optimistic).await?; let mut tx = self.transaction(Write, Optimistic).await?;
if let Err(e) = self.garbage_collect_stale_change_feeds_impl(ts, &mut tx).await { if let Err(e) = self.garbage_collect_stale_change_feeds_impl(ts, &mut tx).await {
return match tx.cancel().await { return match tx.cancel().await {
Ok(_) => { Ok(_) => {
Err(e) Err(e)
} }
Err(txe) => { Err(txe) => {
Err(Error::Tx(format!("Error garbage collecting stale change feeds: {:?} and error cancelling transaction: {:?}", e, txe))) Err(Error::Tx(format!("Error garbage collecting stale change feeds: {:?} and error cancelling transaction: {:?}", e, txe)))
} }
}; };
} }
Ok(()) Ok(())
} }
@ -1208,6 +1215,7 @@ impl Datastore {
vso: self.versionstamp_oracle.clone(), vso: self.versionstamp_oracle.clone(),
clock: self.clock.clone(), clock: self.clock.clone(),
prepared_live_queries: (Arc::new(send), Arc::new(recv)), prepared_live_queries: (Arc::new(send), Arc::new(recv)),
engine_options: self.engine_options,
}) })
} }

View file

@ -36,6 +36,7 @@ use crate::kvs::cache::Entry;
use crate::kvs::clock::SizedClock; use crate::kvs::clock::SizedClock;
use crate::kvs::lq_structs::{LqEntry, LqValue, TrackedResult}; use crate::kvs::lq_structs::{LqEntry, LqValue, TrackedResult};
use crate::kvs::Check; use crate::kvs::Check;
use crate::options::EngineOptions;
use crate::sql; use crate::sql;
use crate::sql::paths::EDGE; use crate::sql::paths::EDGE;
use crate::sql::paths::IN; use crate::sql::paths::IN;
@ -51,8 +52,6 @@ use super::kv::Convert;
use super::Key; use super::Key;
use super::Val; use super::Val;
const LQ_CAPACITY: usize = 100;
#[derive(Copy, Clone, Debug)] #[derive(Copy, Clone, Debug)]
pub enum Limit { pub enum Limit {
Unlimited, Unlimited,
@ -93,6 +92,7 @@ pub struct Transaction {
pub(super) vso: Arc<Mutex<Oracle>>, pub(super) vso: Arc<Mutex<Oracle>>,
pub(super) clock: Arc<SizedClock>, pub(super) clock: Arc<SizedClock>,
pub(super) prepared_live_queries: (Arc<Sender<LqEntry>>, Arc<Receiver<LqEntry>>), pub(super) prepared_live_queries: (Arc<Sender<LqEntry>>, Arc<Receiver<LqEntry>>),
pub(super) engine_options: EngineOptions,
} }
#[allow(clippy::large_enum_variant)] #[allow(clippy::large_enum_variant)]
@ -112,6 +112,7 @@ pub(super) enum Inner {
#[cfg(feature = "kv-surrealkv")] #[cfg(feature = "kv-surrealkv")]
SurrealKV(super::surrealkv::Transaction), SurrealKV(super::surrealkv::Transaction),
} }
#[derive(Copy, Clone)] #[derive(Copy, Clone)]
pub enum TransactionType { pub enum TransactionType {
Read, Read,
@ -330,7 +331,8 @@ impl Transaction {
/// From the existing transaction, consume all the remaining live query registration events and return them synchronously /// From the existing transaction, consume all the remaining live query registration events and return them synchronously
pub(crate) fn consume_pending_live_queries(&self) -> Vec<TrackedResult> { pub(crate) fn consume_pending_live_queries(&self) -> Vec<TrackedResult> {
let mut lq: Vec<TrackedResult> = Vec::with_capacity(LQ_CAPACITY); let mut lq: Vec<TrackedResult> =
Vec::with_capacity(self.engine_options.new_live_queries_per_transaction as usize);
while let Ok(l) = self.prepared_live_queries.1.try_recv() { while let Ok(l) = self.prepared_live_queries.1.try_recv() {
lq.push(TrackedResult::LiveQuery(l)); lq.push(TrackedResult::LiveQuery(l));
} }

View file

@ -37,6 +37,7 @@ pub mod kvs;
#[cfg(any(feature = "ml", feature = "ml2", feature = "jwks"))] #[cfg(any(feature = "ml", feature = "ml2", feature = "jwks"))]
#[doc(hidden)] #[doc(hidden)]
pub mod obs; pub mod obs;
pub mod options;
#[doc(hidden)] #[doc(hidden)]
pub mod syn; pub mod syn;

18
core/src/options.rs Normal file
View file

@ -0,0 +1,18 @@
/// Configuration for the engine behaviour
/// The defaults are optimal so please only modify these if you know deliberately why you are modifying them.
#[derive(Clone, Copy, Debug)]
pub struct EngineOptions {
/// The maximum number of live queries that can be created in a single transaction
pub new_live_queries_per_transaction: u32,
/// The size of batches being requested per update in order to catch up a live query
pub live_query_catchup_size: u32,
}
impl Default for EngineOptions {
fn default() -> Self {
Self {
new_live_queries_per_transaction: 100,
live_query_catchup_size: 1000,
}
}
}

View file

@ -5,6 +5,7 @@ use std::{net::SocketAddr, path::PathBuf};
pub static CF: OnceLock<Config> = OnceLock::new(); pub static CF: OnceLock<Config> = OnceLock::new();
use std::time::Duration; use std::time::Duration;
use surrealdb::options::EngineOptions;
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct Config { pub struct Config {
@ -16,4 +17,5 @@ pub struct Config {
pub crt: Option<PathBuf>, pub crt: Option<PathBuf>,
pub key: Option<PathBuf>, pub key: Option<PathBuf>,
pub tick_interval: Duration, pub tick_interval: Duration,
pub engine: Option<EngineOptions>,
} }

View file

@ -173,6 +173,7 @@ pub async fn init(
tick_interval, tick_interval,
crt: web.as_ref().and_then(|x| x.web_crt.clone()), crt: web.as_ref().and_then(|x| x.web_crt.clone()),
key: web.as_ref().and_then(|x| x.web_key.clone()), key: web.as_ref().and_then(|x| x.web_key.clone()),
engine: None,
}); });
// This is the cancellation token propagated down to // This is the cancellation token propagated down to
// all the async functions that needs to be stopped gracefully. // all the async functions that needs to be stopped gracefully.

View file

@ -244,7 +244,7 @@ pub async fn init(
debug!("Server capabilities: {caps}"); debug!("Server capabilities: {caps}");
// Parse and setup the desired kv datastore // Parse and setup the desired kv datastore
let dbs = Datastore::new(&opt.path) let mut dbs = Datastore::new(&opt.path)
.await? .await?
.with_notifications() .with_notifications()
.with_strict_mode(strict_mode) .with_strict_mode(strict_mode)
@ -253,6 +253,11 @@ pub async fn init(
.with_auth_enabled(auth_enabled) .with_auth_enabled(auth_enabled)
.with_auth_level_enabled(auth_level_enabled) .with_auth_level_enabled(auth_level_enabled)
.with_capabilities(caps); .with_capabilities(caps);
if let Some(engine_options) = opt.engine {
dbs = dbs.with_engine_options(engine_options);
}
// Make immutable
let dbs = dbs;
dbs.bootstrap().await?; dbs.bootstrap().await?;