Fix live queries to check predicates against previous document instead of current (#2452)
This commit is contained in:
parent
ce6cfb7634
commit
1dfc58a8da
3 changed files with 46 additions and 17 deletions
|
@ -30,9 +30,17 @@ impl<'a> Document<'a> {
|
|||
// Create a new statement
|
||||
let lq = Statement::from(lv);
|
||||
// Check LIVE SELECT where condition
|
||||
if self.check(ctx, opt, txn, &lq).await.is_err() {
|
||||
if let Some(cond) = lq.conds() {
|
||||
// Check if this is a delete statement
|
||||
let doc = match stm.is_delete() {
|
||||
true => &self.initial,
|
||||
false => &self.current,
|
||||
};
|
||||
// Check if the expression is truthy
|
||||
if !cond.compute(ctx, opt, txn, Some(doc)).await?.is_truthy() {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
// Check what type of data change this is
|
||||
if stm.is_delete() {
|
||||
// Send a DELETE notification
|
||||
|
|
|
@ -2636,6 +2636,7 @@ impl Transaction {
|
|||
|
||||
#[allow(unused_variables)]
|
||||
fn check_level(&mut self, check: Check) {
|
||||
#![allow(unused_variables)]
|
||||
match self {
|
||||
#[cfg(feature = "kv-mem")]
|
||||
Transaction {
|
||||
|
|
|
@ -1,4 +1,6 @@
|
|||
mod parse;
|
||||
|
||||
use channel::{Receiver, TryRecvError};
|
||||
use parse::Parse;
|
||||
use surrealdb::dbs::{Action, Notification, Session};
|
||||
use surrealdb::err::Error;
|
||||
|
@ -373,43 +375,46 @@ async fn check_permissions_auth_disabled() {
|
|||
|
||||
#[tokio::test]
|
||||
async fn delete_filtered_live_notification() -> Result<(), Error> {
|
||||
let sql = "
|
||||
CREATE person:test SET value = 50;
|
||||
LIVE SELECT * FROM person WHERE value<100;
|
||||
DELETE person:test;
|
||||
";
|
||||
let dbs = Datastore::new("memory").await?.with_notifications();
|
||||
let ses = Session::owner().with_ns("test").with_db("test").with_rt(true);
|
||||
let res = &mut dbs.execute(sql, &ses, None).await?;
|
||||
assert_eq!(res.len(), 3);
|
||||
//
|
||||
let res = &mut dbs.execute("CREATE person:test_true SET condition = true", &ses, None).await?;
|
||||
assert_eq!(res.len(), 1);
|
||||
// validate create response
|
||||
let tmp = res.remove(0).result?;
|
||||
let expected_record = Value::parse(
|
||||
"[
|
||||
{
|
||||
id: person:test,
|
||||
value: 50
|
||||
id: person:test_true,
|
||||
condition: true,
|
||||
}
|
||||
]",
|
||||
);
|
||||
assert_eq!(tmp, expected_record);
|
||||
//
|
||||
|
||||
// Validate live query response
|
||||
let res =
|
||||
&mut dbs.execute("LIVE SELECT * FROM person WHERE condition = true", &ses, None).await?;
|
||||
assert_eq!(res.len(), 1);
|
||||
let live_id = res.remove(0).result?;
|
||||
let live_id = match live_id {
|
||||
Value::Uuid(id) => id,
|
||||
_ => panic!("expected uuid"),
|
||||
};
|
||||
//
|
||||
|
||||
// Validate delete response
|
||||
let res = &mut dbs.execute("DELETE person:test_true", &ses, None).await?;
|
||||
assert_eq!(res.len(), 1);
|
||||
let tmp = res.remove(0).result?;
|
||||
let val = Value::parse("[]");
|
||||
assert_eq!(tmp, val);
|
||||
//
|
||||
|
||||
// Validate notification
|
||||
let notifications = dbs.notifications();
|
||||
let notifications = match notifications {
|
||||
Some(notifications) => notifications,
|
||||
None => panic!("expected notifications"),
|
||||
};
|
||||
let not = notifications.recv_blocking().unwrap();
|
||||
let not = recv_notification(¬ifications, 10, std::time::Duration::from_millis(100)).unwrap();
|
||||
assert_eq!(
|
||||
not,
|
||||
Notification {
|
||||
|
@ -417,9 +422,24 @@ async fn delete_filtered_live_notification() -> Result<(), Error> {
|
|||
action: Action::Delete,
|
||||
result: Value::Thing(Thing {
|
||||
tb: "person".to_string(),
|
||||
id: Id::String("test".to_string()),
|
||||
id: Id::String("test_true".to_string()),
|
||||
}),
|
||||
}
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn recv_notification(
|
||||
notifications: &Receiver<Notification>,
|
||||
tries: u8,
|
||||
poll_rate: std::time::Duration,
|
||||
) -> Result<Notification, TryRecvError> {
|
||||
for _ in 0..tries {
|
||||
match notifications.try_recv() {
|
||||
Ok(not) => return Ok(not),
|
||||
Err(_) => {}
|
||||
}
|
||||
std::thread::sleep(poll_rate);
|
||||
}
|
||||
notifications.try_recv()
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue