Fix api live tests for lq v2 (#3900)
This commit is contained in:
parent
10c6e3edc7
commit
52dc064005
1 changed files with 121 additions and 21 deletions
|
@ -1,12 +1,20 @@
|
|||
// Tests for running live queries
|
||||
// Supported by the storage engines and the WS protocol
|
||||
|
||||
use futures::Stream;
|
||||
use futures::StreamExt;
|
||||
use futures::TryStreamExt;
|
||||
use std::ops::DerefMut;
|
||||
use surrealdb::method::QueryStream;
|
||||
use surrealdb::Action;
|
||||
use surrealdb::Notification;
|
||||
use surrealdb_core::sql::Object;
|
||||
use tokio::sync::mpsc::{channel, Receiver, Sender};
|
||||
use tokio::sync::RwLock;
|
||||
use tracing::info;
|
||||
|
||||
const LQ_TIMEOUT: Duration = Duration::from_secs(10);
|
||||
const LQ_TIMEOUT: Duration = Duration::from_secs(1);
|
||||
const MAX_NOTIFICATIONS: usize = 100;
|
||||
|
||||
#[test_log::test(tokio::test)]
|
||||
async fn live_select_table() {
|
||||
|
@ -16,6 +24,13 @@ async fn live_select_table() {
|
|||
|
||||
{
|
||||
let table = format!("table_{}", Ulid::new());
|
||||
if FFLAGS.change_feed_live_queries.enabled() {
|
||||
db.query(format!("DEFINE TABLE {table} CHANGEFEED 10m INCLUDE ORIGINAL"))
|
||||
.await
|
||||
.unwrap();
|
||||
} else {
|
||||
db.query(format!("DEFINE TABLE {table}")).await.unwrap();
|
||||
}
|
||||
|
||||
// Start listening
|
||||
let mut users = db.select(&table).live().await.unwrap();
|
||||
|
@ -49,6 +64,13 @@ async fn live_select_table() {
|
|||
|
||||
{
|
||||
let table = format!("table_{}", Ulid::new());
|
||||
if FFLAGS.change_feed_live_queries.enabled() {
|
||||
db.query(format!("DEFINE TABLE {table} CHANGEFEED 10m INCLUDE ORIGINAL"))
|
||||
.await
|
||||
.unwrap();
|
||||
} else {
|
||||
db.query(format!("DEFINE TABLE {table}")).await.unwrap();
|
||||
}
|
||||
|
||||
// Start listening
|
||||
let mut users = db.select(Resource::from(&table)).live().await.unwrap();
|
||||
|
@ -73,7 +95,15 @@ async fn live_select_record_id() {
|
|||
db.use_ns(NS).use_db(Ulid::new().to_string()).await.unwrap();
|
||||
|
||||
{
|
||||
let record_id = Thing::from((Ulid::new().to_string(), "john".to_owned()));
|
||||
let table = format!("table_{}", Ulid::new());
|
||||
if FFLAGS.change_feed_live_queries.enabled() {
|
||||
db.query(format!("DEFINE TABLE {table} CHANGEFEED 10m INCLUDE ORIGINAL"))
|
||||
.await
|
||||
.unwrap();
|
||||
} else {
|
||||
db.query(format!("DEFINE TABLE {table}")).await.unwrap();
|
||||
}
|
||||
let record_id = Thing::from((table, "john".to_owned()));
|
||||
|
||||
// Start listening
|
||||
let mut users = db.select(&record_id).live().await.unwrap();
|
||||
|
@ -107,7 +137,15 @@ async fn live_select_record_id() {
|
|||
}
|
||||
|
||||
{
|
||||
let record_id = Thing::from((Ulid::new().to_string(), "john".to_owned()));
|
||||
let table = format!("table_{}", Ulid::new());
|
||||
if FFLAGS.change_feed_live_queries.enabled() {
|
||||
db.query(format!("DEFINE TABLE {table} CHANGEFEED 10m INCLUDE ORIGINAL"))
|
||||
.await
|
||||
.unwrap();
|
||||
} else {
|
||||
db.query(format!("DEFINE TABLE {table}")).await.unwrap();
|
||||
}
|
||||
let record_id = Thing::from((table, "john".to_owned()));
|
||||
|
||||
// Start listening
|
||||
let mut users = db.select(Resource::from(&record_id)).live().await.unwrap();
|
||||
|
@ -134,6 +172,13 @@ async fn live_select_record_ranges() {
|
|||
|
||||
{
|
||||
let table = format!("table_{}", Ulid::new());
|
||||
if FFLAGS.change_feed_live_queries.enabled() {
|
||||
db.query(format!("DEFINE TABLE {table} CHANGEFEED 10m INCLUDE ORIGINAL"))
|
||||
.await
|
||||
.unwrap();
|
||||
} else {
|
||||
db.query(format!("DEFINE TABLE {table}")).await.unwrap();
|
||||
}
|
||||
|
||||
// Start listening
|
||||
let mut users = db.select(&table).range("jane".."john").live().await.unwrap();
|
||||
|
@ -168,6 +213,13 @@ async fn live_select_record_ranges() {
|
|||
|
||||
{
|
||||
let table = format!("table_{}", Ulid::new());
|
||||
if FFLAGS.change_feed_live_queries.enabled() {
|
||||
db.query(format!("DEFINE TABLE {table} CHANGEFEED 10m INCLUDE ORIGINAL"))
|
||||
.await
|
||||
.unwrap();
|
||||
} else {
|
||||
db.query(format!("DEFINE TABLE {table}")).await.unwrap();
|
||||
}
|
||||
|
||||
// Start listening
|
||||
let mut users =
|
||||
|
@ -195,46 +247,70 @@ async fn live_select_query() {
|
|||
|
||||
{
|
||||
let table = format!("table_{}", Ulid::new());
|
||||
if FFLAGS.change_feed_live_queries.enabled() {
|
||||
db.query(format!("DEFINE TABLE {table} CHANGEFEED 10m INCLUDE ORIGINAL"))
|
||||
.await
|
||||
.unwrap();
|
||||
} else {
|
||||
db.query(format!("DEFINE TABLE {table}")).await.unwrap();
|
||||
}
|
||||
|
||||
// Start listening
|
||||
let mut users = db
|
||||
info!("Starting live query");
|
||||
let users: QueryStream<Notification<RecordId>> = db
|
||||
.query(format!("LIVE SELECT * FROM {table}"))
|
||||
.await
|
||||
.unwrap()
|
||||
.stream::<Notification<_>>(0)
|
||||
.unwrap();
|
||||
let users = Arc::new(RwLock::new(users));
|
||||
|
||||
// Create a record
|
||||
info!("Creating record");
|
||||
let created: Vec<RecordId> = db.create(table).await.unwrap();
|
||||
// Pull the notification
|
||||
let notification: Notification<RecordId> =
|
||||
tokio::time::timeout(LQ_TIMEOUT, users.next()).await.unwrap().unwrap().unwrap();
|
||||
// The returned record should match the created record
|
||||
assert_eq!(created, vec![notification.data.clone()]);
|
||||
let notifications = receive_all_pending_notifications(users.clone(), LQ_TIMEOUT).await;
|
||||
// It should be newly created
|
||||
assert_eq!(notification.action, Action::Create);
|
||||
assert_eq!(
|
||||
notifications.iter().map(|n| n.action).collect::<Vec<_>>(),
|
||||
vec![Action::Create],
|
||||
"{:?}",
|
||||
notifications
|
||||
);
|
||||
// The returned record should match the created record
|
||||
assert_eq!(created, vec![notifications[0].data.clone()]);
|
||||
|
||||
// Update the record
|
||||
info!("Updating record");
|
||||
let _: Option<RecordId> =
|
||||
db.update(¬ification.data.id).content(json!({"foo": "bar"})).await.unwrap();
|
||||
// Pull the notification
|
||||
let notification =
|
||||
tokio::time::timeout(LQ_TIMEOUT, users.next()).await.unwrap().unwrap().unwrap();
|
||||
db.update(¬ifications[0].data.id).content(json!({"foo": "bar"})).await.unwrap();
|
||||
let notifications = receive_all_pending_notifications(users.clone(), LQ_TIMEOUT).await;
|
||||
|
||||
// It should be updated
|
||||
assert_eq!(notification.action, Action::Update);
|
||||
assert_eq!(
|
||||
notifications.iter().map(|n| n.action).collect::<Vec<_>>(),
|
||||
[Action::Update],
|
||||
"{:?}",
|
||||
notifications
|
||||
);
|
||||
|
||||
// Delete the record
|
||||
let _: Option<RecordId> = db.delete(¬ification.data.id).await.unwrap();
|
||||
info!("Deleting record");
|
||||
let _: Option<RecordId> = db.delete(¬ifications[0].data.id).await.unwrap();
|
||||
// Pull the notification
|
||||
let notification: Notification<RecordId> =
|
||||
tokio::time::timeout(LQ_TIMEOUT, users.next()).await.unwrap().unwrap().unwrap();
|
||||
let notifications = receive_all_pending_notifications(users.clone(), LQ_TIMEOUT).await;
|
||||
// It should be deleted
|
||||
assert_eq!(notification.action, Action::Delete);
|
||||
assert_eq!(
|
||||
notifications.iter().map(|n| n.action).collect::<Vec<_>>(),
|
||||
[Action::Delete],
|
||||
"{:?}",
|
||||
notifications
|
||||
);
|
||||
}
|
||||
|
||||
{
|
||||
let table = format!("table_{}", Ulid::new());
|
||||
db.query(format!("DEFINE TABLE {table} CHANGEFEED 10m INCLUDE ORIGINAL")).await.unwrap();
|
||||
|
||||
// Start listening
|
||||
let mut users = db
|
||||
|
@ -257,6 +333,7 @@ async fn live_select_query() {
|
|||
|
||||
{
|
||||
let table = format!("table_{}", Ulid::new());
|
||||
db.query(format!("DEFINE TABLE {table} CHANGEFEED 10m INCLUDE ORIGINAL")).await.unwrap();
|
||||
|
||||
// Start listening
|
||||
let mut users = db
|
||||
|
@ -274,7 +351,7 @@ async fn live_select_query() {
|
|||
// The returned record should match the created record
|
||||
assert_eq!(created, vec![notification.data.clone()]);
|
||||
// It should be newly created
|
||||
assert_eq!(notification.action, Action::Create);
|
||||
assert_eq!(notification.action, Action::Create, "{:?}", notification);
|
||||
|
||||
// Update the record
|
||||
let _: Option<RecordId> =
|
||||
|
@ -283,7 +360,7 @@ async fn live_select_query() {
|
|||
let notification: Notification<RecordId> =
|
||||
tokio::time::timeout(LQ_TIMEOUT, users.next()).await.unwrap().unwrap().unwrap();
|
||||
// It should be updated
|
||||
assert_eq!(notification.action, Action::Update);
|
||||
assert_eq!(notification.action, Action::Update, "{:?}", notification);
|
||||
|
||||
// Delete the record
|
||||
let _: Option<RecordId> = db.delete(¬ification.data.id).await.unwrap();
|
||||
|
@ -291,11 +368,12 @@ async fn live_select_query() {
|
|||
let notification: Notification<RecordId> =
|
||||
tokio::time::timeout(LQ_TIMEOUT, users.next()).await.unwrap().unwrap().unwrap();
|
||||
// It should be deleted
|
||||
assert_eq!(notification.action, Action::Delete);
|
||||
assert_eq!(notification.action, Action::Delete, "{:?}", notification);
|
||||
}
|
||||
|
||||
{
|
||||
let table = format!("table_{}", Ulid::new());
|
||||
db.query(format!("DEFINE TABLE {table} CHANGEFEED 10m INCLUDE ORIGINAL")).await.unwrap();
|
||||
|
||||
// Start listening
|
||||
let mut users = db
|
||||
|
@ -319,3 +397,25 @@ async fn live_select_query() {
|
|||
|
||||
drop(permit);
|
||||
}
|
||||
|
||||
async fn receive_all_pending_notifications<
|
||||
S: Stream<Item = Result<Notification<I>, Error>> + Unpin,
|
||||
I,
|
||||
>(
|
||||
mut stream: Arc<RwLock<S>>,
|
||||
timeout: Duration,
|
||||
) -> Vec<Notification<I>> {
|
||||
let (send, mut recv) = channel::<Notification<I>>(MAX_NOTIFICATIONS);
|
||||
let we_expect_timeout = tokio::time::timeout(timeout, async move {
|
||||
while let Some(notification) = stream.write().await.next().await {
|
||||
send.send(notification.unwrap()).await.unwrap();
|
||||
}
|
||||
})
|
||||
.await;
|
||||
assert!(we_expect_timeout.is_err());
|
||||
let mut results = Vec::new();
|
||||
while let Ok(notification) = recv.try_recv() {
|
||||
results.push(notification);
|
||||
}
|
||||
results
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue