Move Live Query V2 code to separate location (#3911)
This commit is contained in:
parent
f61ac90867
commit
a45fd5c197
5 changed files with 305 additions and 351 deletions
core/src
|
@ -95,7 +95,7 @@ impl<'a> Executor<'a> {
|
|||
let lqs: Vec<TrackedResult> =
|
||||
txn.consume_pending_live_queries();
|
||||
// Track the live queries in the data store
|
||||
self.kvs.adapt_tracked_live_queries(&lqs).await?;
|
||||
self.kvs.handle_postprocessing_of_statements(&lqs).await?;
|
||||
Ok(())
|
||||
}
|
||||
Err(e) => Err(e),
|
||||
|
|
|
@ -118,6 +118,14 @@ impl<'a> Document<'a> {
|
|||
initial: CursorDoc::new(ir, id, doc_id, initial),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn current_doc(&self) -> &Value {
|
||||
self.current.doc.as_ref()
|
||||
}
|
||||
|
||||
pub(crate) fn initial_doc(&self) -> &Value {
|
||||
self.initial.doc.as_ref()
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> Document<'a> {
|
||||
|
@ -125,10 +133,18 @@ impl<'a> Document<'a> {
|
|||
pub fn changed(&self) -> bool {
|
||||
self.initial.doc != self.current.doc
|
||||
}
|
||||
/// Check if document has changed
|
||||
|
||||
/// Check if document is being created
|
||||
pub fn is_new(&self) -> bool {
|
||||
self.initial.doc.is_none()
|
||||
self.initial.doc.is_none() && self.current.doc.is_some()
|
||||
}
|
||||
|
||||
/// Check if document is being deleted
|
||||
#[allow(dead_code)]
|
||||
pub fn is_delete(&self) -> bool {
|
||||
self.current.doc.is_none()
|
||||
}
|
||||
|
||||
/// Get the table for this document
|
||||
pub async fn tb(
|
||||
&self,
|
||||
|
|
|
@ -36,17 +36,16 @@ use wasmtimer::std::{SystemTime, UNIX_EPOCH};
|
|||
|
||||
use super::tx::Transaction;
|
||||
use crate::cf;
|
||||
use crate::cf::{ChangeSet, TableMutation};
|
||||
use crate::cf::TableMutation;
|
||||
use crate::ctx::Context;
|
||||
#[cfg(feature = "jwks")]
|
||||
use crate::dbs::capabilities::NetTarget;
|
||||
use crate::dbs::{
|
||||
node::Timestamp, Attach, Capabilities, Executor, Notification, Options, Response, Session,
|
||||
Statement, Variables, Workable,
|
||||
Variables, Workable,
|
||||
};
|
||||
use crate::doc::Document;
|
||||
use crate::err::Error;
|
||||
use crate::fflags::FFLAGS;
|
||||
#[cfg(feature = "jwks")]
|
||||
use crate::iam::jwks::JwksCache;
|
||||
use crate::iam::{Action, Auth, Error as IamError, Resource, Role};
|
||||
|
@ -55,13 +54,12 @@ use crate::key::root::hb::Hb;
|
|||
use crate::kvs::clock::SizedClock;
|
||||
#[allow(unused_imports)]
|
||||
use crate::kvs::clock::SystemClock;
|
||||
use crate::kvs::lq_structs::{
|
||||
LqIndexKey, LqIndexValue, LqSelector, LqValue, TrackedResult, UnreachableLqType,
|
||||
};
|
||||
use crate::kvs::lq_cf::LiveQueryTracker;
|
||||
use crate::kvs::lq_structs::{LqValue, TrackedResult, UnreachableLqType};
|
||||
use crate::kvs::lq_v2_fut::process_lq_notifications;
|
||||
use crate::kvs::{LockType, LockType::*, TransactionType, TransactionType::*};
|
||||
use crate::options::EngineOptions;
|
||||
use crate::sql::statements::show::ShowSince;
|
||||
use crate::sql::{self, statements::DefineUserStatement, Base, Query, Uuid, Value};
|
||||
use crate::sql::{self, statements::DefineUserStatement, Base, Object, Query, Strand, Uuid, Value};
|
||||
use crate::syn;
|
||||
use crate::vs::{conv, Oracle, Versionstamp};
|
||||
|
||||
|
@ -72,6 +70,8 @@ const LQ_CHANNEL_SIZE: usize = 100;
|
|||
// The batch size used for non-paged operations (i.e. if there are more results, they are ignored)
|
||||
const NON_PAGED_BATCH_SIZE: u32 = 100_000;
|
||||
|
||||
const EMPTY_DOC: Value = Value::None;
|
||||
|
||||
/// The underlying datastore instance which stores the dataset.
|
||||
#[allow(dead_code)]
|
||||
#[non_exhaustive]
|
||||
|
@ -93,18 +93,12 @@ pub struct Datastore {
|
|||
transaction_timeout: Option<Duration>,
|
||||
// Capabilities for this datastore
|
||||
capabilities: Capabilities,
|
||||
engine_options: EngineOptions,
|
||||
pub(super) engine_options: EngineOptions,
|
||||
// The versionstamp oracle for this datastore.
|
||||
// Used only in some datastores, such as tikv.
|
||||
versionstamp_oracle: Arc<Mutex<Oracle>>,
|
||||
// Whether this datastore enables live query notifications to subscribers
|
||||
notification_channel: Option<(Sender<Notification>, Receiver<Notification>)>,
|
||||
// Map of Live Query identifier (ns+db+tb) for change feed tracking
|
||||
// the mapping is to a list of affected live queries
|
||||
local_live_queries: Arc<RwLock<BTreeMap<LqIndexKey, Vec<LqIndexValue>>>>,
|
||||
// Set of tracked change feeds with associated watermarks
|
||||
// This is updated with new/removed live queries and improves cf request performance
|
||||
cf_watermarks: Arc<Mutex<BTreeMap<LqSelector, Versionstamp>>>,
|
||||
pub(super) notification_channel: Option<(Sender<Notification>, Receiver<Notification>)>,
|
||||
// Clock for tracking time. It is read only and accessible to all transactions. It is behind a mutex as tests may write to it.
|
||||
clock: Arc<SizedClock>,
|
||||
// The index store cache
|
||||
|
@ -122,6 +116,7 @@ pub struct Datastore {
|
|||
))]
|
||||
// The temporary directory
|
||||
temporary_directory: Arc<PathBuf>,
|
||||
pub(crate) lq_cf_store: Arc<RwLock<LiveQueryTracker>>,
|
||||
}
|
||||
|
||||
/// We always want to be circulating the live query information
|
||||
|
@ -392,8 +387,6 @@ impl Datastore {
|
|||
versionstamp_oracle: Arc::new(Mutex::new(Oracle::systime_counter())),
|
||||
clock,
|
||||
index_stores: IndexStores::default(),
|
||||
local_live_queries: Arc::new(RwLock::new(BTreeMap::new())),
|
||||
cf_watermarks: Arc::new(Mutex::new(BTreeMap::new())),
|
||||
#[cfg(feature = "jwks")]
|
||||
jwks_cache: Arc::new(RwLock::new(JwksCache::new())),
|
||||
#[cfg(any(
|
||||
|
@ -405,6 +398,7 @@ impl Datastore {
|
|||
feature = "kv-speedb"
|
||||
))]
|
||||
temporary_directory: Arc::new(env::temp_dir()),
|
||||
lq_cf_store: Arc::new(RwLock::new(LiveQueryTracker::new())),
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -903,7 +897,7 @@ impl Datastore {
|
|||
// It is handy for testing, because it allows you to specify the timestamp,
|
||||
// without depending on a system clock.
|
||||
pub async fn tick_at(&self, ts: u64) -> Result<(), Error> {
|
||||
trace!("Ticking at timestamp {}", ts);
|
||||
trace!("Ticking at timestamp {} ({:?})", ts, conv::u64_to_versionstamp(ts));
|
||||
let _vs = self.save_timestamp_for_versionstamp(ts).await?;
|
||||
self.garbage_collect_stale_change_feeds(ts).await?;
|
||||
// TODO Add LQ GC
|
||||
|
@ -939,302 +933,24 @@ impl Datastore {
|
|||
stk: &mut Stk,
|
||||
opt: &Options,
|
||||
) -> Result<(), Error> {
|
||||
// Runtime feature gate, as it is not production-ready
|
||||
if !FFLAGS.change_feed_live_queries.enabled() {
|
||||
return Ok(());
|
||||
}
|
||||
// Return if there are no live queries
|
||||
if self.notification_channel.is_none() {
|
||||
trace!("Channels is none, short-circuiting");
|
||||
return Ok(());
|
||||
}
|
||||
if self.local_live_queries.read().await.is_empty() {
|
||||
trace!("No live queries, short-circuiting");
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// Change map includes a mapping of selector to changesets, ordered by versionstamp
|
||||
let mut change_map: BTreeMap<LqSelector, Vec<ChangeSet>> = BTreeMap::new();
|
||||
{
|
||||
let tx = self.transaction(Read, Optimistic).await?;
|
||||
let tracked_cfs_updates = find_required_cfs_to_catch_up(
|
||||
tx,
|
||||
self.cf_watermarks.clone(),
|
||||
self.engine_options.live_query_catchup_size,
|
||||
&mut change_map,
|
||||
)
|
||||
.await?;
|
||||
// Now we update since we are no longer iterating immutably
|
||||
let mut tracked_cfs = self.cf_watermarks.lock().await;
|
||||
for (selector, vs) in tracked_cfs_updates {
|
||||
tracked_cfs.insert(selector, vs);
|
||||
}
|
||||
};
|
||||
|
||||
for (selector, change_sets) in change_map {
|
||||
// find matching live queries
|
||||
let lq_pairs: Vec<(LqIndexKey, LqIndexValue)> = {
|
||||
let lq_lock = self.local_live_queries.read().await;
|
||||
lq_lock
|
||||
.iter()
|
||||
.filter(|(k, _)| k.selector == selector)
|
||||
.flat_map(|(lq_index, lq_values)| {
|
||||
lq_values.iter().cloned().map(|x| (lq_index.clone(), x))
|
||||
})
|
||||
.to_owned()
|
||||
.collect()
|
||||
};
|
||||
|
||||
// Find relevant changes
|
||||
let tx = Arc::new(Mutex::new(self.transaction(Read, Optimistic).await?));
|
||||
trace!("There are {} change sets", change_sets.len());
|
||||
trace!(
|
||||
"\n{}",
|
||||
change_sets
|
||||
.iter()
|
||||
.enumerate()
|
||||
.map(|(i, x)| format!("[{i}] {:?}", x))
|
||||
.collect::<Vec<String>>()
|
||||
.join("\n")
|
||||
);
|
||||
for change_set in change_sets {
|
||||
self.process_change_set_for_notifications(
|
||||
stk,
|
||||
tx.clone(),
|
||||
opt,
|
||||
change_set,
|
||||
&lq_pairs,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
}
|
||||
trace!("Finished process lq successfully");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn process_change_set_for_notifications(
|
||||
&self,
|
||||
stk: &mut Stk,
|
||||
tx: Arc<Mutex<Transaction>>,
|
||||
opt: &Options,
|
||||
change_set: ChangeSet,
|
||||
lq_pairs: &[(LqIndexKey, LqIndexValue)],
|
||||
) -> Result<(), Error> {
|
||||
// TODO(phughk): this loop can be on the inside so we are only checking lqs relavant to cf change
|
||||
trace!("Moving to next change set, {:?}", change_set);
|
||||
for (lq_key, lq_value) in lq_pairs.iter() {
|
||||
trace!(
|
||||
"Processing live query for notification key={:?} and value={:?}",
|
||||
lq_key,
|
||||
lq_value
|
||||
);
|
||||
let change_vs = change_set.0;
|
||||
let database_mutation = &change_set.1;
|
||||
for table_mutations in database_mutation.0.iter() {
|
||||
if table_mutations.0 == lq_key.selector.tb {
|
||||
// Create a doc of the table value
|
||||
// Run the 'lives' logic on the doc, while providing live queries instead of reading from storage
|
||||
// This will generate and send notifications
|
||||
trace!(
|
||||
"There are {} table mutations being prepared for notifications",
|
||||
table_mutations.1.len()
|
||||
);
|
||||
for (i, mutation) in table_mutations.1.iter().enumerate() {
|
||||
trace!(
|
||||
"[{} @ {:?}] Processing table mutation: {:?}",
|
||||
i,
|
||||
change_vs,
|
||||
mutation
|
||||
);
|
||||
trace!("Constructing document from mutation");
|
||||
if let Some(doc) = Self::construct_document(mutation) {
|
||||
// We know we are only processing a single LQ at a time, so we can limit notifications to 1
|
||||
let notification_capacity = 1;
|
||||
// We track notifications as a separate channel in case we want to process
|
||||
// for the current state we only forward
|
||||
let (sender, receiver) = channel::bounded(notification_capacity);
|
||||
doc.check_lqs_and_send_notifications(
|
||||
stk,
|
||||
opt,
|
||||
&Statement::Live(&lq_value.stm),
|
||||
&tx,
|
||||
[&lq_value.stm].as_slice(),
|
||||
&sender,
|
||||
)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
Error::Internal(format!(
|
||||
"Error checking lqs for notifications: {:?}",
|
||||
e
|
||||
))
|
||||
})?;
|
||||
|
||||
// Send the notifications to driver or api
|
||||
// TODO: evaluate if we want channel directly instead of proxy
|
||||
while let Ok(notification) = receiver.try_recv() {
|
||||
trace!("Sending notification to client");
|
||||
self.notification_channel
|
||||
.as_ref()
|
||||
.unwrap()
|
||||
.0
|
||||
.send(notification)
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
trace!("Ended notification sending")
|
||||
}
|
||||
|
||||
self.update_versionstamp(&change_vs, lq_key, lq_value).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn update_versionstamp(
|
||||
&self,
|
||||
change_vs: &Versionstamp,
|
||||
lq_key: &LqIndexKey,
|
||||
lq_value: &LqIndexValue,
|
||||
) {
|
||||
// We increase the watermark because scans are inclusive of first result
|
||||
// And we have already processed the input watermark - it is derived from the event
|
||||
// let change_vs = conv::try_u128_to_versionstamp(conv::to_u128_be(*change_vs) + 1).unwrap();
|
||||
|
||||
// Update watermarks
|
||||
trace!("Updating watermark to {:?} for index key {:?}", change_vs, lq_key);
|
||||
// For each live query we have processed we update the watermarks
|
||||
self.local_live_queries.write().await.insert(
|
||||
lq_key.clone(),
|
||||
vec![LqIndexValue {
|
||||
vs: *change_vs,
|
||||
..lq_value.clone()
|
||||
}],
|
||||
);
|
||||
|
||||
// TODO(phugk) We also update the tracked_cfs with a minimum watermark
|
||||
let mut tracked_cfs = self.cf_watermarks.lock().await;
|
||||
// TODO we may be able to re-use the key without cloning...
|
||||
tracked_cfs.insert(lq_key.selector.clone(), *change_vs).unwrap();
|
||||
}
|
||||
|
||||
/// Construct a document from a Change Feed mutation
|
||||
/// This is required to perform document operations such as live query notifications
|
||||
fn construct_document(mutation: &TableMutation) -> Option<Document> {
|
||||
match mutation {
|
||||
TableMutation::Set(id, current_value) => {
|
||||
let doc = Document::new(None, Some(id), None, current_value, Workable::Normal);
|
||||
Some(doc)
|
||||
}
|
||||
TableMutation::Del(id) => {
|
||||
let doc = Document::new(None, Some(id), None, &Value::None, Workable::Normal);
|
||||
Some(doc)
|
||||
}
|
||||
TableMutation::Def(_) => None,
|
||||
TableMutation::SetWithDiff(id, current_value, _operations) => {
|
||||
let todo_original_after_reverse_applying_patches = Value::None;
|
||||
let doc = Document::new_artificial(
|
||||
None,
|
||||
Some(id),
|
||||
None,
|
||||
Cow::Borrowed(current_value),
|
||||
Cow::Owned(todo_original_after_reverse_applying_patches),
|
||||
Workable::Normal,
|
||||
);
|
||||
trace!("Constructed artificial document: {:?}, is_new={}", doc, doc.is_new());
|
||||
// TODO(SUR-328): reverse diff and apply to doc to retrieve original version of doc
|
||||
Some(doc)
|
||||
}
|
||||
}
|
||||
process_lq_notifications(self, stk, opt).await
|
||||
}
|
||||
|
||||
/// Add and kill live queries being track on the datastore
|
||||
/// These get polled by the change feed tick
|
||||
pub(crate) async fn adapt_tracked_live_queries(
|
||||
pub(crate) async fn handle_postprocessing_of_statements(
|
||||
&self,
|
||||
lqs: &Vec<TrackedResult>,
|
||||
) -> Result<(), Error> {
|
||||
// Lock the local live queries
|
||||
let mut lq_map = self.local_live_queries.write().await;
|
||||
let mut cf_watermarks = self.cf_watermarks.lock().await;
|
||||
let mut watermarks_to_check: Vec<LqIndexKey> = vec![];
|
||||
let mut lq_cf_store = self.lq_cf_store.write().await;
|
||||
for lq in lqs {
|
||||
match lq {
|
||||
TrackedResult::LiveQuery(lq) => {
|
||||
let lq_index_key: LqIndexKey = lq.as_key();
|
||||
let m = lq_map.get_mut(&lq_index_key);
|
||||
match m {
|
||||
Some(lq_index_value) => lq_index_value
|
||||
.push(lq.as_value(Versionstamp::default(), Timestamp::default())),
|
||||
None => {
|
||||
let lq_vec =
|
||||
vec![lq.as_value(Versionstamp::default(), Timestamp::default())];
|
||||
lq_map.insert(lq_index_key.clone(), lq_vec);
|
||||
}
|
||||
}
|
||||
let selector = lq_index_key.selector;
|
||||
// TODO(phughk): - read watermark for catchup
|
||||
// We insert the current watermark.
|
||||
cf_watermarks.entry(selector).or_insert_with(Versionstamp::default);
|
||||
lq_cf_store.register_live_query(lq, Versionstamp::default()).unwrap();
|
||||
}
|
||||
TrackedResult::KillQuery(kill_entry) => {
|
||||
let found: Option<(LqIndexKey, LqIndexValue)> = lq_map
|
||||
.iter_mut()
|
||||
.filter(|(k, _)| {
|
||||
// Get all the live queries in the ns/db pair. We don't know table
|
||||
k.selector.ns == kill_entry.ns && k.selector.db == kill_entry.db
|
||||
})
|
||||
.filter_map(|(k, v)| {
|
||||
let index = v.iter().position(|a| a.stm.id == kill_entry.live_id);
|
||||
match index {
|
||||
Some(i) => {
|
||||
let v = v.remove(i);
|
||||
// Sadly we do need to clone out of mutable reference, because of Strings
|
||||
Some((k.clone(), v))
|
||||
}
|
||||
None => None,
|
||||
}
|
||||
})
|
||||
.next();
|
||||
match found {
|
||||
None => {
|
||||
// TODO(SUR-336): Make Live Query ID validation available at statement level, perhaps via transaction
|
||||
trace!(
|
||||
"Could not find live query {:?} to kill in ns/db pair {:?}",
|
||||
&kill_entry,
|
||||
&kill_entry.ns
|
||||
);
|
||||
}
|
||||
Some(found) => {
|
||||
trace!(
|
||||
"Killed live query {:?} with found key {:?} and found value {:?}",
|
||||
&kill_entry,
|
||||
&found.0,
|
||||
&found.1
|
||||
);
|
||||
// Check if we need to remove the LQ key from tracking
|
||||
let empty = match lq_map.get(&found.0) {
|
||||
None => false,
|
||||
Some(v) => v.is_empty(),
|
||||
};
|
||||
if empty {
|
||||
trace!("Removing live query index key {:?}", &found.0);
|
||||
lq_map.remove(&found.0);
|
||||
}
|
||||
// Now add the LQ to tracked watermarks
|
||||
watermarks_to_check.push(found.0.clone());
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
// Now check if we can stop tracking watermarks
|
||||
for watermark in watermarks_to_check {
|
||||
if let Some(lq) = lq_map.get(&watermark) {
|
||||
if lq.is_empty() {
|
||||
trace!("Removing watermark for {:?}", watermark);
|
||||
cf_watermarks.remove(&watermark.selector);
|
||||
lq_cf_store.unregister_live_query(kill_entry);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1338,13 +1054,13 @@ impl Datastore {
|
|||
) -> Result<Transaction, Error> {
|
||||
#![allow(unused_variables)]
|
||||
let write = match write {
|
||||
TransactionType::Read => false,
|
||||
TransactionType::Write => true,
|
||||
Read => false,
|
||||
Write => true,
|
||||
};
|
||||
|
||||
let lock = match lock {
|
||||
LockType::Pessimistic => true,
|
||||
LockType::Optimistic => false,
|
||||
Pessimistic => true,
|
||||
Optimistic => false,
|
||||
};
|
||||
|
||||
let inner = match &self.inner {
|
||||
|
@ -1517,7 +1233,7 @@ impl Datastore {
|
|||
match res {
|
||||
Ok((responses, lives)) => {
|
||||
// Register live queries
|
||||
self.adapt_tracked_live_queries(&lives).await?;
|
||||
self.handle_postprocessing_of_statements(&lives).await?;
|
||||
Ok(responses)
|
||||
}
|
||||
Err(e) => Err(e),
|
||||
|
@ -1753,50 +1469,59 @@ impl Datastore {
|
|||
}
|
||||
}
|
||||
|
||||
async fn find_required_cfs_to_catch_up(
|
||||
mut tx: Transaction,
|
||||
tracked_cfs: Arc<Mutex<BTreeMap<LqSelector, Versionstamp>>>,
|
||||
catchup_size: u32,
|
||||
change_map: &mut BTreeMap<LqSelector, Vec<ChangeSet>>,
|
||||
) -> Result<Vec<(LqSelector, Versionstamp)>, Error> {
|
||||
let tracked_cfs = tracked_cfs.lock().await;
|
||||
let mut tracked_cfs_updates = Vec::with_capacity(tracked_cfs.len());
|
||||
for (selector, vs) in tracked_cfs.iter() {
|
||||
// Read the change feed for the selector
|
||||
let res = cf::read(
|
||||
&mut tx,
|
||||
&selector.ns,
|
||||
&selector.db,
|
||||
// Technically, we can not fetch by table and do the per-table filtering this side.
|
||||
// That is an improvement though
|
||||
Some(&selector.tb),
|
||||
ShowSince::versionstamp(vs),
|
||||
Some(catchup_size),
|
||||
)
|
||||
.await?;
|
||||
// Confirm we do need to change watermark - this is technically already handled by the cf range scan
|
||||
if res.is_empty() {
|
||||
trace!(
|
||||
"There were no changes in the change feed for {:?} from versionstamp {:?}",
|
||||
selector,
|
||||
conv::versionstamp_to_u64(vs)
|
||||
)
|
||||
/// Construct a document from a Change Feed mutation
|
||||
/// This is required to perform document operations such as live query notifications
|
||||
pub(crate) fn construct_document(mutation: &TableMutation) -> Option<Document> {
|
||||
match mutation {
|
||||
TableMutation::Set(id, current_value) => {
|
||||
let doc = Document::new_artificial(
|
||||
None,
|
||||
Some(id),
|
||||
None,
|
||||
Cow::Borrowed(current_value),
|
||||
Cow::Owned(EMPTY_DOC),
|
||||
Workable::Normal,
|
||||
);
|
||||
Some(doc)
|
||||
}
|
||||
if let Some(change_set) = res.last() {
|
||||
if conv::versionstamp_to_u64(&change_set.0) > conv::versionstamp_to_u64(vs) {
|
||||
trace!("Adding a change set for lq notification processing");
|
||||
// Update the cf watermark so we can progress scans
|
||||
// If the notifications fail from here-on, they are lost
|
||||
// this is a separate vec that we later insert to because we are iterating immutably
|
||||
// We shouldn't use a read lock because of consistency between watermark scans
|
||||
tracked_cfs_updates.push((selector.clone(), change_set.0));
|
||||
// This does not guarantee a notification, as a changeset an include many tables and many changes
|
||||
change_map.insert(selector.clone(), res);
|
||||
}
|
||||
TableMutation::Del(id) => {
|
||||
let fake_previous_value_because_we_need_the_id_and_del_doesnt_store_value =
|
||||
Value::Object(Object::from(map! {
|
||||
"id" => Value::Thing(id.clone()),
|
||||
}));
|
||||
let doc = Document::new_artificial(
|
||||
None,
|
||||
Some(id),
|
||||
None,
|
||||
Cow::Owned(Value::None),
|
||||
Cow::Owned(fake_previous_value_because_we_need_the_id_and_del_doesnt_store_value),
|
||||
Workable::Normal,
|
||||
);
|
||||
Some(doc)
|
||||
}
|
||||
TableMutation::Def(_) => None,
|
||||
TableMutation::SetWithDiff(id, current_value, _operations) => {
|
||||
// We need a previous value otherwise the Value::compute function won't work correctly
|
||||
// This is also how IDs are carried into notifications, not via doc.rid
|
||||
let todo_original_after_reverse_applying_patches = Value::Object(Object::from(map! {
|
||||
"id" => Value::Thing(id.clone()),
|
||||
// This value is included so that we know for sure it is placeholder
|
||||
"fake_value" => Value::Strand(
|
||||
Strand::from( "placeholder until we can derive diffs from reversing patch operations" ))
|
||||
}));
|
||||
let doc = Document::new_artificial(
|
||||
None,
|
||||
Some(id),
|
||||
None,
|
||||
Cow::Borrowed(current_value),
|
||||
Cow::Owned(todo_original_after_reverse_applying_patches),
|
||||
Workable::Normal,
|
||||
);
|
||||
trace!("Constructed artificial document: {:?}, is_new={}", doc, doc.is_new());
|
||||
// TODO(SUR-328): reverse diff and apply to doc to retrieve original version of doc
|
||||
Some(doc)
|
||||
}
|
||||
}
|
||||
tx.cancel().await?;
|
||||
Ok(tracked_cfs_updates)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
|
212
core/src/kvs/lq_v2_fut.rs
Normal file
212
core/src/kvs/lq_v2_fut.rs
Normal file
|
@ -0,0 +1,212 @@
|
|||
use crate::cf;
|
||||
use crate::cf::{ChangeSet, TableMutation};
|
||||
use crate::dbs::{Options, Statement};
|
||||
use crate::err::Error;
|
||||
use crate::fflags::FFLAGS;
|
||||
use crate::kvs::lq_cf::LiveQueryTracker;
|
||||
use crate::kvs::lq_structs::{LqIndexKey, LqIndexValue, LqSelector};
|
||||
use crate::kvs::LockType::Optimistic;
|
||||
use crate::kvs::TransactionType::Read;
|
||||
use crate::kvs::{construct_document, Datastore, Transaction};
|
||||
use crate::sql::statements::show::ShowSince;
|
||||
use crate::vs::conv;
|
||||
use futures::lock::Mutex;
|
||||
use reblessive::tree::Stk;
|
||||
use std::collections::BTreeMap;
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::RwLock;
|
||||
|
||||
/// Poll change feeds for live query notifications
|
||||
pub async fn process_lq_notifications(
|
||||
ds: &Datastore,
|
||||
stk: &mut Stk,
|
||||
opt: &Options,
|
||||
) -> Result<(), Error> {
|
||||
// Runtime feature gate, as it is not production-ready
|
||||
if !FFLAGS.change_feed_live_queries.enabled() {
|
||||
return Ok(());
|
||||
}
|
||||
// Return if there are no live queries
|
||||
if ds.notification_channel.is_none() {
|
||||
trace!("Channels is none, short-circuiting");
|
||||
return Ok(());
|
||||
}
|
||||
if ds.lq_cf_store.read().await.is_empty() {
|
||||
// This is safe - just a shortcut
|
||||
trace!("No live queries, short-circuiting");
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// Change map includes a mapping of selector to changesets, ordered by versionstamp
|
||||
let mut relevant_changesets: BTreeMap<LqSelector, Vec<ChangeSet>> = BTreeMap::new();
|
||||
{
|
||||
let tx = ds.transaction(Read, Optimistic).await?;
|
||||
populate_relevant_changesets(
|
||||
tx,
|
||||
ds.lq_cf_store.clone(),
|
||||
ds.engine_options.live_query_catchup_size,
|
||||
&mut relevant_changesets,
|
||||
)
|
||||
.await?;
|
||||
};
|
||||
|
||||
for (selector, change_sets) in relevant_changesets {
|
||||
// find matching live queries
|
||||
let lq_pairs = ds.lq_cf_store.read().await.live_queries_for_selector(&selector);
|
||||
|
||||
// Find relevant changes
|
||||
let tx = ds.transaction(Read, Optimistic).await?.enclose();
|
||||
#[cfg(debug_assertions)]
|
||||
trace!("There are {} change sets", change_sets.len());
|
||||
#[cfg(debug_assertions)]
|
||||
trace!(
|
||||
"\n{}",
|
||||
change_sets
|
||||
.iter()
|
||||
.enumerate()
|
||||
.map(|(i, x)| format!("[{i}] {:?}", x))
|
||||
.collect::<Vec<String>>()
|
||||
.join("\n")
|
||||
);
|
||||
for change_set in change_sets {
|
||||
process_change_set_for_notifications(ds, stk, tx.clone(), opt, change_set, &lq_pairs)
|
||||
.await?;
|
||||
}
|
||||
}
|
||||
trace!("Finished process lq successfully");
|
||||
Ok(())
|
||||
}
|
||||
async fn populate_relevant_changesets(
|
||||
mut tx: Transaction,
|
||||
live_query_tracker: Arc<RwLock<LiveQueryTracker>>,
|
||||
catchup_size: u32,
|
||||
relevant_changesets: &mut BTreeMap<LqSelector, Vec<ChangeSet>>,
|
||||
) -> Result<(), Error> {
|
||||
let live_query_tracker = live_query_tracker.write().await;
|
||||
let tracked_cfs = live_query_tracker.get_watermarks().len();
|
||||
// We are going to track the latest observed versionstamp here
|
||||
for current in 0..tracked_cfs {
|
||||
// The reason we iterate this way (len+index) is because we "know" that the list won't change, but we
|
||||
// want mutable access to it so we can update it while iterating
|
||||
let (selector, vs) = live_query_tracker.get_watermark_by_enum_index(current).unwrap();
|
||||
|
||||
// Read the change feed for the selector
|
||||
#[cfg(debug_assertions)]
|
||||
trace!(
|
||||
"Checking for new changes for ns={} db={} tb={} vs={:?}",
|
||||
selector.ns,
|
||||
selector.db,
|
||||
selector.tb,
|
||||
vs
|
||||
);
|
||||
let res = cf::read(
|
||||
&mut tx,
|
||||
&selector.ns,
|
||||
&selector.db,
|
||||
// Technically, we can not fetch by table and do the per-table filtering this side.
|
||||
// That is an improvement though
|
||||
Some(&selector.tb),
|
||||
ShowSince::versionstamp(vs),
|
||||
Some(catchup_size),
|
||||
)
|
||||
.await?;
|
||||
// Confirm we do need to change watermark - this is technically already handled by the cf range scan
|
||||
if res.is_empty() {
|
||||
#[cfg(debug_assertions)]
|
||||
trace!(
|
||||
"There were no changes in the change feed for {:?} from versionstamp {:?}",
|
||||
selector,
|
||||
conv::versionstamp_to_u64(vs)
|
||||
)
|
||||
}
|
||||
if let Some(change_set) = res.last() {
|
||||
if conv::versionstamp_to_u64(&change_set.0) > conv::versionstamp_to_u64(vs) {
|
||||
#[cfg(debug_assertions)]
|
||||
trace!("Adding a change set for lq notification processing");
|
||||
// This does not guarantee a notification, as a changeset an include many tables and many changes
|
||||
relevant_changesets.insert(selector.clone(), res);
|
||||
}
|
||||
}
|
||||
}
|
||||
tx.cancel().await
|
||||
}
|
||||
|
||||
async fn process_change_set_for_notifications(
|
||||
ds: &Datastore,
|
||||
stk: &mut Stk,
|
||||
tx: Arc<Mutex<Transaction>>,
|
||||
opt: &Options,
|
||||
change_set: ChangeSet,
|
||||
lq_pairs: &[(LqIndexKey, LqIndexValue)],
|
||||
) -> Result<(), Error> {
|
||||
#[cfg(debug_assertions)]
|
||||
trace!("Moving to next change set, {:?}", change_set);
|
||||
for (lq_key, lq_value) in lq_pairs.iter() {
|
||||
#[cfg(debug_assertions)]
|
||||
trace!("Processing live query for notification key={:?} and value={:?}", lq_key, lq_value);
|
||||
let change_vs = change_set.0;
|
||||
let database_mutation = &change_set.1;
|
||||
for table_mutations in database_mutation.0.iter() {
|
||||
if table_mutations.0 == lq_key.selector.tb {
|
||||
// Create a doc of the table value
|
||||
// Run the 'lives' logic on the doc, while providing live queries instead of reading from storage
|
||||
// This will generate and send notifications
|
||||
#[cfg(debug_assertions)]
|
||||
trace!(
|
||||
"There are {} table mutations being prepared for notifications",
|
||||
table_mutations.1.len()
|
||||
);
|
||||
for (i, mutation) in table_mutations.1.iter().enumerate() {
|
||||
#[cfg(debug_assertions)]
|
||||
trace!("[{} @ {:?}] Processing table mutation: {:?} Constructing document from mutation", i, change_vs, mutation);
|
||||
if let Some(doc) = construct_document(mutation) {
|
||||
// We know we are only processing a single LQ at a time, so we can limit notifications to 1
|
||||
let notification_capacity = 1;
|
||||
// We track notifications as a separate channel in case we want to process
|
||||
// for the current state we only forward
|
||||
let (local_notification_channel_sender, local_notification_channel_recv) =
|
||||
channel::bounded(notification_capacity);
|
||||
if doc.initial_doc().is_none()
|
||||
&& doc.current_doc().is_none()
|
||||
&& !matches!(mutation, TableMutation::Del(_))
|
||||
{
|
||||
// If we have a None to None mutation, and it isn't delete, then it indicates a bad document
|
||||
panic!("Doc was wrong and the mutation was {:?}", mutation);
|
||||
}
|
||||
doc.check_lqs_and_send_notifications(
|
||||
stk,
|
||||
opt,
|
||||
&Statement::Live(&lq_value.stm),
|
||||
&tx,
|
||||
[&lq_value.stm].as_slice(),
|
||||
&local_notification_channel_sender,
|
||||
)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
Error::Internal(format!(
|
||||
"Error checking lqs for notifications: {:?}",
|
||||
e
|
||||
))
|
||||
})?;
|
||||
|
||||
// Send the notifications to driver or api
|
||||
while let Ok(notification) = local_notification_channel_recv.try_recv() {
|
||||
#[cfg(debug_assertions)]
|
||||
trace!("Sending notification to client: {:?}", notification);
|
||||
ds.notification_channel
|
||||
.as_ref()
|
||||
.unwrap()
|
||||
.0
|
||||
.send(notification)
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
}
|
||||
// Progress the live query watermark
|
||||
}
|
||||
}
|
||||
}
|
||||
ds.lq_cf_store.write().await.update_watermark_live_query(lq_key, &change_vs).unwrap();
|
||||
}
|
||||
Ok(())
|
||||
}
|
|
@ -28,6 +28,7 @@ mod tx;
|
|||
pub(crate) mod lq_structs;
|
||||
|
||||
mod lq_cf;
|
||||
mod lq_v2_fut;
|
||||
#[cfg(test)]
|
||||
mod tests;
|
||||
|
||||
|
|
Loading…
Reference in a new issue