Remaining LQ v2 fixes ()

This commit is contained in:
Przemyslaw Hugh Kaznowski 2024-04-23 11:00:23 +01:00 committed by GitHub
parent a45fd5c197
commit 8f6af53de6
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
14 changed files with 561 additions and 140 deletions

View file

@ -1,11 +1,12 @@
use std::borrow::Cow;
use std::collections::HashMap;
use crate::cf::{TableMutation, TableMutations};
use crate::kvs::Key;
use crate::sql::statements::DefineTableStatement;
use crate::sql::thing::Thing;
use crate::sql::value::Value;
use crate::sql::Idiom;
use std::borrow::Cow;
use std::collections::HashMap;
// PreparedWrite is a tuple of (versionstamp key, key prefix, key suffix, serialized table mutations).
// The versionstamp key is the key that contains the current versionstamp and might be used by the
@ -64,7 +65,7 @@ impl Writer {
}
#[allow(clippy::too_many_arguments)]
pub(crate) fn update(
pub(crate) fn record_cf_change(
&mut self,
ns: &str,
db: &str,
@ -139,7 +140,7 @@ mod tests {
use crate::cf::{ChangeSet, DatabaseMutation, TableMutation, TableMutations};
use crate::fflags::FFLAGS;
use crate::key::key_req::KeyRequirements;
use crate::kvs::{Datastore, LockType::*, TransactionType::*};
use crate::kvs::{Datastore, LockType::*, Transaction, TransactionType::*};
use crate::sql::changefeed::ChangeFeed;
use crate::sql::id::Id;
use crate::sql::statements::show::ShowSince;
@ -148,7 +149,9 @@ mod tests {
};
use crate::sql::thing::Thing;
use crate::sql::value::Value;
use crate::sql::Datetime;
use crate::vs;
use crate::vs::{conv, Versionstamp};
const DONT_STORE_PREVIOUS: bool = false;
@ -158,43 +161,8 @@ mod tests {
#[tokio::test]
async fn test_changefeed_read_write() {
let ts = crate::sql::Datetime::default();
let dns = DefineNamespaceStatement {
name: crate::sql::Ident(NS.to_string()),
..Default::default()
};
let ddb = DefineDatabaseStatement {
name: crate::sql::Ident(DB.to_string()),
changefeed: Some(ChangeFeed {
expiry: Duration::from_secs(10),
store_original: false,
}),
..Default::default()
};
let dtb = DefineTableStatement {
name: TB.into(),
changefeed: Some(ChangeFeed {
expiry: Duration::from_secs(10),
store_original: false,
}),
..Default::default()
};
let ds = Datastore::new("memory").await.unwrap();
//
// Create the ns, db, and tb to let the GC and the timestamp-to-versionstamp conversion
// work.
//
let mut tx0 = ds.transaction(Write, Optimistic).await.unwrap();
let ns_root = crate::key::root::ns::new(NS);
tx0.put(ns_root.key_category(), &ns_root, dns).await.unwrap();
let db_root = crate::key::namespace::db::new(NS, DB);
tx0.put(db_root.key_category(), &db_root, ddb).await.unwrap();
let tb_root = crate::key::database::tb::new(NS, DB, TB);
tx0.put(tb_root.key_category(), &tb_root, dtb.clone()).await.unwrap();
tx0.commit().await.unwrap();
let ts = Datetime::default();
let ds = init().await;
// Let the db remember the timestamp for the current versionstamp
// so that we can replay change feeds from the timestamp later.
@ -412,4 +380,113 @@ mod tests {
tx7.commit().await.unwrap();
assert_eq!(r, want);
}
#[test_log::test(tokio::test)]
async fn test_scan_picks_up_from_offset() {
// Given we have 2 entries in change feeds
let ds = init().await;
ds.tick_at(5).await.unwrap();
let _id1 = record_change_feed_entry(
ds.transaction(Write, Optimistic).await.unwrap(),
"First".to_string(),
)
.await;
ds.tick_at(10).await.unwrap();
let mut tx = ds.transaction(Write, Optimistic).await.unwrap();
let vs1 = tx.get_versionstamp_from_timestamp(5, NS, DB, false).await.unwrap().unwrap();
let vs2 = tx.get_versionstamp_from_timestamp(10, NS, DB, false).await.unwrap().unwrap();
tx.cancel().await.unwrap();
let _id2 = record_change_feed_entry(
ds.transaction(Write, Optimistic).await.unwrap(),
"Second".to_string(),
)
.await;
// When we scan from the versionstamp between the changes
let r = change_feed(ds.transaction(Write, Optimistic).await.unwrap(), &vs2).await;
// Then there is only 1 change
assert_eq!(r.len(), 1);
assert!(r[0].0 >= vs2, "{:?}", r);
// And scanning with previous offset includes both values (without table definitions)
let r = change_feed(ds.transaction(Write, Optimistic).await.unwrap(), &vs1).await;
assert_eq!(r.len(), 2);
}
async fn change_feed(mut tx: Transaction, vs: &Versionstamp) -> Vec<ChangeSet> {
let r = crate::cf::read(
&mut tx,
NS,
DB,
Some(TB),
ShowSince::Versionstamp(conv::versionstamp_to_u64(vs)),
Some(10),
)
.await
.unwrap();
tx.cancel().await.unwrap();
r
}
async fn record_change_feed_entry(mut tx: Transaction, id: String) -> Thing {
let thing = Thing {
tb: TB.to_owned(),
id: Id::String(id),
};
let value_a: Value = "a".into();
let previous = Cow::from(Value::None);
tx.record_change(
NS,
DB,
TB,
&thing,
previous.clone(),
Cow::Borrowed(&value_a),
DONT_STORE_PREVIOUS,
);
tx.complete_changes(true).await.unwrap();
tx.commit().await.unwrap();
thing
}
async fn init() -> Datastore {
let dns = DefineNamespaceStatement {
name: crate::sql::Ident(NS.to_string()),
..Default::default()
};
let ddb = DefineDatabaseStatement {
name: crate::sql::Ident(DB.to_string()),
changefeed: Some(ChangeFeed {
expiry: Duration::from_secs(10),
store_original: false,
}),
..Default::default()
};
let dtb = DefineTableStatement {
name: TB.into(),
changefeed: Some(ChangeFeed {
expiry: Duration::from_secs(10),
store_original: false,
}),
..Default::default()
};
let ds = Datastore::new("memory").await.unwrap();
//
// Create the ns, db, and tb to let the GC and the timestamp-to-versionstamp conversion
// work.
//
let mut tx0 = ds.transaction(Write, Optimistic).await.unwrap();
let ns_root = crate::key::root::ns::new(NS);
tx0.put(ns_root.key_category(), &ns_root, dns).await.unwrap();
let db_root = crate::key::namespace::db::new(NS, DB);
tx0.put(db_root.key_category(), &db_root, ddb).await.unwrap();
let tb_root = crate::key::database::tb::new(NS, DB, TB);
tx0.put(tb_root.key_category(), &tb_root, dtb.clone()).await.unwrap();
tx0.commit().await.unwrap();
ds
}
}

View file

@ -26,6 +26,7 @@ pub(crate) struct Document<'a> {
}
#[non_exhaustive]
#[cfg_attr(debug_assertions, derive(Debug))]
pub struct CursorDoc<'a> {
pub(crate) ir: Option<IteratorRef>,
pub(crate) rid: Option<&'a Thing>,
@ -140,7 +141,6 @@ impl<'a> Document<'a> {
}
/// Check if document is being deleted
#[allow(dead_code)]
pub fn is_delete(&self) -> bool {
self.current.doc.is_none()
}

View file

@ -71,6 +71,7 @@ impl<'a> Document<'a> {
// Carry on
Ok(())
}
/// Check any PERRMISSIONS for a LIVE query
async fn lq_allow(
&self,
@ -117,11 +118,17 @@ impl<'a> Document<'a> {
"Called check_lqs_and_send_notifications with {} live statements",
live_statements.len()
);
// Technically this isnt the condition - the `lives` function is passing in the currently evaluated statement
// but the ds.rs invocation of this function is reconstructing this statement
let is_delete = match FFLAGS.change_feed_live_queries.enabled() {
true => self.is_delete(),
false => stm.is_delete(),
};
for lv in live_statements {
// Create a new statement
let lq = Statement::from(*lv);
// Get the event action
let met = if stm.is_delete() {
let met = if is_delete {
Value::from("DELETE")
} else if self.is_new() {
Value::from("CREATE")
@ -129,7 +136,7 @@ impl<'a> Document<'a> {
Value::from("UPDATE")
};
// Check if this is a delete statement
let doc = match stm.is_delete() {
let doc = match is_delete {
true => &self.initial,
false => &self.current,
};
@ -209,10 +216,9 @@ impl<'a> Document<'a> {
node_id,
lv.node.0
);
if stm.is_delete() {
if is_delete {
// Send a DELETE notification
if node_matches_live_query {
trace!("Sending lq delete notification");
sender
.send(Notification {
id: lv.id,
@ -223,6 +229,11 @@ impl<'a> Document<'a> {
// Output the full document before any changes were applied
let mut value =
doc.doc.compute(stk, &lqctx, lqopt, txn, Some(doc)).await?;
// TODO(SUR-349): We need an empty object instead of Value::None for serialisation
if value.is_none() {
value = Value::Object(Default::default());
}
// Remove metadata fields on output
value.del(stk, &lqctx, lqopt, txn, &*META).await?;
// Output result

View file

@ -12,6 +12,9 @@ use crate::sql::value::Value;
use reblessive::tree::Stk;
impl<'a> Document<'a> {
/// Evaluates a doc that has been modified so that it can be further computed into a result Value
/// This includes some permissions handling, output format handling (as specified in statement),
/// field handling (like params, links etc).
pub async fn pluck(
&self,
stk: &mut Stk,

View file

@ -1,4 +1,3 @@
use std::borrow::Cow;
use std::collections::{BTreeMap, BTreeSet};
#[cfg(any(
feature = "kv-surrealkv",
@ -36,15 +35,13 @@ use wasmtimer::std::{SystemTime, UNIX_EPOCH};
use super::tx::Transaction;
use crate::cf;
use crate::cf::TableMutation;
use crate::ctx::Context;
#[cfg(feature = "jwks")]
use crate::dbs::capabilities::NetTarget;
use crate::dbs::{
node::Timestamp, Attach, Capabilities, Executor, Notification, Options, Response, Session,
Variables, Workable,
Variables,
};
use crate::doc::Document;
use crate::err::Error;
#[cfg(feature = "jwks")]
use crate::iam::jwks::JwksCache;
@ -59,7 +56,7 @@ use crate::kvs::lq_structs::{LqValue, TrackedResult, UnreachableLqType};
use crate::kvs::lq_v2_fut::process_lq_notifications;
use crate::kvs::{LockType, LockType::*, TransactionType, TransactionType::*};
use crate::options::EngineOptions;
use crate::sql::{self, statements::DefineUserStatement, Base, Object, Query, Strand, Uuid, Value};
use crate::sql::{self, statements::DefineUserStatement, Base, Query, Uuid, Value};
use crate::syn;
use crate::vs::{conv, Oracle, Versionstamp};
@ -70,8 +67,6 @@ const LQ_CHANNEL_SIZE: usize = 100;
// The batch size used for non-paged operations (i.e. if there are more results, they are ignored)
const NON_PAGED_BATCH_SIZE: u32 = 100_000;
const EMPTY_DOC: Value = Value::None;
/// The underlying datastore instance which stores the dataset.
#[allow(dead_code)]
#[non_exhaustive]
@ -1469,61 +1464,6 @@ impl Datastore {
}
}
/// Construct a document from a Change Feed mutation
/// This is required to perform document operations such as live query notifications
pub(crate) fn construct_document(mutation: &TableMutation) -> Option<Document> {
match mutation {
TableMutation::Set(id, current_value) => {
let doc = Document::new_artificial(
None,
Some(id),
None,
Cow::Borrowed(current_value),
Cow::Owned(EMPTY_DOC),
Workable::Normal,
);
Some(doc)
}
TableMutation::Del(id) => {
let fake_previous_value_because_we_need_the_id_and_del_doesnt_store_value =
Value::Object(Object::from(map! {
"id" => Value::Thing(id.clone()),
}));
let doc = Document::new_artificial(
None,
Some(id),
None,
Cow::Owned(Value::None),
Cow::Owned(fake_previous_value_because_we_need_the_id_and_del_doesnt_store_value),
Workable::Normal,
);
Some(doc)
}
TableMutation::Def(_) => None,
TableMutation::SetWithDiff(id, current_value, _operations) => {
// We need a previous value otherwise the Value::compute function won't work correctly
// This is also how IDs are carried into notifications, not via doc.rid
let todo_original_after_reverse_applying_patches = Value::Object(Object::from(map! {
"id" => Value::Thing(id.clone()),
// This value is included so that we know for sure it is placeholder
"fake_value" => Value::Strand(
Strand::from( "placeholder until we can derive diffs from reversing patch operations" ))
}));
let doc = Document::new_artificial(
None,
Some(id),
None,
Cow::Borrowed(current_value),
Cow::Owned(todo_original_after_reverse_applying_patches),
Workable::Normal,
);
trace!("Constructed artificial document: {:?}, is_new={}", doc, doc.is_new());
// TODO(SUR-328): reverse diff and apply to doc to retrieve original version of doc
Some(doc)
}
}
}
#[cfg(test)]
mod test {
use super::*;

View file

@ -5,7 +5,6 @@ use crate::kvs::lq_structs::{KillEntry, LqEntry, LqIndexKey, LqIndexValue, LqSel
use crate::vs::{conv, Versionstamp};
/// We often want to increment by 1, but the 2 least significant bytes are unused
#[allow(dead_code)]
const ONE_SHIFTED: u128 = 1 << 16;
/// The datastore needs to track live queries that it owns as an engine. The db API and drivers
@ -14,7 +13,6 @@ const ONE_SHIFTED: u128 = 1 << 16;
/// This struct tracks live queries against change feeds so that the correct watermarks are used
/// across differently versioned live queries. It provides convenience, correctness and separation
/// of concerns.
#[allow(dead_code)]
pub(crate) struct LiveQueryTracker {
// Map of Live Query identifier (ns+db+tb) for change feed tracking
// the mapping is to a list of affected live queries
@ -27,7 +25,6 @@ pub(crate) struct LiveQueryTracker {
}
impl LiveQueryTracker {
#[allow(dead_code)]
pub(crate) const fn new() -> Self {
Self {
local_live_queries: BTreeMap::new(),
@ -36,7 +33,6 @@ impl LiveQueryTracker {
}
/// Add another Live Query to track, given the Versionstamp to stream from
#[allow(dead_code)]
pub(crate) fn register_live_query(
&mut self,
lq_index_key: &LqEntry,
@ -71,7 +67,6 @@ impl LiveQueryTracker {
Ok(())
}
#[allow(dead_code)]
pub(crate) fn unregister_live_query(&mut self, kill_entry: &KillEntry) {
// Because the information available from a kill statement is limited, we need to find a relevant kill query
let found: Option<(LqIndexKey, LqIndexValue)> = self
@ -105,7 +100,6 @@ impl LiveQueryTracker {
}
/// This will update the watermark of all live queries, regardless of their individual state
#[allow(dead_code)]
pub(crate) fn update_watermark_live_query(
&mut self,
live_query: &LqIndexKey,
@ -141,7 +135,6 @@ impl LiveQueryTracker {
Ok(())
}
#[allow(dead_code)]
pub(crate) fn get_watermarks(&self) -> &BTreeMap<LqSelector, Versionstamp> {
&self.cf_watermarks
}
@ -151,7 +144,6 @@ impl LiveQueryTracker {
/// to iterate over it normally
/// This will break if values are added or removed, so keep the write lock while iterating
/// This can be improved by having droppable trackers/iterators returned
#[allow(dead_code)]
pub(crate) fn get_watermark_by_enum_index(
&self,
index: usize,
@ -159,13 +151,11 @@ impl LiveQueryTracker {
self.cf_watermarks.iter().nth(index)
}
#[allow(dead_code)]
pub(crate) fn is_empty(&self) -> bool {
self.local_live_queries.is_empty()
}
/// Find the necessary Live Query information for a given selector
#[allow(dead_code)]
pub(crate) fn live_queries_for_selector(
&self,
selector: &LqSelector,

367
core/src/kvs/lq_v2_doc.rs Normal file
View file

@ -0,0 +1,367 @@
use std::borrow::Cow;
use crate::cf::TableMutation;
use crate::dbs::Workable;
use crate::doc::Document;
use crate::err::Error;
use crate::sql::{Array, Object, Value};
const EMPTY_DOC: Value = Value::None;
/// Construct a document from a Change Feed mutation
/// This is required to perform document operations such as live query notifications
pub(in crate::kvs) fn construct_document(
mutation: &TableMutation,
) -> Result<Option<Document>, Error> {
match mutation {
TableMutation::Set(id, current_value) => {
let doc = Document::new_artificial(
None,
Some(id),
None,
Cow::Borrowed(current_value),
Cow::Owned(EMPTY_DOC),
Workable::Normal,
);
Ok(Some(doc))
}
TableMutation::Del(id) => {
let fake_previous_value_because_we_need_the_id_and_del_doesnt_store_value =
Value::Object(Object::from(map! {
"id" => Value::Thing(id.clone()),
}));
let doc = Document::new_artificial(
None,
Some(id),
None,
Cow::Owned(Value::None),
Cow::Owned(fake_previous_value_because_we_need_the_id_and_del_doesnt_store_value),
Workable::Normal,
);
Ok(Some(doc))
}
TableMutation::Def(_) => Ok(None),
TableMutation::SetWithDiff(id, current_value, operations) => {
// We need a previous value otherwise the Value::compute function won't work correctly
// This is also how IDs are carried into notifications, not via doc.rid
let mut copy = current_value.clone();
copy.patch(Value::Array(Array(
operations.iter().map(|op| Value::Object(Object::from(op.clone()))).collect(),
)))?;
let doc = Document::new_artificial(
None,
Some(id),
None,
Cow::Borrowed(current_value),
Cow::Owned(copy),
Workable::Normal,
);
trace!("Constructed artificial document: {:?}, is_new={}", doc, doc.is_new());
// TODO(SUR-328): reverse diff and apply to doc to retrieve original version of doc
Ok(Some(doc))
}
}
}
#[cfg(test)]
mod test {
use crate::cf::TableMutation;
use crate::kvs::lq_v2_doc::construct_document;
use crate::sql::statements::DefineTableStatement;
use crate::sql::{Strand, Thing, Value};
#[test]
fn test_construct_document_create() {
let thing = Thing::from(("table", "id"));
let value = Value::Strand(Strand::from("value"));
let tb_mutation = TableMutation::Set(thing.clone(), value);
let doc = construct_document(&tb_mutation).unwrap();
let doc = doc.unwrap();
assert!(doc.is_new());
assert!(doc.initial_doc().is_none());
assert!(doc.current_doc().is_some());
}
#[test]
fn test_construct_document_empty_value_is_valid() {
let thing = Thing::from(("table", "id"));
let value = Value::None;
let tb_mutation = TableMutation::Set(thing.clone(), value);
let doc = construct_document(&tb_mutation).unwrap();
let doc = doc.unwrap();
assert!(!doc.is_new());
// This is actually invalid data - we are going to treat it as delete though
assert!(doc.is_delete());
assert!(doc.initial_doc().is_none());
assert!(doc.current_doc().is_none());
}
#[test]
fn test_construct_document_update() {
let thing = Thing::from(("table", "id"));
let value = Value::Strand(Strand::from("value"));
let operations = vec![];
let tb_mutation = TableMutation::SetWithDiff(thing.clone(), value, operations);
let doc = construct_document(&tb_mutation).unwrap();
let doc = doc.unwrap();
assert!(!doc.is_new());
assert!(doc.initial_doc().is_strand(), "{:?}", doc.initial_doc());
assert!(doc.current_doc().is_strand(), "{:?}", doc.current_doc());
}
#[test]
fn test_construct_document_delete() {
let thing = Thing::from(("table", "id"));
let tb_mutation = TableMutation::Del(thing.clone());
let doc = construct_document(&tb_mutation).unwrap();
let doc = doc.unwrap();
// The previous and current doc values are "None", so technically this is a new doc as per
// current == None
assert!(!doc.is_new(), "{:?}", doc);
assert!(doc.is_delete(), "{:?}", doc);
assert!(doc.current_doc().is_none());
assert!(doc.initial_doc().is_some());
match doc.initial_doc() {
Value::Object(o) => {
assert!(o.contains_key("id"));
assert_eq!(o.get("id").unwrap(), &Value::Thing(thing));
}
_ => panic!("Initial doc should be an object"),
}
}
#[test]
fn test_construct_document_none_for_schema() {
let tb_mutation = TableMutation::Def(DefineTableStatement::default());
let doc = construct_document(&tb_mutation).unwrap();
assert!(doc.is_none());
}
}
#[cfg(feature = "kv-mem")]
#[cfg(test)]
mod test_check_lqs_and_send_notifications {
use std::collections::BTreeMap;
use std::sync::Arc;
use channel::Sender;
use futures::executor::block_on;
use once_cell::sync::Lazy;
use reblessive::TreeStack;
use crate::cf::TableMutation;
use crate::ctx::Context;
use crate::dbs::fuzzy_eq::FuzzyEq;
use crate::dbs::{Action, Notification, Options, Session, Statement};
use crate::fflags::FFLAGS;
use crate::iam::{Auth, Role};
use crate::kvs::lq_v2_doc::construct_document;
use crate::kvs::{Datastore, LockType, TransactionType};
use crate::sql::paths::{OBJ_PATH_AUTH, OBJ_PATH_SCOPE, OBJ_PATH_TOKEN};
use crate::sql::statements::{CreateStatement, DeleteStatement, LiveStatement};
use crate::sql::{Fields, Object, Strand, Table, Thing, Uuid, Value, Values};
const SETUP: Lazy<Arc<TestSuite>> = Lazy::new(|| Arc::new(block_on(setup_test_suite_init())));
struct TestSuite {
ds: Datastore,
ns: String,
db: String,
tb: String,
rid: Value,
}
async fn setup_test_suite_init() -> TestSuite {
let ds = Datastore::new("memory").await.unwrap();
let ns = "the_namespace";
let db = "the_database";
let tb = "the_table";
// First we define levels of permissions and schemas and required CF
let vars = Some(BTreeMap::new());
ds.execute(
&format!(
"
USE NAMESPACE {ns};
USE DATABASE {db};
DEFINE TABLE {tb} CHANGEFEED 1m INCLUDE ORIGINAL PERMISSIONS FULL;
"
),
&Session::owner(),
vars,
)
.await
.unwrap()
.into_iter()
.map(|r| r.result.unwrap())
.for_each(drop);
TestSuite {
ds,
ns: ns.to_string(),
db: db.to_string(),
tb: tb.to_string(),
rid: Value::Thing(Thing::from(("user", "test"))),
}
}
#[test_log::test(tokio::test)]
async fn test_create() {
if !FFLAGS.change_feed_live_queries.enabled_test {
return;
}
// Setup channels used for listening to LQs
let (sender, receiver) = channel::unbounded();
let opt = a_usable_options(&sender);
let tx = SETUP
.ds
.transaction(TransactionType::Write, LockType::Optimistic)
.await
.unwrap()
.enclose();
// WHEN:
// Construct document we are validating
let record_id = Thing::from((SETUP.tb.as_str(), "id"));
let value = Value::Strand(Strand::from("value"));
let tb_mutation = TableMutation::Set(record_id.clone(), value);
let doc = construct_document(&tb_mutation).unwrap().unwrap();
// AND:
// Perform "live query" on the constructed doc that we are checking
let live_statement = a_live_query_statement();
let executed_statement = a_create_statement();
let mut stack = TreeStack::new();
stack.enter(|stk| async {
doc.check_lqs_and_send_notifications(
stk,
&opt,
&Statement::Create(&executed_statement),
&tx,
&[&live_statement],
&sender,
)
.await
.unwrap();
});
// THEN:
let notification = receiver.try_recv().expect("There should be a notification");
assert!(
notification.fuzzy_eq(&Notification::new(
Uuid::default(),
Action::Create,
Value::Strand(Strand::from("value"))
)),
"{:?}",
notification
);
assert!(receiver.try_recv().is_err());
tx.lock().await.cancel().await.unwrap();
}
#[test_log::test(tokio::test)]
async fn test_delete() {
if !FFLAGS.change_feed_live_queries.enabled_test {
return;
}
// Setup channels used for listening to LQs
let (sender, receiver) = channel::unbounded();
let opt = a_usable_options(&sender);
let tx = SETUP
.ds
.transaction(TransactionType::Write, LockType::Optimistic)
.await
.unwrap()
.enclose();
// WHEN:
// Construct document we are validating
let record_id = Thing::from((SETUP.tb.as_str(), "id"));
let value = Value::Strand(Strand::from("value"));
let tb_mutation = TableMutation::Set(record_id.clone(), value);
let doc = construct_document(&tb_mutation).unwrap().unwrap();
// AND:
// Perform "live query" on the constructed doc that we are checking
let live_statement = a_live_query_statement();
let executed_statement = a_delete_statement();
let mut stack = TreeStack::new();
stack.enter(|stk| async {
doc.check_lqs_and_send_notifications(
stk,
&opt,
&Statement::Delete(&executed_statement),
&tx,
&[&live_statement],
&sender,
)
.await
.unwrap();
});
// THEN:
let notification = receiver.try_recv().expect("There should be a notification");
// TODO(SUR-349): Delete value should be the object that was just deleted
let expected_value = Value::Object(Object::default());
assert!(
notification.fuzzy_eq(&Notification::new(
Uuid::default(),
Action::Delete,
expected_value
)),
"{:?}",
notification
);
assert!(receiver.try_recv().is_err());
tx.lock().await.cancel().await.unwrap();
}
// Live queries will have authentication info associated with them
// This is a way to fake that
fn a_live_query_statement() -> LiveStatement {
let mut stm = LiveStatement::new(Fields::all());
let mut session: BTreeMap<String, Value> = BTreeMap::new();
session.insert(OBJ_PATH_AUTH.to_string(), Value::Strand(Strand::from("auth")));
session.insert(OBJ_PATH_SCOPE.to_string(), Value::Strand(Strand::from("scope")));
session.insert(OBJ_PATH_TOKEN.to_string(), Value::Strand(Strand::from("token")));
let session = Value::Object(Object::from(session));
stm.session = Some(session);
stm.auth = Some(Auth::for_db(Role::Owner, "namespace", "database"));
stm
}
// Fake a create statement that does not involve parsing the query
fn a_create_statement() -> CreateStatement {
CreateStatement {
only: false,
what: Values(vec![Value::Table(Table::from(SETUP.tb.clone()))]),
data: None,
output: None,
timeout: None,
parallel: false,
}
}
fn a_delete_statement() -> DeleteStatement {
DeleteStatement {
only: false,
what: Values(vec![Value::Table(Table::from(SETUP.tb.clone()))]),
cond: None,
output: None,
timeout: None,
parallel: false,
}
}
fn a_usable_options(sender: &Sender<Notification>) -> Options {
let mut ctx = Context::default();
ctx.add_notifications(Some(sender));
let opt = Options::default()
.with_ns(Some(SETUP.ns.clone().into()))
.with_db(Some(SETUP.db.clone().into()));
opt
}
}

View file

@ -5,9 +5,10 @@ use crate::err::Error;
use crate::fflags::FFLAGS;
use crate::kvs::lq_cf::LiveQueryTracker;
use crate::kvs::lq_structs::{LqIndexKey, LqIndexValue, LqSelector};
use crate::kvs::lq_v2_doc::construct_document;
use crate::kvs::LockType::Optimistic;
use crate::kvs::TransactionType::Read;
use crate::kvs::{construct_document, Datastore, Transaction};
use crate::kvs::{Datastore, Transaction};
use crate::sql::statements::show::ShowSince;
use crate::vs::conv;
use futures::lock::Mutex;
@ -159,7 +160,7 @@ async fn process_change_set_for_notifications(
for (i, mutation) in table_mutations.1.iter().enumerate() {
#[cfg(debug_assertions)]
trace!("[{} @ {:?}] Processing table mutation: {:?} Constructing document from mutation", i, change_vs, mutation);
if let Some(doc) = construct_document(mutation) {
if let Some(doc) = construct_document(mutation)? {
// We know we are only processing a single LQ at a time, so we can limit notifications to 1
let notification_capacity = 1;
// We track notifications as a separate channel in case we want to process

View file

@ -28,6 +28,7 @@ mod tx;
pub(crate) mod lq_structs;
mod lq_cf;
mod lq_v2_doc;
mod lq_v2_fut;
#[cfg(test)]
mod tests;

View file

@ -2721,7 +2721,7 @@ impl Transaction {
current: Cow<'_, Value>,
store_difference: bool,
) {
self.cf.update(ns, db, tb, id.clone(), previous, current, store_difference)
self.cf.record_cf_change(ns, db, tb, id.clone(), previous, current, store_difference)
}
// Records the table (re)definition in the changefeed if enabled.

View file

@ -1,6 +1,9 @@
use crate::sql::part::Part;
use once_cell::sync::Lazy;
pub const OBJ_PATH_AUTH: &str = "sd";
pub const OBJ_PATH_SCOPE: &str = "sc";
pub const OBJ_PATH_TOKEN: &str = "tk";
pub static ID: Lazy<[Part; 1]> = Lazy::new(|| [Part::from("id")]);
pub static IP: Lazy<[Part; 1]> = Lazy::new(|| [Part::from("ip")]);
@ -9,13 +12,13 @@ pub static NS: Lazy<[Part; 1]> = Lazy::new(|| [Part::from("ns")]);
pub static DB: Lazy<[Part; 1]> = Lazy::new(|| [Part::from("db")]);
pub static SC: Lazy<[Part; 1]> = Lazy::new(|| [Part::from("sc")]);
pub static SC: Lazy<[Part; 1]> = Lazy::new(|| [Part::from(OBJ_PATH_SCOPE)]);
pub static SD: Lazy<[Part; 1]> = Lazy::new(|| [Part::from("sd")]);
pub static SD: Lazy<[Part; 1]> = Lazy::new(|| [Part::from(OBJ_PATH_AUTH)]);
pub static OR: Lazy<[Part; 1]> = Lazy::new(|| [Part::from("or")]);
pub static TK: Lazy<[Part; 1]> = Lazy::new(|| [Part::from("tk")]);
pub static TK: Lazy<[Part; 1]> = Lazy::new(|| [Part::from(OBJ_PATH_TOKEN)]);
pub static IN: Lazy<[Part; 1]> = Lazy::new(|| [Part::from("in")]);

View file

@ -14,12 +14,18 @@ use std::fmt;
#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
#[non_exhaustive]
pub struct CreateStatement {
// A keyword modifier indicating if we are expecting a single result or several
#[revision(start = 2)]
pub only: bool,
// Where we are creating (i.e. table, or record ID)
pub what: Values,
// The data associated with the record being created
pub data: Option<Data>,
// What the result of the statement should resemble (i.e. Diff or no result etc).
pub output: Option<Output>,
// The timeout for the statement
pub timeout: Option<Timeout>,
// If the statement should be run in parallel
pub parallel: bool,
}

View file

@ -28,18 +28,20 @@ pub struct LiveStatement {
// When a live query is marked for archiving, this will
// be set to the node ID that archived the query. This
// is an internal property, set by the database runtime.
// This is optional, and os only set when archived.
// This is optional, and is only set when archived.
//
// This is deprecated from 2.0
pub(crate) archived: Option<Uuid>,
// When a live query is created, we must also store the
// authenticated session of the user who made the query,
// so we can chack it later when sending notifications.
// so we can check it later when sending notifications.
// This is optional as it is only set by the database
// runtime when storing the live query to storage.
#[revision(start = 2)]
pub(crate) session: Option<Value>,
// When a live query is created, we must also store the
// authenticated session of the user who made the query,
// so we can chack it later when sending notifications.
// so we can check it later when sending notifications.
// This is optional as it is only set by the database
// runtime when storing the live query to storage.
pub(crate) auth: Option<Auth>,

View file

@ -1237,20 +1237,40 @@ async fn changefeed() {
};
assert!(versionstamp2 < versionstamp3);
let changes = a.get("changes").unwrap().to_owned();
assert_eq!(
changes,
surrealdb::sql::value(
"[
{
update: {
id: user:amos,
name: 'AMOS'
}
match FFLAGS.change_feed_live_queries.enabled() {
true => {
assert_eq!(
changes,
surrealdb::sql::value(
"[
{
create: {
id: user:amos,
name: 'AMOS'
}
}
]"
)
.unwrap()
);
}
]"
)
.unwrap()
);
false => {
assert_eq!(
changes,
surrealdb::sql::value(
"[
{
update: {
id: user:amos,
name: 'AMOS'
}
}
]"
)
.unwrap()
);
}
};
// UPDATE table
let a = array.get(4).unwrap();
let Value::Object(a) = a else {