diff --git a/core/src/sql/statements/live.rs b/core/src/sql/statements/live.rs index 90b3481d..bb256224 100644 --- a/core/src/sql/statements/live.rs +++ b/core/src/sql/statements/live.rs @@ -110,6 +110,8 @@ impl LiveStatement { }; // Get the transaction let txn = ctx.tx(); + // Ensure that the table definition exists + txn.ensure_ns_db_tb(ns, db, &tb, opt.strict).await?; // Lock the transaction let mut txn = txn.lock().await; // Insert the node live query @@ -153,3 +155,125 @@ impl InfoStructure for LiveStatement { }) } } + +#[cfg(test)] +mod tests { + use crate::dbs::{Action, Capabilities, Notification, Session}; + use crate::kvs::Datastore; + use crate::kvs::LockType::Optimistic; + use crate::kvs::TransactionType::Write; + use crate::sql::Value; + use crate::syn::Parse; + + pub async fn new_ds() -> Result { + Ok(Datastore::new("memory") + .await? + .with_capabilities(Capabilities::all()) + .with_notifications()) + } + + #[tokio::test] + async fn test_table_definition_is_created_for_live_query() { + let dbs = new_ds().await.unwrap().with_notifications(); + let (ns, db, tb) = ("test", "test", "person"); + let ses = Session::owner().with_ns(ns).with_db(db).with_rt(true); + + // Create a new transaction and verify that there are no tables defined. + let tx = dbs.transaction(Write, Optimistic).await.unwrap(); + let table_occurrences = &*(tx.all_tb(ns, db).await.unwrap()); + assert!(table_occurrences.is_empty()); + tx.cancel().await.unwrap(); + + // Initiate a live query statement + let lq_stmt = format!("LIVE SELECT * FROM {}", tb); + let live_query_response = &mut dbs.execute(&lq_stmt, &ses, None).await.unwrap(); + + let live_id = live_query_response.remove(0).result.unwrap(); + let live_id = match live_id { + Value::Uuid(id) => id, + _ => panic!("expected uuid"), + }; + + // Verify that the table definition has been created. + let tx = dbs.transaction(Write, Optimistic).await.unwrap(); + let table_occurrences = &*(tx.all_tb(ns, db).await.unwrap()); + assert_eq!(table_occurrences.len(), 1); + assert_eq!(table_occurrences[0].name.0, tb); + tx.cancel().await.unwrap(); + + // Initiate a Create record + let create_statement = format!("CREATE {}:test_true SET condition = true", tb); + let create_response = &mut dbs.execute(&create_statement, &ses, None).await.unwrap(); + assert_eq!(create_response.len(), 1); + let expected_record = Value::parse(&format!( + "[{{ + id: {}:test_true, + condition: true, + }}]", + tb + )); + + let tmp = create_response.remove(0).result.unwrap(); + assert_eq!(tmp, expected_record); + + // Create a new transaction to verify that the same table was used. + let tx = dbs.transaction(Write, Optimistic).await.unwrap(); + let table_occurrences = &*(tx.all_tb(ns, db).await.unwrap()); + assert_eq!(table_occurrences.len(), 1); + assert_eq!(table_occurrences[0].name.0, tb); + tx.cancel().await.unwrap(); + + // Validate notification + let notifications = dbs.notifications().expect("expected notifications"); + let notification = notifications.recv().await.unwrap(); + assert_eq!( + notification, + Notification::new( + live_id, + Action::Create, + Value::parse(&format!( + "{{ + id: {}:test_true, + condition: true, + }}", + tb + ),), + ) + ); + } + + #[tokio::test] + async fn test_table_exists_for_live_query() { + let dbs = new_ds().await.unwrap().with_notifications(); + let (ns, db, tb) = ("test", "test", "person"); + let ses = Session::owner().with_ns(ns).with_db(db).with_rt(true); + + // Create a new transaction and verify that there are no tables defined. + let tx = dbs.transaction(Write, Optimistic).await.unwrap(); + let table_occurrences = &*(tx.all_tb(ns, db).await.unwrap()); + assert!(table_occurrences.is_empty()); + tx.cancel().await.unwrap(); + + // Initiate a Create record + let create_statement = format!("CREATE {}:test_true SET condition = true", tb); + dbs.execute(&create_statement, &ses, None).await.unwrap(); + + // Create a new transaction and confirm that a new table is created. + let tx = dbs.transaction(Write, Optimistic).await.unwrap(); + let table_occurrences = &*(tx.all_tb(ns, db).await.unwrap()); + assert_eq!(table_occurrences.len(), 1); + assert_eq!(table_occurrences[0].name.0, tb); + tx.cancel().await.unwrap(); + + // Initiate a live query statement + let lq_stmt = format!("LIVE SELECT * FROM {}", tb); + dbs.execute(&lq_stmt, &ses, None).await.unwrap(); + + // Verify that the old table definition was used. + let tx = dbs.transaction(Write, Optimistic).await.unwrap(); + let table_occurrences = &*(tx.all_tb(ns, db).await.unwrap()); + assert_eq!(table_occurrences.len(), 1); + assert_eq!(table_occurrences[0].name.0, tb); + tx.cancel().await.unwrap(); + } +}