diff --git a/core/src/dbs/executor.rs b/core/src/dbs/executor.rs index 540e4453..aeda6222 100644 --- a/core/src/dbs/executor.rs +++ b/core/src/dbs/executor.rs @@ -94,7 +94,7 @@ impl<'a> Executor<'a> { let lqs: Vec = txn.consume_pending_live_queries(); // Track the live queries in the data store - self.kvs.track_live_queries(&lqs).await?; + self.kvs.adapt_tracked_live_queries(&lqs).await?; Ok(()) } Err(e) => Err(e), @@ -471,27 +471,27 @@ mod tests { #[tokio::test] async fn check_execute_option_permissions() { let tests = vec![ - // Root level - (Session::for_level(().into(), Role::Owner).with_ns("NS").with_db("DB"), true, "owner at root level should be able to set options"), - (Session::for_level(().into(), Role::Editor).with_ns("NS").with_db("DB"), true, "editor at root level should be able to set options"), - (Session::for_level(().into(), Role::Viewer).with_ns("NS").with_db("DB"), false, "viewer at root level should not be able to set options"), + // Root level + (Session::for_level(().into(), Role::Owner).with_ns("NS").with_db("DB"), true, "owner at root level should be able to set options"), + (Session::for_level(().into(), Role::Editor).with_ns("NS").with_db("DB"), true, "editor at root level should be able to set options"), + (Session::for_level(().into(), Role::Viewer).with_ns("NS").with_db("DB"), false, "viewer at root level should not be able to set options"), - // Namespace level - (Session::for_level(("NS",).into(), Role::Owner).with_ns("NS").with_db("DB"), true, "owner at namespace level should be able to set options on its namespace"), - (Session::for_level(("NS",).into(), Role::Owner).with_ns("OTHER_NS").with_db("DB"), false, "owner at namespace level should not be able to set options on another namespace"), - (Session::for_level(("NS",).into(), Role::Editor).with_ns("NS").with_db("DB"), true, "editor at namespace level should be able to set options on its namespace"), - (Session::for_level(("NS",).into(), Role::Editor).with_ns("OTHER_NS").with_db("DB"), false, "editor at namespace level should not be able to set options on another namespace"), - (Session::for_level(("NS",).into(), Role::Viewer).with_ns("NS").with_db("DB"), false, "viewer at namespace level should not be able to set options on its namespace"), + // Namespace level + (Session::for_level(("NS", ).into(), Role::Owner).with_ns("NS").with_db("DB"), true, "owner at namespace level should be able to set options on its namespace"), + (Session::for_level(("NS", ).into(), Role::Owner).with_ns("OTHER_NS").with_db("DB"), false, "owner at namespace level should not be able to set options on another namespace"), + (Session::for_level(("NS", ).into(), Role::Editor).with_ns("NS").with_db("DB"), true, "editor at namespace level should be able to set options on its namespace"), + (Session::for_level(("NS", ).into(), Role::Editor).with_ns("OTHER_NS").with_db("DB"), false, "editor at namespace level should not be able to set options on another namespace"), + (Session::for_level(("NS", ).into(), Role::Viewer).with_ns("NS").with_db("DB"), false, "viewer at namespace level should not be able to set options on its namespace"), - // Database level - (Session::for_level(("NS", "DB").into(), Role::Owner).with_ns("NS").with_db("DB"), true, "owner at database level should be able to set options on its database"), - (Session::for_level(("NS", "DB").into(), Role::Owner).with_ns("NS").with_db("OTHER_DB"), false, "owner at database level should not be able to set options on another database"), - (Session::for_level(("NS", "DB").into(), Role::Owner).with_ns("OTHER_NS").with_db("DB"), false, "owner at database level should not be able to set options on another namespace even if the database name matches"), - (Session::for_level(("NS", "DB").into(), Role::Editor).with_ns("NS").with_db("DB"), true, "editor at database level should be able to set options on its database"), - (Session::for_level(("NS", "DB").into(), Role::Editor).with_ns("NS").with_db("OTHER_DB"), false, "editor at database level should not be able to set options on another database"), - (Session::for_level(("NS", "DB").into(), Role::Editor).with_ns("OTHER_NS").with_db("DB"), false, "editor at database level should not be able to set options on another namespace even if the database name matches"), - (Session::for_level(("NS", "DB").into(), Role::Viewer).with_ns("NS").with_db("DB"), false, "viewer at database level should not be able to set options on its database"), - ]; + // Database level + (Session::for_level(("NS", "DB").into(), Role::Owner).with_ns("NS").with_db("DB"), true, "owner at database level should be able to set options on its database"), + (Session::for_level(("NS", "DB").into(), Role::Owner).with_ns("NS").with_db("OTHER_DB"), false, "owner at database level should not be able to set options on another database"), + (Session::for_level(("NS", "DB").into(), Role::Owner).with_ns("OTHER_NS").with_db("DB"), false, "owner at database level should not be able to set options on another namespace even if the database name matches"), + (Session::for_level(("NS", "DB").into(), Role::Editor).with_ns("NS").with_db("DB"), true, "editor at database level should be able to set options on its database"), + (Session::for_level(("NS", "DB").into(), Role::Editor).with_ns("NS").with_db("OTHER_DB"), false, "editor at database level should not be able to set options on another database"), + (Session::for_level(("NS", "DB").into(), Role::Editor).with_ns("OTHER_NS").with_db("DB"), false, "editor at database level should not be able to set options on another namespace even if the database name matches"), + (Session::for_level(("NS", "DB").into(), Role::Viewer).with_ns("NS").with_db("DB"), false, "viewer at database level should not be able to set options on its database"), + ]; let statement = "OPTION FIELDS = false"; for test in tests.iter() { diff --git a/core/src/kvs/ds.rs b/core/src/kvs/ds.rs index d4c748eb..de6a85e9 100644 --- a/core/src/kvs/ds.rs +++ b/core/src/kvs/ds.rs @@ -35,7 +35,7 @@ use crate::kvs::clock::SizedClock; #[allow(unused_imports)] use crate::kvs::clock::SystemClock; use crate::kvs::lq_structs::{ - LqEntry, LqIndexKey, LqIndexValue, LqSelector, LqValue, TrackedResult, UnreachableLqType, + LqIndexKey, LqIndexValue, LqSelector, LqValue, TrackedResult, UnreachableLqType, }; use crate::kvs::{LockType, LockType::*, TransactionType, TransactionType::*}; use crate::options::EngineOptions; @@ -1041,12 +1041,16 @@ impl Datastore { } } - /// Add live queries to track on the datastore + /// Add and kill live queries being track on the datastore /// These get polled by the change feed tick - pub(crate) async fn track_live_queries(&self, lqs: &Vec) -> Result<(), Error> { + pub(crate) async fn adapt_tracked_live_queries( + &self, + lqs: &Vec, + ) -> Result<(), Error> { // Lock the local live queries let mut lq_map = self.local_live_queries.write().await; let mut cf_watermarks = self.cf_watermarks.write().await; + let mut watermarks_to_check: Vec = vec![]; for lq in lqs { match lq { TrackedResult::LiveQuery(lq) => { @@ -1064,8 +1068,63 @@ impl Datastore { // We insert the current watermark. cf_watermarks.entry(selector).or_insert_with(Versionstamp::default); } - TrackedResult::KillQuery(_lq) => { - unimplemented!("Cannot kill queries yet") + TrackedResult::KillQuery(kill_entry) => { + let found: Option<(LqIndexKey, LqIndexValue)> = lq_map + .iter_mut() + .filter(|(k, _)| { + // Get all the live queries in the ns/db pair. We don't know table + k.selector.ns == kill_entry.ns && k.selector.db == kill_entry.db + }) + .filter_map(|(k, v)| { + let index = v.iter().position(|a| a.stm.id == kill_entry.live_id); + match index { + Some(i) => { + let v = v.remove(i); + // Sadly we do need to clone out of mutable reference, because of Strings + Some((k.clone(), v)) + } + None => None, + } + }) + .next(); + match found { + None => { + // TODO(SUR-336): Make Live Query ID validation available at statement level, perhaps via transaction + trace!( + "Could not find live query {:?} to kill in ns/db pair {:?}", + &kill_entry, + &kill_entry.ns + ); + } + Some(found) => { + trace!( + "Killed live query {:?} with found key {:?} and found value {:?}", + &kill_entry, + &found.0, + &found.1 + ); + // Check if we need to remove the LQ key from tracking + let empty = match lq_map.get(&found.0) { + None => false, + Some(v) => v.is_empty(), + }; + if empty { + trace!("Removing live query index key {:?}", &found.0); + lq_map.remove(&found.0); + } + // Now add the LQ to tracked watermarks + watermarks_to_check.push(found.0.clone()); + } + }; + } + } + } + // Now check if we can stop tracking watermarks + for watermark in watermarks_to_check { + if let Some(lq) = lq_map.get(&watermark) { + if lq.is_empty() { + trace!("Removing watermark for {:?}", watermark); + cf_watermarks.remove(&watermark.selector); } } } @@ -1217,7 +1276,8 @@ impl Datastore { _ => unreachable!(), }; - let (send, recv): (Sender, Receiver) = channel::bounded(LQ_CHANNEL_SIZE); + let (send, recv): (Sender, Receiver) = + channel::bounded(LQ_CHANNEL_SIZE); #[allow(unreachable_code)] Ok(Transaction { @@ -1226,7 +1286,7 @@ impl Datastore { cf: cf::Writer::new(), vso: self.versionstamp_oracle.clone(), clock: self.clock.clone(), - prepared_live_queries: (Arc::new(send), Arc::new(recv)), + prepared_async_events: (Arc::new(send), Arc::new(recv)), engine_options: self.engine_options, }) } @@ -1328,7 +1388,7 @@ impl Datastore { match res { Ok((responses, lives)) => { // Register live queries - self.track_live_queries(&lives).await?; + self.adapt_tracked_live_queries(&lives).await?; Ok(responses) } Err(e) => Err(e), diff --git a/core/src/kvs/lq_structs.rs b/core/src/kvs/lq_structs.rs index 9a62e6c6..c706cdce 100644 --- a/core/src/kvs/lq_structs.rs +++ b/core/src/kvs/lq_structs.rs @@ -96,7 +96,17 @@ pub(crate) struct LqEntry { pub(crate) enum TrackedResult { LiveQuery(LqEntry), #[allow(dead_code)] - KillQuery(LqEntry), + KillQuery(KillEntry), +} + +/// KillEntry is a type that is used to hold the data necessary to kill a live query +/// It is not used for any indexing +#[derive(Debug)] +#[cfg_attr(test, derive(PartialEq, Clone))] +pub(crate) struct KillEntry { + pub(crate) live_id: Uuid, + pub(crate) ns: String, + pub(crate) db: String, } impl LqEntry { diff --git a/core/src/kvs/mod.rs b/core/src/kvs/mod.rs index 7d837f21..3a14cda8 100644 --- a/core/src/kvs/mod.rs +++ b/core/src/kvs/mod.rs @@ -17,7 +17,8 @@ mod ds; mod fdb; mod indxdb; mod kv; -mod mem; +// pub(crate) for tests +pub(crate) mod mem; mod rocksdb; mod speedb; mod surrealkv; diff --git a/core/src/kvs/tests/tx_test.rs b/core/src/kvs/tests/tx_test.rs index 2c6da4a4..e04bd082 100644 --- a/core/src/kvs/tests/tx_test.rs +++ b/core/src/kvs/tests/tx_test.rs @@ -1,4 +1,4 @@ -use crate::kvs::lq_structs::{LqEntry, TrackedResult}; +use crate::kvs::lq_structs::{KillEntry, LqEntry, TrackedResult}; #[tokio::test] #[serial] @@ -25,7 +25,7 @@ async fn live_queries_sent_to_tx_are_received() { auth: None, }, }; - tx.pre_commit_register_live_query(lq_entry.clone()).unwrap(); + tx.pre_commit_register_async_event(TrackedResult::LiveQuery(lq_entry.clone())).unwrap(); tx.commit().await.unwrap(); @@ -34,3 +34,27 @@ async fn live_queries_sent_to_tx_are_received() { assert_eq!(live_queries.len(), 1); assert_eq!(live_queries[0], TrackedResult::LiveQuery(lq_entry)); } +#[tokio::test] +#[serial] +async fn kill_queries_sent_to_tx_are_received() { + let node_id = uuid::uuid!("1cae3d33-64e6-4867-bf17-d095c1b842d7"); + let clock = Arc::new(SizedClock::Fake(FakeClock::new(Timestamp::default()))); + let test = init(node_id, clock).await.unwrap(); + let mut tx = test.db.transaction(Write, Optimistic).await.unwrap(); + + let kill_entry = KillEntry { + live_id: uuid::uuid!("f396c0cb-01ca-4213-a72d-b0240f6d00b2").into(), + ns: "some_ns".to_string(), + db: "some_db".to_string(), + }; + + // Create live query data + tx.pre_commit_register_async_event(TrackedResult::KillQuery(kill_entry.clone())).unwrap(); + + tx.commit().await.unwrap(); + + // Verify data + let live_queries = tx.consume_pending_live_queries(); + assert_eq!(live_queries.len(), 1); + assert_eq!(live_queries[0], TrackedResult::KillQuery(kill_entry)); +} diff --git a/core/src/kvs/tx.rs b/core/src/kvs/tx.rs index 2852de01..16c46b5e 100644 --- a/core/src/kvs/tx.rs +++ b/core/src/kvs/tx.rs @@ -35,7 +35,7 @@ use crate::key::key_req::KeyRequirements; use crate::kvs::cache::Cache; use crate::kvs::cache::Entry; use crate::kvs::clock::SizedClock; -use crate::kvs::lq_structs::{LqEntry, LqValue, TrackedResult}; +use crate::kvs::lq_structs::{LqValue, TrackedResult}; use crate::kvs::Check; use crate::options::EngineOptions; use crate::sql; @@ -92,7 +92,7 @@ pub struct Transaction { pub(super) cf: cf::Writer, pub(super) vso: Arc>, pub(super) clock: Arc, - pub(super) prepared_live_queries: (Arc>, Arc>), + pub(super) prepared_async_events: (Arc>, Arc>), pub(super) engine_options: EngineOptions, } @@ -331,24 +331,26 @@ impl Transaction { } /// From the existing transaction, consume all the remaining live query registration events and return them synchronously + /// This function does not check that a transaction was committed, but the intention is to consume from this + /// only once the transaction is committed pub(crate) fn consume_pending_live_queries(&self) -> Vec { - let mut lq: Vec = + let mut tracked_results: Vec = Vec::with_capacity(self.engine_options.new_live_queries_per_transaction as usize); - while let Ok(l) = self.prepared_live_queries.1.try_recv() { - lq.push(TrackedResult::LiveQuery(l)); + while let Ok(tracked_result) = self.prepared_async_events.1.try_recv() { + tracked_results.push(tracked_result); } - lq + tracked_results } - /// Sends a live query to the transaction which is forwarded only once committed - /// And removed once a transaction is aborted + /// Sends an async operation, such as a new live query, to the transaction which is forwarded + /// only once committed and removed once a transaction is aborted // allow(dead_code) because this is used in v2, but not v1 #[allow(dead_code)] - pub(crate) fn pre_commit_register_live_query( + pub(crate) fn pre_commit_register_async_event( &mut self, - lq_entry: LqEntry, + lq_entry: TrackedResult, ) -> Result<(), Error> { - self.prepared_live_queries.0.try_send(lq_entry).map_err(|_send_err| { + self.prepared_async_events.0.try_send(lq_entry).map_err(|_send_err| { Error::Internal("Prepared lq failed to add lq to channel".to_string()) }) } @@ -3269,7 +3271,7 @@ mod tx_test { auth: None, }, }; - tx.pre_commit_register_live_query(lq_entry.clone()).unwrap(); + tx.pre_commit_register_async_event(TrackedResult::LiveQuery(lq_entry.clone())).unwrap(); tx.commit().await.unwrap(); diff --git a/core/src/sql/v1/statements/kill.rs b/core/src/sql/v1/statements/kill.rs index 3b98a0d8..be028ccc 100644 --- a/core/src/sql/v1/statements/kill.rs +++ b/core/src/sql/v1/statements/kill.rs @@ -2,6 +2,8 @@ use crate::ctx::Context; use crate::dbs::{Options, Transaction}; use crate::doc::CursorDoc; use crate::err::Error; +use crate::fflags::FFLAGS; +use crate::kvs::lq_structs::{KillEntry, TrackedResult}; use crate::sql::Uuid; use crate::sql::Value; use derive::Store; @@ -58,33 +60,46 @@ impl KillStatement { }; // Claim transaction let mut run = txn.lock().await; - // Fetch the live query key - let key = crate::key::node::lq::new(opt.id()?, live_query_id.0, opt.ns(), opt.db()); - // Fetch the live query key if it exists - match run.get(key).await? { - Some(val) => match std::str::from_utf8(&val) { - Ok(tb) => { - // Delete the node live query - let key = - crate::key::node::lq::new(opt.id()?, live_query_id.0, opt.ns(), opt.db()); - run.del(key).await?; - // Delete the table live query - let key = crate::key::table::lq::new(opt.ns(), opt.db(), tb, live_query_id.0); - run.del(key).await?; - } - _ => { + if FFLAGS.change_feed_live_queries.enabled() { + run.pre_commit_register_async_event(TrackedResult::KillQuery(KillEntry { + live_id: live_query_id, + ns: opt.ns().to_string(), + db: opt.db().to_string(), + }))?; + } else { + // Fetch the live query key + let key = crate::key::node::lq::new(opt.id()?, live_query_id.0, opt.ns(), opt.db()); + // Fetch the live query key if it exists + match run.get(key).await? { + Some(val) => match std::str::from_utf8(&val) { + Ok(tb) => { + // Delete the node live query + let key = crate::key::node::lq::new( + opt.id()?, + live_query_id.0, + opt.ns(), + opt.db(), + ); + run.del(key).await?; + // Delete the table live query + let key = + crate::key::table::lq::new(opt.ns(), opt.db(), tb, live_query_id.0); + run.del(key).await?; + } + _ => { + return Err(Error::KillStatement { + value: self.id.to_string(), + }) + } + }, + None => { return Err(Error::KillStatement { value: self.id.to_string(), }) } - }, - None => { - return Err(Error::KillStatement { - value: self.id.to_string(), - }) } + // Return the query id } - // Return the query id Ok(Value::None) } } @@ -94,3 +109,49 @@ impl fmt::Display for KillStatement { write!(f, "KILL {}", self.id) } } + +#[cfg(test)] +mod test { + use std::str::FromStr; + + use crate::ctx::Context; + use crate::dbs::Options; + use crate::fflags::FFLAGS; + use crate::kvs::lq_structs::{KillEntry, TrackedResult}; + use crate::kvs::{Datastore, LockType, TransactionType}; + use crate::sql::v1::statements::KillStatement; + use crate::sql::Uuid; + + #[test_log::test(tokio::test)] + async fn kill_handles_uuid_event_registration() { + if !FFLAGS.change_feed_live_queries.enabled() { + return; + } + let res = KillStatement { + id: Uuid::from_str("889757b3-2040-4da3-9ad6-47fe65bd2fb6").unwrap().into(), + }; + let ctx = Context::default(); + let opt = Options::new() + .with_id(uuid::Uuid::from_str("55a85e9c-7cd1-49cb-a8f7-41124d8fdaf8").unwrap()) + .with_live(true) + .with_db(Some("database".into())) + .with_ns(Some("namespace".into())); + let ds = Datastore::new("memory").await.unwrap(); + let mut tx = + ds.transaction(TransactionType::Write, LockType::Optimistic).await.unwrap().enclose(); + res.compute(&ctx, &opt, &tx, None).await.unwrap(); + + let mut tx = tx.lock().await; + tx.commit().await.unwrap(); + + // Validate sent + assert_eq!( + tx.consume_pending_live_queries(), + vec![TrackedResult::KillQuery(KillEntry { + live_id: Uuid::from_str("889757b3-2040-4da3-9ad6-47fe65bd2fb6").unwrap(), + ns: "namespace".to_string(), + db: "database".to_string(), + })] + ); + } +} diff --git a/core/src/sql/v2/statements/kill.rs b/core/src/sql/v2/statements/kill.rs index 3b98a0d8..2ad9bf93 100644 --- a/core/src/sql/v2/statements/kill.rs +++ b/core/src/sql/v2/statements/kill.rs @@ -1,13 +1,17 @@ +use std::fmt; + +use derive::Store; +use revision::revisioned; +use serde::{Deserialize, Serialize}; + use crate::ctx::Context; use crate::dbs::{Options, Transaction}; use crate::doc::CursorDoc; use crate::err::Error; +use crate::fflags::FFLAGS; +use crate::kvs::lq_structs::{KillEntry, TrackedResult}; use crate::sql::Uuid; use crate::sql::Value; -use derive::Store; -use revision::revisioned; -use serde::{Deserialize, Serialize}; -use std::fmt; #[derive(Clone, Debug, Default, Eq, PartialEq, PartialOrd, Serialize, Deserialize, Store, Hash)] #[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))] @@ -40,48 +44,72 @@ impl KillStatement { Ok(id) => Uuid(id), _ => { return Err(Error::KillStatement { - value: self.id.to_string(), - }) + value: + "KILL received a parameter that could not be converted to a UUID" + .to_string(), + }); } }, _ => { return Err(Error::KillStatement { - value: self.id.to_string(), - }) + value: "KILL received a parameter that was not expected".to_string(), + }); + } + }, + Value::Strand(maybe_id) => match uuid::Uuid::try_parse(maybe_id) { + Ok(id) => Uuid(id), + _ => { + return Err(Error::KillStatement { + value: "KILL received a Strand that could not be converted to a UUID" + .to_string(), + }); } }, _ => { return Err(Error::KillStatement { - value: self.id.to_string(), - }) + value: "Unhandled type for KILL statement".to_string(), + }); } }; // Claim transaction let mut run = txn.lock().await; - // Fetch the live query key - let key = crate::key::node::lq::new(opt.id()?, live_query_id.0, opt.ns(), opt.db()); - // Fetch the live query key if it exists - match run.get(key).await? { - Some(val) => match std::str::from_utf8(&val) { - Ok(tb) => { - // Delete the node live query - let key = - crate::key::node::lq::new(opt.id()?, live_query_id.0, opt.ns(), opt.db()); - run.del(key).await?; - // Delete the table live query - let key = crate::key::table::lq::new(opt.ns(), opt.db(), tb, live_query_id.0); - run.del(key).await?; - } - _ => { + if FFLAGS.change_feed_live_queries.enabled() { + run.pre_commit_register_async_event(TrackedResult::KillQuery(KillEntry { + live_id: live_query_id, + ns: opt.ns().to_string(), + db: opt.db().to_string(), + }))?; + } else { + // Fetch the live query key + let key = crate::key::node::lq::new(opt.id()?, live_query_id.0, opt.ns(), opt.db()); + // Fetch the live query key if it exists + match run.get(key).await? { + Some(val) => match std::str::from_utf8(&val) { + Ok(tb) => { + // Delete the node live query + let key = crate::key::node::lq::new( + opt.id()?, + live_query_id.0, + opt.ns(), + opt.db(), + ); + run.del(key).await?; + // Delete the table live query + let key = + crate::key::table::lq::new(opt.ns(), opt.db(), tb, live_query_id.0); + run.del(key).await?; + } + _ => { + return Err(Error::KillStatement { + value: self.id.to_string(), + }); + } + }, + None => { return Err(Error::KillStatement { - value: self.id.to_string(), - }) + value: "KILL statement uuid did not exist".to_string(), + }); } - }, - None => { - return Err(Error::KillStatement { - value: self.id.to_string(), - }) } } // Return the query id @@ -94,3 +122,50 @@ impl fmt::Display for KillStatement { write!(f, "KILL {}", self.id) } } + +#[cfg(test)] +#[cfg(feature = "sql2")] +mod test { + use std::str::FromStr; + + use crate::ctx::Context; + use crate::dbs::Options; + use crate::fflags::FFLAGS; + use crate::kvs::lq_structs::{KillEntry, TrackedResult}; + use crate::kvs::{Datastore, LockType, TransactionType}; + use crate::sql::statements::KillStatement; + use crate::sql::v2::uuid::Uuid; + + #[test_log::test(tokio::test)] + async fn kill_handles_uuid_event_registration() { + if !FFLAGS.change_feed_live_queries.enabled() { + return; + } + let res = KillStatement { + id: Uuid::from_str("8f92f057-c739-4bf2-9d0c-a74d01299efc").unwrap().into(), + }; + let ctx = Context::default(); + let opt = Options::new() + .with_id(uuid::Uuid::from_str("8c41d9f7-a627-40f7-86f5-59d56cd765c6").unwrap()) + .with_live(true) + .with_db(Some("database".into())) + .with_ns(Some("namespace".into())); + let ds = Datastore::new("memory").await.unwrap(); + let mut tx = + ds.transaction(TransactionType::Write, LockType::Optimistic).await.unwrap().enclose(); + res.compute(&ctx, &opt, &tx, None).await.unwrap(); + + let mut tx = tx.lock().await; + tx.commit().await.unwrap(); + + // Validate sent + assert_eq!( + tx.consume_pending_live_queries(), + vec![TrackedResult::KillQuery(KillEntry { + live_id: Uuid::from_str("8f92f057-c739-4bf2-9d0c-a74d01299efc").unwrap(), + ns: "namespace".to_string(), + db: "database".to_string(), + })] + ); + } +} diff --git a/core/src/sql/v2/statements/live.rs b/core/src/sql/v2/statements/live.rs index 1d3ee60e..05f17f60 100644 --- a/core/src/sql/v2/statements/live.rs +++ b/core/src/sql/v2/statements/live.rs @@ -4,7 +4,7 @@ use crate::doc::CursorDoc; use crate::err::Error; use crate::fflags::FFLAGS; use crate::iam::Auth; -use crate::kvs::lq_structs::LqEntry; +use crate::kvs::lq_structs::{LqEntry, TrackedResult}; use crate::sql::{Cond, Fetchs, Fields, Uuid, Value}; use derive::Store; use revision::revisioned; @@ -103,12 +103,12 @@ impl LiveStatement { match stm.what.compute(ctx, opt, txn, doc).await? { Value::Table(_tb) => { // Send the live query registration hook to the transaction pre-commit channel - run.pre_commit_register_live_query(LqEntry { + run.pre_commit_register_async_event(TrackedResult::LiveQuery(LqEntry { live_id: stm.id, ns: opt.ns().to_string(), db: opt.db().to_string(), stm, - })?; + }))?; } v => { return Err(Error::LiveStatement { @@ -134,7 +134,7 @@ impl LiveStatement { v => { return Err(Error::LiveStatement { value: v.to_string(), - }) + }); } }; // Return the query id diff --git a/core/src/sql/v2/value/serde/ser/statement/kill.rs b/core/src/sql/v2/value/serde/ser/statement/kill.rs index ce7e33a6..be75f547 100644 --- a/core/src/sql/v2/value/serde/ser/statement/kill.rs +++ b/core/src/sql/v2/value/serde/ser/statement/kill.rs @@ -70,6 +70,7 @@ impl serde::ser::SerializeStruct for SerializeKillStatement { #[cfg(test)] mod tests { use super::*; + use crate::sql::v2::statements::kill::KillStatement; #[test] fn default() { diff --git a/core/src/syn/v2/parser/mod.rs b/core/src/syn/v2/parser/mod.rs index 7bcc5df3..0c1d4679 100644 --- a/core/src/syn/v2/parser/mod.rs +++ b/core/src/syn/v2/parser/mod.rs @@ -33,7 +33,7 @@ mod function; mod idiom; mod json; mod kind; -mod mac; +pub(crate) mod mac; mod object; mod prime; mod stmt; diff --git a/core/src/syn/v2/parser/test/streaming.rs b/core/src/syn/v2/parser/test/streaming.rs index ca5e366e..74ee3f88 100644 --- a/core/src/syn/v2/parser/test/streaming.rs +++ b/core/src/syn/v2/parser/test/streaming.rs @@ -322,6 +322,7 @@ fn statements() -> Vec { cols: Idioms(vec![Idiom(vec![Part::Field(Ident("a".to_owned()))])]), index: Index::MTree(MTreeParams { dimension: 4, + _distance: Default::default(), distance: Distance::Minkowski(Number::Int(5)), _distance: Default::default(), capacity: 6,