From 1dfc58a8da35fc1c3700259571cc6f96d435cc86 Mon Sep 17 00:00:00 2001 From: Przemyslaw Hugh Kaznowski Date: Sat, 19 Aug 2023 13:31:51 +0100 Subject: [PATCH] Fix live queries to check predicates against previous document instead of current (#2452) --- lib/src/doc/lives.rs | 12 +++++++++-- lib/src/kvs/tx.rs | 1 + lib/tests/delete.rs | 50 +++++++++++++++++++++++++++++++------------- 3 files changed, 46 insertions(+), 17 deletions(-) diff --git a/lib/src/doc/lives.rs b/lib/src/doc/lives.rs index 7b0677ba..6cc454a7 100644 --- a/lib/src/doc/lives.rs +++ b/lib/src/doc/lives.rs @@ -30,8 +30,16 @@ impl<'a> Document<'a> { // Create a new statement let lq = Statement::from(lv); // Check LIVE SELECT where condition - if self.check(ctx, opt, txn, &lq).await.is_err() { - continue; + if let Some(cond) = lq.conds() { + // Check if this is a delete statement + let doc = match stm.is_delete() { + true => &self.initial, + false => &self.current, + }; + // Check if the expression is truthy + if !cond.compute(ctx, opt, txn, Some(doc)).await?.is_truthy() { + continue; + } } // Check what type of data change this is if stm.is_delete() { diff --git a/lib/src/kvs/tx.rs b/lib/src/kvs/tx.rs index c9b07433..476cc013 100644 --- a/lib/src/kvs/tx.rs +++ b/lib/src/kvs/tx.rs @@ -2636,6 +2636,7 @@ impl Transaction { #[allow(unused_variables)] fn check_level(&mut self, check: Check) { + #![allow(unused_variables)] match self { #[cfg(feature = "kv-mem")] Transaction { diff --git a/lib/tests/delete.rs b/lib/tests/delete.rs index 2ee885c9..5a9270a9 100644 --- a/lib/tests/delete.rs +++ b/lib/tests/delete.rs @@ -1,4 +1,6 @@ mod parse; + +use channel::{Receiver, TryRecvError}; use parse::Parse; use surrealdb::dbs::{Action, Notification, Session}; use surrealdb::err::Error; @@ -373,43 +375,46 @@ async fn check_permissions_auth_disabled() { #[tokio::test] async fn delete_filtered_live_notification() -> Result<(), Error> { - let sql = " - CREATE person:test SET value = 50; - LIVE SELECT * FROM person WHERE value<100; - DELETE person:test; - "; let dbs = Datastore::new("memory").await?.with_notifications(); let ses = Session::owner().with_ns("test").with_db("test").with_rt(true); - let res = &mut dbs.execute(sql, &ses, None).await?; - assert_eq!(res.len(), 3); - // + let res = &mut dbs.execute("CREATE person:test_true SET condition = true", &ses, None).await?; + assert_eq!(res.len(), 1); + // validate create response let tmp = res.remove(0).result?; let expected_record = Value::parse( "[ { - id: person:test, - value: 50 + id: person:test_true, + condition: true, } ]", ); assert_eq!(tmp, expected_record); - // + + // Validate live query response + let res = + &mut dbs.execute("LIVE SELECT * FROM person WHERE condition = true", &ses, None).await?; + assert_eq!(res.len(), 1); let live_id = res.remove(0).result?; let live_id = match live_id { Value::Uuid(id) => id, _ => panic!("expected uuid"), }; - // + + // Validate delete response + let res = &mut dbs.execute("DELETE person:test_true", &ses, None).await?; + assert_eq!(res.len(), 1); let tmp = res.remove(0).result?; let val = Value::parse("[]"); assert_eq!(tmp, val); - // + + // Validate notification let notifications = dbs.notifications(); let notifications = match notifications { Some(notifications) => notifications, None => panic!("expected notifications"), }; - let not = notifications.recv_blocking().unwrap(); + let not = recv_notification(¬ifications, 10, std::time::Duration::from_millis(100)).unwrap(); assert_eq!( not, Notification { @@ -417,9 +422,24 @@ async fn delete_filtered_live_notification() -> Result<(), Error> { action: Action::Delete, result: Value::Thing(Thing { tb: "person".to_string(), - id: Id::String("test".to_string()), + id: Id::String("test_true".to_string()), }), } ); Ok(()) } + +fn recv_notification( + notifications: &Receiver, + tries: u8, + poll_rate: std::time::Duration, +) -> Result { + for _ in 0..tries { + match notifications.try_recv() { + Ok(not) => return Ok(not), + Err(_) => {} + } + std::thread::sleep(poll_rate); + } + notifications.try_recv() +}