From c514c39e9d4192a7aecc6e37d4793dcd78a91bb6 Mon Sep 17 00:00:00 2001 From: Salvador Girones Gil Date: Fri, 18 Aug 2023 15:16:02 +0200 Subject: [PATCH] [ws-tests/lq] Fix the LQ tests and account for unordered WS messages (#2456) --- tests/common/mod.rs | 53 ++++++++++++- tests/ws_integration.rs | 166 ++++++++++++++++++---------------------- 2 files changed, 126 insertions(+), 93 deletions(-) diff --git a/tests/common/mod.rs b/tests/common/mod.rs index a60ec06c..c9826b4c 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -11,6 +11,7 @@ use std::error::Error; use std::fs::File; use std::path::Path; use std::process::{Command, Stdio}; +use std::time::Duration; use std::{env, fs}; use tokio::net::TcpStream; use tokio::time; @@ -91,7 +92,7 @@ pub fn run_internal>(args: &str, current_dir: Option

) -> Child // Use local files instead of pipes to avoid deadlocks. See https://github.com/rust-lang/rust/issues/45572 let stdout_path = tmp_file(format!("server-stdout-{}.log", rand::random::()).as_str()); let stderr_path = tmp_file(format!("server-stderr-{}.log", rand::random::()).as_str()); - debug!("Logging server output to: ({}, {})", stdout_path, stderr_path); + debug!("Redirecting output. args=`{args}` stdout={stdout_path} stderr={stderr_path})"); let stdout = Stdio::from(File::create(&stdout_path).unwrap()); let stderr = Stdio::from(File::create(&stderr_path).unwrap()); @@ -246,6 +247,32 @@ pub async fn ws_recv_msg(socket: &mut WsStream) -> Result Result, Box> { + let mut res = Vec::new(); + loop { + tokio::select! { + _ = time::sleep(timeout) => { + debug!("Waited for {:?} and received {} messages", timeout, res.len()); + if res.len() != expected { + return Err(format!("Expected {} messages but got {} after {:?}", expected, res.len(), timeout).into()); + } + } + msg = ws_recv_msg(socket) => { + res.push(msg?); + } + } + if res.len() == expected { + return Ok(res); + } + } +} + pub async fn ws_send_msg_and_wait_response( socket: &mut WsStream, msg_req: String, @@ -324,6 +351,8 @@ pub async fn ws_signin( ws_send_msg(socket, serde_json::to_string(&json).unwrap()).await?; let msg = ws_recv_msg(socket).await?; + debug!("ws_query result json={json:?} msg={msg:?}"); + match msg.as_object() { Some(obj) if obj.keys().all(|k| ["id", "error"].contains(&k.as_str())) => { Err(format!("unexpected error from query request: {:?}", obj.get("error")).into()) @@ -357,6 +386,7 @@ pub async fn ws_query( ws_send_msg(socket, serde_json::to_string(&json).unwrap()).await?; let msg = ws_recv_msg(socket).await?; + debug!("ws_query result json={json:?} msg={msg:?}"); match msg.as_object() { Some(obj) if obj.keys().all(|k| ["id", "error"].contains(&k.as_str())) => { @@ -394,6 +424,7 @@ pub async fn ws_use( ws_send_msg(socket, serde_json::to_string(&json).unwrap()).await?; let msg = ws_recv_msg(socket).await?; + debug!("ws_query result json={json:?} msg={msg:?}"); match msg.as_object() { Some(obj) if obj.keys().all(|k| ["id", "error"].contains(&k.as_str())) => { @@ -414,3 +445,23 @@ pub async fn ws_use( } } } + +/// Check if the given message is a successful notification from LQ. +pub fn ws_msg_is_notification(msg: &serde_json::Value) -> bool { + // Example of LQ notification: + // + // Object {"result": Object {"action": String("CREATE"), "id": String("04460f07-b0e1-4339-92db-049a94aeec10"), "result": Object {"id": String("table_FD40A9A361884C56B5908A934164884A:⟨an-id-goes-here⟩"), "name": String("ok")}}} + msg.is_object() + && msg["result"].is_object() + && msg["result"] + .as_object() + .unwrap() + .keys() + .all(|k| ["id", "action", "result"].contains(&k.as_str())) +} + +/// Check if the given message is a notification from LQ and comes from the given LQ ID. +pub fn ws_msg_is_notification_from_lq(msg: &serde_json::Value, id: &str) -> bool { + ws_msg_is_notification(msg) + && msg["result"].as_object().unwrap().get("id").unwrap().as_str() == Some(id) +} diff --git a/tests/ws_integration.rs b/tests/ws_integration.rs index 4da2ad61..51b8ce22 100644 --- a/tests/ws_integration.rs +++ b/tests/ws_integration.rs @@ -2,10 +2,11 @@ mod common; +use std::time::Duration; + use serde_json::json; use test_log::test; -use crate::common::error::TestError; use crate::common::{PASS, USER}; #[test(tokio::test)] @@ -329,7 +330,7 @@ async fn live_live_endpoint() -> Result<(), Box> { let _ = common::ws_use(socket, Some(ns), Some(db)).await?; // LIVE query via live endpoint - let live_id = common::ws_send_msg_and_wait_response( + let live_res = common::ws_send_msg_and_wait_response( socket, serde_json::to_string(&json!({ "id": "1", @@ -341,61 +342,54 @@ async fn live_live_endpoint() -> Result<(), Box> { .unwrap(), ) .await?; + let live_id = live_res["result"].as_str().unwrap(); // Create some data for notification + // Manually send the query and wait for multiple messages. Ordering of the messages is not guaranteed, so we could receive the notification before the query result. let id = "an-id-goes-here"; let query = format!(r#"INSERT INTO {} {{"id": "{}", "name": "ok"}};"#, table_name, id); - let created = common::ws_query(socket, query.as_str()).await.unwrap(); - assert_eq!(created.len(), 1); + let json = json!({ + "id": "1", + "method": "query", + "params": [query], + }); - // Receive notification - let res = common::ws_recv_msg(socket).await.unwrap(); + common::ws_send_msg(socket, serde_json::to_string(&json).unwrap()).await?; - // Verify response contains no error + // Wait some time for all messages to arrive, and then search for the notification message + let msgs = common::ws_recv_all_msgs(socket, 2, Duration::from_millis(500)).await; + assert!(msgs.is_ok(), "Error waiting for messages: {:?}", msgs.err()); + let msgs = msgs.unwrap(); + assert!(msgs.iter().all(|v| v["error"].is_null()), "Unexpected error received: {:#?}", msgs); + + let lq_notif = msgs.iter().find(|v| common::ws_msg_is_notification_from_lq(v, live_id)); assert!( - res.as_object() - .ok_or(TestError::AssertionError { - message: format!("Unable to retrieve object from result: {}", res) - }) - .unwrap() - .keys() - .eq(["result"]), - "result: {}", - res + lq_notif.is_some(), + "Expected to find a notification for LQ id {}: {:#?}", + live_id, + msgs ); - - // Unwrap - let notification = &res - .as_object() - .ok_or(TestError::NetworkError { - message: format!("missing json object, res: {:?}", res).to_string(), - }) - .unwrap()["result"]; - assert_eq!( - ¬ification["id"], - live_id["result"].as_str().unwrap(), - "expected a notification id to match the live query id: {} but was {}", - ¬ification, - live_id - ); - let action = notification["action"].as_str().unwrap(); - let result = notification["result"].as_object().unwrap(); + // Extract the notification object + let lq_notif = lq_notif.unwrap(); + let lq_notif = lq_notif["result"].as_object().unwrap(); // Verify message on individual keys since the notification ID is random - assert_eq!(action, &serde_json::to_value("CREATE").unwrap(), "result: {:?}", res); + let action = lq_notif["action"].as_str().unwrap(); + let result = lq_notif["result"].as_object().unwrap(); + assert_eq!(action, "CREATE", "expected notification to be `CREATE`: {:?}", lq_notif); + let expected_id = format!("{}:⟨{}⟩", table_name, id); assert_eq!( - result["id"].as_str().ok_or(TestError::AssertionError { - message: format!("missing id, res: {:?}", res).to_string(), - })?, - format!("{}:⟨{}⟩", table_name, id), - "result: {:?}", - res + result["id"].as_str(), + Some(expected_id.as_str()), + "expected notification to have id {:?}: {:?}", + expected_id, + lq_notif ); assert_eq!( - result["name"].as_str().unwrap(), - serde_json::to_value("ok").unwrap(), - "result: {:?}", - res + result["name"].as_str(), + Some("ok"), + "expected notification to have name `ok`: {:?}", + lq_notif ); Ok(()) @@ -416,68 +410,56 @@ async fn live_query_endpoint() -> Result<(), Box> { // LIVE query via query endpoint let lq_res = common::ws_query(socket, format!("LIVE SELECT * FROM {};", table_name).as_str()).await?; - assert_eq!(lq_res.len(), 1); - let live_id = lq_res - .get(0) - .ok_or(TestError::AssertionError { - message: "Expected 1 result after len check".to_string(), - }) - .unwrap(); + assert_eq!(lq_res.len(), 1, "Expected 1 result got: {:?}", lq_res); + let live_id = lq_res[0]["result"].as_str().unwrap(); // Create some data for notification + // Manually send the query and wait for multiple messages. Ordering of the messages is not guaranteed, so we could receive the notification before the query result. let id = "an-id-goes-here"; let query = format!(r#"INSERT INTO {} {{"id": "{}", "name": "ok"}};"#, table_name, id); - let created = common::ws_query(socket, query.as_str()).await.unwrap(); - assert_eq!(created.len(), 1); + let json = json!({ + "id": "1", + "method": "query", + "params": [query], + }); - // Receive notification - let res = common::ws_recv_msg(socket).await.unwrap(); + common::ws_send_msg(socket, serde_json::to_string(&json).unwrap()).await?; - // Verify response contains no error + // Wait some time for all messages to arrive, and then search for the notification message + let msgs = common::ws_recv_all_msgs(socket, 2, Duration::from_millis(500)).await; + assert!(msgs.is_ok(), "Error waiting for messages: {:?}", msgs.err()); + let msgs = msgs.unwrap(); + assert!(msgs.iter().all(|v| v["error"].is_null()), "Unexpected error received: {:#?}", msgs); + + let lq_notif = msgs.iter().find(|v| common::ws_msg_is_notification_from_lq(v, live_id)); assert!( - res.as_object() - .ok_or(TestError::AssertionError { - message: format!("Unable to retrieve object from result: {}", res) - }) - .unwrap() - .keys() - .eq(["result"]), - "result: {}", - res + lq_notif.is_some(), + "Expected to find a notification for LQ id {}: {:#?}", + live_id, + msgs ); - // Unwrap - let notification = &res - .as_object() - .ok_or(TestError::NetworkError { - message: format!("missing json object, res: {:?}", res).to_string(), - }) - .unwrap()["result"]; - assert_eq!( - ¬ification["id"], - live_id["result"].as_str().unwrap(), - "expected a notification id to match the live query id: {} but was {}", - ¬ification, - live_id - ); - let action = notification["action"].as_str().unwrap(); - let result = notification["result"].as_object().unwrap(); + // Extract the notification object + let lq_notif = lq_notif.unwrap(); + let lq_notif = lq_notif["result"].as_object().unwrap(); // Verify message on individual keys since the notification ID is random - assert_eq!(action, &serde_json::to_value("CREATE").unwrap(), "result: {:?}", res); + let action = lq_notif["action"].as_str().unwrap(); + let result = lq_notif["result"].as_object().unwrap(); + assert_eq!(action, "CREATE", "expected notification to be `CREATE`: {:?}", lq_notif); + let expected_id = format!("{}:⟨{}⟩", table_name, id); assert_eq!( - result["id"].as_str().ok_or(TestError::AssertionError { - message: format!("missing id, res: {:?}", res).to_string(), - })?, - format!("{}:⟨{}⟩", table_name, id), - "result: {:?}", - res + result["id"].as_str(), + Some(expected_id.as_str()), + "expected notification to have id {:?}: {:?}", + expected_id, + lq_notif ); assert_eq!( - result["name"].as_str().unwrap(), - serde_json::to_value("ok").unwrap(), - "result: {:?}", - res + result["name"].as_str(), + Some("ok"), + "expected notification to have name `ok`: {:?}", + lq_notif ); Ok(())