diff --git a/core/src/kvs/ds.rs b/core/src/kvs/ds.rs index e35eb198..a2060aa5 100644 --- a/core/src/kvs/ds.rs +++ b/core/src/kvs/ds.rs @@ -31,7 +31,9 @@ use crate::key::root::hb::Hb; use crate::kvs::clock::SizedClock; #[allow(unused_imports)] use crate::kvs::clock::SystemClock; -use crate::kvs::lq_structs::{LqIndexKey, LqIndexValue, LqSelector, LqValue, UnreachableLqType}; +use crate::kvs::lq_structs::{ + LqEntry, LqIndexKey, LqIndexValue, LqSelector, LqValue, UnreachableLqType, +}; use crate::kvs::{LockType, LockType::*, TransactionType, TransactionType::*}; use crate::sql::statements::show::ShowSince; use crate::sql::{self, statements::DefineUserStatement, Base, Query, Uuid, Value}; @@ -1063,6 +1065,8 @@ impl Datastore { _ => unreachable!(), }; + let (send, recv): (Sender, Receiver) = channel::bounded(LQ_CHANNEL_SIZE); + #[allow(unreachable_code)] Ok(Transaction { inner, @@ -1070,6 +1074,7 @@ impl Datastore { cf: cf::Writer::new(), vso: self.versionstamp_oracle.clone(), clock: self.clock.clone(), + prepared_live_queries: (Arc::new(send), Arc::new(recv)), }) } diff --git a/core/src/kvs/lq_structs.rs b/core/src/kvs/lq_structs.rs index 0f07ee96..4dae6276 100644 --- a/core/src/kvs/lq_structs.rs +++ b/core/src/kvs/lq_structs.rs @@ -81,6 +81,7 @@ pub(crate) struct LqIndexValue { /// Stores all data required for tracking a live query /// Can be used to derive various in-memory map indexes and values #[derive(Debug)] +#[cfg_attr(test, derive(PartialEq, Clone))] pub(crate) struct LqEntry { #[allow(dead_code)] pub(crate) live_id: Uuid, diff --git a/core/src/kvs/tx.rs b/core/src/kvs/tx.rs index 19ccccfa..b497aa2d 100644 --- a/core/src/kvs/tx.rs +++ b/core/src/kvs/tx.rs @@ -12,7 +12,7 @@ use crate::key::key_req::KeyRequirements; use crate::kvs::cache::Cache; use crate::kvs::cache::Entry; use crate::kvs::clock::SizedClock; -use crate::kvs::lq_structs::LqValue; +use crate::kvs::lq_structs::{LqEntry, LqValue}; use crate::kvs::Check; use crate::sql; use crate::sql::paths::EDGE; @@ -23,7 +23,7 @@ use crate::sql::Strand; use crate::sql::Value; use crate::vs::Oracle; use crate::vs::Versionstamp; -use channel::Sender; +use channel::{Receiver, Sender}; use futures::lock::Mutex; use sql::permission::Permissions; use sql::statements::DefineAnalyzerStatement; @@ -47,6 +47,8 @@ use std::ops::Range; use std::sync::Arc; use uuid::Uuid; +const LQ_CAPACITY: usize = 100; + #[derive(Copy, Clone, Debug)] pub enum Limit { Unlimited, @@ -86,6 +88,7 @@ pub struct Transaction { pub(super) cf: cf::Writer, pub(super) vso: Arc>, pub(super) clock: Arc, + pub(super) prepared_live_queries: (Arc>, Arc>), } #[allow(clippy::large_enum_variant)] @@ -321,6 +324,26 @@ impl Transaction { } } + #[allow(unused)] + pub(crate) fn consume_pending_live_queries(&self) -> Vec { + let mut lq: Vec = Vec::with_capacity(LQ_CAPACITY); + while let Ok(l) = self.prepared_live_queries.1.try_recv() { + lq.push(l); + } + lq + } + + /// Sends a live query to the transaction which is forwarded only once committed + /// And removed once a transaction is aborted + pub(crate) fn pre_commit_register_live_query( + &mut self, + lq_entry: LqEntry, + ) -> Result<(), Error> { + self.prepared_live_queries.0.try_send(lq_entry).map_err(|_send_err| { + Error::Internal("Prepared lq failed to add lq to channel".to_string()) + }) + } + /// Delete a key from the datastore. #[allow(unused_variables)] pub async fn del(&mut self, key: K) -> Result<(), Error> @@ -3106,3 +3129,47 @@ mod tests { } } } + +#[cfg(all(test, feature = "kv-mem"))] +mod tx_test { + use crate::kvs::lq_structs::LqEntry; + use crate::kvs::Datastore; + use crate::kvs::LockType::Optimistic; + use crate::kvs::TransactionType::Write; + use crate::sql; + use crate::sql::statements::LiveStatement; + use crate::sql::Value; + + #[tokio::test] + pub async fn lqs_can_be_submitted_and_read() { + let ds = Datastore::new("memory").await.unwrap(); + let mut tx = ds.transaction(Write, Optimistic).await.unwrap(); + + // Create live query data + let node_id = uuid::uuid!("d2715187-9d1a-49a5-9b0a-b496035b6c21"); + let lq_entry = LqEntry { + live_id: sql::Uuid::new_v4(), + ns: "namespace".to_string(), + db: "database".to_string(), + stm: LiveStatement { + id: sql::Uuid::new_v4(), + node: sql::uuid::Uuid(node_id), + expr: Default::default(), + what: Default::default(), + cond: None, + fetch: None, + archived: None, + session: Some(Value::None), + auth: None, + }, + }; + tx.pre_commit_register_live_query(lq_entry.clone()).unwrap(); + + tx.commit().await.unwrap(); + + // Verify data + let live_queries = tx.consume_pending_live_queries(); + assert_eq!(live_queries.len(), 1); + assert_eq!(live_queries[0], lq_entry); + } +} diff --git a/core/src/sql/v2/statements/live.rs b/core/src/sql/v2/statements/live.rs index 2850f57b..1d3ee60e 100644 --- a/core/src/sql/v2/statements/live.rs +++ b/core/src/sql/v2/statements/live.rs @@ -2,7 +2,9 @@ use crate::ctx::Context; use crate::dbs::{Options, Transaction}; use crate::doc::CursorDoc; use crate::err::Error; +use crate::fflags::FFLAGS; use crate::iam::Auth; +use crate::kvs::lq_structs::LqEntry; use crate::sql::{Cond, Fetchs, Fields, Uuid, Value}; use derive::Store; use revision::revisioned; @@ -95,26 +97,50 @@ impl LiveStatement { ..self.clone() }; let id = stm.id.0; - // Claim transaction - let mut run = txn.lock().await; - // Process the live query table - match stm.what.compute(ctx, opt, txn, doc).await? { - Value::Table(tb) => { - // Store the current Node ID - stm.node = nid.into(); - // Insert the node live query - run.putc_ndlq(nid, id, opt.ns(), opt.db(), tb.as_str(), None).await?; - // Insert the table live query - run.putc_tblq(opt.ns(), opt.db(), &tb, stm, None).await?; + match FFLAGS.change_feed_live_queries.enabled() { + true => { + let mut run = txn.lock().await; + match stm.what.compute(ctx, opt, txn, doc).await? { + Value::Table(_tb) => { + // Send the live query registration hook to the transaction pre-commit channel + run.pre_commit_register_live_query(LqEntry { + live_id: stm.id, + ns: opt.ns().to_string(), + db: opt.db().to_string(), + stm, + })?; + } + v => { + return Err(Error::LiveStatement { + value: v.to_string(), + }); + } + } + Ok(id.into()) } - v => { - return Err(Error::LiveStatement { - value: v.to_string(), - }) + false => { + // Claim transaction + let mut run = txn.lock().await; + // Process the live query table + match stm.what.compute(ctx, opt, txn, doc).await? { + Value::Table(tb) => { + // Store the current Node ID + stm.node = nid.into(); + // Insert the node live query + run.putc_ndlq(nid, id, opt.ns(), opt.db(), tb.as_str(), None).await?; + // Insert the table live query + run.putc_tblq(opt.ns(), opt.db(), &tb, stm, None).await?; + } + v => { + return Err(Error::LiveStatement { + value: v.to_string(), + }) + } + }; + // Return the query id + Ok(id.into()) } - }; - // Return the query id - Ok(id.into()) + } } pub(crate) fn archive(mut self, node_id: Uuid) -> LiveStatement { diff --git a/lib/tests/live.rs b/lib/tests/live.rs new file mode 100644 index 00000000..a39c434f --- /dev/null +++ b/lib/tests/live.rs @@ -0,0 +1,45 @@ +mod parse; +use parse::Parse; +mod helpers; +use helpers::new_ds; +use surrealdb::dbs::Session; +use surrealdb::err::Error; +use surrealdb::sql::Value; +use surrealdb_core::fflags::FFLAGS; +use surrealdb_core::kvs::LockType::Optimistic; +use surrealdb_core::kvs::TransactionType::Write; + +#[tokio::test] +async fn live_query_sends_registered_lq_details() -> Result<(), Error> { + if !FFLAGS.change_feed_live_queries.enabled() { + return Ok(()); + } + let sql = " + DEFINE TABLE lq_test_123 CHANGEFEED 10m; + LIVE SELECT * FROM lq_test_123; + "; + let dbs = new_ds().await?; + let ses = Session::owner().with_ns("test").with_db("test"); + let res = &mut dbs.execute(sql, &ses, None).await?; + assert_eq!(res.len(), 2); + // + let tmp = res.remove(0).result; + assert!(tmp.is_ok()); + // + let actual = res.remove(0).result?; + let expected = Value::parse("{}"); + assert_eq!(actual, expected); + // + let tmp = res.remove(0).result?; + let val = Value::parse("[12345]"); + assert_eq!(tmp, val); + // + let tmp = res.remove(0).result; + assert!(tmp.is_ok()); + // + let tmp = res.remove(0).result?; + let val = Value::parse("[56789]"); + assert_eq!(tmp, val); + // + Ok(()) +}