Adjust live statement to avoid storage on live query (#3523)

This commit is contained in:
Przemyslaw Hugh Kaznowski 2024-02-20 14:32:37 +00:00 committed by GitHub
parent 7b0771acb7
commit b52d630c4b
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 165 additions and 21 deletions

View file

@ -31,7 +31,9 @@ use crate::key::root::hb::Hb;
use crate::kvs::clock::SizedClock; use crate::kvs::clock::SizedClock;
#[allow(unused_imports)] #[allow(unused_imports)]
use crate::kvs::clock::SystemClock; use crate::kvs::clock::SystemClock;
use crate::kvs::lq_structs::{LqIndexKey, LqIndexValue, LqSelector, LqValue, UnreachableLqType}; use crate::kvs::lq_structs::{
LqEntry, LqIndexKey, LqIndexValue, LqSelector, LqValue, UnreachableLqType,
};
use crate::kvs::{LockType, LockType::*, TransactionType, TransactionType::*}; use crate::kvs::{LockType, LockType::*, TransactionType, TransactionType::*};
use crate::sql::statements::show::ShowSince; use crate::sql::statements::show::ShowSince;
use crate::sql::{self, statements::DefineUserStatement, Base, Query, Uuid, Value}; use crate::sql::{self, statements::DefineUserStatement, Base, Query, Uuid, Value};
@ -1063,6 +1065,8 @@ impl Datastore {
_ => unreachable!(), _ => unreachable!(),
}; };
let (send, recv): (Sender<LqEntry>, Receiver<LqEntry>) = channel::bounded(LQ_CHANNEL_SIZE);
#[allow(unreachable_code)] #[allow(unreachable_code)]
Ok(Transaction { Ok(Transaction {
inner, inner,
@ -1070,6 +1074,7 @@ impl Datastore {
cf: cf::Writer::new(), cf: cf::Writer::new(),
vso: self.versionstamp_oracle.clone(), vso: self.versionstamp_oracle.clone(),
clock: self.clock.clone(), clock: self.clock.clone(),
prepared_live_queries: (Arc::new(send), Arc::new(recv)),
}) })
} }

View file

@ -81,6 +81,7 @@ pub(crate) struct LqIndexValue {
/// Stores all data required for tracking a live query /// Stores all data required for tracking a live query
/// Can be used to derive various in-memory map indexes and values /// Can be used to derive various in-memory map indexes and values
#[derive(Debug)] #[derive(Debug)]
#[cfg_attr(test, derive(PartialEq, Clone))]
pub(crate) struct LqEntry { pub(crate) struct LqEntry {
#[allow(dead_code)] #[allow(dead_code)]
pub(crate) live_id: Uuid, pub(crate) live_id: Uuid,

View file

@ -12,7 +12,7 @@ use crate::key::key_req::KeyRequirements;
use crate::kvs::cache::Cache; use crate::kvs::cache::Cache;
use crate::kvs::cache::Entry; use crate::kvs::cache::Entry;
use crate::kvs::clock::SizedClock; use crate::kvs::clock::SizedClock;
use crate::kvs::lq_structs::LqValue; use crate::kvs::lq_structs::{LqEntry, LqValue};
use crate::kvs::Check; use crate::kvs::Check;
use crate::sql; use crate::sql;
use crate::sql::paths::EDGE; use crate::sql::paths::EDGE;
@ -23,7 +23,7 @@ use crate::sql::Strand;
use crate::sql::Value; use crate::sql::Value;
use crate::vs::Oracle; use crate::vs::Oracle;
use crate::vs::Versionstamp; use crate::vs::Versionstamp;
use channel::Sender; use channel::{Receiver, Sender};
use futures::lock::Mutex; use futures::lock::Mutex;
use sql::permission::Permissions; use sql::permission::Permissions;
use sql::statements::DefineAnalyzerStatement; use sql::statements::DefineAnalyzerStatement;
@ -47,6 +47,8 @@ use std::ops::Range;
use std::sync::Arc; use std::sync::Arc;
use uuid::Uuid; use uuid::Uuid;
const LQ_CAPACITY: usize = 100;
#[derive(Copy, Clone, Debug)] #[derive(Copy, Clone, Debug)]
pub enum Limit { pub enum Limit {
Unlimited, Unlimited,
@ -86,6 +88,7 @@ pub struct Transaction {
pub(super) cf: cf::Writer, pub(super) cf: cf::Writer,
pub(super) vso: Arc<Mutex<Oracle>>, pub(super) vso: Arc<Mutex<Oracle>>,
pub(super) clock: Arc<SizedClock>, pub(super) clock: Arc<SizedClock>,
pub(super) prepared_live_queries: (Arc<Sender<LqEntry>>, Arc<Receiver<LqEntry>>),
} }
#[allow(clippy::large_enum_variant)] #[allow(clippy::large_enum_variant)]
@ -321,6 +324,26 @@ impl Transaction {
} }
} }
#[allow(unused)]
pub(crate) fn consume_pending_live_queries(&self) -> Vec<LqEntry> {
let mut lq: Vec<LqEntry> = Vec::with_capacity(LQ_CAPACITY);
while let Ok(l) = self.prepared_live_queries.1.try_recv() {
lq.push(l);
}
lq
}
/// Sends a live query to the transaction which is forwarded only once committed
/// And removed once a transaction is aborted
pub(crate) fn pre_commit_register_live_query(
&mut self,
lq_entry: LqEntry,
) -> Result<(), Error> {
self.prepared_live_queries.0.try_send(lq_entry).map_err(|_send_err| {
Error::Internal("Prepared lq failed to add lq to channel".to_string())
})
}
/// Delete a key from the datastore. /// Delete a key from the datastore.
#[allow(unused_variables)] #[allow(unused_variables)]
pub async fn del<K>(&mut self, key: K) -> Result<(), Error> pub async fn del<K>(&mut self, key: K) -> Result<(), Error>
@ -3106,3 +3129,47 @@ mod tests {
} }
} }
} }
#[cfg(all(test, feature = "kv-mem"))]
mod tx_test {
use crate::kvs::lq_structs::LqEntry;
use crate::kvs::Datastore;
use crate::kvs::LockType::Optimistic;
use crate::kvs::TransactionType::Write;
use crate::sql;
use crate::sql::statements::LiveStatement;
use crate::sql::Value;
#[tokio::test]
pub async fn lqs_can_be_submitted_and_read() {
let ds = Datastore::new("memory").await.unwrap();
let mut tx = ds.transaction(Write, Optimistic).await.unwrap();
// Create live query data
let node_id = uuid::uuid!("d2715187-9d1a-49a5-9b0a-b496035b6c21");
let lq_entry = LqEntry {
live_id: sql::Uuid::new_v4(),
ns: "namespace".to_string(),
db: "database".to_string(),
stm: LiveStatement {
id: sql::Uuid::new_v4(),
node: sql::uuid::Uuid(node_id),
expr: Default::default(),
what: Default::default(),
cond: None,
fetch: None,
archived: None,
session: Some(Value::None),
auth: None,
},
};
tx.pre_commit_register_live_query(lq_entry.clone()).unwrap();
tx.commit().await.unwrap();
// Verify data
let live_queries = tx.consume_pending_live_queries();
assert_eq!(live_queries.len(), 1);
assert_eq!(live_queries[0], lq_entry);
}
}

View file

@ -2,7 +2,9 @@ use crate::ctx::Context;
use crate::dbs::{Options, Transaction}; use crate::dbs::{Options, Transaction};
use crate::doc::CursorDoc; use crate::doc::CursorDoc;
use crate::err::Error; use crate::err::Error;
use crate::fflags::FFLAGS;
use crate::iam::Auth; use crate::iam::Auth;
use crate::kvs::lq_structs::LqEntry;
use crate::sql::{Cond, Fetchs, Fields, Uuid, Value}; use crate::sql::{Cond, Fetchs, Fields, Uuid, Value};
use derive::Store; use derive::Store;
use revision::revisioned; use revision::revisioned;
@ -95,26 +97,50 @@ impl LiveStatement {
..self.clone() ..self.clone()
}; };
let id = stm.id.0; let id = stm.id.0;
// Claim transaction match FFLAGS.change_feed_live_queries.enabled() {
let mut run = txn.lock().await; true => {
// Process the live query table let mut run = txn.lock().await;
match stm.what.compute(ctx, opt, txn, doc).await? { match stm.what.compute(ctx, opt, txn, doc).await? {
Value::Table(tb) => { Value::Table(_tb) => {
// Store the current Node ID // Send the live query registration hook to the transaction pre-commit channel
stm.node = nid.into(); run.pre_commit_register_live_query(LqEntry {
// Insert the node live query live_id: stm.id,
run.putc_ndlq(nid, id, opt.ns(), opt.db(), tb.as_str(), None).await?; ns: opt.ns().to_string(),
// Insert the table live query db: opt.db().to_string(),
run.putc_tblq(opt.ns(), opt.db(), &tb, stm, None).await?; stm,
})?;
}
v => {
return Err(Error::LiveStatement {
value: v.to_string(),
});
}
}
Ok(id.into())
} }
v => { false => {
return Err(Error::LiveStatement { // Claim transaction
value: v.to_string(), let mut run = txn.lock().await;
}) // Process the live query table
match stm.what.compute(ctx, opt, txn, doc).await? {
Value::Table(tb) => {
// Store the current Node ID
stm.node = nid.into();
// Insert the node live query
run.putc_ndlq(nid, id, opt.ns(), opt.db(), tb.as_str(), None).await?;
// Insert the table live query
run.putc_tblq(opt.ns(), opt.db(), &tb, stm, None).await?;
}
v => {
return Err(Error::LiveStatement {
value: v.to_string(),
})
}
};
// Return the query id
Ok(id.into())
} }
}; }
// Return the query id
Ok(id.into())
} }
pub(crate) fn archive(mut self, node_id: Uuid) -> LiveStatement { pub(crate) fn archive(mut self, node_id: Uuid) -> LiveStatement {

45
lib/tests/live.rs Normal file
View file

@ -0,0 +1,45 @@
mod parse;
use parse::Parse;
mod helpers;
use helpers::new_ds;
use surrealdb::dbs::Session;
use surrealdb::err::Error;
use surrealdb::sql::Value;
use surrealdb_core::fflags::FFLAGS;
use surrealdb_core::kvs::LockType::Optimistic;
use surrealdb_core::kvs::TransactionType::Write;
#[tokio::test]
async fn live_query_sends_registered_lq_details() -> Result<(), Error> {
if !FFLAGS.change_feed_live_queries.enabled() {
return Ok(());
}
let sql = "
DEFINE TABLE lq_test_123 CHANGEFEED 10m;
LIVE SELECT * FROM lq_test_123;
";
let dbs = new_ds().await?;
let ses = Session::owner().with_ns("test").with_db("test");
let res = &mut dbs.execute(sql, &ses, None).await?;
assert_eq!(res.len(), 2);
//
let tmp = res.remove(0).result;
assert!(tmp.is_ok());
//
let actual = res.remove(0).result?;
let expected = Value::parse("{}");
assert_eq!(actual, expected);
//
let tmp = res.remove(0).result?;
let val = Value::parse("[12345]");
assert_eq!(tmp, val);
//
let tmp = res.remove(0).result;
assert!(tmp.is_ok());
//
let tmp = res.remove(0).result?;
let val = Value::parse("[56789]");
assert_eq!(tmp, val);
//
Ok(())
}