feat-4416: Ensure table definition exists on live queries (#4426)

Co-authored-by: Tobie Morgan Hitchcock <tobie@surrealdb.com>
This commit is contained in:
Aman Kumar 2024-08-22 19:54:59 +05:30 committed by GitHub
parent 5bf3119558
commit c5e8324653
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -110,6 +110,8 @@ impl LiveStatement {
};
// Get the transaction
let txn = ctx.tx();
// Ensure that the table definition exists
txn.ensure_ns_db_tb(ns, db, &tb, opt.strict).await?;
// Lock the transaction
let mut txn = txn.lock().await;
// Insert the node live query
@ -153,3 +155,125 @@ impl InfoStructure for LiveStatement {
})
}
}
#[cfg(test)]
mod tests {
use crate::dbs::{Action, Capabilities, Notification, Session};
use crate::kvs::Datastore;
use crate::kvs::LockType::Optimistic;
use crate::kvs::TransactionType::Write;
use crate::sql::Value;
use crate::syn::Parse;
pub async fn new_ds() -> Result<Datastore, crate::err::Error> {
Ok(Datastore::new("memory")
.await?
.with_capabilities(Capabilities::all())
.with_notifications())
}
#[tokio::test]
async fn test_table_definition_is_created_for_live_query() {
let dbs = new_ds().await.unwrap().with_notifications();
let (ns, db, tb) = ("test", "test", "person");
let ses = Session::owner().with_ns(ns).with_db(db).with_rt(true);
// Create a new transaction and verify that there are no tables defined.
let tx = dbs.transaction(Write, Optimistic).await.unwrap();
let table_occurrences = &*(tx.all_tb(ns, db).await.unwrap());
assert!(table_occurrences.is_empty());
tx.cancel().await.unwrap();
// Initiate a live query statement
let lq_stmt = format!("LIVE SELECT * FROM {}", tb);
let live_query_response = &mut dbs.execute(&lq_stmt, &ses, None).await.unwrap();
let live_id = live_query_response.remove(0).result.unwrap();
let live_id = match live_id {
Value::Uuid(id) => id,
_ => panic!("expected uuid"),
};
// Verify that the table definition has been created.
let tx = dbs.transaction(Write, Optimistic).await.unwrap();
let table_occurrences = &*(tx.all_tb(ns, db).await.unwrap());
assert_eq!(table_occurrences.len(), 1);
assert_eq!(table_occurrences[0].name.0, tb);
tx.cancel().await.unwrap();
// Initiate a Create record
let create_statement = format!("CREATE {}:test_true SET condition = true", tb);
let create_response = &mut dbs.execute(&create_statement, &ses, None).await.unwrap();
assert_eq!(create_response.len(), 1);
let expected_record = Value::parse(&format!(
"[{{
id: {}:test_true,
condition: true,
}}]",
tb
));
let tmp = create_response.remove(0).result.unwrap();
assert_eq!(tmp, expected_record);
// Create a new transaction to verify that the same table was used.
let tx = dbs.transaction(Write, Optimistic).await.unwrap();
let table_occurrences = &*(tx.all_tb(ns, db).await.unwrap());
assert_eq!(table_occurrences.len(), 1);
assert_eq!(table_occurrences[0].name.0, tb);
tx.cancel().await.unwrap();
// Validate notification
let notifications = dbs.notifications().expect("expected notifications");
let notification = notifications.recv().await.unwrap();
assert_eq!(
notification,
Notification::new(
live_id,
Action::Create,
Value::parse(&format!(
"{{
id: {}:test_true,
condition: true,
}}",
tb
),),
)
);
}
#[tokio::test]
async fn test_table_exists_for_live_query() {
let dbs = new_ds().await.unwrap().with_notifications();
let (ns, db, tb) = ("test", "test", "person");
let ses = Session::owner().with_ns(ns).with_db(db).with_rt(true);
// Create a new transaction and verify that there are no tables defined.
let tx = dbs.transaction(Write, Optimistic).await.unwrap();
let table_occurrences = &*(tx.all_tb(ns, db).await.unwrap());
assert!(table_occurrences.is_empty());
tx.cancel().await.unwrap();
// Initiate a Create record
let create_statement = format!("CREATE {}:test_true SET condition = true", tb);
dbs.execute(&create_statement, &ses, None).await.unwrap();
// Create a new transaction and confirm that a new table is created.
let tx = dbs.transaction(Write, Optimistic).await.unwrap();
let table_occurrences = &*(tx.all_tb(ns, db).await.unwrap());
assert_eq!(table_occurrences.len(), 1);
assert_eq!(table_occurrences[0].name.0, tb);
tx.cancel().await.unwrap();
// Initiate a live query statement
let lq_stmt = format!("LIVE SELECT * FROM {}", tb);
dbs.execute(&lq_stmt, &ses, None).await.unwrap();
// Verify that the old table definition was used.
let tx = dbs.transaction(Write, Optimistic).await.unwrap();
let table_occurrences = &*(tx.all_tb(ns, db).await.unwrap());
assert_eq!(table_occurrences.len(), 1);
assert_eq!(table_occurrences[0].name.0, tb);
tx.cancel().await.unwrap();
}
}