// RUST_LOG=warn cargo make ci-ws-integration
mod common;

mod ws_integration {
	use std::time::Duration;

	use serde_json::json;
	use test_log::test;

	use super::common::{self, PASS, USER};
	use crate::common::error::TestError;

	#[test(tokio::test)]
	async fn ping() -> Result<(), Box<dyn std::error::Error>> {
		let (addr, _server) = common::start_server_with_defaults().await.unwrap();
		let socket = &mut common::connect_ws(&addr).await?;

		// Send command
		let res = common::ws_send_msg_and_wait_response(
			socket,
			serde_json::to_string(&json!({
				"id": "1",
				"method": "ping",
			}))
			.unwrap(),
		)
		.await;
		assert!(res.is_ok(), "result: {:?}", res);
		let res = res.unwrap();
		assert!(res.is_object(), "result: {:?}", res);
		let res = res.as_object().unwrap();
		assert!(res.keys().all(|k| ["id", "result"].contains(&k.as_str())), "result: {:?}", res);

		Ok(())
	}

	#[test(tokio::test)]
	async fn info() -> Result<(), Box<dyn std::error::Error>> {
		let (addr, _server) = common::start_server_with_defaults().await.unwrap();
		let socket = &mut common::connect_ws(&addr).await?;

		//
		// Prepare the connection
		//
		let res = common::ws_signin(socket, USER, PASS, None, None, None).await;
		assert!(res.is_ok(), "result: {:?}", res);
		let res = common::ws_use(socket, Some("N"), Some("D")).await;
		assert!(res.is_ok(), "result: {:?}", res);

		//
		// Setup operations
		//
		let res = common::ws_query(socket, "DEFINE TABLE user PERMISSIONS FULL").await;
		assert!(res.is_ok(), "result: {:?}", res);
		let res = common::ws_query(
			socket,
			r#"
			DEFINE SCOPE scope SESSION 24h
				SIGNUP ( CREATE user SET user = $user, pass = crypto::argon2::generate($pass) )
				SIGNIN ( SELECT * FROM user WHERE user = $user AND crypto::argon2::compare(pass, $pass) )
			;
			"#,
		)
		.await;
		assert!(res.is_ok(), "result: {:?}", res);
		let res = common::ws_query(
			socket,
			r#"
			CREATE user CONTENT {
				user: 'user',
				pass: crypto::argon2::generate('pass')
			};
			"#,
		)
		.await;
		assert!(res.is_ok(), "result: {:?}", res);

		// Sign in
		let res =
			common::ws_signin(socket, "user", "pass", Some("N"), Some("D"), Some("scope")).await;
		assert!(res.is_ok(), "result: {:?}", res);

		// Send the info command
		let res = common::ws_send_msg_and_wait_response(
			socket,
			serde_json::to_string(&json!({
				"id": "1",
				"method": "info",
			}))
			.unwrap(),
		)
		.await;
		assert!(res.is_ok(), "result: {:?}", res);

		// Verify the response contains the expected info
		let res = res.unwrap();
		assert!(res["result"].is_object(), "result: {:?}", res);
		let res = res["result"].as_object().unwrap();
		assert_eq!(res["user"], "user", "result: {:?}", res);

		Ok(())
	}

	#[test(tokio::test)]
	async fn signup() -> Result<(), Box<dyn std::error::Error>> {
		let (addr, _server) = common::start_server_with_defaults().await.unwrap();
		let socket = &mut common::connect_ws(&addr).await?;

		//
		// Prepare the connection
		//
		let res = common::ws_signin(socket, USER, PASS, None, None, None).await;
		assert!(res.is_ok(), "result: {:?}", res);
		let res = common::ws_use(socket, Some("N"), Some("D")).await;
		assert!(res.is_ok(), "result: {:?}", res);

		// Setup scope
		let res = common::ws_query(
			socket,
			r#"
			DEFINE SCOPE scope SESSION 24h
				SIGNUP ( CREATE user SET email = $email, pass = crypto::argon2::generate($pass) )
				SIGNIN ( SELECT * FROM user WHERE email = $email AND crypto::argon2::compare(pass, $pass) )
			;"#,
		)
		.await;
		assert!(res.is_ok(), "result: {:?}", res);

		// Signup
		let res = common::ws_send_msg_and_wait_response(
			socket,
			serde_json::to_string(&json!({
				"id": "1",
				"method": "signup",
				"params": [{
					"ns": "N",
					"db": "D",
					"sc": "scope",
					"email": "email@email.com",
					"pass": "pass",
				}],
			}))
			.unwrap(),
		)
		.await;
		assert!(res.is_ok(), "result: {:?}", res);
		let res = res.unwrap();
		assert!(res.is_object(), "result: {:?}", res);
		let res = res.as_object().unwrap();

		// Verify response contains no error
		assert!(res.keys().all(|k| ["id", "result"].contains(&k.as_str())), "result: {:?}", res);

		// Verify it returns a token
		assert!(res["result"].is_string(), "result: {:?}", res);

		let res = res["result"].as_str().unwrap();
		assert!(res.starts_with("eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzUxMiJ9"), "result: {}", res);
		Ok(())
	}

	#[test(tokio::test)]
	async fn signin() -> Result<(), Box<dyn std::error::Error>> {
		let (addr, _server) = common::start_server_with_defaults().await.unwrap();
		let socket = &mut common::connect_ws(&addr).await?;

		//
		// Prepare the connection
		//
		let res = common::ws_signin(socket, USER, PASS, None, None, None).await;
		assert!(res.is_ok(), "result: {:?}", res);
		let res = common::ws_use(socket, Some("N"), Some("D")).await;
		assert!(res.is_ok(), "result: {:?}", res);

		// Setup scope
		let res = common::ws_query(
			socket,
			r#"
			DEFINE SCOPE scope SESSION 24h
				SIGNUP ( CREATE user SET email = $email, pass = crypto::argon2::generate($pass) )
				SIGNIN ( SELECT * FROM user WHERE email = $email AND crypto::argon2::compare(pass, $pass) )
			;"#,
		)
		.await;
		assert!(res.is_ok(), "result: {:?}", res);

		// Signup
		let res = common::ws_send_msg_and_wait_response(
			socket,
			serde_json::to_string(&json!({
				"id": "1",
				"method": "signup",
				"params": [{
					"ns": "N",
					"db": "D",
					"sc": "scope",
					"email": "email@email.com",
					"pass": "pass",
				}],
			}))
			.unwrap(),
		)
		.await;
		assert!(res.is_ok(), "result: {:?}", res);

		// Sign in
		let res = common::ws_send_msg_and_wait_response(
			socket,
			serde_json::to_string(&json!({
				"id": "1",
				"method": "signin",
				"params": [{
					"ns": "N",
					"db": "D",
					"sc": "scope",
					"email": "email@email.com",
					"pass": "pass",
				}],
			}))
			.unwrap(),
		)
		.await;
		assert!(res.is_ok(), "result: {:?}", res);
		let res = res.unwrap();
		assert!(res.is_object(), "result: {:?}", res);
		let res = res.as_object().unwrap();

		// Verify response contains no error
		assert!(res.keys().all(|k| ["id", "result"].contains(&k.as_str())), "result: {:?}", res);

		// Verify it returns a token
		assert!(res["result"].is_string(), "result: {:?}", res);
		let res = res["result"].as_str().unwrap();
		assert!(res.starts_with("eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzUxMiJ9"), "result: {}", res);

		Ok(())
	}

	#[test(tokio::test)]
	async fn variable_auth_live_query() -> Result<(), Box<dyn std::error::Error>> {
		let (addr, _server) = common::start_server_with_defaults().await.unwrap();
		let socket = &mut common::connect_ws(&addr).await?;

		//
		// Prepare the connection
		//
		let res = common::ws_signin(socket, USER, PASS, None, None, None).await;
		assert!(res.is_ok(), "result: {:?}", res);
		let res = common::ws_use(socket, Some("N"), Some("D")).await;
		assert!(res.is_ok(), "result: {:?}", res);

		// Setup scope
		let res = common::ws_query(socket, r#"
        DEFINE SCOPE scope SESSION 2s
            SIGNUP ( CREATE user SET user = $user, pass = crypto::argon2::generate($pass) )
            SIGNIN ( SELECT * FROM user WHERE user = $user AND crypto::argon2::compare(pass, $pass) )
        ;"#).await;
		assert!(res.is_ok(), "result: {:?}", res);

		// Signup
		let res = common::ws_send_msg_and_wait_response(
			socket,
			serde_json::to_string(&json!({
				"id": "1",
				"method": "signup",
				"params": [{
					"ns": "N",
					"db": "D",
					"sc": "scope",
					"user": "user",
					"pass": "pass",
				}],
			}))
			.unwrap(),
		)
		.await;
		assert!(res.is_ok(), "result: {:?}", res);

		// Sign in
		let res =
			common::ws_signin(socket, "user", "pass", Some("N"), Some("D"), Some("scope")).await;
		assert!(res.is_ok(), "result: {:?}", res);
		let res = res.unwrap();

		// Start Live Query
		let table_name = "test_tableBB4B0A788C7E46E798720AEF938CBCF6";
		let _live_query_response = common::ws_send_msg_and_wait_response(
			socket,
			serde_json::to_string(&json!({
					"id": "66BB05C8-EF4B-4338-BCCD-8F8A19223CB1",
					"method": "live",
					"params": [
						table_name
					],
			}))
			.unwrap(),
		)
		.await
		.unwrap_or_else(|e| panic!("Error sending message: {}", e))
		.as_object()
		.unwrap_or_else(|| panic!("Expected object, got {:?}", res));

		// Wait 2 seconds for auth to expire
		tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;

		// Start second connection
		let socket2 = &mut common::connect_ws(&addr).await?;

		// Signin
		let res =
			common::ws_signin(socket2, "user", "pass", Some("N"), Some("D"), Some("scope")).await;
		assert!(res.is_ok(), "result: {:?}", res);

		// Insert
		let id = "A23A05ABC15C420E9A7E13D2C8657890";
		let query = format!(r#"INSERT INTO {} {{"id": "{}", "name": "ok"}};"#, table_name, id);
		let created = common::ws_query(socket2, query.as_str()).await.unwrap();
		assert_eq!(created.len(), 1);

		// Validate live query from first session didnt produce a result
		let res = common::ws_recv_msg(socket).await;
		match &res {
			Err(e) => {
				if let Some(TestError::NetworkError {
					..
				}) = e.downcast_ref::<TestError>()
				{
				} else {
					panic!("Expected a network error, but got: {:?}", e)
				}
			}
			Ok(v) => {
				panic!("Expected a network error, but got: {:?}", v)
			}
		}
		Ok(())
	}

	#[test(tokio::test)]
	async fn invalidate() -> Result<(), Box<dyn std::error::Error>> {
		let (addr, _server) = common::start_server_with_defaults().await.unwrap();
		let socket = &mut common::connect_ws(&addr).await?;

		//
		// Prepare the connection
		//
		let res = common::ws_signin(socket, USER, PASS, None, None, None).await;
		assert!(res.is_ok(), "result: {:?}", res);

		// Verify we have a ROOT session
		let res = common::ws_query(socket, "DEFINE NAMESPACE NS").await;
		assert!(res.is_ok(), "result: {:?}", res);

		// Invalidate session
		let res = common::ws_send_msg_and_wait_response(
			socket,
			serde_json::to_string(&json!({
				"id": "1",
				"method": "invalidate",
			}))
			.unwrap(),
		)
		.await;
		assert!(res.is_ok(), "result: {:?}", res);

		// Verify we invalidated the root session
		let res = common::ws_query(socket, "DEFINE NAMESPACE NS2").await;
		assert!(res.is_ok(), "result: {:?}", res);
		let res = res.unwrap();

		assert_eq!(res[0]["status"], "ERR", "result: {:?}", res);
		assert_eq!(
			res[0]["result"], "IAM error: Not enough permissions to perform this action",
			"result: {:?}",
			res
		);
		Ok(())
	}

	#[test(tokio::test)]
	async fn authenticate() -> Result<(), Box<dyn std::error::Error>> {
		let (addr, _server) = common::start_server_with_defaults().await.unwrap();
		let socket = &mut common::connect_ws(&addr).await?;

		//
		// Prepare the connection
		//
		let res = common::ws_signin(socket, USER, PASS, None, None, None).await;
		assert!(res.is_ok(), "result: {:?}", res);
		let token = res.unwrap();

		// Reconnect so we start with an empty session
		socket.close(None).await?;
		let socket = &mut common::connect_ws(&addr).await?;

		//
		// Authenticate with the token
		//

		// Send command
		let res = common::ws_send_msg_and_wait_response(
			socket,
			serde_json::to_string(&json!({
				"id": "1",
				"method": "authenticate",
				"params": [
					token,

				],
			}))
			.unwrap(),
		)
		.await;
		assert!(res.is_ok(), "result: {:?}", res);

		// Verify we have a ROOT session
		let res = common::ws_query(socket, "DEFINE NAMESPACE D2").await;
		assert!(res.is_ok(), "result: {:?}", res);
		let res = res.unwrap();
		assert_eq!(res[0]["status"], "OK", "result: {:?}", res);
		Ok(())
	}

	#[test(tokio::test)]
	async fn kill_kill_endpoint() -> Result<(), Box<dyn std::error::Error>> {
		let (addr, _server) = common::start_server_with_defaults().await.unwrap();
		let table_name = "table_D250F804BC244558982DB7D8712F6BE3".to_string();

		let socket = &mut common::connect_ws(&addr).await?;

		let ns = "DE4E65C08E7248FB851CBB4D939C13C7";
		let db = "D7C40F656162434DB4888E334032B52C";
		let _ = common::ws_signin(socket, USER, PASS, None, None, None).await?;
		let _ = common::ws_use(socket, Some(ns), Some(db)).await?;

		// LIVE query via live endpoint
		let live_res = common::ws_send_msg_and_wait_response(
			socket,
			serde_json::to_string(&json!({
					"id": "1",
					"method": "live",
					"params": [
						table_name
					],
			}))
			.unwrap(),
		)
		.await?;
		let live_id = live_res["result"].as_str().unwrap();

		// KILL query via kill endpoint
		common::ws_send_msg(
			socket,
			serde_json::to_string(&json!({
					"id": "1",
					"method": "kill",
					"params": [
						live_id
					],
			}))
			.unwrap(),
		)
		.await?;

		// Verify we killed the query
		let msgs = common::ws_recv_all_msgs(socket, 1, Duration::from_millis(1000)).await?;
		assert!(
			msgs.iter().all(|v| v["error"].is_null()),
			"Unexpected error received: {:#?}",
			msgs
		);
		let msg = msgs.get(0).unwrap();
		assert!(msg["status"].is_null(), "unexpected status: {:?}", msg);

		// Create some data for notification
		let id = "an-id-goes-here";
		let query = format!(r#"INSERT INTO {} {{"id": "{}", "name": "ok"}};"#, table_name, id);
		let _ = common::ws_query(socket, query.as_str()).await.unwrap();
		let json = json!({
			"id": "1",
			"method": "query",
			"params": [query],
		});

		common::ws_send_msg(socket, serde_json::to_string(&json).unwrap()).await?;

		// Wait some time for all messages to arrive, and then verify we didn't get any notification
		let msgs = common::ws_recv_all_msgs(socket, 1, Duration::from_millis(500)).await?;
		assert!(
			msgs.iter().all(|v| v["error"].is_null()),
			"Unexpected error received: {:#?}",
			msgs
		);
		let lq_notif = msgs.iter().find(|v| common::ws_msg_is_notification_from_lq(v, live_id));
		assert!(lq_notif.is_none(), "Expected to find no notifications, found 1: {:#?}", msgs);

		Ok(())
	}

	#[test(tokio::test)]
	async fn kill_query_endpoint() -> Result<(), Box<dyn std::error::Error>> {
		let (addr, _server) = common::start_server_with_defaults().await.unwrap();
		let table_name = "table_8B5E5635869E4FF2A35C94E8FC2CAE9A".to_string();

		let socket = &mut common::connect_ws(&addr).await?;

		let ns = "3CB1D26373AF45F78D836EF2F78384A2";
		let db = "622772B60DEB46958B6450EE43ED2515";
		let _ = common::ws_signin(socket, USER, PASS, None, None, None).await?;
		let _ = common::ws_use(socket, Some(ns), Some(db)).await?;

		// LIVE query via live endpoint
		let live_res = common::ws_send_msg_and_wait_response(
			socket,
			serde_json::to_string(&json!({
					"id": "1",
					"method": "live",
					"params": [
						table_name
					],
			}))
			.unwrap(),
		)
		.await?;
		let live_id = live_res["result"].as_str().unwrap();

		// KILL query via kill endpoint
		let kill_query = format!("KILL '{live_id}'");
		common::ws_send_msg(
			socket,
			serde_json::to_string(&json!({
					"id": "1",
					"method": "query",
					"params": [
						kill_query
					],
			}))
			.unwrap(),
		)
		.await?;

		// Verify we killed the query
		let msgs = common::ws_recv_all_msgs(socket, 1, Duration::from_millis(1000)).await?;
		assert!(
			msgs.iter().all(|v| v["error"].is_null()),
			"Unexpected error received: {:#?}",
			msgs
		);
		let msg = msgs.get(0).unwrap();
		assert!(msg["status"].is_null(), "unexpected status: {:?}", msg);

		// Create some data for notification
		let id = "an-id-goes-here";
		let query = format!(r#"INSERT INTO {} {{"id": "{}", "name": "ok"}};"#, table_name, id);
		let _ = common::ws_query(socket, query.as_str()).await.unwrap();
		let json = json!({
			"id": "1",
			"method": "query",
			"params": [query],
		});

		common::ws_send_msg(socket, serde_json::to_string(&json).unwrap()).await?;

		// Wait some time for all messages to arrive, and then verify we didn't get any notification
		let msgs = common::ws_recv_all_msgs(socket, 1, Duration::from_millis(500)).await?;
		assert!(
			msgs.iter().all(|v| v["error"].is_null()),
			"Unexpected error received: {:#?}",
			msgs
		);
		let lq_notif = msgs.iter().find(|v| common::ws_msg_is_notification_from_lq(v, live_id));
		assert!(lq_notif.is_none(), "Expected to find no notifications, found 1: {:#?}", msgs);

		Ok(())
	}

	#[test(tokio::test)]
	async fn live_live_endpoint() -> Result<(), Box<dyn std::error::Error>> {
		let (addr, _server) = common::start_server_without_auth().await.unwrap();
		let table_name = "table_FD40A9A361884C56B5908A934164884A".to_string();

		let socket = &mut common::connect_ws(&addr).await?;

		let ns = "3498b03b44b5452a9d3f15252b454db1";
		let db = "2cf93e52ff0a42f39d271412404a01f6";
		let _ = common::ws_signin(socket, USER, PASS, None, None, None).await?;
		let _ = common::ws_use(socket, Some(ns), Some(db)).await?;

		// LIVE query via live endpoint
		let live_res = common::ws_send_msg_and_wait_response(
			socket,
			serde_json::to_string(&json!({
					"id": "1",
					"method": "live",
					"params": [
						table_name
					],
			}))
			.unwrap(),
		)
		.await?;
		let live_id = live_res["result"].as_str().unwrap();

		// Create some data for notification
		// Manually send the query and wait for multiple messages. Ordering of the messages is not guaranteed, so we could receive the notification before the query result.
		let id = "an-id-goes-here";
		let query = format!(r#"INSERT INTO {} {{"id": "{}", "name": "ok"}};"#, table_name, id);
		let json = json!({
			"id": "1",
			"method": "query",
			"params": [query],
		});

		common::ws_send_msg(socket, serde_json::to_string(&json).unwrap()).await?;

		// Wait some time for all messages to arrive, and then search for the notification message
		let msgs = common::ws_recv_all_msgs(socket, 2, Duration::from_millis(500)).await;
		assert!(msgs.is_ok(), "Error waiting for messages: {:?}", msgs.err());
		let msgs = msgs.unwrap();
		assert!(
			msgs.iter().all(|v| v["error"].is_null()),
			"Unexpected error received: {:#?}",
			msgs
		);

		let lq_notif = msgs.iter().find(|v| common::ws_msg_is_notification_from_lq(v, live_id));
		assert!(
			lq_notif.is_some(),
			"Expected to find a notification for LQ id {}: {:#?}",
			live_id,
			msgs
		);
		// Extract the notification object
		let lq_notif = lq_notif.unwrap();
		let lq_notif = lq_notif["result"].as_object().unwrap();

		// Verify message on individual keys since the notification ID is random
		let action = lq_notif["action"].as_str().unwrap();
		let result = lq_notif["result"].as_object().unwrap();
		assert_eq!(action, "CREATE", "expected notification to be `CREATE`: {:?}", lq_notif);
		let expected_id = format!("{}:⟨{}⟩", table_name, id);
		assert_eq!(
			result["id"].as_str(),
			Some(expected_id.as_str()),
			"expected notification to have id {:?}: {:?}",
			expected_id,
			lq_notif
		);
		assert_eq!(
			result["name"].as_str(),
			Some("ok"),
			"expected notification to have name `ok`: {:?}",
			lq_notif
		);

		Ok(())
	}

	#[test(tokio::test)]
	async fn live_query_endpoint() -> Result<(), Box<dyn std::error::Error>> {
		let (addr, _server) = common::start_server_without_auth().await.unwrap();
		let table_name = "table_FD40A9A361884C56B5908A934164884A".to_string();

		let socket = &mut common::connect_ws(&addr).await?;

		let ns = "3498b03b44b5452a9d3f15252b454db1";
		let db = "2cf93e52ff0a42f39d271412404a01f6";
		let _ = common::ws_signin(socket, USER, PASS, None, None, None).await?;
		let _ = common::ws_use(socket, Some(ns), Some(db)).await?;

		// LIVE query via query endpoint
		let lq_res =
			common::ws_query(socket, format!("LIVE SELECT * FROM {};", table_name).as_str())
				.await?;
		assert_eq!(lq_res.len(), 1, "Expected 1 result got: {:?}", lq_res);
		let live_id = lq_res[0]["result"].as_str().unwrap();

		// Create some data for notification
		// Manually send the query and wait for multiple messages. Ordering of the messages is not guaranteed, so we could receive the notification before the query result.
		let id = "an-id-goes-here";
		let query = format!(r#"INSERT INTO {} {{"id": "{}", "name": "ok"}};"#, table_name, id);
		let json = json!({
			"id": "1",
			"method": "query",
			"params": [query],
		});

		common::ws_send_msg(socket, serde_json::to_string(&json).unwrap()).await?;

		// Wait some time for all messages to arrive, and then search for the notification message
		let msgs = common::ws_recv_all_msgs(socket, 2, Duration::from_millis(500)).await;
		assert!(msgs.is_ok(), "Error waiting for messages: {:?}", msgs.err());
		let msgs = msgs.unwrap();
		assert!(
			msgs.iter().all(|v| v["error"].is_null()),
			"Unexpected error received: {:#?}",
			msgs
		);

		let lq_notif = msgs.iter().find(|v| common::ws_msg_is_notification_from_lq(v, live_id));
		assert!(
			lq_notif.is_some(),
			"Expected to find a notification for LQ id {}: {:#?}",
			live_id,
			msgs
		);

		// Extract the notification object
		let lq_notif = lq_notif.unwrap();
		let lq_notif = lq_notif["result"].as_object().unwrap();

		// Verify message on individual keys since the notification ID is random
		let action = lq_notif["action"].as_str().unwrap();
		let result = lq_notif["result"].as_object().unwrap();
		assert_eq!(action, "CREATE", "expected notification to be `CREATE`: {:?}", lq_notif);
		let expected_id = format!("{}:⟨{}⟩", table_name, id);
		assert_eq!(
			result["id"].as_str(),
			Some(expected_id.as_str()),
			"expected notification to have id {:?}: {:?}",
			expected_id,
			lq_notif
		);
		assert_eq!(
			result["name"].as_str(),
			Some("ok"),
			"expected notification to have name `ok`: {:?}",
			lq_notif
		);

		Ok(())
	}

	#[test(tokio::test)]
	async fn let_and_set() -> Result<(), Box<dyn std::error::Error>> {
		let (addr, _server) = common::start_server_with_defaults().await.unwrap();
		let socket = &mut common::connect_ws(&addr).await?;

		//
		// Prepare the connection
		//
		let res = common::ws_use(socket, Some("N"), Some("D")).await;
		assert!(res.is_ok(), "result: {:?}", res);

		// Define variable using let
		let res = common::ws_send_msg_and_wait_response(
			socket,
			serde_json::to_string(&json!({
				"id": "1",
				"method": "let",
				"params": [
					"let_var", "let_value",
				],
			}))
			.unwrap(),
		)
		.await;
		assert!(res.is_ok(), "result: {:?}", res);

		// Define variable using set
		let res = common::ws_send_msg_and_wait_response(
			socket,
			serde_json::to_string(&json!({
				"id": "1",
				"method": "set",
				"params": [
					"set_var", "set_value",
				],
			}))
			.unwrap(),
		)
		.await;
		assert!(res.is_ok(), "result: {:?}", res);

		// Verify the variables are set
		let res = common::ws_query(socket, "SELECT * FROM $let_var, $set_var").await?;
		assert_eq!(
			res[0]["result"],
			serde_json::to_value(["let_value", "set_value"]).unwrap(),
			"result: {:?}",
			res
		);
		Ok(())
	}

	#[test(tokio::test)]
	async fn unset() -> Result<(), Box<dyn std::error::Error>> {
		let (addr, _server) = common::start_server_with_defaults().await.unwrap();
		let socket = &mut common::connect_ws(&addr).await?;

		//
		// Prepare the connection
		//
		let res = common::ws_use(socket, Some("N"), Some("D")).await;
		assert!(res.is_ok(), "result: {:?}", res);

		// Define variable
		let res = common::ws_send_msg_and_wait_response(
			socket,
			serde_json::to_string(&json!({
				"id": "1",
				"method": "let",
				"params": [
					"let_var", "let_value",
				],
			}))
			.unwrap(),
		)
		.await;
		assert!(res.is_ok(), "result: {:?}", res);

		// Verify the variable is set
		let res = common::ws_query(socket, "SELECT * FROM $let_var").await;
		assert!(res.is_ok(), "result: {:?}", res);
		let res = res.unwrap();
		assert!(res[0]["result"].is_array(), "result: {:?}", res);
		let res = res[0]["result"].as_array().unwrap();

		assert_eq!(res[0], "let_value", "result: {:?}", res);

		// Unset variable
		let res = common::ws_send_msg_and_wait_response(
			socket,
			serde_json::to_string(&json!({
				"id": "1",
				"method": "unset",
				"params": [
					"let_var",
				],
			}))
			.unwrap(),
		)
		.await;
		assert!(res.is_ok(), "result: {:?}", res);

		// Verify the variable is unset
		let res = common::ws_query(socket, "SELECT * FROM $let_var").await;
		assert!(res.is_ok(), "result: {:?}", res);
		let res = res.unwrap();
		assert!(res[0]["result"].is_array(), "result: {:?}", res);
		let res = res[0]["result"].as_array().unwrap();

		assert!(res[0].is_null(), "result: {:?}", res);

		Ok(())
	}

	#[test(tokio::test)]
	async fn select() -> Result<(), Box<dyn std::error::Error>> {
		let (addr, _server) = common::start_server_with_defaults().await.unwrap();
		let socket = &mut common::connect_ws(&addr).await?;

		//
		// Prepare the connection
		//
		let res = common::ws_signin(socket, USER, PASS, None, None, None).await;
		assert!(res.is_ok(), "result: {:?}", res);
		let res = common::ws_use(socket, Some("N"), Some("D")).await;
		assert!(res.is_ok(), "result: {:?}", res);

		//
		// Setup the database
		//
		let res = common::ws_query(socket, "CREATE foo").await;
		assert!(res.is_ok(), "result: {:?}", res);

		// Select data
		let res = common::ws_send_msg_and_wait_response(
			socket,
			serde_json::to_string(&json!({
				"id": "1",
				"method": "select",
				"params": [
					"foo",
				],
			}))
			.unwrap(),
		)
		.await;
		assert!(res.is_ok(), "result: {:?}", res);
		let res = res.unwrap();
		assert!(res["result"].is_array(), "result: {:?}", res);
		let res = res["result"].as_array().unwrap();

		// Verify the response contains the output of the select
		assert_eq!(res.len(), 1, "result: {:?}", res);

		Ok(())
	}

	#[test(tokio::test)]
	async fn insert() -> Result<(), Box<dyn std::error::Error>> {
		let (addr, _server) = common::start_server_with_defaults().await.unwrap();
		let socket = &mut common::connect_ws(&addr).await?;

		//
		// Prepare the connection
		//
		let res = common::ws_signin(socket, USER, PASS, None, None, None).await;
		assert!(res.is_ok(), "result: {:?}", res);
		let res = common::ws_use(socket, Some("N"), Some("D")).await;
		assert!(res.is_ok(), "result: {:?}", res);

		// Insert data
		let res = common::ws_send_msg_and_wait_response(
			socket,
			serde_json::to_string(&json!({
				"id": "1",
				"method": "insert",
				"params": [
					"table",
					{
						"name": "foo",
						"value": "bar",
					}
				],
			}))
			.unwrap(),
		)
		.await;
		assert!(res.is_ok(), "result: {:?}", res);

		// Verify the data was inserted and can be queried
		let res = common::ws_query(socket, "SELECT * FROM table").await;
		assert!(res.is_ok(), "result: {:?}", res);
		let res = res.unwrap();
		assert!(res[0]["result"].is_array(), "result: {:?}", res);
		let res = res[0]["result"].as_array().unwrap();

		assert_eq!(res[0]["name"], "foo", "result: {:?}", res);
		assert_eq!(res[0]["value"], "bar", "result: {:?}", res);

		Ok(())
	}

	#[test(tokio::test)]
	async fn create() -> Result<(), Box<dyn std::error::Error>> {
		let (addr, _server) = common::start_server_with_defaults().await.unwrap();
		let socket = &mut common::connect_ws(&addr).await?;

		//
		// Prepare the connection
		//
		let res = common::ws_signin(socket, USER, PASS, None, None, None).await;
		assert!(res.is_ok(), "result: {:?}", res);
		let res = common::ws_use(socket, Some("N"), Some("D")).await;
		assert!(res.is_ok(), "result: {:?}", res);

		// Insert data
		let res = common::ws_send_msg_and_wait_response(
			socket,
			serde_json::to_string(&json!({
				"id": "1",
				"method": "create",
				"params": [
					"table",
				],
			}))
			.unwrap(),
		)
		.await;
		assert!(res.is_ok(), "result: {:?}", res);

		// Verify the data was created and can be queried
		let res = common::ws_query(socket, "SELECT * FROM table").await;
		assert!(res.is_ok(), "result: {:?}", res);
		let res = res.unwrap();
		assert!(res[0]["result"].is_array(), "result: {:?}", res);
		let res = res[0]["result"].as_array().unwrap();
		assert_eq!(res.len(), 1, "result: {:?}", res);

		Ok(())
	}

	#[test(tokio::test)]
	async fn update() -> Result<(), Box<dyn std::error::Error>> {
		let (addr, _server) = common::start_server_with_defaults().await.unwrap();
		let socket = &mut common::connect_ws(&addr).await?;

		//
		// Prepare the connection
		//
		let res = common::ws_signin(socket, USER, PASS, None, None, None).await;
		assert!(res.is_ok(), "result: {:?}", res);
		let res = common::ws_use(socket, Some("N"), Some("D")).await;
		assert!(res.is_ok(), "result: {:?}", res);

		//
		// Setup the database
		//
		let res = common::ws_query(socket, r#"CREATE table SET name = "foo""#).await;
		assert!(res.is_ok(), "result: {:?}", res);

		// Insert data
		let res = common::ws_send_msg_and_wait_response(
			socket,
			serde_json::to_string(&json!({
				"id": "1",
				"method": "update",
				"params": [
					"table",
					{
						"value": "bar",
					}
				],
			}))
			.unwrap(),
		)
		.await;
		assert!(res.is_ok(), "result: {:?}", res);

		// Verify the data was updated
		let res = common::ws_query(socket, "SELECT * FROM table").await;
		assert!(res.is_ok(), "result: {:?}", res);
		let res = res.unwrap();
		assert!(res[0]["result"].is_array(), "result: {:?}", res);
		let res = &res[0]["result"].as_array().unwrap()[0];
		assert!(res["name"].is_null(), "result: {:?}", res);
		assert_eq!(res["value"], "bar", "result: {:?}", res);

		Ok(())
	}

	#[test(tokio::test)]
	async fn merge() -> Result<(), Box<dyn std::error::Error>> {
		let (addr, _server) = common::start_server_with_defaults().await.unwrap();
		let socket = &mut common::connect_ws(&addr).await?;

		//
		// Prepare the connection
		//
		let res = common::ws_signin(socket, USER, PASS, None, None, None).await;
		assert!(res.is_ok(), "result: {:?}", res);
		let res = common::ws_use(socket, Some("N"), Some("D")).await;
		assert!(res.is_ok(), "result: {:?}", res);

		//
		// Setup the database
		//
		let res = common::ws_query(socket, "CREATE foo").await;
		assert!(res.is_ok(), "result: {:?}", res);
		let res = res.unwrap();
		assert!(res[0]["result"].is_array(), "result: {:?}", res);
		let res = &res[0]["result"].as_array().unwrap()[0];

		assert!(res["id"].is_string(), "result: {:?}", res);
		let what = res["id"].as_str().unwrap();

		//
		// Merge data
		//

		let res = common::ws_send_msg_and_wait_response(
			socket,
			serde_json::to_string(&json!({
				"id": "1",
				"method": "merge",
				"params": [
					what, {
						"name_for_merge": "foo",
						"value_for_merge": "bar",
					}
				]
			}))
			.unwrap(),
		)
		.await;
		assert!(res.is_ok(), "result: {:?}", res);

		//
		// Get the data and verify the output
		//
		let res = common::ws_query(socket, &format!("SELECT * FROM foo WHERE id = {what}")).await;
		assert!(res.is_ok(), "result: {:?}", res);
		let res = res.unwrap();
		assert!(res[0]["result"].is_array(), "result: {:?}", res);
		let res = &res[0]["result"].as_array().unwrap()[0];

		assert_eq!(res["id"], what);
		assert_eq!(res["name_for_merge"], "foo");
		assert_eq!(res["value_for_merge"], "bar");

		Ok(())
	}

	#[test(tokio::test)]
	async fn patch() -> Result<(), Box<dyn std::error::Error>> {
		let (addr, _server) = common::start_server_with_defaults().await.unwrap();
		let socket = &mut common::connect_ws(&addr).await?;

		//
		// Prepare the connection
		//
		let res = common::ws_signin(socket, USER, PASS, None, None, None).await;
		assert!(res.is_ok(), "result: {:?}", res);
		let res = common::ws_use(socket, Some("N"), Some("D")).await;
		assert!(res.is_ok(), "result: {:?}", res);

		//
		// Setup the database
		//
		let res =
			common::ws_query(socket, r#"CREATE table SET original_name = "original_value""#).await;
		assert!(res.is_ok(), "result: {:?}", res);
		let res = res.unwrap();
		assert!(res[0]["result"].is_array(), "result: {:?}", res);
		let res = &res[0]["result"].as_array().unwrap()[0];

		let what = res["id"].as_str().unwrap();

		//
		// Patch data
		//

		let ops = json!([
			{
				"op": "add",
				"path": "patch_name",
				"value": "patch_value"
			},
			{
				"op": "remove",
				"path": "original_name",
			}
		]);
		let res = common::ws_send_msg_and_wait_response(
			socket,
			serde_json::to_string(&json!({
				"id": "1",
				"method": "patch",
				"params": [
					what, ops
				]
			}))
			.unwrap(),
		)
		.await;
		assert!(res.is_ok(), "result: {:?}", res);
		let res = res.unwrap();
		assert!(res["result"].is_object(), "result: {:?}", res);
		let res = res["result"].as_object().unwrap();
		assert_eq!(res.get("patch_name"), Some(json!("patch_value")).as_ref(), "result: {:?}", res);

		//
		// Get the data and verify the output
		//
		let res = common::ws_query(socket, &format!("SELECT * FROM table WHERE id = {what}")).await;
		assert!(res.is_ok(), "result: {:?}", res);
		let res = res.unwrap();
		assert!(res[0]["result"].is_array(), "result: {:?}", res);
		let res = &res[0]["result"].as_array().unwrap()[0];

		assert_eq!(res["id"], what);
		assert!(res["original_name"].is_null());
		assert_eq!(res["patch_name"], "patch_value");

		Ok(())
	}

	#[test(tokio::test)]
	async fn delete() -> Result<(), Box<dyn std::error::Error>> {
		let (addr, _server) = common::start_server_with_defaults().await.unwrap();
		let socket = &mut common::connect_ws(&addr).await?;

		//
		// Prepare the connection
		//
		let res = common::ws_signin(socket, USER, PASS, None, None, None).await;
		assert!(res.is_ok(), "result: {:?}", res);
		let res = common::ws_use(socket, Some("N"), Some("D")).await;
		assert!(res.is_ok(), "result: {:?}", res);

		//
		// Setup the database
		//
		let res = common::ws_query(socket, "CREATE table:id").await;
		assert!(res.is_ok(), "result: {:?}", res);

		//
		// Verify the data was created and can be queried
		//
		let res = common::ws_query(socket, "SELECT * FROM table:id").await;
		assert!(res.is_ok(), "result: {:?}", res);
		let res = res.unwrap();
		assert!(res[0]["result"].is_array(), "result: {:?}", res);
		let res = res[0]["result"].as_array().unwrap();

		assert_eq!(res.len(), 1, "result: {:?}", res);

		//
		// Delete data
		//
		let res = common::ws_send_msg_and_wait_response(
			socket,
			serde_json::to_string(&json!({
				"id": "1",
				"method": "delete",
				"params": [
					"table:id"
				]
			}))
			.unwrap(),
		)
		.await;
		assert!(res.is_ok(), "result: {:?}", res);

		//
		// Verify the data was deleted
		//
		let res = common::ws_query(socket, "SELECT * FROM table:id").await;
		assert!(res.is_ok(), "result: {:?}", res);
		let res = res.unwrap();
		assert!(res[0]["result"].is_array(), "result: {:?}", res);
		let res = res[0]["result"].as_array().unwrap();

		assert_eq!(res.len(), 0, "result: {:?}", res);

		Ok(())
	}

	#[test(tokio::test)]
	async fn format_json() -> Result<(), Box<dyn std::error::Error>> {
		let (addr, _server) = common::start_server_with_defaults().await.unwrap();
		let socket = &mut common::connect_ws(&addr).await?;

		//
		// Prepare the connection
		//
		let res = common::ws_signin(socket, USER, PASS, None, None, None).await;
		assert!(res.is_ok(), "result: {:?}", res);
		let res = common::ws_use(socket, Some("N"), Some("D")).await;
		assert!(res.is_ok(), "result: {:?}", res);

		//
		// Setup the database
		//
		let res = common::ws_query(socket, "CREATE table:id").await;
		assert!(res.is_ok(), "result: {:?}", res);

		//
		// Test JSON format
		//

		// Change format
		let msg = json!({
			"id": "1",
			"method": "format",
			"params": [
				"json"
			]
		});
		let res =
			common::ws_send_msg_and_wait_response(socket, serde_json::to_string(&msg).unwrap())
				.await;
		assert!(res.is_ok(), "result: {:?}", res);

		// Query data
		let res = common::ws_query(socket, "SELECT * FROM table:id").await;
		assert!(res.is_ok(), "result: {:?}", res);
		let res = res.unwrap();
		assert!(res[0]["result"].is_array(), "result: {:?}", res);
		let res = res[0]["result"].as_array().unwrap();
		assert_eq!(res.len(), 1, "result: {:?}", res);

		Ok(())
	}

	#[test(tokio::test)]
	async fn format_cbor() -> Result<(), Box<dyn std::error::Error>> {
		let (addr, _server) = common::start_server_with_defaults().await.unwrap();
		let socket = &mut common::connect_ws(&addr).await?;

		//
		// Prepare the connection
		//
		let res = common::ws_signin(socket, USER, PASS, None, None, None).await;
		assert!(res.is_ok(), "result: {:?}", res);
		let res = common::ws_use(socket, Some("N"), Some("D")).await;
		assert!(res.is_ok(), "result: {:?}", res);

		//
		// Setup the database
		//
		let res = common::ws_query(socket, "CREATE table:id").await;
		assert!(res.is_ok(), "result: {:?}", res);

		//
		// Test CBOR format
		//

		// Change format
		let msg = serde_json::to_string(&json!({
			"id": "1",
			"method": "format",
			"params": [
				"cbor"
			]
		}))
		.unwrap();
		let res = common::ws_send_msg_and_wait_response(socket, msg).await;
		assert!(res.is_ok(), "result: {:?}", res);

		// Query data
		let msg = serde_json::to_string(&json!({
			"id": "1",
			"method": "query",
			"params": [
				"SELECT * FROM table:id"
			]
		}))
		.unwrap();

		let res = common::ws_send_msg(socket, msg).await;
		assert!(res.is_ok(), "result: {:?}", res);

		let res = common::ws_recv_msg_with_fmt(socket, common::Format::Cbor).await;
		assert!(res.is_ok(), "result: {:?}", res);
		let res = res.unwrap();
		assert!(res["result"].is_array(), "result: {:?}", res);
		let res = res["result"].as_array().unwrap();
		assert!(res[0]["result"].is_array(), "result: {:?}", res);
		let res = res[0]["result"].as_array().unwrap();
		assert_eq!(res.len(), 1, "result: {:?}", res);

		Ok(())
	}

	#[test(tokio::test)]
	async fn format_pack() -> Result<(), Box<dyn std::error::Error>> {
		let (addr, _server) = common::start_server_with_defaults().await.unwrap();
		let socket = &mut common::connect_ws(&addr).await?;

		//
		// Prepare the connection
		//
		let res = common::ws_signin(socket, USER, PASS, None, None, None).await;
		assert!(res.is_ok(), "result: {:?}", res);
		let res = common::ws_use(socket, Some("N"), Some("D")).await;
		assert!(res.is_ok(), "result: {:?}", res);

		//
		// Setup the database
		//
		let res = common::ws_query(socket, "CREATE table:id").await;
		assert!(res.is_ok(), "result: {:?}", res);

		//
		// Test PACK format
		//

		// Change format
		let msg = serde_json::to_string(&json!({
			"id": "1",
			"method": "format",
			"params": [
				"pack"
			]
		}))
		.unwrap();
		let res = common::ws_send_msg_and_wait_response(socket, msg).await;
		assert!(res.is_ok(), "result: {:?}", res);

		// Query data
		let msg = serde_json::to_string(&json!({
			"id": "1",
			"method": "query",
			"params": [
				"SELECT * FROM table:id"
			]
		}))
		.unwrap();

		let res = common::ws_send_msg(socket, msg).await;
		assert!(res.is_ok(), "result: {:?}", res);
		let res = common::ws_recv_msg_with_fmt(socket, common::Format::Pack).await;
		assert!(res.is_ok(), "result: {:?}", res);
		let res = res.unwrap();
		assert!(res["result"].is_array(), "result: {:?}", res);
		let res = res["result"].as_array().unwrap();
		assert!(res[0]["result"].is_array(), "result: {:?}", res);
		let res = res[0]["result"].as_array().unwrap();
		assert_eq!(res.len(), 1, "result: {:?}", res);

		Ok(())
	}

	#[test(tokio::test)]
	async fn query() -> Result<(), Box<dyn std::error::Error>> {
		let (addr, _server) = common::start_server_with_defaults().await.unwrap();
		let socket = &mut common::connect_ws(&addr).await?;

		//
		// Prepare the connection
		//
		let res = common::ws_signin(socket, USER, PASS, None, None, None).await;
		assert!(res.is_ok(), "result: {:?}", res);
		let res = common::ws_use(socket, Some("N"), Some("D")).await;
		assert!(res.is_ok(), "result: {:?}", res);

		//
		// Run a CREATE query
		//
		let res = common::ws_send_msg_and_wait_response(
			socket,
			serde_json::to_string(&json!({
				"id": "1",
				"method": "query",
				"params": [
					"CREATE foo",
				]
			}))
			.unwrap(),
		)
		.await;
		assert!(res.is_ok(), "result: {:?}", res);

		//
		// Verify the data was created and can be queried
		//
		let res = common::ws_query(socket, "SELECT * FROM foo").await;
		assert!(res.is_ok(), "result: {:?}", res);
		let res = res.unwrap();
		assert!(res[0]["result"].is_array(), "result: {:?}", res);
		let res = res[0]["result"].as_array().unwrap();
		assert_eq!(res.len(), 1, "result: {:?}", res);

		Ok(())
	}

	#[test(tokio::test)]
	async fn version() -> Result<(), Box<dyn std::error::Error>> {
		let (addr, _server) = common::start_server_with_defaults().await.unwrap();
		let socket = &mut common::connect_ws(&addr).await?;

		// Send command
		let res = common::ws_send_msg_and_wait_response(
			socket,
			serde_json::to_string(&json!({
				"id": "1",
				"method": "version",
				"params": [],
			}))
			.unwrap(),
		)
		.await;
		assert!(res.is_ok(), "result: {:?}", res);
		let res = res.unwrap();
		assert!(res["result"].is_string(), "result: {:?}", res);
		let res = res["result"].as_str().unwrap();

		// Verify response
		assert!(res.starts_with("surrealdb-"), "result: {}", res);
		Ok(())
	}
}