From 09553baae0133dc0e86b10a4c16caafbe3592b75 Mon Sep 17 00:00:00 2001 From: Przemyslaw Hugh Kaznowski Date: Tue, 2 Apr 2024 10:15:36 +0100 Subject: [PATCH] Refactor LQ v2 (#3779) --- core/src/cf/mutations.rs | 12 +- core/src/cf/writer.rs | 8 +- core/src/doc/document.rs | 26 +++- core/src/kvs/ds.rs | 303 ++++++++++++++++++++++++--------------- 4 files changed, 227 insertions(+), 122 deletions(-) diff --git a/core/src/cf/mutations.rs b/core/src/cf/mutations.rs index c6ce5432..ea5bbe9b 100644 --- a/core/src/cf/mutations.rs +++ b/core/src/cf/mutations.rs @@ -1,3 +1,4 @@ +use crate::fflags::FFLAGS; use crate::sql::array::Array; use crate::sql::object::Object; use crate::sql::statements::DefineTableStatement; @@ -21,8 +22,9 @@ pub enum TableMutation { Del(Thing), Def(DefineTableStatement), #[revision(start = 2)] - /// Includes the ID, current value, and changes that were applied to achieve this value - /// Example, ("mytb:tobie", {{"note": "surreal"}}, [{"op": "add", "path": "/note", "value": "surreal"}]) + /// Includes the ID, current value (after change), changes that were applied to achieve this + /// value, and if this is a new record (i.e. create = true vs update = false) + /// Example, ("mytb:tobie", {{"note": "surreal"}}, [{"op": "add", "path": "/note", "value": "surreal"}], false) /// Means that we have already applied the add "/note" operation to achieve the recorded result SetWithDiff(Thing, Value, Vec), } @@ -77,7 +79,11 @@ impl TableMutation { let mut h = BTreeMap::::new(); let h = match self { TableMutation::Set(_thing, v) => { - h.insert("update".to_string(), v); + if FFLAGS.change_feed_live_queries.enabled() { + h.insert("create".to_string(), v); + } else { + h.insert("update".to_string(), v); + } h } TableMutation::SetWithDiff(_thing, current, operations) => { diff --git a/core/src/cf/writer.rs b/core/src/cf/writer.rs index 0224f0b5..af44348a 100644 --- a/core/src/cf/writer.rs +++ b/core/src/cf/writer.rs @@ -79,7 +79,13 @@ impl Writer { match store_difference { true => { let patches = current.diff(&previous, Idiom(Vec::new())); - TableMutation::SetWithDiff(id, current.into_owned(), patches) + let new_record = !previous.is_some(); + trace!("The record is new_record={new_record} because previous is {previous:?}"); + if previous.is_none() { + TableMutation::Set(id, current.into_owned()) + } else { + TableMutation::SetWithDiff(id, current.into_owned(), patches) + } } false => TableMutation::Set(id, current.into_owned()), }, diff --git a/core/src/doc/document.rs b/core/src/doc/document.rs index 8ca4acc8..0ab7d797 100644 --- a/core/src/doc/document.rs +++ b/core/src/doc/document.rs @@ -37,12 +37,12 @@ impl<'a> CursorDoc<'a> { ir: Option, rid: Option<&'a Thing>, doc_id: Option, - doc: &'a Value, + doc: Cow<'a, Value>, ) -> Self { Self { ir, rid, - doc: Cow::Borrowed(doc), + doc, doc_id, } } @@ -89,12 +89,32 @@ impl<'a> Document<'a> { doc_id: Option, val: &'a Value, extras: Workable, + ) -> Self { + Document { + id, + extras, + current: CursorDoc::new(ir, id, doc_id, Cow::Borrowed(val)), + initial: CursorDoc::new(ir, id, doc_id, Cow::Borrowed(val)), + } + } + + /// Create a new document that is not going through the standard lifecycle of documents + /// + /// This allows for it to be crafted without needing statements to operate on it + #[doc(hidden)] + pub fn new_artificial( + ir: Option, + id: Option<&'a Thing>, + doc_id: Option, + val: Cow<'a, Value>, + initial: Cow<'a, Value>, + extras: Workable, ) -> Self { Document { id, extras, current: CursorDoc::new(ir, id, doc_id, val), - initial: CursorDoc::new(ir, id, doc_id, val), + initial: CursorDoc::new(ir, id, doc_id, initial), } } } diff --git a/core/src/kvs/ds.rs b/core/src/kvs/ds.rs index 3aee9d1e..8da8ec50 100644 --- a/core/src/kvs/ds.rs +++ b/core/src/kvs/ds.rs @@ -1,3 +1,4 @@ +use std::borrow::Cow; use std::collections::{BTreeMap, BTreeSet}; #[cfg(any( feature = "kv-surrealkv", @@ -102,7 +103,7 @@ pub struct Datastore { local_live_queries: Arc>>>, // Set of tracked change feeds with associated watermarks // This is updated with new/removed live queries and improves cf request performance - cf_watermarks: Arc>>, + cf_watermarks: Arc>>, // Clock for tracking time. It is read only and accessible to all transactions. It is behind a mutex as tests may write to it. clock: Arc, // The index store cache @@ -391,7 +392,7 @@ impl Datastore { clock, index_stores: IndexStores::default(), local_live_queries: Arc::new(RwLock::new(BTreeMap::new())), - cf_watermarks: Arc::new(RwLock::new(BTreeMap::new())), + cf_watermarks: Arc::new(Mutex::new(BTreeMap::new())), #[cfg(feature = "jwks")] jwks_cache: Arc::new(RwLock::new(JwksCache::new())), #[cfg(any( @@ -949,49 +950,21 @@ impl Datastore { // Change map includes a mapping of selector to changesets, ordered by versionstamp let mut change_map: BTreeMap> = BTreeMap::new(); - let mut tx = self.transaction(Read, Optimistic).await?; - let mut tracked_cfs = self.cf_watermarks.write().await; - let mut tracked_cfs_updates = Vec::with_capacity(tracked_cfs.len()); - for (selector, vs) in tracked_cfs.iter() { - // Read the change feed for the selector - let res = cf::read( - &mut tx, - &selector.ns, - &selector.db, - // Technically, we can not fetch by table and do the per-table filtering this side. - // That is an improvement though - Some(&selector.tb), - ShowSince::versionstamp(vs), - Some(self.engine_options.live_query_catchup_size), + { + let tx = self.transaction(Read, Optimistic).await?; + let tracked_cfs_updates = find_required_cfs_to_catch_up( + tx, + self.cf_watermarks.clone(), + self.engine_options.live_query_catchup_size, + &mut change_map, ) .await?; - // Confirm we do need to change watermark - this is technically already handled by the cf range scan - if res.is_empty() { - trace!( - "There were no changes in the change feed for {:?} from versionstamp {:?}", - selector, - conv::versionstamp_to_u64(vs) - ) + // Now we update since we are no longer iterating immutably + let mut tracked_cfs = self.cf_watermarks.lock().await; + for (selector, vs) in tracked_cfs_updates { + tracked_cfs.insert(selector, vs); } - if let Some(change_set) = res.last() { - if conv::versionstamp_to_u64(&change_set.0) > conv::versionstamp_to_u64(vs) { - trace!("Adding a change set for lq notification processing"); - // Update the cf watermark so we can progress scans - // If the notifications fail from here-on, they are lost - // this is a separate vec that we later insert to because we are iterating immutably - // We shouldn't use a read lock because of consistency between watermark scans - tracked_cfs_updates.push((selector.clone(), change_set.0)); - // This does not guarantee a notification, as a changeset an include many tables and many changes - change_map.insert(selector.clone(), res); - } - } - } - tx.cancel().await?; - - // Now we update since we are no longer iterating immutably - for (selector, vs) in tracked_cfs_updates { - tracked_cfs.insert(selector, vs); - } + }; for (selector, change_sets) in change_map { // find matching live queries @@ -1009,78 +982,96 @@ impl Datastore { // Find relevant changes let tx = Arc::new(Mutex::new(self.transaction(Read, Optimistic).await?)); + trace!("There are {} change sets", change_sets.len()); + trace!( + "\n{}", + change_sets + .iter() + .enumerate() + .map(|(i, x)| format!("[{i}] {:?}", x)) + .collect::>() + .join("\n") + ); for change_set in change_sets { - // TODO(phughk): this loop can be on the inside so we are only checking lqs relavant to cf change - for (lq_key, lq_value) in lq_pairs.iter() { + self.process_change_set_for_notifications(tx.clone(), opt, change_set, &lq_pairs) + .await?; + } + } + trace!("Finished process lq successfully"); + Ok(()) + } + + async fn process_change_set_for_notifications( + &self, + tx: Arc>, + opt: &Options, + change_set: ChangeSet, + lq_pairs: &[(LqIndexKey, LqIndexValue)], + ) -> Result<(), Error> { + // TODO(phughk): this loop can be on the inside so we are only checking lqs relavant to cf change + trace!("Moving to next change set, {:?}", change_set); + for (lq_key, lq_value) in lq_pairs.iter() { + trace!( + "Processing live query for notification key={:?} and value={:?}", + lq_key, + lq_value + ); + let change_vs = change_set.0; + let database_mutation = &change_set.1; + for table_mutations in database_mutation.0.iter() { + if table_mutations.0 == lq_key.selector.tb { + // Create a doc of the table value + // Run the 'lives' logic on the doc, while providing live queries instead of reading from storage + // This will generate and send notifications trace!( - "Processing live query for notification key={:?} and value={:?}", - lq_key, - lq_value + "There are {} table mutations being prepared for notifications", + table_mutations.1.len() ); - let change_vs = change_set.0; - let database_mutation = &change_set.1; - for table_mutations in database_mutation.0.iter() { - if table_mutations.0 == lq_key.selector.tb { - // Create a doc of the table value - // Run the 'lives' logic on the doc, while providing live queries instead of reading from storage - // This will generate and send notifications - for mutation in table_mutations.1.iter() { - if let Some(doc) = Self::construct_document(mutation) { - // We know we are only processing a single LQ at a time, so we can limit notifications to 1 - let notification_capacity = 1; - // We track notifications as a separate channel in case we want to process - // for the current state we only forward - let (sender, receiver) = - channel::bounded(notification_capacity); - doc.check_lqs_and_send_notifications( - opt, - &Statement::Live(&lq_value.stm), - &tx, - [&lq_value.stm].as_slice(), - &sender, - ) + for (i, mutation) in table_mutations.1.iter().enumerate() { + trace!( + "[{} @ {:?}] Processing table mutation: {:?}", + i, + change_vs, + mutation + ); + trace!("Constructing document from mutation"); + if let Some(doc) = Self::construct_document(mutation) { + // We know we are only processing a single LQ at a time, so we can limit notifications to 1 + let notification_capacity = 1; + // We track notifications as a separate channel in case we want to process + // for the current state we only forward + let (sender, receiver) = channel::bounded(notification_capacity); + doc.check_lqs_and_send_notifications( + opt, + &Statement::Live(&lq_value.stm), + &tx, + [&lq_value.stm].as_slice(), + &sender, + ) + .await + .map_err(|e| { + Error::Internal(format!( + "Error checking lqs for notifications: {:?}", + e + )) + })?; + + // Send the notifications to driver or api + // TODO: evaluate if we want channel directly instead of proxy + while let Ok(notification) = receiver.try_recv() { + trace!("Sending notification to client"); + self.notification_channel + .as_ref() + .unwrap() + .0 + .send(notification) .await - .map_err(|e| { - Error::Internal(format!( - "Error checking lqs for notifications: {:?}", - e - )) - })?; - - // Send the notifications to driver or api - // TODO: evaluate if we want channel directly instead of proxy - while let Ok(notification) = receiver.try_recv() { - trace!("Sending notification to client"); - self.notification_channel - .as_ref() - .unwrap() - .0 - .send(notification) - .await - .unwrap(); - } - trace!("Ended notification sending") - } - - // Update watermarks - trace!( - "Updating watermark to {:?} for index key {:?}", - change_vs, - lq_key - ); - - // For each live query we have processed we update the watermarks - self.local_live_queries.write().await.insert( - (*lq_key).clone(), - vec![LqIndexValue { - vs: change_vs, - ..lq_value.clone() - }], - ); - - // We also update the tracked_cfs with a minimum watermark + .unwrap(); } + trace!("Ended notification sending") } + + self.update_versionstamp(&change_vs, lq_key, lq_value).await; } } } @@ -1088,21 +1079,57 @@ impl Datastore { Ok(()) } + async fn update_versionstamp( + &self, + change_vs: &Versionstamp, + lq_key: &LqIndexKey, + lq_value: &LqIndexValue, + ) { + // We increase the watermark because scans are inclusive of first result + // And we have already processed the input watermark - it is derived from the event + // let change_vs = conv::try_u128_to_versionstamp(conv::to_u128_be(*change_vs) + 1).unwrap(); + + // Update watermarks + trace!("Updating watermark to {:?} for index key {:?}", change_vs, lq_key); + // For each live query we have processed we update the watermarks + self.local_live_queries.write().await.insert( + lq_key.clone(), + vec![LqIndexValue { + vs: *change_vs, + ..lq_value.clone() + }], + ); + + // TODO(phugk) We also update the tracked_cfs with a minimum watermark + let mut tracked_cfs = self.cf_watermarks.lock().await; + // TODO we may be able to re-use the key without cloning... + tracked_cfs.insert(lq_key.selector.clone(), *change_vs).unwrap(); + } + /// Construct a document from a Change Feed mutation /// This is required to perform document operations such as live query notifications fn construct_document(mutation: &TableMutation) -> Option { match mutation { - TableMutation::Set(a, b) => { - let doc = Document::new(None, Some(a), None, b, Workable::Normal); + TableMutation::Set(id, current_value) => { + let doc = Document::new(None, Some(id), None, current_value, Workable::Normal); Some(doc) } - TableMutation::Del(a) => { - let doc = Document::new(None, Some(a), None, &Value::None, Workable::Normal); + TableMutation::Del(id) => { + let doc = Document::new(None, Some(id), None, &Value::None, Workable::Normal); Some(doc) } TableMutation::Def(_) => None, - TableMutation::SetWithDiff(id, new, _operations) => { - let doc = Document::new(None, Some(id), None, new, Workable::Normal); + TableMutation::SetWithDiff(id, current_value, _operations) => { + let todo_original_after_reverse_applying_patches = Value::None; + let doc = Document::new_artificial( + None, + Some(id), + None, + Cow::Borrowed(current_value), + Cow::Owned(todo_original_after_reverse_applying_patches), + Workable::Normal, + ); + trace!("Constructed artificial document: {:?}, is_new={}", doc, doc.is_new()); // TODO(SUR-328): reverse diff and apply to doc to retrieve original version of doc Some(doc) } @@ -1117,7 +1144,7 @@ impl Datastore { ) -> Result<(), Error> { // Lock the local live queries let mut lq_map = self.local_live_queries.write().await; - let mut cf_watermarks = self.cf_watermarks.write().await; + let mut cf_watermarks = self.cf_watermarks.lock().await; let mut watermarks_to_check: Vec = vec![]; for lq in lqs { match lq { @@ -1705,3 +1732,49 @@ impl Datastore { Ok(()) } } + +async fn find_required_cfs_to_catch_up( + mut tx: Transaction, + tracked_cfs: Arc>>, + catchup_size: u32, + change_map: &mut BTreeMap>, +) -> Result, Error> { + let tracked_cfs = tracked_cfs.lock().await; + let mut tracked_cfs_updates = Vec::with_capacity(tracked_cfs.len()); + for (selector, vs) in tracked_cfs.iter() { + // Read the change feed for the selector + let res = cf::read( + &mut tx, + &selector.ns, + &selector.db, + // Technically, we can not fetch by table and do the per-table filtering this side. + // That is an improvement though + Some(&selector.tb), + ShowSince::versionstamp(vs), + Some(catchup_size), + ) + .await?; + // Confirm we do need to change watermark - this is technically already handled by the cf range scan + if res.is_empty() { + trace!( + "There were no changes in the change feed for {:?} from versionstamp {:?}", + selector, + conv::versionstamp_to_u64(vs) + ) + } + if let Some(change_set) = res.last() { + if conv::versionstamp_to_u64(&change_set.0) > conv::versionstamp_to_u64(vs) { + trace!("Adding a change set for lq notification processing"); + // Update the cf watermark so we can progress scans + // If the notifications fail from here-on, they are lost + // this is a separate vec that we later insert to because we are iterating immutably + // We shouldn't use a read lock because of consistency between watermark scans + tracked_cfs_updates.push((selector.clone(), change_set.0)); + // This does not guarantee a notification, as a changeset an include many tables and many changes + change_map.insert(selector.clone(), res); + } + } + } + tx.cancel().await?; + Ok(tracked_cfs_updates) +}