[ws-tests/lq] Fix the LQ tests and account for unordered WS messages (#2456)

This commit is contained in:
Salvador Girones Gil 2023-08-18 15:16:02 +02:00 committed by GitHub
parent 41c9fd701e
commit c514c39e9d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 126 additions and 93 deletions

View file

@ -11,6 +11,7 @@ use std::error::Error;
use std::fs::File; use std::fs::File;
use std::path::Path; use std::path::Path;
use std::process::{Command, Stdio}; use std::process::{Command, Stdio};
use std::time::Duration;
use std::{env, fs}; use std::{env, fs};
use tokio::net::TcpStream; use tokio::net::TcpStream;
use tokio::time; use tokio::time;
@ -91,7 +92,7 @@ pub fn run_internal<P: AsRef<Path>>(args: &str, current_dir: Option<P>) -> Child
// Use local files instead of pipes to avoid deadlocks. See https://github.com/rust-lang/rust/issues/45572 // Use local files instead of pipes to avoid deadlocks. See https://github.com/rust-lang/rust/issues/45572
let stdout_path = tmp_file(format!("server-stdout-{}.log", rand::random::<u32>()).as_str()); let stdout_path = tmp_file(format!("server-stdout-{}.log", rand::random::<u32>()).as_str());
let stderr_path = tmp_file(format!("server-stderr-{}.log", rand::random::<u32>()).as_str()); let stderr_path = tmp_file(format!("server-stderr-{}.log", rand::random::<u32>()).as_str());
debug!("Logging server output to: ({}, {})", stdout_path, stderr_path); debug!("Redirecting output. args=`{args}` stdout={stdout_path} stderr={stderr_path})");
let stdout = Stdio::from(File::create(&stdout_path).unwrap()); let stdout = Stdio::from(File::create(&stdout_path).unwrap());
let stderr = Stdio::from(File::create(&stderr_path).unwrap()); let stderr = Stdio::from(File::create(&stderr_path).unwrap());
@ -246,6 +247,32 @@ pub async fn ws_recv_msg(socket: &mut WsStream) -> Result<serde_json::Value, Box
ws_recv_msg_with_fmt(socket, Format::Json).await ws_recv_msg_with_fmt(socket, Format::Json).await
} }
/// When testing Live Queries, we may receive multiple messages unordered.
/// This method captures all the expected messages before the given timeout. The result can be inspected later on to find the desired message.
pub async fn ws_recv_all_msgs(
socket: &mut WsStream,
expected: usize,
timeout: Duration,
) -> Result<Vec<serde_json::Value>, Box<dyn Error>> {
let mut res = Vec::new();
loop {
tokio::select! {
_ = time::sleep(timeout) => {
debug!("Waited for {:?} and received {} messages", timeout, res.len());
if res.len() != expected {
return Err(format!("Expected {} messages but got {} after {:?}", expected, res.len(), timeout).into());
}
}
msg = ws_recv_msg(socket) => {
res.push(msg?);
}
}
if res.len() == expected {
return Ok(res);
}
}
}
pub async fn ws_send_msg_and_wait_response( pub async fn ws_send_msg_and_wait_response(
socket: &mut WsStream, socket: &mut WsStream,
msg_req: String, msg_req: String,
@ -324,6 +351,8 @@ pub async fn ws_signin(
ws_send_msg(socket, serde_json::to_string(&json).unwrap()).await?; ws_send_msg(socket, serde_json::to_string(&json).unwrap()).await?;
let msg = ws_recv_msg(socket).await?; let msg = ws_recv_msg(socket).await?;
debug!("ws_query result json={json:?} msg={msg:?}");
match msg.as_object() { match msg.as_object() {
Some(obj) if obj.keys().all(|k| ["id", "error"].contains(&k.as_str())) => { Some(obj) if obj.keys().all(|k| ["id", "error"].contains(&k.as_str())) => {
Err(format!("unexpected error from query request: {:?}", obj.get("error")).into()) Err(format!("unexpected error from query request: {:?}", obj.get("error")).into())
@ -357,6 +386,7 @@ pub async fn ws_query(
ws_send_msg(socket, serde_json::to_string(&json).unwrap()).await?; ws_send_msg(socket, serde_json::to_string(&json).unwrap()).await?;
let msg = ws_recv_msg(socket).await?; let msg = ws_recv_msg(socket).await?;
debug!("ws_query result json={json:?} msg={msg:?}");
match msg.as_object() { match msg.as_object() {
Some(obj) if obj.keys().all(|k| ["id", "error"].contains(&k.as_str())) => { Some(obj) if obj.keys().all(|k| ["id", "error"].contains(&k.as_str())) => {
@ -394,6 +424,7 @@ pub async fn ws_use(
ws_send_msg(socket, serde_json::to_string(&json).unwrap()).await?; ws_send_msg(socket, serde_json::to_string(&json).unwrap()).await?;
let msg = ws_recv_msg(socket).await?; let msg = ws_recv_msg(socket).await?;
debug!("ws_query result json={json:?} msg={msg:?}");
match msg.as_object() { match msg.as_object() {
Some(obj) if obj.keys().all(|k| ["id", "error"].contains(&k.as_str())) => { Some(obj) if obj.keys().all(|k| ["id", "error"].contains(&k.as_str())) => {
@ -414,3 +445,23 @@ pub async fn ws_use(
} }
} }
} }
/// Check if the given message is a successful notification from LQ.
pub fn ws_msg_is_notification(msg: &serde_json::Value) -> bool {
// Example of LQ notification:
//
// Object {"result": Object {"action": String("CREATE"), "id": String("04460f07-b0e1-4339-92db-049a94aeec10"), "result": Object {"id": String("table_FD40A9A361884C56B5908A934164884A:⟨an-id-goes-here⟩"), "name": String("ok")}}}
msg.is_object()
&& msg["result"].is_object()
&& msg["result"]
.as_object()
.unwrap()
.keys()
.all(|k| ["id", "action", "result"].contains(&k.as_str()))
}
/// Check if the given message is a notification from LQ and comes from the given LQ ID.
pub fn ws_msg_is_notification_from_lq(msg: &serde_json::Value, id: &str) -> bool {
ws_msg_is_notification(msg)
&& msg["result"].as_object().unwrap().get("id").unwrap().as_str() == Some(id)
}

View file

@ -2,10 +2,11 @@
mod common; mod common;
use std::time::Duration;
use serde_json::json; use serde_json::json;
use test_log::test; use test_log::test;
use crate::common::error::TestError;
use crate::common::{PASS, USER}; use crate::common::{PASS, USER};
#[test(tokio::test)] #[test(tokio::test)]
@ -329,7 +330,7 @@ async fn live_live_endpoint() -> Result<(), Box<dyn std::error::Error>> {
let _ = common::ws_use(socket, Some(ns), Some(db)).await?; let _ = common::ws_use(socket, Some(ns), Some(db)).await?;
// LIVE query via live endpoint // LIVE query via live endpoint
let live_id = common::ws_send_msg_and_wait_response( let live_res = common::ws_send_msg_and_wait_response(
socket, socket,
serde_json::to_string(&json!({ serde_json::to_string(&json!({
"id": "1", "id": "1",
@ -341,61 +342,54 @@ async fn live_live_endpoint() -> Result<(), Box<dyn std::error::Error>> {
.unwrap(), .unwrap(),
) )
.await?; .await?;
let live_id = live_res["result"].as_str().unwrap();
// Create some data for notification // Create some data for notification
// Manually send the query and wait for multiple messages. Ordering of the messages is not guaranteed, so we could receive the notification before the query result.
let id = "an-id-goes-here"; let id = "an-id-goes-here";
let query = format!(r#"INSERT INTO {} {{"id": "{}", "name": "ok"}};"#, table_name, id); let query = format!(r#"INSERT INTO {} {{"id": "{}", "name": "ok"}};"#, table_name, id);
let created = common::ws_query(socket, query.as_str()).await.unwrap(); let json = json!({
assert_eq!(created.len(), 1); "id": "1",
"method": "query",
"params": [query],
});
// Receive notification common::ws_send_msg(socket, serde_json::to_string(&json).unwrap()).await?;
let res = common::ws_recv_msg(socket).await.unwrap();
// Verify response contains no error // Wait some time for all messages to arrive, and then search for the notification message
let msgs = common::ws_recv_all_msgs(socket, 2, Duration::from_millis(500)).await;
assert!(msgs.is_ok(), "Error waiting for messages: {:?}", msgs.err());
let msgs = msgs.unwrap();
assert!(msgs.iter().all(|v| v["error"].is_null()), "Unexpected error received: {:#?}", msgs);
let lq_notif = msgs.iter().find(|v| common::ws_msg_is_notification_from_lq(v, live_id));
assert!( assert!(
res.as_object() lq_notif.is_some(),
.ok_or(TestError::AssertionError { "Expected to find a notification for LQ id {}: {:#?}",
message: format!("Unable to retrieve object from result: {}", res) live_id,
}) msgs
.unwrap()
.keys()
.eq(["result"]),
"result: {}",
res
); );
// Extract the notification object
// Unwrap let lq_notif = lq_notif.unwrap();
let notification = &res let lq_notif = lq_notif["result"].as_object().unwrap();
.as_object()
.ok_or(TestError::NetworkError {
message: format!("missing json object, res: {:?}", res).to_string(),
})
.unwrap()["result"];
assert_eq!(
&notification["id"],
live_id["result"].as_str().unwrap(),
"expected a notification id to match the live query id: {} but was {}",
&notification,
live_id
);
let action = notification["action"].as_str().unwrap();
let result = notification["result"].as_object().unwrap();
// Verify message on individual keys since the notification ID is random // Verify message on individual keys since the notification ID is random
assert_eq!(action, &serde_json::to_value("CREATE").unwrap(), "result: {:?}", res); let action = lq_notif["action"].as_str().unwrap();
let result = lq_notif["result"].as_object().unwrap();
assert_eq!(action, "CREATE", "expected notification to be `CREATE`: {:?}", lq_notif);
let expected_id = format!("{}:⟨{}", table_name, id);
assert_eq!( assert_eq!(
result["id"].as_str().ok_or(TestError::AssertionError { result["id"].as_str(),
message: format!("missing id, res: {:?}", res).to_string(), Some(expected_id.as_str()),
})?, "expected notification to have id {:?}: {:?}",
format!("{}:⟨{}", table_name, id), expected_id,
"result: {:?}", lq_notif
res
); );
assert_eq!( assert_eq!(
result["name"].as_str().unwrap(), result["name"].as_str(),
serde_json::to_value("ok").unwrap(), Some("ok"),
"result: {:?}", "expected notification to have name `ok`: {:?}",
res lq_notif
); );
Ok(()) Ok(())
@ -416,68 +410,56 @@ async fn live_query_endpoint() -> Result<(), Box<dyn std::error::Error>> {
// LIVE query via query endpoint // LIVE query via query endpoint
let lq_res = let lq_res =
common::ws_query(socket, format!("LIVE SELECT * FROM {};", table_name).as_str()).await?; common::ws_query(socket, format!("LIVE SELECT * FROM {};", table_name).as_str()).await?;
assert_eq!(lq_res.len(), 1); assert_eq!(lq_res.len(), 1, "Expected 1 result got: {:?}", lq_res);
let live_id = lq_res let live_id = lq_res[0]["result"].as_str().unwrap();
.get(0)
.ok_or(TestError::AssertionError {
message: "Expected 1 result after len check".to_string(),
})
.unwrap();
// Create some data for notification // Create some data for notification
// Manually send the query and wait for multiple messages. Ordering of the messages is not guaranteed, so we could receive the notification before the query result.
let id = "an-id-goes-here"; let id = "an-id-goes-here";
let query = format!(r#"INSERT INTO {} {{"id": "{}", "name": "ok"}};"#, table_name, id); let query = format!(r#"INSERT INTO {} {{"id": "{}", "name": "ok"}};"#, table_name, id);
let created = common::ws_query(socket, query.as_str()).await.unwrap(); let json = json!({
assert_eq!(created.len(), 1); "id": "1",
"method": "query",
"params": [query],
});
// Receive notification common::ws_send_msg(socket, serde_json::to_string(&json).unwrap()).await?;
let res = common::ws_recv_msg(socket).await.unwrap();
// Verify response contains no error // Wait some time for all messages to arrive, and then search for the notification message
let msgs = common::ws_recv_all_msgs(socket, 2, Duration::from_millis(500)).await;
assert!(msgs.is_ok(), "Error waiting for messages: {:?}", msgs.err());
let msgs = msgs.unwrap();
assert!(msgs.iter().all(|v| v["error"].is_null()), "Unexpected error received: {:#?}", msgs);
let lq_notif = msgs.iter().find(|v| common::ws_msg_is_notification_from_lq(v, live_id));
assert!( assert!(
res.as_object() lq_notif.is_some(),
.ok_or(TestError::AssertionError { "Expected to find a notification for LQ id {}: {:#?}",
message: format!("Unable to retrieve object from result: {}", res) live_id,
}) msgs
.unwrap()
.keys()
.eq(["result"]),
"result: {}",
res
); );
// Unwrap // Extract the notification object
let notification = &res let lq_notif = lq_notif.unwrap();
.as_object() let lq_notif = lq_notif["result"].as_object().unwrap();
.ok_or(TestError::NetworkError {
message: format!("missing json object, res: {:?}", res).to_string(),
})
.unwrap()["result"];
assert_eq!(
&notification["id"],
live_id["result"].as_str().unwrap(),
"expected a notification id to match the live query id: {} but was {}",
&notification,
live_id
);
let action = notification["action"].as_str().unwrap();
let result = notification["result"].as_object().unwrap();
// Verify message on individual keys since the notification ID is random // Verify message on individual keys since the notification ID is random
assert_eq!(action, &serde_json::to_value("CREATE").unwrap(), "result: {:?}", res); let action = lq_notif["action"].as_str().unwrap();
let result = lq_notif["result"].as_object().unwrap();
assert_eq!(action, "CREATE", "expected notification to be `CREATE`: {:?}", lq_notif);
let expected_id = format!("{}:⟨{}", table_name, id);
assert_eq!( assert_eq!(
result["id"].as_str().ok_or(TestError::AssertionError { result["id"].as_str(),
message: format!("missing id, res: {:?}", res).to_string(), Some(expected_id.as_str()),
})?, "expected notification to have id {:?}: {:?}",
format!("{}:⟨{}", table_name, id), expected_id,
"result: {:?}", lq_notif
res
); );
assert_eq!( assert_eq!(
result["name"].as_str().unwrap(), result["name"].as_str(),
serde_json::to_value("ok").unwrap(), Some("ok"),
"result: {:?}", "expected notification to have name `ok`: {:?}",
res lq_notif
); );
Ok(()) Ok(())