Store evaluated live query params ()

This commit is contained in:
Przemyslaw Hugh Kaznowski 2024-04-26 10:09:33 +01:00 committed by GitHub
parent 8ae2908d6e
commit 6783f5ee11
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 48 additions and 0 deletions
core/src
kvs/tests
sql/statements

View file

@ -1,3 +1,4 @@
use crate::kvs::lq_structs::{LqIndexKey, LqIndexValue, LqSelector};
use uuid::Uuid;
#[tokio::test]
@ -38,3 +39,45 @@ async fn scan_node_lq() {
tx.commit().await.unwrap();
}
#[test_log::test(tokio::test)]
async fn live_params_are_evaluated() {
if !FFLAGS.change_feed_live_queries.enabled() {
return;
}
let node_id = Uuid::parse_str("9cb22db9-1851-4781-8847-d781a3f373ae").unwrap();
let clock = Arc::new(SizedClock::Fake(FakeClock::new(Timestamp::default())));
let test = init(node_id, clock).await.unwrap();
let sess = Session::owner().with_ns("test_namespace").with_db("test_database");
let params = map! {
"expected_table".to_string() => Value::Table(sql::Table("test_table".to_string())),
};
test.db.execute("DEFINE TABLE expected_table CHANGEFEED 10m INCLUDE ORIGINAL; LIVE SELECT * FROM $expected_table", &sess, Some(params)).await.unwrap();
let mut res = test.db.lq_cf_store.read().await.live_queries_for_selector(&LqSelector {
ns: "test_namespace".to_string(),
db: "test_database".to_string(),
tb: "test_table".to_string(),
});
assert_eq!(res.len(), 1);
// We remove the unknown value
res[0].0.lq = Default::default();
assert_eq!(
res,
vec![(
LqIndexKey {
selector: LqSelector {
ns: "test_namespace".to_string(),
db: "test_database".to_string(),
tb: "test_table".to_string(),
},
lq: Default::default(),
},
LqIndexValue {
stm: Default::default(),
vs: [0; 10],
ts: Default::default(),
}
)]
)
}

View file

@ -34,6 +34,7 @@ type ClockType = Arc<SizedClock>;
#[cfg(feature = "kv-mem")]
mod mem {
use crate::fflags::FFLAGS;
use crate::kvs::tests::{ClockType, Kvs};
use crate::kvs::Datastore;
use crate::kvs::LockType;

View file

@ -109,6 +109,10 @@ impl LiveStatement {
let mut run = txn.lock().await;
match stm.what.compute(stk, ctx, opt, txn, doc).await? {
Value::Table(tb) => {
// We modify the table as it can be a $PARAM and the compute evaluates that
let mut stm = stm;
stm.what = Value::Table(tb.clone());
let ns = opt.ns().to_string();
let db = opt.db().to_string();
self.validate_change_feed_valid(&mut run, &ns, &db, &tb).await?;