Add tests for Live Queries over web sockets (#2382)
Co-authored-by: Salvador Girones <salvadorgirones@gmail.com> Co-authored-by: Tobie Morgan Hitchcock <tobie@surrealdb.com>
This commit is contained in:
parent
c48cc4affc
commit
abbf9819c4
4 changed files with 210 additions and 2 deletions
|
@ -23,6 +23,7 @@ async fn handler() -> impl IntoResponse {
|
||||||
// The transaction was successful
|
// The transaction was successful
|
||||||
Ok(mut tx) => {
|
Ok(mut tx) => {
|
||||||
// Cancel the transaction
|
// Cancel the transaction
|
||||||
|
trace!("Health endpoint cancelling transaction");
|
||||||
let _ = tx.cancel().await;
|
let _ = tx.cancel().await;
|
||||||
// Return the response
|
// Return the response
|
||||||
Ok(())
|
Ok(())
|
||||||
|
|
15
tests/common/error.rs
Normal file
15
tests/common/error.rs
Normal file
|
@ -0,0 +1,15 @@
|
||||||
|
use std::fmt::Debug;
|
||||||
|
use thiserror::Error;
|
||||||
|
|
||||||
|
#[derive(Debug, Error)]
|
||||||
|
pub enum TestError {
|
||||||
|
#[error("A network error occurred: {message}")]
|
||||||
|
NetworkError {
|
||||||
|
message: String,
|
||||||
|
},
|
||||||
|
|
||||||
|
#[error("An assertion failed as part of an invocation stack: {message}")]
|
||||||
|
AssertionError {
|
||||||
|
message: String,
|
||||||
|
},
|
||||||
|
}
|
|
@ -1,4 +1,7 @@
|
||||||
#![allow(dead_code)]
|
#![allow(dead_code)]
|
||||||
|
|
||||||
|
pub mod error;
|
||||||
|
|
||||||
use futures_util::{SinkExt, StreamExt, TryStreamExt};
|
use futures_util::{SinkExt, StreamExt, TryStreamExt};
|
||||||
use rand::{thread_rng, Rng};
|
use rand::{thread_rng, Rng};
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
|
@ -14,6 +17,8 @@ use tokio_tungstenite::tungstenite::Message;
|
||||||
use tokio_tungstenite::{connect_async, MaybeTlsStream, WebSocketStream};
|
use tokio_tungstenite::{connect_async, MaybeTlsStream, WebSocketStream};
|
||||||
use tracing::{error, info};
|
use tracing::{error, info};
|
||||||
|
|
||||||
|
use crate::common::error::TestError;
|
||||||
|
|
||||||
pub const USER: &str = "root";
|
pub const USER: &str = "root";
|
||||||
pub const PASS: &str = "root";
|
pub const PASS: &str = "root";
|
||||||
|
|
||||||
|
@ -190,12 +195,36 @@ pub async fn ws_send_msg(
|
||||||
ws_send_msg_with_fmt(socket, msg_req, Format::Json).await
|
ws_send_msg_with_fmt(socket, msg_req, Format::Json).await
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub async fn ws_recv_msg(socket: &mut WsStream) -> Result<serde_json::Value, Box<dyn Error>> {
|
||||||
|
ws_recv_msg_with_fmt(socket, Format::Json).await
|
||||||
|
}
|
||||||
|
|
||||||
pub enum Format {
|
pub enum Format {
|
||||||
Json,
|
Json,
|
||||||
Cbor,
|
Cbor,
|
||||||
Pack,
|
Pack,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub async fn ws_recv_msg_with_fmt(
|
||||||
|
socket: &mut WsStream,
|
||||||
|
format: Format,
|
||||||
|
) -> Result<serde_json::Value, Box<dyn Error>> {
|
||||||
|
// Parse and return response
|
||||||
|
let mut f = socket.try_filter(|msg| match format {
|
||||||
|
Format::Json => futures_util::future::ready(msg.is_text()),
|
||||||
|
Format::Pack | Format::Cbor => futures_util::future::ready(msg.is_binary()),
|
||||||
|
});
|
||||||
|
let msg: serde_json::Value = tokio::select! {
|
||||||
|
_ = time::sleep(time::Duration::from_millis(2000)) => {
|
||||||
|
return Err(TestError::NetworkError{message: "timeout waiting for the response".to_string()}.into());
|
||||||
|
}
|
||||||
|
msg = f.select_next_some() => {
|
||||||
|
serde_json::from_str(&msg?.to_string())?
|
||||||
|
}
|
||||||
|
};
|
||||||
|
Ok(serde_json::from_str(&msg.to_string())?)
|
||||||
|
}
|
||||||
|
|
||||||
pub async fn ws_send_msg_with_fmt(
|
pub async fn ws_send_msg_with_fmt(
|
||||||
socket: &mut WsStream,
|
socket: &mut WsStream,
|
||||||
msg_req: String,
|
msg_req: String,
|
||||||
|
|
|
@ -6,6 +6,7 @@ use serde_json::json;
|
||||||
use serial_test::serial;
|
use serial_test::serial;
|
||||||
use test_log::test;
|
use test_log::test;
|
||||||
|
|
||||||
|
use crate::common::error::TestError;
|
||||||
use crate::common::{PASS, USER};
|
use crate::common::{PASS, USER};
|
||||||
|
|
||||||
#[test(tokio::test)]
|
#[test(tokio::test)]
|
||||||
|
@ -325,8 +326,170 @@ async fn kill() -> Result<(), Box<dyn std::error::Error>> {
|
||||||
|
|
||||||
#[test(tokio::test)]
|
#[test(tokio::test)]
|
||||||
#[serial]
|
#[serial]
|
||||||
async fn live() -> Result<(), Box<dyn std::error::Error>> {
|
async fn live_live_endpoint() -> Result<(), Box<dyn std::error::Error>> {
|
||||||
// TODO: implement
|
let (addr, _server) = common::start_server(false, false, true).await.unwrap();
|
||||||
|
let table_name = "table_FD40A9A361884C56B5908A934164884A".to_string();
|
||||||
|
|
||||||
|
let socket = &mut common::connect_ws(&addr).await?;
|
||||||
|
|
||||||
|
let ns = "3498b03b44b5452a9d3f15252b454db1";
|
||||||
|
let db = "2cf93e52ff0a42f39d271412404a01f6";
|
||||||
|
let _ = common::ws_signin(socket, USER, PASS, None, None, None).await?;
|
||||||
|
let _ = common::ws_use(socket, Some(ns), Some(db)).await?;
|
||||||
|
|
||||||
|
// LIVE query via live endpoint
|
||||||
|
let live_id = common::ws_send_msg(
|
||||||
|
socket,
|
||||||
|
serde_json::to_string(&json!({
|
||||||
|
"id": "1",
|
||||||
|
"method": "live",
|
||||||
|
"params": [
|
||||||
|
table_name
|
||||||
|
],
|
||||||
|
}))
|
||||||
|
.unwrap(),
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
// Create some data for notification
|
||||||
|
let id = "an-id-goes-here";
|
||||||
|
let query = format!(r#"INSERT INTO {} {{"id": "{}", "name": "ok"}};"#, table_name, id);
|
||||||
|
let created = common::ws_query(socket, query.as_str()).await.unwrap();
|
||||||
|
assert_eq!(created.len(), 1);
|
||||||
|
|
||||||
|
// Receive notification
|
||||||
|
let res = common::ws_recv_msg(socket).await.unwrap();
|
||||||
|
|
||||||
|
// Verify response contains no error
|
||||||
|
assert!(
|
||||||
|
res.as_object()
|
||||||
|
.ok_or(TestError::AssertionError {
|
||||||
|
message: format!("Unable to retrieve object from result: {}", res)
|
||||||
|
})
|
||||||
|
.unwrap()
|
||||||
|
.keys()
|
||||||
|
.eq(["result"]),
|
||||||
|
"result: {}",
|
||||||
|
res
|
||||||
|
);
|
||||||
|
|
||||||
|
// Unwrap
|
||||||
|
let notification = &res
|
||||||
|
.as_object()
|
||||||
|
.ok_or(TestError::NetworkError {
|
||||||
|
message: format!("missing json object, res: {:?}", res).to_string(),
|
||||||
|
})
|
||||||
|
.unwrap()["result"];
|
||||||
|
assert_eq!(
|
||||||
|
¬ification["id"],
|
||||||
|
live_id["result"].as_str().unwrap(),
|
||||||
|
"expected a notification id to match the live query id: {} but was {}",
|
||||||
|
¬ification,
|
||||||
|
live_id
|
||||||
|
);
|
||||||
|
let action = notification["action"].as_str().unwrap();
|
||||||
|
let result = notification["result"].as_object().unwrap();
|
||||||
|
|
||||||
|
// Verify message on individual keys since the notification ID is random
|
||||||
|
assert_eq!(action, &serde_json::to_value("CREATE").unwrap(), "result: {:?}", res);
|
||||||
|
assert_eq!(
|
||||||
|
result["id"].as_str().ok_or(TestError::AssertionError {
|
||||||
|
message: format!("missing id, res: {:?}", res).to_string(),
|
||||||
|
})?,
|
||||||
|
format!("{}:⟨{}⟩", table_name, id),
|
||||||
|
"result: {:?}",
|
||||||
|
res
|
||||||
|
);
|
||||||
|
assert_eq!(
|
||||||
|
result["name"].as_str().unwrap(),
|
||||||
|
serde_json::to_value("ok").unwrap(),
|
||||||
|
"result: {:?}",
|
||||||
|
res
|
||||||
|
);
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test(tokio::test)]
|
||||||
|
#[serial]
|
||||||
|
async fn live_query_endpoint() -> Result<(), Box<dyn std::error::Error>> {
|
||||||
|
let (addr, _server) = common::start_server(false, false, true).await.unwrap();
|
||||||
|
let table_name = "table_FD40A9A361884C56B5908A934164884A".to_string();
|
||||||
|
|
||||||
|
let socket = &mut common::connect_ws(&addr).await?;
|
||||||
|
|
||||||
|
let ns = "3498b03b44b5452a9d3f15252b454db1";
|
||||||
|
let db = "2cf93e52ff0a42f39d271412404a01f6";
|
||||||
|
let _ = common::ws_signin(socket, USER, PASS, None, None, None).await?;
|
||||||
|
let _ = common::ws_use(socket, Some(ns), Some(db)).await?;
|
||||||
|
|
||||||
|
// LIVE query via query endpoint
|
||||||
|
let lq_res =
|
||||||
|
common::ws_query(socket, format!("LIVE SELECT * FROM {};", table_name).as_str()).await?;
|
||||||
|
assert_eq!(lq_res.len(), 1);
|
||||||
|
let live_id = lq_res
|
||||||
|
.get(0)
|
||||||
|
.ok_or(TestError::AssertionError {
|
||||||
|
message: "Expected 1 result after len check".to_string(),
|
||||||
|
})
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
// Create some data for notification
|
||||||
|
let id = "an-id-goes-here";
|
||||||
|
let query = format!(r#"INSERT INTO {} {{"id": "{}", "name": "ok"}};"#, table_name, id);
|
||||||
|
let created = common::ws_query(socket, query.as_str()).await.unwrap();
|
||||||
|
assert_eq!(created.len(), 1);
|
||||||
|
|
||||||
|
// Receive notification
|
||||||
|
let res = common::ws_recv_msg(socket).await.unwrap();
|
||||||
|
|
||||||
|
// Verify response contains no error
|
||||||
|
assert!(
|
||||||
|
res.as_object()
|
||||||
|
.ok_or(TestError::AssertionError {
|
||||||
|
message: format!("Unable to retrieve object from result: {}", res)
|
||||||
|
})
|
||||||
|
.unwrap()
|
||||||
|
.keys()
|
||||||
|
.eq(["result"]),
|
||||||
|
"result: {}",
|
||||||
|
res
|
||||||
|
);
|
||||||
|
|
||||||
|
// Unwrap
|
||||||
|
let notification = &res
|
||||||
|
.as_object()
|
||||||
|
.ok_or(TestError::NetworkError {
|
||||||
|
message: format!("missing json object, res: {:?}", res).to_string(),
|
||||||
|
})
|
||||||
|
.unwrap()["result"];
|
||||||
|
assert_eq!(
|
||||||
|
¬ification["id"],
|
||||||
|
live_id["result"].as_str().unwrap(),
|
||||||
|
"expected a notification id to match the live query id: {} but was {}",
|
||||||
|
¬ification,
|
||||||
|
live_id
|
||||||
|
);
|
||||||
|
let action = notification["action"].as_str().unwrap();
|
||||||
|
let result = notification["result"].as_object().unwrap();
|
||||||
|
|
||||||
|
// Verify message on individual keys since the notification ID is random
|
||||||
|
assert_eq!(action, &serde_json::to_value("CREATE").unwrap(), "result: {:?}", res);
|
||||||
|
assert_eq!(
|
||||||
|
result["id"].as_str().ok_or(TestError::AssertionError {
|
||||||
|
message: format!("missing id, res: {:?}", res).to_string(),
|
||||||
|
})?,
|
||||||
|
format!("{}:⟨{}⟩", table_name, id),
|
||||||
|
"result: {:?}",
|
||||||
|
res
|
||||||
|
);
|
||||||
|
assert_eq!(
|
||||||
|
result["name"].as_str().unwrap(),
|
||||||
|
serde_json::to_value("ok").unwrap(),
|
||||||
|
"result: {:?}",
|
||||||
|
res
|
||||||
|
);
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue