Baseline for KILL, realising other PRs need to land (#3600)
This commit is contained in:
parent
cb3ca6dd39
commit
2fe398f5b4
12 changed files with 337 additions and 102 deletions
|
@ -94,7 +94,7 @@ impl<'a> Executor<'a> {
|
|||
let lqs: Vec<TrackedResult> =
|
||||
txn.consume_pending_live_queries();
|
||||
// Track the live queries in the data store
|
||||
self.kvs.track_live_queries(&lqs).await?;
|
||||
self.kvs.adapt_tracked_live_queries(&lqs).await?;
|
||||
Ok(())
|
||||
}
|
||||
Err(e) => Err(e),
|
||||
|
@ -471,27 +471,27 @@ mod tests {
|
|||
#[tokio::test]
|
||||
async fn check_execute_option_permissions() {
|
||||
let tests = vec![
|
||||
// Root level
|
||||
(Session::for_level(().into(), Role::Owner).with_ns("NS").with_db("DB"), true, "owner at root level should be able to set options"),
|
||||
(Session::for_level(().into(), Role::Editor).with_ns("NS").with_db("DB"), true, "editor at root level should be able to set options"),
|
||||
(Session::for_level(().into(), Role::Viewer).with_ns("NS").with_db("DB"), false, "viewer at root level should not be able to set options"),
|
||||
// Root level
|
||||
(Session::for_level(().into(), Role::Owner).with_ns("NS").with_db("DB"), true, "owner at root level should be able to set options"),
|
||||
(Session::for_level(().into(), Role::Editor).with_ns("NS").with_db("DB"), true, "editor at root level should be able to set options"),
|
||||
(Session::for_level(().into(), Role::Viewer).with_ns("NS").with_db("DB"), false, "viewer at root level should not be able to set options"),
|
||||
|
||||
// Namespace level
|
||||
(Session::for_level(("NS",).into(), Role::Owner).with_ns("NS").with_db("DB"), true, "owner at namespace level should be able to set options on its namespace"),
|
||||
(Session::for_level(("NS",).into(), Role::Owner).with_ns("OTHER_NS").with_db("DB"), false, "owner at namespace level should not be able to set options on another namespace"),
|
||||
(Session::for_level(("NS",).into(), Role::Editor).with_ns("NS").with_db("DB"), true, "editor at namespace level should be able to set options on its namespace"),
|
||||
(Session::for_level(("NS",).into(), Role::Editor).with_ns("OTHER_NS").with_db("DB"), false, "editor at namespace level should not be able to set options on another namespace"),
|
||||
(Session::for_level(("NS",).into(), Role::Viewer).with_ns("NS").with_db("DB"), false, "viewer at namespace level should not be able to set options on its namespace"),
|
||||
// Namespace level
|
||||
(Session::for_level(("NS", ).into(), Role::Owner).with_ns("NS").with_db("DB"), true, "owner at namespace level should be able to set options on its namespace"),
|
||||
(Session::for_level(("NS", ).into(), Role::Owner).with_ns("OTHER_NS").with_db("DB"), false, "owner at namespace level should not be able to set options on another namespace"),
|
||||
(Session::for_level(("NS", ).into(), Role::Editor).with_ns("NS").with_db("DB"), true, "editor at namespace level should be able to set options on its namespace"),
|
||||
(Session::for_level(("NS", ).into(), Role::Editor).with_ns("OTHER_NS").with_db("DB"), false, "editor at namespace level should not be able to set options on another namespace"),
|
||||
(Session::for_level(("NS", ).into(), Role::Viewer).with_ns("NS").with_db("DB"), false, "viewer at namespace level should not be able to set options on its namespace"),
|
||||
|
||||
// Database level
|
||||
(Session::for_level(("NS", "DB").into(), Role::Owner).with_ns("NS").with_db("DB"), true, "owner at database level should be able to set options on its database"),
|
||||
(Session::for_level(("NS", "DB").into(), Role::Owner).with_ns("NS").with_db("OTHER_DB"), false, "owner at database level should not be able to set options on another database"),
|
||||
(Session::for_level(("NS", "DB").into(), Role::Owner).with_ns("OTHER_NS").with_db("DB"), false, "owner at database level should not be able to set options on another namespace even if the database name matches"),
|
||||
(Session::for_level(("NS", "DB").into(), Role::Editor).with_ns("NS").with_db("DB"), true, "editor at database level should be able to set options on its database"),
|
||||
(Session::for_level(("NS", "DB").into(), Role::Editor).with_ns("NS").with_db("OTHER_DB"), false, "editor at database level should not be able to set options on another database"),
|
||||
(Session::for_level(("NS", "DB").into(), Role::Editor).with_ns("OTHER_NS").with_db("DB"), false, "editor at database level should not be able to set options on another namespace even if the database name matches"),
|
||||
(Session::for_level(("NS", "DB").into(), Role::Viewer).with_ns("NS").with_db("DB"), false, "viewer at database level should not be able to set options on its database"),
|
||||
];
|
||||
// Database level
|
||||
(Session::for_level(("NS", "DB").into(), Role::Owner).with_ns("NS").with_db("DB"), true, "owner at database level should be able to set options on its database"),
|
||||
(Session::for_level(("NS", "DB").into(), Role::Owner).with_ns("NS").with_db("OTHER_DB"), false, "owner at database level should not be able to set options on another database"),
|
||||
(Session::for_level(("NS", "DB").into(), Role::Owner).with_ns("OTHER_NS").with_db("DB"), false, "owner at database level should not be able to set options on another namespace even if the database name matches"),
|
||||
(Session::for_level(("NS", "DB").into(), Role::Editor).with_ns("NS").with_db("DB"), true, "editor at database level should be able to set options on its database"),
|
||||
(Session::for_level(("NS", "DB").into(), Role::Editor).with_ns("NS").with_db("OTHER_DB"), false, "editor at database level should not be able to set options on another database"),
|
||||
(Session::for_level(("NS", "DB").into(), Role::Editor).with_ns("OTHER_NS").with_db("DB"), false, "editor at database level should not be able to set options on another namespace even if the database name matches"),
|
||||
(Session::for_level(("NS", "DB").into(), Role::Viewer).with_ns("NS").with_db("DB"), false, "viewer at database level should not be able to set options on its database"),
|
||||
];
|
||||
let statement = "OPTION FIELDS = false";
|
||||
|
||||
for test in tests.iter() {
|
||||
|
|
|
@ -35,7 +35,7 @@ use crate::kvs::clock::SizedClock;
|
|||
#[allow(unused_imports)]
|
||||
use crate::kvs::clock::SystemClock;
|
||||
use crate::kvs::lq_structs::{
|
||||
LqEntry, LqIndexKey, LqIndexValue, LqSelector, LqValue, TrackedResult, UnreachableLqType,
|
||||
LqIndexKey, LqIndexValue, LqSelector, LqValue, TrackedResult, UnreachableLqType,
|
||||
};
|
||||
use crate::kvs::{LockType, LockType::*, TransactionType, TransactionType::*};
|
||||
use crate::options::EngineOptions;
|
||||
|
@ -1041,12 +1041,16 @@ impl Datastore {
|
|||
}
|
||||
}
|
||||
|
||||
/// Add live queries to track on the datastore
|
||||
/// Add and kill live queries being track on the datastore
|
||||
/// These get polled by the change feed tick
|
||||
pub(crate) async fn track_live_queries(&self, lqs: &Vec<TrackedResult>) -> Result<(), Error> {
|
||||
pub(crate) async fn adapt_tracked_live_queries(
|
||||
&self,
|
||||
lqs: &Vec<TrackedResult>,
|
||||
) -> Result<(), Error> {
|
||||
// Lock the local live queries
|
||||
let mut lq_map = self.local_live_queries.write().await;
|
||||
let mut cf_watermarks = self.cf_watermarks.write().await;
|
||||
let mut watermarks_to_check: Vec<LqIndexKey> = vec![];
|
||||
for lq in lqs {
|
||||
match lq {
|
||||
TrackedResult::LiveQuery(lq) => {
|
||||
|
@ -1064,8 +1068,63 @@ impl Datastore {
|
|||
// We insert the current watermark.
|
||||
cf_watermarks.entry(selector).or_insert_with(Versionstamp::default);
|
||||
}
|
||||
TrackedResult::KillQuery(_lq) => {
|
||||
unimplemented!("Cannot kill queries yet")
|
||||
TrackedResult::KillQuery(kill_entry) => {
|
||||
let found: Option<(LqIndexKey, LqIndexValue)> = lq_map
|
||||
.iter_mut()
|
||||
.filter(|(k, _)| {
|
||||
// Get all the live queries in the ns/db pair. We don't know table
|
||||
k.selector.ns == kill_entry.ns && k.selector.db == kill_entry.db
|
||||
})
|
||||
.filter_map(|(k, v)| {
|
||||
let index = v.iter().position(|a| a.stm.id == kill_entry.live_id);
|
||||
match index {
|
||||
Some(i) => {
|
||||
let v = v.remove(i);
|
||||
// Sadly we do need to clone out of mutable reference, because of Strings
|
||||
Some((k.clone(), v))
|
||||
}
|
||||
None => None,
|
||||
}
|
||||
})
|
||||
.next();
|
||||
match found {
|
||||
None => {
|
||||
// TODO(SUR-336): Make Live Query ID validation available at statement level, perhaps via transaction
|
||||
trace!(
|
||||
"Could not find live query {:?} to kill in ns/db pair {:?}",
|
||||
&kill_entry,
|
||||
&kill_entry.ns
|
||||
);
|
||||
}
|
||||
Some(found) => {
|
||||
trace!(
|
||||
"Killed live query {:?} with found key {:?} and found value {:?}",
|
||||
&kill_entry,
|
||||
&found.0,
|
||||
&found.1
|
||||
);
|
||||
// Check if we need to remove the LQ key from tracking
|
||||
let empty = match lq_map.get(&found.0) {
|
||||
None => false,
|
||||
Some(v) => v.is_empty(),
|
||||
};
|
||||
if empty {
|
||||
trace!("Removing live query index key {:?}", &found.0);
|
||||
lq_map.remove(&found.0);
|
||||
}
|
||||
// Now add the LQ to tracked watermarks
|
||||
watermarks_to_check.push(found.0.clone());
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
// Now check if we can stop tracking watermarks
|
||||
for watermark in watermarks_to_check {
|
||||
if let Some(lq) = lq_map.get(&watermark) {
|
||||
if lq.is_empty() {
|
||||
trace!("Removing watermark for {:?}", watermark);
|
||||
cf_watermarks.remove(&watermark.selector);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1217,7 +1276,8 @@ impl Datastore {
|
|||
_ => unreachable!(),
|
||||
};
|
||||
|
||||
let (send, recv): (Sender<LqEntry>, Receiver<LqEntry>) = channel::bounded(LQ_CHANNEL_SIZE);
|
||||
let (send, recv): (Sender<TrackedResult>, Receiver<TrackedResult>) =
|
||||
channel::bounded(LQ_CHANNEL_SIZE);
|
||||
|
||||
#[allow(unreachable_code)]
|
||||
Ok(Transaction {
|
||||
|
@ -1226,7 +1286,7 @@ impl Datastore {
|
|||
cf: cf::Writer::new(),
|
||||
vso: self.versionstamp_oracle.clone(),
|
||||
clock: self.clock.clone(),
|
||||
prepared_live_queries: (Arc::new(send), Arc::new(recv)),
|
||||
prepared_async_events: (Arc::new(send), Arc::new(recv)),
|
||||
engine_options: self.engine_options,
|
||||
})
|
||||
}
|
||||
|
@ -1328,7 +1388,7 @@ impl Datastore {
|
|||
match res {
|
||||
Ok((responses, lives)) => {
|
||||
// Register live queries
|
||||
self.track_live_queries(&lives).await?;
|
||||
self.adapt_tracked_live_queries(&lives).await?;
|
||||
Ok(responses)
|
||||
}
|
||||
Err(e) => Err(e),
|
||||
|
|
|
@ -96,7 +96,17 @@ pub(crate) struct LqEntry {
|
|||
pub(crate) enum TrackedResult {
|
||||
LiveQuery(LqEntry),
|
||||
#[allow(dead_code)]
|
||||
KillQuery(LqEntry),
|
||||
KillQuery(KillEntry),
|
||||
}
|
||||
|
||||
/// KillEntry is a type that is used to hold the data necessary to kill a live query
|
||||
/// It is not used for any indexing
|
||||
#[derive(Debug)]
|
||||
#[cfg_attr(test, derive(PartialEq, Clone))]
|
||||
pub(crate) struct KillEntry {
|
||||
pub(crate) live_id: Uuid,
|
||||
pub(crate) ns: String,
|
||||
pub(crate) db: String,
|
||||
}
|
||||
|
||||
impl LqEntry {
|
||||
|
|
|
@ -17,7 +17,8 @@ mod ds;
|
|||
mod fdb;
|
||||
mod indxdb;
|
||||
mod kv;
|
||||
mod mem;
|
||||
// pub(crate) for tests
|
||||
pub(crate) mod mem;
|
||||
mod rocksdb;
|
||||
mod speedb;
|
||||
mod surrealkv;
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
use crate::kvs::lq_structs::{LqEntry, TrackedResult};
|
||||
use crate::kvs::lq_structs::{KillEntry, LqEntry, TrackedResult};
|
||||
|
||||
#[tokio::test]
|
||||
#[serial]
|
||||
|
@ -25,7 +25,7 @@ async fn live_queries_sent_to_tx_are_received() {
|
|||
auth: None,
|
||||
},
|
||||
};
|
||||
tx.pre_commit_register_live_query(lq_entry.clone()).unwrap();
|
||||
tx.pre_commit_register_async_event(TrackedResult::LiveQuery(lq_entry.clone())).unwrap();
|
||||
|
||||
tx.commit().await.unwrap();
|
||||
|
||||
|
@ -34,3 +34,27 @@ async fn live_queries_sent_to_tx_are_received() {
|
|||
assert_eq!(live_queries.len(), 1);
|
||||
assert_eq!(live_queries[0], TrackedResult::LiveQuery(lq_entry));
|
||||
}
|
||||
#[tokio::test]
|
||||
#[serial]
|
||||
async fn kill_queries_sent_to_tx_are_received() {
|
||||
let node_id = uuid::uuid!("1cae3d33-64e6-4867-bf17-d095c1b842d7");
|
||||
let clock = Arc::new(SizedClock::Fake(FakeClock::new(Timestamp::default())));
|
||||
let test = init(node_id, clock).await.unwrap();
|
||||
let mut tx = test.db.transaction(Write, Optimistic).await.unwrap();
|
||||
|
||||
let kill_entry = KillEntry {
|
||||
live_id: uuid::uuid!("f396c0cb-01ca-4213-a72d-b0240f6d00b2").into(),
|
||||
ns: "some_ns".to_string(),
|
||||
db: "some_db".to_string(),
|
||||
};
|
||||
|
||||
// Create live query data
|
||||
tx.pre_commit_register_async_event(TrackedResult::KillQuery(kill_entry.clone())).unwrap();
|
||||
|
||||
tx.commit().await.unwrap();
|
||||
|
||||
// Verify data
|
||||
let live_queries = tx.consume_pending_live_queries();
|
||||
assert_eq!(live_queries.len(), 1);
|
||||
assert_eq!(live_queries[0], TrackedResult::KillQuery(kill_entry));
|
||||
}
|
||||
|
|
|
@ -35,7 +35,7 @@ use crate::key::key_req::KeyRequirements;
|
|||
use crate::kvs::cache::Cache;
|
||||
use crate::kvs::cache::Entry;
|
||||
use crate::kvs::clock::SizedClock;
|
||||
use crate::kvs::lq_structs::{LqEntry, LqValue, TrackedResult};
|
||||
use crate::kvs::lq_structs::{LqValue, TrackedResult};
|
||||
use crate::kvs::Check;
|
||||
use crate::options::EngineOptions;
|
||||
use crate::sql;
|
||||
|
@ -92,7 +92,7 @@ pub struct Transaction {
|
|||
pub(super) cf: cf::Writer,
|
||||
pub(super) vso: Arc<Mutex<Oracle>>,
|
||||
pub(super) clock: Arc<SizedClock>,
|
||||
pub(super) prepared_live_queries: (Arc<Sender<LqEntry>>, Arc<Receiver<LqEntry>>),
|
||||
pub(super) prepared_async_events: (Arc<Sender<TrackedResult>>, Arc<Receiver<TrackedResult>>),
|
||||
pub(super) engine_options: EngineOptions,
|
||||
}
|
||||
|
||||
|
@ -331,24 +331,26 @@ impl Transaction {
|
|||
}
|
||||
|
||||
/// From the existing transaction, consume all the remaining live query registration events and return them synchronously
|
||||
/// This function does not check that a transaction was committed, but the intention is to consume from this
|
||||
/// only once the transaction is committed
|
||||
pub(crate) fn consume_pending_live_queries(&self) -> Vec<TrackedResult> {
|
||||
let mut lq: Vec<TrackedResult> =
|
||||
let mut tracked_results: Vec<TrackedResult> =
|
||||
Vec::with_capacity(self.engine_options.new_live_queries_per_transaction as usize);
|
||||
while let Ok(l) = self.prepared_live_queries.1.try_recv() {
|
||||
lq.push(TrackedResult::LiveQuery(l));
|
||||
while let Ok(tracked_result) = self.prepared_async_events.1.try_recv() {
|
||||
tracked_results.push(tracked_result);
|
||||
}
|
||||
lq
|
||||
tracked_results
|
||||
}
|
||||
|
||||
/// Sends a live query to the transaction which is forwarded only once committed
|
||||
/// And removed once a transaction is aborted
|
||||
/// Sends an async operation, such as a new live query, to the transaction which is forwarded
|
||||
/// only once committed and removed once a transaction is aborted
|
||||
// allow(dead_code) because this is used in v2, but not v1
|
||||
#[allow(dead_code)]
|
||||
pub(crate) fn pre_commit_register_live_query(
|
||||
pub(crate) fn pre_commit_register_async_event(
|
||||
&mut self,
|
||||
lq_entry: LqEntry,
|
||||
lq_entry: TrackedResult,
|
||||
) -> Result<(), Error> {
|
||||
self.prepared_live_queries.0.try_send(lq_entry).map_err(|_send_err| {
|
||||
self.prepared_async_events.0.try_send(lq_entry).map_err(|_send_err| {
|
||||
Error::Internal("Prepared lq failed to add lq to channel".to_string())
|
||||
})
|
||||
}
|
||||
|
@ -3269,7 +3271,7 @@ mod tx_test {
|
|||
auth: None,
|
||||
},
|
||||
};
|
||||
tx.pre_commit_register_live_query(lq_entry.clone()).unwrap();
|
||||
tx.pre_commit_register_async_event(TrackedResult::LiveQuery(lq_entry.clone())).unwrap();
|
||||
|
||||
tx.commit().await.unwrap();
|
||||
|
||||
|
|
|
@ -2,6 +2,8 @@ use crate::ctx::Context;
|
|||
use crate::dbs::{Options, Transaction};
|
||||
use crate::doc::CursorDoc;
|
||||
use crate::err::Error;
|
||||
use crate::fflags::FFLAGS;
|
||||
use crate::kvs::lq_structs::{KillEntry, TrackedResult};
|
||||
use crate::sql::Uuid;
|
||||
use crate::sql::Value;
|
||||
use derive::Store;
|
||||
|
@ -58,33 +60,46 @@ impl KillStatement {
|
|||
};
|
||||
// Claim transaction
|
||||
let mut run = txn.lock().await;
|
||||
// Fetch the live query key
|
||||
let key = crate::key::node::lq::new(opt.id()?, live_query_id.0, opt.ns(), opt.db());
|
||||
// Fetch the live query key if it exists
|
||||
match run.get(key).await? {
|
||||
Some(val) => match std::str::from_utf8(&val) {
|
||||
Ok(tb) => {
|
||||
// Delete the node live query
|
||||
let key =
|
||||
crate::key::node::lq::new(opt.id()?, live_query_id.0, opt.ns(), opt.db());
|
||||
run.del(key).await?;
|
||||
// Delete the table live query
|
||||
let key = crate::key::table::lq::new(opt.ns(), opt.db(), tb, live_query_id.0);
|
||||
run.del(key).await?;
|
||||
}
|
||||
_ => {
|
||||
if FFLAGS.change_feed_live_queries.enabled() {
|
||||
run.pre_commit_register_async_event(TrackedResult::KillQuery(KillEntry {
|
||||
live_id: live_query_id,
|
||||
ns: opt.ns().to_string(),
|
||||
db: opt.db().to_string(),
|
||||
}))?;
|
||||
} else {
|
||||
// Fetch the live query key
|
||||
let key = crate::key::node::lq::new(opt.id()?, live_query_id.0, opt.ns(), opt.db());
|
||||
// Fetch the live query key if it exists
|
||||
match run.get(key).await? {
|
||||
Some(val) => match std::str::from_utf8(&val) {
|
||||
Ok(tb) => {
|
||||
// Delete the node live query
|
||||
let key = crate::key::node::lq::new(
|
||||
opt.id()?,
|
||||
live_query_id.0,
|
||||
opt.ns(),
|
||||
opt.db(),
|
||||
);
|
||||
run.del(key).await?;
|
||||
// Delete the table live query
|
||||
let key =
|
||||
crate::key::table::lq::new(opt.ns(), opt.db(), tb, live_query_id.0);
|
||||
run.del(key).await?;
|
||||
}
|
||||
_ => {
|
||||
return Err(Error::KillStatement {
|
||||
value: self.id.to_string(),
|
||||
})
|
||||
}
|
||||
},
|
||||
None => {
|
||||
return Err(Error::KillStatement {
|
||||
value: self.id.to_string(),
|
||||
})
|
||||
}
|
||||
},
|
||||
None => {
|
||||
return Err(Error::KillStatement {
|
||||
value: self.id.to_string(),
|
||||
})
|
||||
}
|
||||
// Return the query id
|
||||
}
|
||||
// Return the query id
|
||||
Ok(Value::None)
|
||||
}
|
||||
}
|
||||
|
@ -94,3 +109,49 @@ impl fmt::Display for KillStatement {
|
|||
write!(f, "KILL {}", self.id)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use std::str::FromStr;
|
||||
|
||||
use crate::ctx::Context;
|
||||
use crate::dbs::Options;
|
||||
use crate::fflags::FFLAGS;
|
||||
use crate::kvs::lq_structs::{KillEntry, TrackedResult};
|
||||
use crate::kvs::{Datastore, LockType, TransactionType};
|
||||
use crate::sql::v1::statements::KillStatement;
|
||||
use crate::sql::Uuid;
|
||||
|
||||
#[test_log::test(tokio::test)]
|
||||
async fn kill_handles_uuid_event_registration() {
|
||||
if !FFLAGS.change_feed_live_queries.enabled() {
|
||||
return;
|
||||
}
|
||||
let res = KillStatement {
|
||||
id: Uuid::from_str("889757b3-2040-4da3-9ad6-47fe65bd2fb6").unwrap().into(),
|
||||
};
|
||||
let ctx = Context::default();
|
||||
let opt = Options::new()
|
||||
.with_id(uuid::Uuid::from_str("55a85e9c-7cd1-49cb-a8f7-41124d8fdaf8").unwrap())
|
||||
.with_live(true)
|
||||
.with_db(Some("database".into()))
|
||||
.with_ns(Some("namespace".into()));
|
||||
let ds = Datastore::new("memory").await.unwrap();
|
||||
let mut tx =
|
||||
ds.transaction(TransactionType::Write, LockType::Optimistic).await.unwrap().enclose();
|
||||
res.compute(&ctx, &opt, &tx, None).await.unwrap();
|
||||
|
||||
let mut tx = tx.lock().await;
|
||||
tx.commit().await.unwrap();
|
||||
|
||||
// Validate sent
|
||||
assert_eq!(
|
||||
tx.consume_pending_live_queries(),
|
||||
vec![TrackedResult::KillQuery(KillEntry {
|
||||
live_id: Uuid::from_str("889757b3-2040-4da3-9ad6-47fe65bd2fb6").unwrap(),
|
||||
ns: "namespace".to_string(),
|
||||
db: "database".to_string(),
|
||||
})]
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,13 +1,17 @@
|
|||
use std::fmt;
|
||||
|
||||
use derive::Store;
|
||||
use revision::revisioned;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::ctx::Context;
|
||||
use crate::dbs::{Options, Transaction};
|
||||
use crate::doc::CursorDoc;
|
||||
use crate::err::Error;
|
||||
use crate::fflags::FFLAGS;
|
||||
use crate::kvs::lq_structs::{KillEntry, TrackedResult};
|
||||
use crate::sql::Uuid;
|
||||
use crate::sql::Value;
|
||||
use derive::Store;
|
||||
use revision::revisioned;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::fmt;
|
||||
|
||||
#[derive(Clone, Debug, Default, Eq, PartialEq, PartialOrd, Serialize, Deserialize, Store, Hash)]
|
||||
#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
|
||||
|
@ -40,48 +44,72 @@ impl KillStatement {
|
|||
Ok(id) => Uuid(id),
|
||||
_ => {
|
||||
return Err(Error::KillStatement {
|
||||
value: self.id.to_string(),
|
||||
})
|
||||
value:
|
||||
"KILL received a parameter that could not be converted to a UUID"
|
||||
.to_string(),
|
||||
});
|
||||
}
|
||||
},
|
||||
_ => {
|
||||
return Err(Error::KillStatement {
|
||||
value: self.id.to_string(),
|
||||
})
|
||||
value: "KILL received a parameter that was not expected".to_string(),
|
||||
});
|
||||
}
|
||||
},
|
||||
Value::Strand(maybe_id) => match uuid::Uuid::try_parse(maybe_id) {
|
||||
Ok(id) => Uuid(id),
|
||||
_ => {
|
||||
return Err(Error::KillStatement {
|
||||
value: "KILL received a Strand that could not be converted to a UUID"
|
||||
.to_string(),
|
||||
});
|
||||
}
|
||||
},
|
||||
_ => {
|
||||
return Err(Error::KillStatement {
|
||||
value: self.id.to_string(),
|
||||
})
|
||||
value: "Unhandled type for KILL statement".to_string(),
|
||||
});
|
||||
}
|
||||
};
|
||||
// Claim transaction
|
||||
let mut run = txn.lock().await;
|
||||
// Fetch the live query key
|
||||
let key = crate::key::node::lq::new(opt.id()?, live_query_id.0, opt.ns(), opt.db());
|
||||
// Fetch the live query key if it exists
|
||||
match run.get(key).await? {
|
||||
Some(val) => match std::str::from_utf8(&val) {
|
||||
Ok(tb) => {
|
||||
// Delete the node live query
|
||||
let key =
|
||||
crate::key::node::lq::new(opt.id()?, live_query_id.0, opt.ns(), opt.db());
|
||||
run.del(key).await?;
|
||||
// Delete the table live query
|
||||
let key = crate::key::table::lq::new(opt.ns(), opt.db(), tb, live_query_id.0);
|
||||
run.del(key).await?;
|
||||
}
|
||||
_ => {
|
||||
if FFLAGS.change_feed_live_queries.enabled() {
|
||||
run.pre_commit_register_async_event(TrackedResult::KillQuery(KillEntry {
|
||||
live_id: live_query_id,
|
||||
ns: opt.ns().to_string(),
|
||||
db: opt.db().to_string(),
|
||||
}))?;
|
||||
} else {
|
||||
// Fetch the live query key
|
||||
let key = crate::key::node::lq::new(opt.id()?, live_query_id.0, opt.ns(), opt.db());
|
||||
// Fetch the live query key if it exists
|
||||
match run.get(key).await? {
|
||||
Some(val) => match std::str::from_utf8(&val) {
|
||||
Ok(tb) => {
|
||||
// Delete the node live query
|
||||
let key = crate::key::node::lq::new(
|
||||
opt.id()?,
|
||||
live_query_id.0,
|
||||
opt.ns(),
|
||||
opt.db(),
|
||||
);
|
||||
run.del(key).await?;
|
||||
// Delete the table live query
|
||||
let key =
|
||||
crate::key::table::lq::new(opt.ns(), opt.db(), tb, live_query_id.0);
|
||||
run.del(key).await?;
|
||||
}
|
||||
_ => {
|
||||
return Err(Error::KillStatement {
|
||||
value: self.id.to_string(),
|
||||
});
|
||||
}
|
||||
},
|
||||
None => {
|
||||
return Err(Error::KillStatement {
|
||||
value: self.id.to_string(),
|
||||
})
|
||||
value: "KILL statement uuid did not exist".to_string(),
|
||||
});
|
||||
}
|
||||
},
|
||||
None => {
|
||||
return Err(Error::KillStatement {
|
||||
value: self.id.to_string(),
|
||||
})
|
||||
}
|
||||
}
|
||||
// Return the query id
|
||||
|
@ -94,3 +122,50 @@ impl fmt::Display for KillStatement {
|
|||
write!(f, "KILL {}", self.id)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
#[cfg(feature = "sql2")]
|
||||
mod test {
|
||||
use std::str::FromStr;
|
||||
|
||||
use crate::ctx::Context;
|
||||
use crate::dbs::Options;
|
||||
use crate::fflags::FFLAGS;
|
||||
use crate::kvs::lq_structs::{KillEntry, TrackedResult};
|
||||
use crate::kvs::{Datastore, LockType, TransactionType};
|
||||
use crate::sql::statements::KillStatement;
|
||||
use crate::sql::v2::uuid::Uuid;
|
||||
|
||||
#[test_log::test(tokio::test)]
|
||||
async fn kill_handles_uuid_event_registration() {
|
||||
if !FFLAGS.change_feed_live_queries.enabled() {
|
||||
return;
|
||||
}
|
||||
let res = KillStatement {
|
||||
id: Uuid::from_str("8f92f057-c739-4bf2-9d0c-a74d01299efc").unwrap().into(),
|
||||
};
|
||||
let ctx = Context::default();
|
||||
let opt = Options::new()
|
||||
.with_id(uuid::Uuid::from_str("8c41d9f7-a627-40f7-86f5-59d56cd765c6").unwrap())
|
||||
.with_live(true)
|
||||
.with_db(Some("database".into()))
|
||||
.with_ns(Some("namespace".into()));
|
||||
let ds = Datastore::new("memory").await.unwrap();
|
||||
let mut tx =
|
||||
ds.transaction(TransactionType::Write, LockType::Optimistic).await.unwrap().enclose();
|
||||
res.compute(&ctx, &opt, &tx, None).await.unwrap();
|
||||
|
||||
let mut tx = tx.lock().await;
|
||||
tx.commit().await.unwrap();
|
||||
|
||||
// Validate sent
|
||||
assert_eq!(
|
||||
tx.consume_pending_live_queries(),
|
||||
vec![TrackedResult::KillQuery(KillEntry {
|
||||
live_id: Uuid::from_str("8f92f057-c739-4bf2-9d0c-a74d01299efc").unwrap(),
|
||||
ns: "namespace".to_string(),
|
||||
db: "database".to_string(),
|
||||
})]
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -4,7 +4,7 @@ use crate::doc::CursorDoc;
|
|||
use crate::err::Error;
|
||||
use crate::fflags::FFLAGS;
|
||||
use crate::iam::Auth;
|
||||
use crate::kvs::lq_structs::LqEntry;
|
||||
use crate::kvs::lq_structs::{LqEntry, TrackedResult};
|
||||
use crate::sql::{Cond, Fetchs, Fields, Uuid, Value};
|
||||
use derive::Store;
|
||||
use revision::revisioned;
|
||||
|
@ -103,12 +103,12 @@ impl LiveStatement {
|
|||
match stm.what.compute(ctx, opt, txn, doc).await? {
|
||||
Value::Table(_tb) => {
|
||||
// Send the live query registration hook to the transaction pre-commit channel
|
||||
run.pre_commit_register_live_query(LqEntry {
|
||||
run.pre_commit_register_async_event(TrackedResult::LiveQuery(LqEntry {
|
||||
live_id: stm.id,
|
||||
ns: opt.ns().to_string(),
|
||||
db: opt.db().to_string(),
|
||||
stm,
|
||||
})?;
|
||||
}))?;
|
||||
}
|
||||
v => {
|
||||
return Err(Error::LiveStatement {
|
||||
|
@ -134,7 +134,7 @@ impl LiveStatement {
|
|||
v => {
|
||||
return Err(Error::LiveStatement {
|
||||
value: v.to_string(),
|
||||
})
|
||||
});
|
||||
}
|
||||
};
|
||||
// Return the query id
|
||||
|
|
|
@ -70,6 +70,7 @@ impl serde::ser::SerializeStruct for SerializeKillStatement {
|
|||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::sql::v2::statements::kill::KillStatement;
|
||||
|
||||
#[test]
|
||||
fn default() {
|
||||
|
|
|
@ -33,7 +33,7 @@ mod function;
|
|||
mod idiom;
|
||||
mod json;
|
||||
mod kind;
|
||||
mod mac;
|
||||
pub(crate) mod mac;
|
||||
mod object;
|
||||
mod prime;
|
||||
mod stmt;
|
||||
|
|
|
@ -322,6 +322,7 @@ fn statements() -> Vec<Statement> {
|
|||
cols: Idioms(vec![Idiom(vec![Part::Field(Ident("a".to_owned()))])]),
|
||||
index: Index::MTree(MTreeParams {
|
||||
dimension: 4,
|
||||
_distance: Default::default(),
|
||||
distance: Distance::Minkowski(Number::Int(5)),
|
||||
_distance: Default::default(),
|
||||
capacity: 6,
|
||||
|
|
Loading…
Reference in a new issue