diff --git a/src/net/health.rs b/src/net/health.rs index 9285fca3..f4da6b55 100644 --- a/src/net/health.rs +++ b/src/net/health.rs @@ -23,6 +23,7 @@ async fn handler() -> impl IntoResponse { // The transaction was successful Ok(mut tx) => { // Cancel the transaction + trace!("Health endpoint cancelling transaction"); let _ = tx.cancel().await; // Return the response Ok(()) diff --git a/tests/common/error.rs b/tests/common/error.rs new file mode 100644 index 00000000..6bcf8cb4 --- /dev/null +++ b/tests/common/error.rs @@ -0,0 +1,15 @@ +use std::fmt::Debug; +use thiserror::Error; + +#[derive(Debug, Error)] +pub enum TestError { + #[error("A network error occurred: {message}")] + NetworkError { + message: String, + }, + + #[error("An assertion failed as part of an invocation stack: {message}")] + AssertionError { + message: String, + }, +} diff --git a/tests/common/mod.rs b/tests/common/mod.rs index ca24026a..8bff457f 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -1,4 +1,7 @@ #![allow(dead_code)] + +pub mod error; + use futures_util::{SinkExt, StreamExt, TryStreamExt}; use rand::{thread_rng, Rng}; use serde::{Deserialize, Serialize}; @@ -14,6 +17,8 @@ use tokio_tungstenite::tungstenite::Message; use tokio_tungstenite::{connect_async, MaybeTlsStream, WebSocketStream}; use tracing::{error, info}; +use crate::common::error::TestError; + pub const USER: &str = "root"; pub const PASS: &str = "root"; @@ -190,12 +195,36 @@ pub async fn ws_send_msg( ws_send_msg_with_fmt(socket, msg_req, Format::Json).await } +pub async fn ws_recv_msg(socket: &mut WsStream) -> Result> { + ws_recv_msg_with_fmt(socket, Format::Json).await +} + pub enum Format { Json, Cbor, Pack, } +pub async fn ws_recv_msg_with_fmt( + socket: &mut WsStream, + format: Format, +) -> Result> { + // Parse and return response + let mut f = socket.try_filter(|msg| match format { + Format::Json => futures_util::future::ready(msg.is_text()), + Format::Pack | Format::Cbor => futures_util::future::ready(msg.is_binary()), + }); + let msg: serde_json::Value = tokio::select! { + _ = time::sleep(time::Duration::from_millis(2000)) => { + return Err(TestError::NetworkError{message: "timeout waiting for the response".to_string()}.into()); + } + msg = f.select_next_some() => { + serde_json::from_str(&msg?.to_string())? + } + }; + Ok(serde_json::from_str(&msg.to_string())?) +} + pub async fn ws_send_msg_with_fmt( socket: &mut WsStream, msg_req: String, diff --git a/tests/ws_integration.rs b/tests/ws_integration.rs index e5fe9bd8..5969b81e 100644 --- a/tests/ws_integration.rs +++ b/tests/ws_integration.rs @@ -6,6 +6,7 @@ use serde_json::json; use serial_test::serial; use test_log::test; +use crate::common::error::TestError; use crate::common::{PASS, USER}; #[test(tokio::test)] @@ -325,8 +326,170 @@ async fn kill() -> Result<(), Box> { #[test(tokio::test)] #[serial] -async fn live() -> Result<(), Box> { - // TODO: implement +async fn live_live_endpoint() -> Result<(), Box> { + let (addr, _server) = common::start_server(false, false, true).await.unwrap(); + let table_name = "table_FD40A9A361884C56B5908A934164884A".to_string(); + + let socket = &mut common::connect_ws(&addr).await?; + + let ns = "3498b03b44b5452a9d3f15252b454db1"; + let db = "2cf93e52ff0a42f39d271412404a01f6"; + let _ = common::ws_signin(socket, USER, PASS, None, None, None).await?; + let _ = common::ws_use(socket, Some(ns), Some(db)).await?; + + // LIVE query via live endpoint + let live_id = common::ws_send_msg( + socket, + serde_json::to_string(&json!({ + "id": "1", + "method": "live", + "params": [ + table_name + ], + })) + .unwrap(), + ) + .await?; + + // Create some data for notification + let id = "an-id-goes-here"; + let query = format!(r#"INSERT INTO {} {{"id": "{}", "name": "ok"}};"#, table_name, id); + let created = common::ws_query(socket, query.as_str()).await.unwrap(); + assert_eq!(created.len(), 1); + + // Receive notification + let res = common::ws_recv_msg(socket).await.unwrap(); + + // Verify response contains no error + assert!( + res.as_object() + .ok_or(TestError::AssertionError { + message: format!("Unable to retrieve object from result: {}", res) + }) + .unwrap() + .keys() + .eq(["result"]), + "result: {}", + res + ); + + // Unwrap + let notification = &res + .as_object() + .ok_or(TestError::NetworkError { + message: format!("missing json object, res: {:?}", res).to_string(), + }) + .unwrap()["result"]; + assert_eq!( + ¬ification["id"], + live_id["result"].as_str().unwrap(), + "expected a notification id to match the live query id: {} but was {}", + ¬ification, + live_id + ); + let action = notification["action"].as_str().unwrap(); + let result = notification["result"].as_object().unwrap(); + + // Verify message on individual keys since the notification ID is random + assert_eq!(action, &serde_json::to_value("CREATE").unwrap(), "result: {:?}", res); + assert_eq!( + result["id"].as_str().ok_or(TestError::AssertionError { + message: format!("missing id, res: {:?}", res).to_string(), + })?, + format!("{}:⟨{}⟩", table_name, id), + "result: {:?}", + res + ); + assert_eq!( + result["name"].as_str().unwrap(), + serde_json::to_value("ok").unwrap(), + "result: {:?}", + res + ); + + Ok(()) +} + +#[test(tokio::test)] +#[serial] +async fn live_query_endpoint() -> Result<(), Box> { + let (addr, _server) = common::start_server(false, false, true).await.unwrap(); + let table_name = "table_FD40A9A361884C56B5908A934164884A".to_string(); + + let socket = &mut common::connect_ws(&addr).await?; + + let ns = "3498b03b44b5452a9d3f15252b454db1"; + let db = "2cf93e52ff0a42f39d271412404a01f6"; + let _ = common::ws_signin(socket, USER, PASS, None, None, None).await?; + let _ = common::ws_use(socket, Some(ns), Some(db)).await?; + + // LIVE query via query endpoint + let lq_res = + common::ws_query(socket, format!("LIVE SELECT * FROM {};", table_name).as_str()).await?; + assert_eq!(lq_res.len(), 1); + let live_id = lq_res + .get(0) + .ok_or(TestError::AssertionError { + message: "Expected 1 result after len check".to_string(), + }) + .unwrap(); + + // Create some data for notification + let id = "an-id-goes-here"; + let query = format!(r#"INSERT INTO {} {{"id": "{}", "name": "ok"}};"#, table_name, id); + let created = common::ws_query(socket, query.as_str()).await.unwrap(); + assert_eq!(created.len(), 1); + + // Receive notification + let res = common::ws_recv_msg(socket).await.unwrap(); + + // Verify response contains no error + assert!( + res.as_object() + .ok_or(TestError::AssertionError { + message: format!("Unable to retrieve object from result: {}", res) + }) + .unwrap() + .keys() + .eq(["result"]), + "result: {}", + res + ); + + // Unwrap + let notification = &res + .as_object() + .ok_or(TestError::NetworkError { + message: format!("missing json object, res: {:?}", res).to_string(), + }) + .unwrap()["result"]; + assert_eq!( + ¬ification["id"], + live_id["result"].as_str().unwrap(), + "expected a notification id to match the live query id: {} but was {}", + ¬ification, + live_id + ); + let action = notification["action"].as_str().unwrap(); + let result = notification["result"].as_object().unwrap(); + + // Verify message on individual keys since the notification ID is random + assert_eq!(action, &serde_json::to_value("CREATE").unwrap(), "result: {:?}", res); + assert_eq!( + result["id"].as_str().ok_or(TestError::AssertionError { + message: format!("missing id, res: {:?}", res).to_string(), + })?, + format!("{}:⟨{}⟩", table_name, id), + "result: {:?}", + res + ); + assert_eq!( + result["name"].as_str().unwrap(), + serde_json::to_value("ok").unwrap(), + "result: {:?}", + res + ); + Ok(()) }