Refactor LQ v2 (#3779)
This commit is contained in:
parent
3dc00c8229
commit
09553baae0
4 changed files with 227 additions and 122 deletions
|
@ -1,3 +1,4 @@
|
||||||
|
use crate::fflags::FFLAGS;
|
||||||
use crate::sql::array::Array;
|
use crate::sql::array::Array;
|
||||||
use crate::sql::object::Object;
|
use crate::sql::object::Object;
|
||||||
use crate::sql::statements::DefineTableStatement;
|
use crate::sql::statements::DefineTableStatement;
|
||||||
|
@ -21,8 +22,9 @@ pub enum TableMutation {
|
||||||
Del(Thing),
|
Del(Thing),
|
||||||
Def(DefineTableStatement),
|
Def(DefineTableStatement),
|
||||||
#[revision(start = 2)]
|
#[revision(start = 2)]
|
||||||
/// Includes the ID, current value, and changes that were applied to achieve this value
|
/// Includes the ID, current value (after change), changes that were applied to achieve this
|
||||||
/// Example, ("mytb:tobie", {{"note": "surreal"}}, [{"op": "add", "path": "/note", "value": "surreal"}])
|
/// value, and if this is a new record (i.e. create = true vs update = false)
|
||||||
|
/// Example, ("mytb:tobie", {{"note": "surreal"}}, [{"op": "add", "path": "/note", "value": "surreal"}], false)
|
||||||
/// Means that we have already applied the add "/note" operation to achieve the recorded result
|
/// Means that we have already applied the add "/note" operation to achieve the recorded result
|
||||||
SetWithDiff(Thing, Value, Vec<Operation>),
|
SetWithDiff(Thing, Value, Vec<Operation>),
|
||||||
}
|
}
|
||||||
|
@ -77,7 +79,11 @@ impl TableMutation {
|
||||||
let mut h = BTreeMap::<String, Value>::new();
|
let mut h = BTreeMap::<String, Value>::new();
|
||||||
let h = match self {
|
let h = match self {
|
||||||
TableMutation::Set(_thing, v) => {
|
TableMutation::Set(_thing, v) => {
|
||||||
h.insert("update".to_string(), v);
|
if FFLAGS.change_feed_live_queries.enabled() {
|
||||||
|
h.insert("create".to_string(), v);
|
||||||
|
} else {
|
||||||
|
h.insert("update".to_string(), v);
|
||||||
|
}
|
||||||
h
|
h
|
||||||
}
|
}
|
||||||
TableMutation::SetWithDiff(_thing, current, operations) => {
|
TableMutation::SetWithDiff(_thing, current, operations) => {
|
||||||
|
|
|
@ -79,7 +79,13 @@ impl Writer {
|
||||||
match store_difference {
|
match store_difference {
|
||||||
true => {
|
true => {
|
||||||
let patches = current.diff(&previous, Idiom(Vec::new()));
|
let patches = current.diff(&previous, Idiom(Vec::new()));
|
||||||
TableMutation::SetWithDiff(id, current.into_owned(), patches)
|
let new_record = !previous.is_some();
|
||||||
|
trace!("The record is new_record={new_record} because previous is {previous:?}");
|
||||||
|
if previous.is_none() {
|
||||||
|
TableMutation::Set(id, current.into_owned())
|
||||||
|
} else {
|
||||||
|
TableMutation::SetWithDiff(id, current.into_owned(), patches)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
false => TableMutation::Set(id, current.into_owned()),
|
false => TableMutation::Set(id, current.into_owned()),
|
||||||
},
|
},
|
||||||
|
|
|
@ -37,12 +37,12 @@ impl<'a> CursorDoc<'a> {
|
||||||
ir: Option<IteratorRef>,
|
ir: Option<IteratorRef>,
|
||||||
rid: Option<&'a Thing>,
|
rid: Option<&'a Thing>,
|
||||||
doc_id: Option<DocId>,
|
doc_id: Option<DocId>,
|
||||||
doc: &'a Value,
|
doc: Cow<'a, Value>,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
Self {
|
Self {
|
||||||
ir,
|
ir,
|
||||||
rid,
|
rid,
|
||||||
doc: Cow::Borrowed(doc),
|
doc,
|
||||||
doc_id,
|
doc_id,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -89,12 +89,32 @@ impl<'a> Document<'a> {
|
||||||
doc_id: Option<DocId>,
|
doc_id: Option<DocId>,
|
||||||
val: &'a Value,
|
val: &'a Value,
|
||||||
extras: Workable,
|
extras: Workable,
|
||||||
|
) -> Self {
|
||||||
|
Document {
|
||||||
|
id,
|
||||||
|
extras,
|
||||||
|
current: CursorDoc::new(ir, id, doc_id, Cow::Borrowed(val)),
|
||||||
|
initial: CursorDoc::new(ir, id, doc_id, Cow::Borrowed(val)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Create a new document that is not going through the standard lifecycle of documents
|
||||||
|
///
|
||||||
|
/// This allows for it to be crafted without needing statements to operate on it
|
||||||
|
#[doc(hidden)]
|
||||||
|
pub fn new_artificial(
|
||||||
|
ir: Option<IteratorRef>,
|
||||||
|
id: Option<&'a Thing>,
|
||||||
|
doc_id: Option<DocId>,
|
||||||
|
val: Cow<'a, Value>,
|
||||||
|
initial: Cow<'a, Value>,
|
||||||
|
extras: Workable,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
Document {
|
Document {
|
||||||
id,
|
id,
|
||||||
extras,
|
extras,
|
||||||
current: CursorDoc::new(ir, id, doc_id, val),
|
current: CursorDoc::new(ir, id, doc_id, val),
|
||||||
initial: CursorDoc::new(ir, id, doc_id, val),
|
initial: CursorDoc::new(ir, id, doc_id, initial),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,3 +1,4 @@
|
||||||
|
use std::borrow::Cow;
|
||||||
use std::collections::{BTreeMap, BTreeSet};
|
use std::collections::{BTreeMap, BTreeSet};
|
||||||
#[cfg(any(
|
#[cfg(any(
|
||||||
feature = "kv-surrealkv",
|
feature = "kv-surrealkv",
|
||||||
|
@ -102,7 +103,7 @@ pub struct Datastore {
|
||||||
local_live_queries: Arc<RwLock<BTreeMap<LqIndexKey, Vec<LqIndexValue>>>>,
|
local_live_queries: Arc<RwLock<BTreeMap<LqIndexKey, Vec<LqIndexValue>>>>,
|
||||||
// Set of tracked change feeds with associated watermarks
|
// Set of tracked change feeds with associated watermarks
|
||||||
// This is updated with new/removed live queries and improves cf request performance
|
// This is updated with new/removed live queries and improves cf request performance
|
||||||
cf_watermarks: Arc<RwLock<BTreeMap<LqSelector, Versionstamp>>>,
|
cf_watermarks: Arc<Mutex<BTreeMap<LqSelector, Versionstamp>>>,
|
||||||
// Clock for tracking time. It is read only and accessible to all transactions. It is behind a mutex as tests may write to it.
|
// Clock for tracking time. It is read only and accessible to all transactions. It is behind a mutex as tests may write to it.
|
||||||
clock: Arc<SizedClock>,
|
clock: Arc<SizedClock>,
|
||||||
// The index store cache
|
// The index store cache
|
||||||
|
@ -391,7 +392,7 @@ impl Datastore {
|
||||||
clock,
|
clock,
|
||||||
index_stores: IndexStores::default(),
|
index_stores: IndexStores::default(),
|
||||||
local_live_queries: Arc::new(RwLock::new(BTreeMap::new())),
|
local_live_queries: Arc::new(RwLock::new(BTreeMap::new())),
|
||||||
cf_watermarks: Arc::new(RwLock::new(BTreeMap::new())),
|
cf_watermarks: Arc::new(Mutex::new(BTreeMap::new())),
|
||||||
#[cfg(feature = "jwks")]
|
#[cfg(feature = "jwks")]
|
||||||
jwks_cache: Arc::new(RwLock::new(JwksCache::new())),
|
jwks_cache: Arc::new(RwLock::new(JwksCache::new())),
|
||||||
#[cfg(any(
|
#[cfg(any(
|
||||||
|
@ -949,49 +950,21 @@ impl Datastore {
|
||||||
|
|
||||||
// Change map includes a mapping of selector to changesets, ordered by versionstamp
|
// Change map includes a mapping of selector to changesets, ordered by versionstamp
|
||||||
let mut change_map: BTreeMap<LqSelector, Vec<ChangeSet>> = BTreeMap::new();
|
let mut change_map: BTreeMap<LqSelector, Vec<ChangeSet>> = BTreeMap::new();
|
||||||
let mut tx = self.transaction(Read, Optimistic).await?;
|
{
|
||||||
let mut tracked_cfs = self.cf_watermarks.write().await;
|
let tx = self.transaction(Read, Optimistic).await?;
|
||||||
let mut tracked_cfs_updates = Vec::with_capacity(tracked_cfs.len());
|
let tracked_cfs_updates = find_required_cfs_to_catch_up(
|
||||||
for (selector, vs) in tracked_cfs.iter() {
|
tx,
|
||||||
// Read the change feed for the selector
|
self.cf_watermarks.clone(),
|
||||||
let res = cf::read(
|
self.engine_options.live_query_catchup_size,
|
||||||
&mut tx,
|
&mut change_map,
|
||||||
&selector.ns,
|
|
||||||
&selector.db,
|
|
||||||
// Technically, we can not fetch by table and do the per-table filtering this side.
|
|
||||||
// That is an improvement though
|
|
||||||
Some(&selector.tb),
|
|
||||||
ShowSince::versionstamp(vs),
|
|
||||||
Some(self.engine_options.live_query_catchup_size),
|
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
// Confirm we do need to change watermark - this is technically already handled by the cf range scan
|
// Now we update since we are no longer iterating immutably
|
||||||
if res.is_empty() {
|
let mut tracked_cfs = self.cf_watermarks.lock().await;
|
||||||
trace!(
|
for (selector, vs) in tracked_cfs_updates {
|
||||||
"There were no changes in the change feed for {:?} from versionstamp {:?}",
|
tracked_cfs.insert(selector, vs);
|
||||||
selector,
|
|
||||||
conv::versionstamp_to_u64(vs)
|
|
||||||
)
|
|
||||||
}
|
}
|
||||||
if let Some(change_set) = res.last() {
|
};
|
||||||
if conv::versionstamp_to_u64(&change_set.0) > conv::versionstamp_to_u64(vs) {
|
|
||||||
trace!("Adding a change set for lq notification processing");
|
|
||||||
// Update the cf watermark so we can progress scans
|
|
||||||
// If the notifications fail from here-on, they are lost
|
|
||||||
// this is a separate vec that we later insert to because we are iterating immutably
|
|
||||||
// We shouldn't use a read lock because of consistency between watermark scans
|
|
||||||
tracked_cfs_updates.push((selector.clone(), change_set.0));
|
|
||||||
// This does not guarantee a notification, as a changeset an include many tables and many changes
|
|
||||||
change_map.insert(selector.clone(), res);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
tx.cancel().await?;
|
|
||||||
|
|
||||||
// Now we update since we are no longer iterating immutably
|
|
||||||
for (selector, vs) in tracked_cfs_updates {
|
|
||||||
tracked_cfs.insert(selector, vs);
|
|
||||||
}
|
|
||||||
|
|
||||||
for (selector, change_sets) in change_map {
|
for (selector, change_sets) in change_map {
|
||||||
// find matching live queries
|
// find matching live queries
|
||||||
|
@ -1009,78 +982,96 @@ impl Datastore {
|
||||||
|
|
||||||
// Find relevant changes
|
// Find relevant changes
|
||||||
let tx = Arc::new(Mutex::new(self.transaction(Read, Optimistic).await?));
|
let tx = Arc::new(Mutex::new(self.transaction(Read, Optimistic).await?));
|
||||||
|
trace!("There are {} change sets", change_sets.len());
|
||||||
|
trace!(
|
||||||
|
"\n{}",
|
||||||
|
change_sets
|
||||||
|
.iter()
|
||||||
|
.enumerate()
|
||||||
|
.map(|(i, x)| format!("[{i}] {:?}", x))
|
||||||
|
.collect::<Vec<String>>()
|
||||||
|
.join("\n")
|
||||||
|
);
|
||||||
for change_set in change_sets {
|
for change_set in change_sets {
|
||||||
// TODO(phughk): this loop can be on the inside so we are only checking lqs relavant to cf change
|
self.process_change_set_for_notifications(tx.clone(), opt, change_set, &lq_pairs)
|
||||||
for (lq_key, lq_value) in lq_pairs.iter() {
|
.await?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
trace!("Finished process lq successfully");
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn process_change_set_for_notifications(
|
||||||
|
&self,
|
||||||
|
tx: Arc<Mutex<Transaction>>,
|
||||||
|
opt: &Options,
|
||||||
|
change_set: ChangeSet,
|
||||||
|
lq_pairs: &[(LqIndexKey, LqIndexValue)],
|
||||||
|
) -> Result<(), Error> {
|
||||||
|
// TODO(phughk): this loop can be on the inside so we are only checking lqs relavant to cf change
|
||||||
|
trace!("Moving to next change set, {:?}", change_set);
|
||||||
|
for (lq_key, lq_value) in lq_pairs.iter() {
|
||||||
|
trace!(
|
||||||
|
"Processing live query for notification key={:?} and value={:?}",
|
||||||
|
lq_key,
|
||||||
|
lq_value
|
||||||
|
);
|
||||||
|
let change_vs = change_set.0;
|
||||||
|
let database_mutation = &change_set.1;
|
||||||
|
for table_mutations in database_mutation.0.iter() {
|
||||||
|
if table_mutations.0 == lq_key.selector.tb {
|
||||||
|
// Create a doc of the table value
|
||||||
|
// Run the 'lives' logic on the doc, while providing live queries instead of reading from storage
|
||||||
|
// This will generate and send notifications
|
||||||
trace!(
|
trace!(
|
||||||
"Processing live query for notification key={:?} and value={:?}",
|
"There are {} table mutations being prepared for notifications",
|
||||||
lq_key,
|
table_mutations.1.len()
|
||||||
lq_value
|
|
||||||
);
|
);
|
||||||
let change_vs = change_set.0;
|
for (i, mutation) in table_mutations.1.iter().enumerate() {
|
||||||
let database_mutation = &change_set.1;
|
trace!(
|
||||||
for table_mutations in database_mutation.0.iter() {
|
"[{} @ {:?}] Processing table mutation: {:?}",
|
||||||
if table_mutations.0 == lq_key.selector.tb {
|
i,
|
||||||
// Create a doc of the table value
|
change_vs,
|
||||||
// Run the 'lives' logic on the doc, while providing live queries instead of reading from storage
|
mutation
|
||||||
// This will generate and send notifications
|
);
|
||||||
for mutation in table_mutations.1.iter() {
|
trace!("Constructing document from mutation");
|
||||||
if let Some(doc) = Self::construct_document(mutation) {
|
if let Some(doc) = Self::construct_document(mutation) {
|
||||||
// We know we are only processing a single LQ at a time, so we can limit notifications to 1
|
// We know we are only processing a single LQ at a time, so we can limit notifications to 1
|
||||||
let notification_capacity = 1;
|
let notification_capacity = 1;
|
||||||
// We track notifications as a separate channel in case we want to process
|
// We track notifications as a separate channel in case we want to process
|
||||||
// for the current state we only forward
|
// for the current state we only forward
|
||||||
let (sender, receiver) =
|
let (sender, receiver) = channel::bounded(notification_capacity);
|
||||||
channel::bounded(notification_capacity);
|
doc.check_lqs_and_send_notifications(
|
||||||
doc.check_lqs_and_send_notifications(
|
opt,
|
||||||
opt,
|
&Statement::Live(&lq_value.stm),
|
||||||
&Statement::Live(&lq_value.stm),
|
&tx,
|
||||||
&tx,
|
[&lq_value.stm].as_slice(),
|
||||||
[&lq_value.stm].as_slice(),
|
&sender,
|
||||||
&sender,
|
)
|
||||||
)
|
.await
|
||||||
|
.map_err(|e| {
|
||||||
|
Error::Internal(format!(
|
||||||
|
"Error checking lqs for notifications: {:?}",
|
||||||
|
e
|
||||||
|
))
|
||||||
|
})?;
|
||||||
|
|
||||||
|
// Send the notifications to driver or api
|
||||||
|
// TODO: evaluate if we want channel directly instead of proxy
|
||||||
|
while let Ok(notification) = receiver.try_recv() {
|
||||||
|
trace!("Sending notification to client");
|
||||||
|
self.notification_channel
|
||||||
|
.as_ref()
|
||||||
|
.unwrap()
|
||||||
|
.0
|
||||||
|
.send(notification)
|
||||||
.await
|
.await
|
||||||
.map_err(|e| {
|
.unwrap();
|
||||||
Error::Internal(format!(
|
|
||||||
"Error checking lqs for notifications: {:?}",
|
|
||||||
e
|
|
||||||
))
|
|
||||||
})?;
|
|
||||||
|
|
||||||
// Send the notifications to driver or api
|
|
||||||
// TODO: evaluate if we want channel directly instead of proxy
|
|
||||||
while let Ok(notification) = receiver.try_recv() {
|
|
||||||
trace!("Sending notification to client");
|
|
||||||
self.notification_channel
|
|
||||||
.as_ref()
|
|
||||||
.unwrap()
|
|
||||||
.0
|
|
||||||
.send(notification)
|
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
}
|
|
||||||
trace!("Ended notification sending")
|
|
||||||
}
|
|
||||||
|
|
||||||
// Update watermarks
|
|
||||||
trace!(
|
|
||||||
"Updating watermark to {:?} for index key {:?}",
|
|
||||||
change_vs,
|
|
||||||
lq_key
|
|
||||||
);
|
|
||||||
|
|
||||||
// For each live query we have processed we update the watermarks
|
|
||||||
self.local_live_queries.write().await.insert(
|
|
||||||
(*lq_key).clone(),
|
|
||||||
vec![LqIndexValue {
|
|
||||||
vs: change_vs,
|
|
||||||
..lq_value.clone()
|
|
||||||
}],
|
|
||||||
);
|
|
||||||
|
|
||||||
// We also update the tracked_cfs with a minimum watermark
|
|
||||||
}
|
}
|
||||||
|
trace!("Ended notification sending")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
self.update_versionstamp(&change_vs, lq_key, lq_value).await;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1088,21 +1079,57 @@ impl Datastore {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn update_versionstamp(
|
||||||
|
&self,
|
||||||
|
change_vs: &Versionstamp,
|
||||||
|
lq_key: &LqIndexKey,
|
||||||
|
lq_value: &LqIndexValue,
|
||||||
|
) {
|
||||||
|
// We increase the watermark because scans are inclusive of first result
|
||||||
|
// And we have already processed the input watermark - it is derived from the event
|
||||||
|
// let change_vs = conv::try_u128_to_versionstamp(conv::to_u128_be(*change_vs) + 1).unwrap();
|
||||||
|
|
||||||
|
// Update watermarks
|
||||||
|
trace!("Updating watermark to {:?} for index key {:?}", change_vs, lq_key);
|
||||||
|
// For each live query we have processed we update the watermarks
|
||||||
|
self.local_live_queries.write().await.insert(
|
||||||
|
lq_key.clone(),
|
||||||
|
vec![LqIndexValue {
|
||||||
|
vs: *change_vs,
|
||||||
|
..lq_value.clone()
|
||||||
|
}],
|
||||||
|
);
|
||||||
|
|
||||||
|
// TODO(phugk) We also update the tracked_cfs with a minimum watermark
|
||||||
|
let mut tracked_cfs = self.cf_watermarks.lock().await;
|
||||||
|
// TODO we may be able to re-use the key without cloning...
|
||||||
|
tracked_cfs.insert(lq_key.selector.clone(), *change_vs).unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
/// Construct a document from a Change Feed mutation
|
/// Construct a document from a Change Feed mutation
|
||||||
/// This is required to perform document operations such as live query notifications
|
/// This is required to perform document operations such as live query notifications
|
||||||
fn construct_document(mutation: &TableMutation) -> Option<Document> {
|
fn construct_document(mutation: &TableMutation) -> Option<Document> {
|
||||||
match mutation {
|
match mutation {
|
||||||
TableMutation::Set(a, b) => {
|
TableMutation::Set(id, current_value) => {
|
||||||
let doc = Document::new(None, Some(a), None, b, Workable::Normal);
|
let doc = Document::new(None, Some(id), None, current_value, Workable::Normal);
|
||||||
Some(doc)
|
Some(doc)
|
||||||
}
|
}
|
||||||
TableMutation::Del(a) => {
|
TableMutation::Del(id) => {
|
||||||
let doc = Document::new(None, Some(a), None, &Value::None, Workable::Normal);
|
let doc = Document::new(None, Some(id), None, &Value::None, Workable::Normal);
|
||||||
Some(doc)
|
Some(doc)
|
||||||
}
|
}
|
||||||
TableMutation::Def(_) => None,
|
TableMutation::Def(_) => None,
|
||||||
TableMutation::SetWithDiff(id, new, _operations) => {
|
TableMutation::SetWithDiff(id, current_value, _operations) => {
|
||||||
let doc = Document::new(None, Some(id), None, new, Workable::Normal);
|
let todo_original_after_reverse_applying_patches = Value::None;
|
||||||
|
let doc = Document::new_artificial(
|
||||||
|
None,
|
||||||
|
Some(id),
|
||||||
|
None,
|
||||||
|
Cow::Borrowed(current_value),
|
||||||
|
Cow::Owned(todo_original_after_reverse_applying_patches),
|
||||||
|
Workable::Normal,
|
||||||
|
);
|
||||||
|
trace!("Constructed artificial document: {:?}, is_new={}", doc, doc.is_new());
|
||||||
// TODO(SUR-328): reverse diff and apply to doc to retrieve original version of doc
|
// TODO(SUR-328): reverse diff and apply to doc to retrieve original version of doc
|
||||||
Some(doc)
|
Some(doc)
|
||||||
}
|
}
|
||||||
|
@ -1117,7 +1144,7 @@ impl Datastore {
|
||||||
) -> Result<(), Error> {
|
) -> Result<(), Error> {
|
||||||
// Lock the local live queries
|
// Lock the local live queries
|
||||||
let mut lq_map = self.local_live_queries.write().await;
|
let mut lq_map = self.local_live_queries.write().await;
|
||||||
let mut cf_watermarks = self.cf_watermarks.write().await;
|
let mut cf_watermarks = self.cf_watermarks.lock().await;
|
||||||
let mut watermarks_to_check: Vec<LqIndexKey> = vec![];
|
let mut watermarks_to_check: Vec<LqIndexKey> = vec![];
|
||||||
for lq in lqs {
|
for lq in lqs {
|
||||||
match lq {
|
match lq {
|
||||||
|
@ -1705,3 +1732,49 @@ impl Datastore {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn find_required_cfs_to_catch_up(
|
||||||
|
mut tx: Transaction,
|
||||||
|
tracked_cfs: Arc<Mutex<BTreeMap<LqSelector, Versionstamp>>>,
|
||||||
|
catchup_size: u32,
|
||||||
|
change_map: &mut BTreeMap<LqSelector, Vec<ChangeSet>>,
|
||||||
|
) -> Result<Vec<(LqSelector, Versionstamp)>, Error> {
|
||||||
|
let tracked_cfs = tracked_cfs.lock().await;
|
||||||
|
let mut tracked_cfs_updates = Vec::with_capacity(tracked_cfs.len());
|
||||||
|
for (selector, vs) in tracked_cfs.iter() {
|
||||||
|
// Read the change feed for the selector
|
||||||
|
let res = cf::read(
|
||||||
|
&mut tx,
|
||||||
|
&selector.ns,
|
||||||
|
&selector.db,
|
||||||
|
// Technically, we can not fetch by table and do the per-table filtering this side.
|
||||||
|
// That is an improvement though
|
||||||
|
Some(&selector.tb),
|
||||||
|
ShowSince::versionstamp(vs),
|
||||||
|
Some(catchup_size),
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
// Confirm we do need to change watermark - this is technically already handled by the cf range scan
|
||||||
|
if res.is_empty() {
|
||||||
|
trace!(
|
||||||
|
"There were no changes in the change feed for {:?} from versionstamp {:?}",
|
||||||
|
selector,
|
||||||
|
conv::versionstamp_to_u64(vs)
|
||||||
|
)
|
||||||
|
}
|
||||||
|
if let Some(change_set) = res.last() {
|
||||||
|
if conv::versionstamp_to_u64(&change_set.0) > conv::versionstamp_to_u64(vs) {
|
||||||
|
trace!("Adding a change set for lq notification processing");
|
||||||
|
// Update the cf watermark so we can progress scans
|
||||||
|
// If the notifications fail from here-on, they are lost
|
||||||
|
// this is a separate vec that we later insert to because we are iterating immutably
|
||||||
|
// We shouldn't use a read lock because of consistency between watermark scans
|
||||||
|
tracked_cfs_updates.push((selector.clone(), change_set.0));
|
||||||
|
// This does not guarantee a notification, as a changeset an include many tables and many changes
|
||||||
|
change_map.insert(selector.clone(), res);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
tx.cancel().await?;
|
||||||
|
Ok(tracked_cfs_updates)
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in a new issue