Add live query API (#2919)

Co-authored-by: Emmanuel Keller <keller.emmanuel@gmail.com>
This commit is contained in:
Rushmore Mushambi 2023-11-13 19:19:47 +02:00 committed by GitHub
parent 57724a27ca
commit 2d19ac9f7a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
54 changed files with 1517 additions and 527 deletions

View file

@ -288,11 +288,6 @@ jobs:
run: |
sudo apt-get -y update
- name: Setup FoundationDB
uses: foundationdb-rs/foundationdb-actions-install@v2.1.0
with:
version: "7.1.30"
- name: Install cargo-make
run: cargo install --debug cargo-make
@ -330,11 +325,6 @@ jobs:
run: |
sudo apt-get -y update
- name: Setup FoundationDB
uses: foundationdb-rs/foundationdb-actions-install@v2.1.0
with:
version: "7.1.30"
- name: Install cargo-make
run: cargo install --debug cargo-make
@ -372,11 +362,6 @@ jobs:
run: |
sudo apt-get -y update
- name: Setup FoundationDB
uses: foundationdb-rs/foundationdb-actions-install@v2.1.0
with:
version: "7.1.30"
- name: Install cargo-make
run: cargo install --debug cargo-make

6
Cargo.lock generated
View file

@ -1425,6 +1425,7 @@ dependencies = [
"serde_derive",
"serde_json",
"tinytemplate",
"tokio 1.32.0",
"walkdir",
]
@ -5258,6 +5259,7 @@ dependencies = [
"tracing-futures",
"tracing-opentelemetry",
"tracing-subscriber",
"ulid",
"urlencoding",
"uuid",
"wiremock",
@ -6250,9 +6252,9 @@ checksum = "497961ef93d974e23eb6f433eb5fe1b7930b659f06d12dec6fc44a8f554c0bba"
[[package]]
name = "ulid"
version = "1.0.0"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "13a3aaa69b04e5b66cc27309710a569ea23593612387d67daaf102e73aa974fd"
checksum = "7e37c4b6cbcc59a8dcd09a6429fbc7890286bcbb79215cea7b38a3c4c0921d93"
dependencies = [
"rand 0.8.5",
"serde",

View file

@ -88,6 +88,7 @@ test-log = { version = "0.2.12", features = ["trace"] }
tokio-stream = { version = "0.1", features = ["net"] }
tokio-tungstenite = { version = "0.18.0" }
tonic = "0.8.3"
ulid = "1.1.0"
wiremock = "0.5.19"
[package.metadata.deb]

View file

@ -76,7 +76,7 @@ command = "cargo"
args = ["test", "--locked", "--package", "surrealdb", "--no-default-features", "--features", "${_TEST_FEATURES}", "--test", "api", "api_integration::${_TEST_API_ENGINE}"]
[tasks.ci-api-integration]
env = { _START_SURREALDB_PATH = "fdb:/etc/foundationdb/fdb.cluster" }
env = { _START_SURREALDB_PATH = "memory" }
private = true
run_task = { name = ["start-surrealdb", "test-api-integration", "stop-surrealdb"], fork = true }
@ -145,9 +145,9 @@ dependencies = ["build-surrealdb"]
script = """
#!/bin/bash -ex
target/debug/surreal start ${_START_SURREALDB_PATH} --allow-all &>/tmp/surrealdb.log &
target/debug/surreal start ${_START_SURREALDB_PATH} --allow-all &>/tmp/surrealdb-${_TEST_API_ENGINE}.log &
echo $! > /tmp/surreal.pid
echo $! > /tmp/surreal-${_TEST_API_ENGINE}.pid
set +e
echo "Waiting for surreal to be ready..."
@ -164,9 +164,9 @@ script = """
[tasks.stop-surrealdb]
category = "CI - SERVICES"
script = """
kill $(cat /tmp/surreal.pid) || true
kill $(cat /tmp/surreal-${_TEST_API_ENGINE}.pid) || true
sleep 5
kill -9 $(cat /tmp/surreal.pid) || true
kill -9 $(cat /tmp/surreal-${_TEST_API_ENGINE}.pid) || true
"""
[tasks.start-tikv]
@ -204,7 +204,7 @@ script = "kill $(cat /tmp/tiup.pid) || true"
[tasks.build-surrealdb]
category = "CI - BUILD"
command = "cargo"
args = ["build", "--locked", "--no-default-features", "--features", "storage-mem,storage-fdb"]
args = ["build", "--locked", "--no-default-features", "--features", "storage-mem"]
#
# Benchmarks

View file

@ -117,7 +117,7 @@ ulid = { version = "1.0.0", features = ["serde"] }
url = "2.4.0"
[dev-dependencies]
criterion = { version="0.4", features= ["async_futures"] }
criterion = { version="0.4", features= ["async_tokio"] }
env_logger = "0.10.0"
pprof = { version = "0.11.1", features = ["flamegraph", "criterion"] }
serial_test = "2.0.0"

View file

@ -1,4 +1,5 @@
use criterion::{black_box, criterion_group, criterion_main, Criterion, Throughput};
use futures::Future;
use pprof::criterion::{Output, PProfProfiler};
use surrealdb::{
dbs::{Capabilities, Session},
@ -11,7 +12,7 @@ macro_rules! query {
};
($c: expr, $name: ident, $setup: expr, $query: expr) => {
$c.bench_function(stringify!($name), |b| {
let (dbs, ses) = futures::executor::block_on(async {
let (dbs, ses) = block_on(async {
let dbs =
Datastore::new("memory").await.unwrap().with_capabilities(Capabilities::all());
let ses = Session::owner().with_ns("test").with_db("test");
@ -23,7 +24,7 @@ macro_rules! query {
});
b.iter(|| {
futures::executor::block_on(async {
block_on(async {
black_box(dbs.execute(black_box($query), &ses, None).await).unwrap();
});
})
@ -31,6 +32,11 @@ macro_rules! query {
};
}
#[tokio::main]
async fn block_on<T>(future: impl Future<Output = T>) -> T {
future.await
}
fn bench_executor(c: &mut Criterion) {
let mut c = c.benchmark_group("executor");
c.throughput(Throughput::Elements(1));

View file

@ -1,4 +1,3 @@
use criterion::async_executor::FuturesExecutor;
use criterion::{black_box, criterion_group, criterion_main, Criterion, Throughput};
use rand::prelude::SliceRandom;
use rand::thread_rng;
@ -8,6 +7,7 @@ use surrealdb::idx::trees::bkeys::{BKeys, FstKeys, TrieKeys};
use surrealdb::idx::trees::btree::{BState, BTree, Payload};
use surrealdb::idx::trees::store::{TreeNodeProvider, TreeNodeStore, TreeStoreType};
use surrealdb::kvs::{Datastore, Key, LockType::*, TransactionType::*};
use tokio::runtime::Runtime;
macro_rules! get_key_value {
($idx:expr) => {{
(format!("{}", $idx).into(), ($idx * 10) as Payload)
@ -23,12 +23,12 @@ fn bench_index_btree(c: &mut Criterion) {
group.measurement_time(Duration::from_secs(30));
group.bench_function("trees-insertion-fst", |b| {
b.to_async(FuturesExecutor)
b.to_async(Runtime::new().unwrap())
.iter(|| bench::<_, FstKeys>(samples_len, |i| get_key_value!(samples[i])))
});
group.bench_function("trees-insertion-trie", |b| {
b.to_async(FuturesExecutor)
b.to_async(Runtime::new().unwrap())
.iter(|| bench::<_, TrieKeys>(samples_len, |i| get_key_value!(samples[i])))
});

View file

@ -1,4 +1,3 @@
use criterion::async_executor::FuturesExecutor;
use criterion::measurement::WallTime;
use criterion::{criterion_group, criterion_main, BenchmarkGroup, Criterion, Throughput};
use futures::executor::block_on;
@ -13,6 +12,7 @@ use surrealdb::kvs::Datastore;
use surrealdb::kvs::LockType::Optimistic;
use surrealdb::kvs::TransactionType::{Read, Write};
use surrealdb::sql::index::Distance;
use tokio::runtime::Runtime;
fn bench_index_mtree_dim_3(c: &mut Criterion) {
bench_index_mtree(c, 1_000, 100_000, 3, 120);
@ -51,7 +51,8 @@ fn bench_index_mtree(
let mut group = get_group(c, "index_mtree_insert", samples_len, measurement_secs);
let id = format!("len_{}_dim_{}", samples_len, vector_dimension);
group.bench_function(id, |b| {
b.to_async(FuturesExecutor).iter(|| insert_objects(&ds, samples_len, vector_dimension));
b.to_async(Runtime::new().unwrap())
.iter(|| insert_objects(&ds, samples_len, vector_dimension));
});
group.finish();
}
@ -62,7 +63,7 @@ fn bench_index_mtree(
for knn in [1, 10] {
let id = format!("knn_{}_len_{}_dim_{}", knn, samples_len, vector_dimension);
group.bench_function(id, |b| {
b.to_async(FuturesExecutor)
b.to_async(Runtime::new().unwrap())
.iter(|| knn_lookup_objects(&ds, 100_000, vector_dimension, knn));
});
}

View file

@ -1,9 +1,9 @@
use criterion::async_executor::FuturesExecutor;
use criterion::{black_box, criterion_group, criterion_main, Criterion, Throughput};
use std::time::Duration;
use surrealdb::dbs::Session;
use surrealdb::kvs::Datastore;
use surrealdb::sql::Value;
use tokio::runtime::Runtime;
fn bench_processor(c: &mut Criterion) {
let rt = tokio::runtime::Runtime::new().unwrap();
@ -15,29 +15,31 @@ fn bench_processor(c: &mut Criterion) {
group.measurement_time(Duration::from_secs(15));
group.bench_function("table-iterator", |b| {
b.to_async(FuturesExecutor).iter(|| run(&i, "SELECT * FROM item", i.count * 5))
b.to_async(Runtime::new().unwrap()).iter(|| run(&i, "SELECT * FROM item", i.count * 5))
});
group.bench_function("table-iterator-parallel", |b| {
b.to_async(FuturesExecutor).iter(|| run(&i, "SELECT * FROM item PARALLEL", i.count * 5))
b.to_async(Runtime::new().unwrap())
.iter(|| run(&i, "SELECT * FROM item PARALLEL", i.count * 5))
});
group.bench_function("non-uniq-index-iterator", |b| {
b.to_async(FuturesExecutor).iter(|| run(&i, "SELECT * FROM item WHERE number=4", i.count))
b.to_async(Runtime::new().unwrap())
.iter(|| run(&i, "SELECT * FROM item WHERE number=4", i.count))
});
group.bench_function("non-uniq-index-iterator-parallel", |b| {
b.to_async(FuturesExecutor)
b.to_async(Runtime::new().unwrap())
.iter(|| run(&i, "SELECT * FROM item WHERE number=4 PARALLEL", i.count))
});
group.bench_function("full-text-index-iterator", |b| {
b.to_async(FuturesExecutor)
b.to_async(Runtime::new().unwrap())
.iter(|| run(&i, "SELECT * FROM item WHERE label @@ 'charlie'", i.count))
});
group.bench_function("full-text-index-iterator-parallel", |b| {
b.to_async(FuturesExecutor)
b.to_async(Runtime::new().unwrap())
.iter(|| run(&i, "SELECT * FROM item WHERE label @@ 'charlie' PARALLEL", i.count))
});

49
lib/examples/live/main.rs Normal file
View file

@ -0,0 +1,49 @@
use futures::StreamExt;
use serde::Deserialize;
use surrealdb::engine::remote::ws::Ws;
use surrealdb::opt::auth::Root;
use surrealdb::sql::Thing;
use surrealdb::Notification;
use surrealdb::Result;
use surrealdb::Surreal;
const ACCOUNT: &str = "account";
#[derive(Debug, Deserialize)]
#[allow(dead_code)]
struct Account {
id: Thing,
balance: String,
}
#[tokio::main]
async fn main() -> surrealdb::Result<()> {
let db = Surreal::new::<Ws>("localhost:8000").await?;
db.signin(Root {
username: "root",
password: "root",
})
.await?;
db.use_ns("namespace").use_db("database").await?;
let mut accounts = db.select(ACCOUNT).range("one".."two").live().await?;
while let Some(notification) = accounts.next().await {
print(notification);
}
Ok(())
}
fn print(result: Result<Notification<Account>>) {
match result {
Ok(notification) => {
let action = notification.action;
let account = notification.data;
println!("{action:?}: {account:?}");
}
Err(error) => eprintln!("{error}"),
}
}

View file

@ -5,6 +5,7 @@ use crate::api::opt::Endpoint;
use crate::api::ExtraFeatures;
use crate::api::Result;
use crate::api::Surreal;
use crate::dbs::Notification;
use crate::opt::from_value;
use crate::sql::Query;
use crate::sql::Value;
@ -113,49 +114,49 @@ pub enum DbResponse {
}
/// Holds the parameters given to the caller
#[derive(Debug)]
#[derive(Debug, Default)]
#[allow(dead_code)] // used by the embedded and remote connections
pub struct Param {
pub(crate) query: Option<(Query, BTreeMap<String, Value>)>,
pub(crate) other: Vec<Value>,
pub(crate) file: Option<PathBuf>,
pub(crate) sender: Option<channel::Sender<Result<Vec<u8>>>>,
pub(crate) bytes_sender: Option<channel::Sender<Result<Vec<u8>>>>,
pub(crate) notification_sender: Option<channel::Sender<Notification>>,
}
impl Param {
pub(crate) fn new(other: Vec<Value>) -> Self {
Self {
query: None,
other,
file: None,
sender: None,
..Default::default()
}
}
pub(crate) fn query(query: Query, bindings: BTreeMap<String, Value>) -> Self {
Self {
query: Some((query, bindings)),
other: Vec::new(),
file: None,
sender: None,
..Default::default()
}
}
pub(crate) fn file(file: PathBuf) -> Self {
Self {
query: None,
other: Vec::new(),
file: Some(file),
sender: None,
..Default::default()
}
}
pub(crate) fn sender(send: channel::Sender<Result<Vec<u8>>>) -> Self {
pub(crate) fn bytes_sender(send: channel::Sender<Result<Vec<u8>>>) -> Self {
Self {
query: None,
other: Vec::new(),
file: None,
sender: Some(send),
bytes_sender: Some(send),
..Default::default()
}
}
pub(crate) fn notification_sender(send: channel::Sender<Notification>) -> Self {
Self {
notification_sender: Some(send),
..Default::default()
}
}
}

View file

@ -66,6 +66,7 @@ impl Connection for Any {
#[cfg(feature = "kv-fdb")]
{
features.insert(ExtraFeatures::Backup);
features.insert(ExtraFeatures::LiveQueries);
engine::local::native::router(address, conn_tx, route_rx);
conn_rx.into_recv_async().await??
}
@ -80,6 +81,7 @@ impl Connection for Any {
#[cfg(feature = "kv-mem")]
{
features.insert(ExtraFeatures::Backup);
features.insert(ExtraFeatures::LiveQueries);
engine::local::native::router(address, conn_tx, route_rx);
conn_rx.into_recv_async().await??
}
@ -94,6 +96,7 @@ impl Connection for Any {
#[cfg(feature = "kv-rocksdb")]
{
features.insert(ExtraFeatures::Backup);
features.insert(ExtraFeatures::LiveQueries);
engine::local::native::router(address, conn_tx, route_rx);
conn_rx.into_recv_async().await??
}
@ -109,6 +112,7 @@ impl Connection for Any {
#[cfg(feature = "kv-speedb")]
{
features.insert(ExtraFeatures::Backup);
features.insert(ExtraFeatures::LiveQueries);
engine::local::native::router(address, conn_tx, route_rx);
conn_rx.into_recv_async().await??
}
@ -124,6 +128,7 @@ impl Connection for Any {
#[cfg(feature = "kv-tikv")]
{
features.insert(ExtraFeatures::Backup);
features.insert(ExtraFeatures::LiveQueries);
engine::local::native::router(address, conn_tx, route_rx);
conn_rx.into_recv_async().await??
}
@ -169,6 +174,7 @@ impl Connection for Any {
"ws" | "wss" => {
#[cfg(feature = "protocol-ws")]
{
features.insert(ExtraFeatures::LiveQueries);
let url = address.url.join(engine::remote::ws::PATH)?;
#[cfg(any(feature = "native-tls", feature = "rustls"))]
let maybe_connector = address.config.tls_config.map(Connector::from);

View file

@ -9,6 +9,7 @@ use crate::api::engine::any::Any;
use crate::api::err::Error;
use crate::api::opt::Endpoint;
use crate::api::DbResponse;
use crate::api::ExtraFeatures;
use crate::api::OnceLockExt;
use crate::api::Result;
use crate::api::Surreal;
@ -50,6 +51,7 @@ impl Connection for Any {
"fdb" => {
#[cfg(feature = "kv-fdb")]
{
features.insert(ExtraFeatures::LiveQueries);
engine::local::wasm::router(address, conn_tx, route_rx);
conn_rx.into_recv_async().await??;
}
@ -63,6 +65,7 @@ impl Connection for Any {
"indxdb" => {
#[cfg(feature = "kv-indxdb")]
{
features.insert(ExtraFeatures::LiveQueries);
engine::local::wasm::router(address, conn_tx, route_rx);
conn_rx.into_recv_async().await??;
}
@ -76,6 +79,7 @@ impl Connection for Any {
"mem" => {
#[cfg(feature = "kv-mem")]
{
features.insert(ExtraFeatures::LiveQueries);
engine::local::wasm::router(address, conn_tx, route_rx);
conn_rx.into_recv_async().await??;
}
@ -89,6 +93,7 @@ impl Connection for Any {
"file" | "rocksdb" => {
#[cfg(feature = "kv-rocksdb")]
{
features.insert(ExtraFeatures::LiveQueries);
engine::local::wasm::router(address, conn_tx, route_rx);
conn_rx.into_recv_async().await??;
}
@ -103,6 +108,7 @@ impl Connection for Any {
"speedb" => {
#[cfg(feature = "kv-speedb")]
{
features.insert(ExtraFeatures::LiveQueries);
engine::local::wasm::router(address, conn_tx, route_rx);
conn_rx.into_recv_async().await??;
}
@ -117,6 +123,7 @@ impl Connection for Any {
"tikv" => {
#[cfg(feature = "kv-tikv")]
{
features.insert(ExtraFeatures::LiveQueries);
engine::local::wasm::router(address, conn_tx, route_rx);
conn_rx.into_recv_async().await??;
}
@ -143,6 +150,7 @@ impl Connection for Any {
"ws" | "wss" => {
#[cfg(feature = "protocol-ws")]
{
features.insert(ExtraFeatures::LiveQueries);
let mut address = address;
address.url = address.url.join(engine::remote::ws::PATH)?;
engine::remote::ws::wasm::router(address, capacity, conn_tx, route_rx);

View file

@ -41,20 +41,23 @@ use crate::api::Connect;
use crate::api::Response as QueryResponse;
use crate::api::Result;
use crate::api::Surreal;
#[cfg(not(target_arch = "wasm32"))]
use crate::channel;
use crate::dbs::Notification;
use crate::dbs::Response;
use crate::dbs::Session;
use crate::kvs::Datastore;
use crate::opt::IntoEndpoint;
use crate::sql::statements::KillStatement;
use crate::sql::Array;
use crate::sql::Query;
use crate::sql::Statement;
use crate::sql::Statements;
use crate::sql::Strand;
use crate::sql::Uuid;
use crate::sql::Value;
use channel::Sender;
use indexmap::IndexMap;
use std::collections::BTreeMap;
use std::collections::HashMap;
use std::marker::PhantomData;
use std::mem;
#[cfg(not(target_arch = "wasm32"))]
@ -431,11 +434,25 @@ where
})
}
async fn kill_live_query(
kvs: &Datastore,
id: Uuid,
session: &Session,
vars: BTreeMap<String, Value>,
) -> Result<Value> {
let query = Query(Statements(vec![Statement::Kill(KillStatement {
id: id.into(),
})]));
let response = kvs.process(query, session, Some(vars)).await?;
take(true, response).await
}
async fn router(
(_, method, param): (i64, Method, Param),
kvs: &Arc<Datastore>,
session: &mut Session,
vars: &mut BTreeMap<String, Value>,
live_queries: &mut HashMap<Uuid, Sender<Notification>>,
) -> Result<DbResponse> {
let mut params = param.other;
@ -544,9 +561,9 @@ async fn router(
Method::Export => {
let ns = session.ns.clone().unwrap_or_default();
let db = session.db.clone().unwrap_or_default();
let (tx, rx) = channel::new(1);
let (tx, rx) = crate::channel::new(1);
match (param.file, param.sender) {
match (param.file, param.bytes_sender) {
(Some(path), None) => {
let (mut writer, mut reader) = io::duplex(10_240);
@ -660,27 +677,20 @@ async fn router(
Ok(DbResponse::Other(Value::None))
}
Method::Live => {
let table = match &mut params[..] {
[value] => mem::take(value),
_ => unreachable!(),
};
let mut vars = BTreeMap::new();
vars.insert("table".to_owned(), table);
let response = kvs
.execute("LIVE SELECT * FROM type::table($table)", &*session, Some(vars))
.await?;
let value = take(true, response).await?;
Ok(DbResponse::Other(value))
if let Some(sender) = param.notification_sender {
if let [Value::Uuid(id)] = &params[..1] {
live_queries.insert(*id, sender);
}
}
Ok(DbResponse::Other(Value::None))
}
Method::Kill => {
let id = match &mut params[..] {
[value] => mem::take(value),
let id = match &params[..] {
[Value::Uuid(id)] => *id,
_ => unreachable!(),
};
let mut vars = BTreeMap::new();
vars.insert("id".to_owned(), id);
let response = kvs.execute("KILL type::string($id)", &*session, Some(vars)).await?;
let value = take(true, response).await?;
live_queries.remove(&id);
let value = kill_live_query(kvs, id, session, vars.clone()).await?;
Ok(DbResponse::Other(value))
}
}

View file

@ -18,9 +18,12 @@ use crate::kvs::Datastore;
use crate::opt::auth::Root;
use flume::Receiver;
use flume::Sender;
use futures::future::Either;
use futures::stream::poll_fn;
use futures::StreamExt;
use futures_concurrency::stream::Merge as _;
use std::collections::BTreeMap;
use std::collections::HashMap;
use std::collections::HashSet;
use std::future::Future;
use std::marker::PhantomData;
@ -28,6 +31,7 @@ use std::pin::Pin;
use std::sync::atomic::AtomicI64;
use std::sync::Arc;
use std::sync::OnceLock;
use std::task::Poll;
use std::time::Duration;
use tokio::time;
use tokio::time::MissedTickBehavior;
@ -59,6 +63,7 @@ impl Connection for Db {
let mut features = HashSet::new();
features.insert(ExtraFeatures::Backup);
features.insert(ExtraFeatures::LiveQueries);
Ok(Surreal {
router: Arc::new(OnceLock::with_value(Router {
@ -109,6 +114,10 @@ pub(crate) fn router(
let kvs = match Datastore::new(endpoint).await {
Ok(kvs) => {
if let Err(error) = kvs.bootstrap().await {
let _ = conn_tx.into_send_async(Err(error.into())).await;
return;
}
// If a root user is specified, setup the initial datastore credentials
if let Some(root) = configured_root {
if let Err(error) = kvs.setup_initial_creds(root).await {
@ -125,33 +134,68 @@ pub(crate) fn router(
}
};
let kvs = match address.config.capabilities.allows_live_query_notifications() {
true => kvs.with_notifications(),
false => kvs,
};
let kvs = kvs
.with_strict_mode(address.config.strict)
.with_query_timeout(address.config.query_timeout)
.with_transaction_timeout(address.config.transaction_timeout)
.with_capabilities(address.config.capabilities);
let kvs = match address.config.notifications {
true => kvs.with_notifications(),
false => kvs,
};
let kvs = Arc::new(kvs);
let mut vars = BTreeMap::new();
let mut stream = route_rx.into_stream();
let mut session = Session::default();
let mut live_queries = HashMap::new();
let mut session = Session::default().with_rt(true);
let (maintenance_tx, maintenance_rx) = flume::bounded::<()>(1);
let tick_interval = address.config.tick_interval.unwrap_or(DEFAULT_TICK_INTERVAL);
run_maintenance(kvs.clone(), tick_interval, maintenance_rx);
while let Some(Some(route)) = stream.next().await {
match super::router(route.request, &kvs, &mut session, &mut vars).await {
Ok(value) => {
let _ = route.response.into_send_async(Ok(value)).await;
let mut notifications = kvs.notifications();
let notification_stream = poll_fn(move |cx| match &mut notifications {
Some(rx) => rx.poll_next_unpin(cx),
None => Poll::Ready(None),
});
let streams = (route_rx.stream().map(Either::Left), notification_stream.map(Either::Right));
let mut merged = streams.merge();
while let Some(either) = merged.next().await {
match either {
Either::Left(None) => break, // Received a shutdown signal
Either::Left(Some(route)) => {
match super::router(
route.request,
&kvs,
&mut session,
&mut vars,
&mut live_queries,
)
.await
{
Ok(value) => {
let _ = route.response.into_send_async(Ok(value)).await;
}
Err(error) => {
let _ = route.response.into_send_async(Err(error)).await;
}
}
}
Err(error) => {
let _ = route.response.into_send_async(Err(error)).await;
Either::Right(notification) => {
let id = notification.id;
if let Some(sender) = live_queries.get(&id) {
if sender.send(notification).await.is_err() {
live_queries.remove(&id);
if let Err(error) =
super::kill_live_query(&kvs, id, &session, vars.clone()).await
{
warn!("Failed to kill live query '{id}'; {error}");
}
}
}
}
}
}

View file

@ -7,6 +7,7 @@ use crate::api::conn::Router;
use crate::api::engine::local::Db;
use crate::api::engine::local::DEFAULT_TICK_INTERVAL;
use crate::api::opt::Endpoint;
use crate::api::ExtraFeatures;
use crate::api::OnceLockExt;
use crate::api::Result;
use crate::api::Surreal;
@ -17,9 +18,12 @@ use crate::kvs::Datastore;
use crate::opt::auth::Root;
use flume::Receiver;
use flume::Sender;
use futures::future::Either;
use futures::stream::poll_fn;
use futures::StreamExt;
use futures_concurrency::stream::Merge as _;
use std::collections::BTreeMap;
use std::collections::HashMap;
use std::collections::HashSet;
use std::future::Future;
use std::marker::PhantomData;
@ -27,6 +31,7 @@ use std::pin::Pin;
use std::sync::atomic::AtomicI64;
use std::sync::Arc;
use std::sync::OnceLock;
use std::task::Poll;
use std::time::Duration;
use wasm_bindgen_futures::spawn_local;
use wasmtimer::tokio as time;
@ -57,9 +62,12 @@ impl Connection for Db {
conn_rx.into_recv_async().await??;
let mut features = HashSet::new();
features.insert(ExtraFeatures::LiveQueries);
Ok(Surreal {
router: Arc::new(OnceLock::with_value(Router {
features: HashSet::new(),
features,
conn: PhantomData,
sender: route_tx,
last_id: AtomicI64::new(0),
@ -101,6 +109,10 @@ pub(crate) fn router(
let kvs = match Datastore::new(&address.path).await {
Ok(kvs) => {
if let Err(error) = kvs.bootstrap().await {
let _ = conn_tx.into_send_async(Err(error.into())).await;
return;
}
// If a root user is specified, setup the initial datastore credentials
if let Some(root) = configured_root {
if let Err(error) = kvs.setup_initial_creds(root).await {
@ -117,32 +129,68 @@ pub(crate) fn router(
}
};
let kvs = kvs
.with_strict_mode(address.config.strict)
.with_query_timeout(address.config.query_timeout)
.with_transaction_timeout(address.config.transaction_timeout);
let kvs = match address.config.notifications {
let kvs = match address.config.capabilities.allows_live_query_notifications() {
true => kvs.with_notifications(),
false => kvs,
};
let kvs = kvs
.with_strict_mode(address.config.strict)
.with_query_timeout(address.config.query_timeout)
.with_transaction_timeout(address.config.transaction_timeout)
.with_capabilities(address.config.capabilities);
let kvs = Arc::new(kvs);
let mut vars = BTreeMap::new();
let mut stream = route_rx.into_stream();
let mut session = Session::default();
let mut live_queries = HashMap::new();
let mut session = Session::default().with_rt(true);
let (maintenance_tx, maintenance_rx) = flume::bounded::<()>(1);
let tick_interval = address.config.tick_interval.unwrap_or(DEFAULT_TICK_INTERVAL);
run_maintenance(kvs.clone(), tick_interval, maintenance_rx);
while let Some(Some(route)) = stream.next().await {
match super::router(route.request, &kvs, &mut session, &mut vars).await {
Ok(value) => {
let _ = route.response.into_send_async(Ok(value)).await;
let mut notifications = kvs.notifications();
let notification_stream = poll_fn(move |cx| match &mut notifications {
Some(rx) => rx.poll_next_unpin(cx),
None => Poll::Ready(None),
});
let streams = (route_rx.stream().map(Either::Left), notification_stream.map(Either::Right));
let mut merged = streams.merge();
while let Some(either) = merged.next().await {
match either {
Either::Left(None) => break, // Received a shutdown signal
Either::Left(Some(route)) => {
match super::router(
route.request,
&kvs,
&mut session,
&mut vars,
&mut live_queries,
)
.await
{
Ok(value) => {
let _ = route.response.into_send_async(Ok(value)).await;
}
Err(error) => {
let _ = route.response.into_send_async(Err(error)).await;
}
}
}
Err(error) => {
let _ = route.response.into_send_async(Err(error)).await;
Either::Right(notification) => {
let id = notification.id;
if let Some(sender) = live_queries.get(&id) {
if sender.send(notification).await.is_err() {
live_queries.remove(&id);
if let Err(error) =
super::kill_live_query(&kvs, id, &session, vars.clone()).await
{
warn!("Failed to kill live query '{id}'; {error}");
}
}
}
}
}
}

View file

@ -494,7 +494,7 @@ async fn router(
.headers(headers.clone())
.auth(auth)
.header(ACCEPT, "application/octet-stream");
let value = export(request, (param.file, param.sender)).await?;
let value = export(request, (param.file, param.bytes_sender)).await?;
Ok(DbResponse::Other(value))
}
#[cfg(not(target_arch = "wasm32"))]

View file

@ -12,6 +12,7 @@ use crate::api::err::Error;
use crate::api::Connect;
use crate::api::Result;
use crate::api::Surreal;
use crate::dbs::Notification;
use crate::dbs::Status;
use crate::opt::IntoEndpoint;
use crate::sql::Strand;
@ -82,6 +83,7 @@ pub(crate) struct Failure {
pub(crate) enum Data {
Other(Value),
Query(Vec<QueryMethodResponse>),
Live(Notification),
}
type ServerResult = std::result::Result<Data, Failure>;
@ -123,6 +125,8 @@ impl DbResponse {
.enumerate()
.collect(),
))),
// Live notifications don't call this method
Data::Live(..) => unreachable!(),
}
}
}

View file

@ -13,9 +13,11 @@ use crate::api::err::Error;
use crate::api::opt::Endpoint;
#[cfg(any(feature = "native-tls", feature = "rustls"))]
use crate::api::opt::Tls;
use crate::api::ExtraFeatures;
use crate::api::OnceLockExt;
use crate::api::Result;
use crate::api::Surreal;
use crate::engine::remote::ws::Data;
use crate::engine::IntervalStream;
use crate::sql::serde::{deserialize, serialize};
use crate::sql::Strand;
@ -27,14 +29,12 @@ use futures::StreamExt;
use futures_concurrency::stream::Merge as _;
use indexmap::IndexMap;
use serde::Deserialize;
use std::borrow::BorrowMut;
use std::collections::hash_map::Entry;
use std::collections::BTreeMap;
use std::collections::HashMap;
use std::collections::HashSet;
use std::future::Future;
use std::marker::PhantomData;
use std::mem;
use std::pin::Pin;
use std::sync::atomic::AtomicI64;
use std::sync::Arc;
@ -130,9 +130,12 @@ impl Connection for Client {
router(url, maybe_connector, capacity, config, socket, route_rx);
let mut features = HashSet::new();
features.insert(ExtraFeatures::LiveQueries);
Ok(Surreal {
router: Arc::new(OnceLock::with_value(Router {
features: HashSet::new(),
features,
conn: PhantomData,
sender: route_tx,
last_id: AtomicI64::new(0),
@ -189,6 +192,7 @@ pub(crate) fn router(
0 => HashMap::new(),
capacity => HashMap::with_capacity(capacity),
};
let mut live_queries = HashMap::new();
let mut interval = time::interval(PING_INTERVAL);
// don't bombard the server with pings if we miss some ticks
@ -231,6 +235,27 @@ pub(crate) fn router(
vars.remove(key);
}
}
Method::Live => {
if let Some(sender) = param.notification_sender {
if let [Value::Uuid(id)] = &params[..1] {
live_queries.insert(*id, sender);
}
}
if response
.into_send_async(Ok(DbResponse::Other(Value::None)))
.await
.is_err()
{
trace!("Receiver dropped");
}
// There is nothing to send to the server here
continue;
}
Method::Kill => {
if let [Value::Uuid(id)] = &params[..1] {
live_queries.remove(id);
}
}
_ => {}
}
let method_str = match method {
@ -262,6 +287,7 @@ pub(crate) fn router(
last_activity = Instant::now();
match routes.entry(id) {
Entry::Vacant(entry) => {
// Register query route
entry.insert((method, response));
}
Entry::Occupied(..) => {
@ -288,53 +314,120 @@ pub(crate) fn router(
Either::Response(result) => {
last_activity = Instant::now();
match result {
Ok(message) => match Response::try_from(&message) {
Ok(option) => {
if let Some(response) = option {
trace!("{response:?}");
if let Some(Ok(id)) =
response.id.map(Value::coerce_to_i64)
{
if let Some((_method, sender)) = routes.remove(&id)
{
let _res = sender
.into_send_async(DbResponse::from(
response.result,
))
.await;
}
}
}
}
Err(error) => {
#[derive(Deserialize)]
struct Response {
id: Option<Value>,
}
// Let's try to find out the ID of the response that failed to deserialise
if let Message::Binary(binary) = message {
if let Ok(Response {
id,
}) = deserialize(&binary)
{
// Return an error if an ID was returned
if let Some(Ok(id)) = id.map(Value::coerce_to_i64) {
if let Some((_method, sender)) =
routes.remove(&id)
{
let _res = sender
.into_send_async(Err(error))
.await;
Ok(message) => {
match Response::try_from(&message) {
Ok(option) => {
// We are only interested in responses that are not empty
if let Some(response) = option {
trace!("{response:?}");
match response.id {
// If `id` is set this is a normal response
Some(id) => {
if let Ok(id) = id.coerce_to_i64() {
// We can only route responses with IDs
if let Some((_method, sender)) =
routes.remove(&id)
{
// Send the response back to the caller
let _res = sender
.into_send_async(
DbResponse::from(
response.result,
),
)
.await;
}
}
}
// If `id` is not set, this may be a live query notification
None => match response.result {
Ok(Data::Live(notification)) => {
let live_query_id = notification.id;
// Check if this live query is registered
if let Some(sender) =
live_queries.get(&live_query_id)
{
// Send the notification back to the caller or kill live query if the receiver is already dropped
if sender
.send(notification)
.await
.is_err()
{
live_queries
.remove(&live_query_id);
let kill = {
let mut request =
BTreeMap::new();
request.insert(
"method".to_owned(),
Method::Kill
.as_str()
.into(),
);
request.insert(
"params".to_owned(),
vec![Value::from(
live_query_id,
)]
.into(),
);
let value =
Value::from(request);
let value =
serialize(&value)
.unwrap();
Message::Binary(value)
};
if let Err(error) =
socket_sink.send(kill).await
{
trace!("failed to send kill query to the server; {error:?}");
break;
}
}
}
}
Ok(..) => { /* Ignored responses like pings */
}
Err(error) => error!("{error:?}"),
},
}
}
}
Err(error) => {
#[derive(Deserialize)]
struct Response {
id: Option<Value>,
}
// Let's try to find out the ID of the response that failed to deserialise
if let Message::Binary(binary) = message {
if let Ok(Response {
id,
}) = deserialize(&binary)
{
// Return an error if an ID was returned
if let Some(Ok(id)) =
id.map(Value::coerce_to_i64)
{
if let Some((_method, sender)) =
routes.remove(&id)
{
let _res = sender
.into_send_async(Err(error))
.await;
}
}
} else {
// Unfortunately, we don't know which response failed to deserialize
warn!(
"Failed to deserialise message; {error:?}"
);
}
} else {
// Unfortunately, we don't know which response failed to deserialize
warn!("Failed to deserialise message; {error:?}");
}
}
}
},
}
Err(error) => {
match error {
WsError::ConnectionClosed => {
@ -358,7 +451,14 @@ pub(crate) fn router(
}
}
}
// Close connection request received
Either::Request(None) => {
match socket_sink.send(Message::Close(None)).await {
Ok(..) => trace!("Connection closed successfully"),
Err(error) => {
warn!("Failed to close database connection; {error}")
}
}
break 'router;
}
}
@ -441,18 +541,3 @@ impl Response {
}
pub struct Socket(Option<SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>>);
impl Drop for Socket {
fn drop(&mut self) {
if let Some(mut conn) = mem::take(&mut self.0) {
futures::executor::block_on(async move {
match conn.borrow_mut().close().await {
Ok(..) => trace!("Connection closed successfully"),
Err(error) => {
trace!("Failed to close database connection; {error}")
}
}
});
}
}
}

View file

@ -11,9 +11,11 @@ use crate::api::engine::remote::ws::PING_INTERVAL;
use crate::api::engine::remote::ws::PING_METHOD;
use crate::api::err::Error;
use crate::api::opt::Endpoint;
use crate::api::ExtraFeatures;
use crate::api::OnceLockExt;
use crate::api::Result;
use crate::api::Surreal;
use crate::engine::remote::ws::Data;
use crate::engine::IntervalStream;
use crate::sql::serde::{deserialize, serialize};
use crate::sql::Strand;
@ -82,9 +84,12 @@ impl Connection for Client {
conn_rx.into_recv_async().await??;
let mut features = HashSet::new();
features.insert(ExtraFeatures::LiveQueries);
Ok(Surreal {
router: Arc::new(OnceLock::with_value(Router {
features: HashSet::new(),
features,
conn: PhantomData,
sender: route_tx,
last_id: AtomicI64::new(0),
@ -160,6 +165,7 @@ pub(crate) fn router(
0 => HashMap::new(),
capacity => HashMap::with_capacity(capacity),
};
let mut live_queries = HashMap::new();
let mut interval = time::interval(PING_INTERVAL);
// don't bombard the server with pings if we miss some ticks
@ -203,6 +209,27 @@ pub(crate) fn router(
vars.remove(key);
}
}
Method::Live => {
if let Some(sender) = param.notification_sender {
if let [Value::Uuid(id)] = &params[..1] {
live_queries.insert(*id, sender);
}
}
if response
.into_send_async(Ok(DbResponse::Other(Value::None)))
.await
.is_err()
{
trace!("Receiver dropped");
}
// There is nothing to send to the server here
continue;
}
Method::Kill => {
if let [Value::Uuid(id)] = &params[..1] {
live_queries.remove(id);
}
}
_ => {}
}
let method_str = match method {
@ -261,14 +288,63 @@ pub(crate) fn router(
last_activity = Instant::now();
match Response::try_from(&message) {
Ok(option) => {
// We are only interested in responses that are not empty
if let Some(response) = option {
trace!("{response:?}");
if let Some(Ok(id)) = response.id.map(Value::coerce_to_i64) {
if let Some((_method, sender)) = routes.remove(&id) {
let _res = sender
.into_send_async(DbResponse::from(response.result))
.await;
match response.id {
// If `id` is set this is a normal response
Some(id) => {
if let Ok(id) = id.coerce_to_i64() {
// We can only route responses with IDs
if let Some((_method, sender)) = routes.remove(&id)
{
// Send the response back to the caller
let _res = sender
.into_send_async(DbResponse::from(
response.result,
))
.await;
}
}
}
// If `id` is not set, this may be a live query notification
None => match response.result {
Ok(Data::Live(notification)) => {
let live_query_id = notification.id;
// Check if this live query is registered
if let Some(sender) =
live_queries.get(&live_query_id)
{
// Send the notification back to the caller or kill live query if the receiver is already dropped
if sender.send(notification).await.is_err() {
live_queries.remove(&live_query_id);
let kill = {
let mut request = BTreeMap::new();
request.insert(
"method".to_owned(),
Method::Kill.as_str().into(),
);
request.insert(
"params".to_owned(),
vec![Value::from(live_query_id)]
.into(),
);
let value = Value::from(request);
let value = serialize(&value).unwrap();
Message::Binary(value)
};
if let Err(error) =
socket_sink.send(kill).await
{
trace!("failed to send kill query to the server; {error:?}");
break;
}
}
}
}
Ok(..) => { /* Ignored responses like pings */ }
Err(error) => error!("{error:?}"),
},
}
}
}
@ -322,7 +398,14 @@ pub(crate) fn router(
}
}
}
// Close connection request received
Either::Request(None) => {
match ws.close().await {
Ok(..) => trace!("Connection closed successfully"),
Err(error) => {
warn!("Failed to close database connection; {error}")
}
}
break 'router;
}
}

View file

@ -164,6 +164,23 @@ pub enum Error {
server_metadata: semver::BuildMetadata,
supported_metadata: semver::BuildMetadata,
},
/// The protocol or storage engine being used does not support live queries on the architecture
/// it's running on
#[error("The protocol or storage engine does not support live queries on this architecture")]
LiveQueriesNotSupported,
/// Tried to use a range query on an object
#[error("Live queries on objects not supported: {0}")]
LiveOnObject(Object),
/// Tried to use a range query on an array
#[error("Live queries on arrays not supported: {0}")]
LiveOnArray(Array),
/// Tried to use a range query on an edge or edges
#[error("Live queries on edges not supported: {0}")]
LiveOnEdges(Edges),
}
#[cfg(feature = "protocol-http")]

View file

@ -43,7 +43,7 @@ macro_rules! into_future {
let content = to_value(content);
Box::pin(async move {
let param = match range {
Some(range) => resource?.with_range(range)?,
Some(range) => resource?.with_range(range)?.into(),
None => resource?.into(),
};
let mut conn = Client::new(method);

View file

@ -34,7 +34,7 @@ macro_rules! into_future {
} = self;
Box::pin(async {
let param = match range {
Some(range) => resource?.with_range(range)?,
Some(range) => resource?.with_range(range)?.into(),
None => resource?.into(),
};
let mut conn = Client::new(Method::Delete);

View file

@ -66,7 +66,7 @@ where
let ExportDestination::Memory = self.target else {
unreachable!();
};
conn.execute_unit(router, Param::sender(tx)).await?;
conn.execute_unit(router, Param::bytes_sender(tx)).await?;
Ok(Backup {
rx,
})

View file

@ -1,32 +0,0 @@
use crate::api::conn::Method;
use crate::api::conn::Param;
use crate::api::conn::Router;
use crate::api::Connection;
use crate::api::Result;
use crate::sql::Uuid;
use std::future::Future;
use std::future::IntoFuture;
use std::pin::Pin;
/// A live query kill future
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct Kill<'r, C: Connection> {
pub(super) router: Result<&'r Router<C>>,
pub(super) query_id: Uuid,
}
impl<'r, Client> IntoFuture for Kill<'r, Client>
where
Client: Connection,
{
type Output = Result<()>;
type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send + Sync + 'r>>;
fn into_future(self) -> Self::IntoFuture {
Box::pin(async move {
let mut conn = Client::new(Method::Kill);
conn.execute_unit(self.router?, Param::new(vec![self.query_id.into()])).await
})
}
}

View file

@ -1,34 +1,324 @@
use crate::api::conn::Method;
use crate::api::conn::Param;
use crate::api::conn::Router;
use crate::api::err::Error;
use crate::api::opt::Range;
use crate::api::Connection;
use crate::api::ExtraFeatures;
use crate::api::Result;
use crate::dbs;
use crate::method::Query;
use crate::opt::from_value;
use crate::opt::Resource;
use crate::sql::cond::Cond;
use crate::sql::expression::Expression;
use crate::sql::field::Field;
use crate::sql::field::Fields;
use crate::sql::ident::Ident;
use crate::sql::idiom::Idiom;
use crate::sql::operator::Operator;
use crate::sql::part::Part;
use crate::sql::statement::Statement;
use crate::sql::statements::live::LiveStatement;
use crate::sql::Id;
use crate::sql::Table;
use crate::sql::Thing;
use crate::sql::Uuid;
use crate::sql::Value;
use crate::Notification;
use channel::Receiver;
use futures::StreamExt;
use serde::de::DeserializeOwned;
use std::future::Future;
use std::future::IntoFuture;
use std::marker::PhantomData;
use std::ops::Bound;
use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
const ID: &str = "id";
/// A live query future
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct Live<'r, C: Connection> {
pub struct Live<'r, C: Connection, R> {
pub(super) router: Result<&'r Router<C>>,
pub(super) table_name: String,
pub(super) resource: Result<Resource>,
pub(super) range: Option<Range<Id>>,
pub(super) response_type: PhantomData<R>,
}
impl<'r, Client> IntoFuture for Live<'r, Client>
macro_rules! into_future {
() => {
fn into_future(self) -> Self::IntoFuture {
let Live {
router,
resource,
range,
..
} = self;
Box::pin(async move {
let router = router?;
if !router.features.contains(&ExtraFeatures::LiveQueries) {
return Err(Error::LiveQueriesNotSupported.into());
}
let mut stmt = LiveStatement {
id: Uuid::new_v4(),
node: Uuid::new_v4(),
expr: Fields(vec![Field::All], false),
..Default::default()
};
match range {
Some(range) => {
let range = resource?.with_range(range)?;
stmt.what = Table(range.tb.clone()).into();
stmt.cond = cond_from_range(range);
}
None => match resource? {
Resource::Table(table) => {
stmt.what = table.into();
}
Resource::RecordId(record) => {
stmt.what = Table(record.tb.clone()).into();
stmt.cond = Some(Cond(Value::Expression(Box::new(Expression::new(
Idiom(vec![Part::from(Ident(ID.to_owned()))]).into(),
Operator::Equal,
record.into(),
)))));
}
Resource::Object(object) => return Err(Error::LiveOnObject(object).into()),
Resource::Array(array) => return Err(Error::LiveOnArray(array).into()),
Resource::Edges(edges) => return Err(Error::LiveOnEdges(edges).into()),
},
}
let query = Query {
router: Ok(router),
query: vec![Ok(vec![Statement::Live(stmt)])],
bindings: Ok(Default::default()),
};
let id: Value = query.await?.take(0)?;
let mut conn = Client::new(Method::Live);
let (tx, rx) = channel::unbounded();
let mut param = Param::notification_sender(tx);
param.other = vec![id.clone()];
conn.execute_unit(router, param).await?;
Ok(Stream {
router,
id,
rx,
response_type: PhantomData,
})
})
}
};
}
fn cond_from_range(range: crate::sql::Range) -> Option<Cond> {
match (range.beg, range.end) {
(Bound::Unbounded, Bound::Unbounded) => None,
(Bound::Unbounded, Bound::Excluded(id)) => {
Some(Cond(Value::Expression(Box::new(Expression::new(
Idiom(vec![Part::from(Ident(ID.to_owned()))]).into(),
Operator::LessThan,
Thing::from((range.tb, id)).into(),
)))))
}
(Bound::Unbounded, Bound::Included(id)) => {
Some(Cond(Value::Expression(Box::new(Expression::new(
Idiom(vec![Part::from(Ident(ID.to_owned()))]).into(),
Operator::LessThanOrEqual,
Thing::from((range.tb, id)).into(),
)))))
}
(Bound::Excluded(id), Bound::Unbounded) => {
Some(Cond(Value::Expression(Box::new(Expression::new(
Idiom(vec![Part::from(Ident(ID.to_owned()))]).into(),
Operator::MoreThan,
Thing::from((range.tb, id)).into(),
)))))
}
(Bound::Included(id), Bound::Unbounded) => {
Some(Cond(Value::Expression(Box::new(Expression::new(
Idiom(vec![Part::from(Ident(ID.to_owned()))]).into(),
Operator::MoreThanOrEqual,
Thing::from((range.tb, id)).into(),
)))))
}
(Bound::Included(lid), Bound::Included(rid)) => {
Some(Cond(Value::Expression(Box::new(Expression::new(
Value::Expression(Box::new(Expression::new(
Idiom(vec![Part::from(Ident(ID.to_owned()))]).into(),
Operator::MoreThanOrEqual,
Thing::from((range.tb.clone(), lid)).into(),
))),
Operator::And,
Value::Expression(Box::new(Expression::new(
Idiom(vec![Part::from(Ident(ID.to_owned()))]).into(),
Operator::LessThanOrEqual,
Thing::from((range.tb, rid)).into(),
))),
)))))
}
(Bound::Included(lid), Bound::Excluded(rid)) => {
Some(Cond(Value::Expression(Box::new(Expression::new(
Value::Expression(Box::new(Expression::new(
Idiom(vec![Part::from(Ident(ID.to_owned()))]).into(),
Operator::MoreThanOrEqual,
Thing::from((range.tb.clone(), lid)).into(),
))),
Operator::And,
Value::Expression(Box::new(Expression::new(
Idiom(vec![Part::from(Ident(ID.to_owned()))]).into(),
Operator::LessThan,
Thing::from((range.tb, rid)).into(),
))),
)))))
}
(Bound::Excluded(lid), Bound::Included(rid)) => {
Some(Cond(Value::Expression(Box::new(Expression::new(
Value::Expression(Box::new(Expression::new(
Idiom(vec![Part::from(Ident(ID.to_owned()))]).into(),
Operator::MoreThan,
Thing::from((range.tb.clone(), lid)).into(),
))),
Operator::And,
Value::Expression(Box::new(Expression::new(
Idiom(vec![Part::from(Ident(ID.to_owned()))]).into(),
Operator::LessThanOrEqual,
Thing::from((range.tb, rid)).into(),
))),
)))))
}
(Bound::Excluded(lid), Bound::Excluded(rid)) => {
Some(Cond(Value::Expression(Box::new(Expression::new(
Value::Expression(Box::new(Expression::new(
Idiom(vec![Part::from(Ident(ID.to_owned()))]).into(),
Operator::MoreThan,
Thing::from((range.tb.clone(), lid)).into(),
))),
Operator::And,
Value::Expression(Box::new(Expression::new(
Idiom(vec![Part::from(Ident(ID.to_owned()))]).into(),
Operator::LessThan,
Thing::from((range.tb, rid)).into(),
))),
)))))
}
}
}
impl<'r, Client> IntoFuture for Live<'r, Client, Value>
where
Client: Connection,
{
type Output = Result<Uuid>;
type Output = Result<Stream<'r, Client, Value>>;
type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send + Sync + 'r>>;
fn into_future(self) -> Self::IntoFuture {
Box::pin(async move {
let mut conn = Client::new(Method::Live);
conn.execute(self.router?, Param::new(vec![Value::Table(Table(self.table_name))])).await
})
into_future! {}
}
impl<'r, Client, R> IntoFuture for Live<'r, Client, Option<R>>
where
Client: Connection,
R: DeserializeOwned,
{
type Output = Result<Stream<'r, Client, Option<R>>>;
type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send + Sync + 'r>>;
into_future! {}
}
impl<'r, Client, R> IntoFuture for Live<'r, Client, Vec<R>>
where
Client: Connection,
R: DeserializeOwned,
{
type Output = Result<Stream<'r, Client, Vec<R>>>;
type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send + Sync + 'r>>;
into_future! {}
}
/// A stream of live query notifications
#[derive(Debug)]
#[must_use = "streams do nothing unless you poll them"]
pub struct Stream<'r, C: Connection, R> {
router: &'r Router<C>,
id: Value,
rx: Receiver<dbs::Notification>,
response_type: PhantomData<R>,
}
impl<Client, R> Stream<'_, Client, R>
where
Client: Connection,
{
/// Close the live query stream
///
/// This kills the live query process responsible for this stream.
/// If the stream is dropped without calling this method, the process
/// will be killed next time it tries to send a notification to the stream.
pub async fn close(self) -> Result<()> {
let mut conn = Client::new(Method::Kill);
conn.execute_unit(self.router, Param::new(vec![self.id])).await
}
}
macro_rules! poll_next {
($action:ident, $result:ident => $body:expr) => {
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match self.as_mut().rx.poll_next_unpin(cx) {
Poll::Ready(Some(dbs::Notification {
$action,
$result,
..
})) => $body,
Poll::Ready(None) => Poll::Ready(None),
Poll::Pending => Poll::Pending,
}
}
};
}
impl<C> futures::Stream for Stream<'_, C, Value>
where
C: Connection,
{
type Item = Notification<Value>;
poll_next! {
action, result => Poll::Ready(Some(Notification { action: action.into(), data: result }))
}
}
macro_rules! poll_next_and_convert {
() => {
poll_next! {
action, result => match from_value(result) {
Ok(data) => Poll::Ready(Some(Ok(Notification { action: action.into(), data }))),
Err(error) => Poll::Ready(Some(Err(error.into()))),
}
}
};
}
impl<C, R> futures::Stream for Stream<'_, C, Option<R>>
where
C: Connection,
R: DeserializeOwned + Unpin,
{
type Item = Result<Notification<R>>;
poll_next_and_convert! {}
}
impl<C, R> futures::Stream for Stream<'_, C, Vec<R>>
where
C: Connection,
R: DeserializeOwned + Unpin,
{
type Item = Result<Notification<R>>;
poll_next_and_convert! {}
}

View file

@ -39,7 +39,7 @@ macro_rules! into_future {
let content = to_value(content);
Box::pin(async move {
let param = match range {
Some(range) => resource?.with_range(range)?,
Some(range) => resource?.with_range(range)?.into(),
None => resource?.into(),
};
let mut conn = Client::new(Method::Merge);

View file

@ -13,7 +13,6 @@ mod export;
mod health;
mod import;
mod invalidate;
mod kill;
mod live;
mod merge;
mod patch;
@ -47,10 +46,8 @@ pub use export::Export;
pub use health::Health;
pub use import::Import;
pub use invalidate::Invalidate;
#[doc(hidden)] // Not supported yet
pub use kill::Kill;
#[doc(hidden)] // Not supported yet
pub use live::Live;
pub use live::Stream;
pub use merge::Merge;
pub use patch::Patch;
pub use query::Query;
@ -76,7 +73,6 @@ use crate::api::OnceLockExt;
use crate::api::Surreal;
use crate::opt::IntoExportDestination;
use crate::sql::to_value;
use crate::sql::Uuid;
use crate::sql::Value;
use serde::Serialize;
use std::marker::PhantomData;
@ -631,6 +627,8 @@ where
/// # Examples
///
/// ```no_run
/// # use futures::StreamExt;
/// # use surrealdb::opt::Resource;
/// # #[derive(serde::Deserialize)]
/// # struct Person;
/// #
@ -644,8 +642,21 @@ where
/// // Select all records from a table
/// let people: Vec<Person> = db.select("person").await?;
///
/// // Select a range of records from a table
/// let people: Vec<Person> = db.select("person").range("jane".."john").await?;
///
/// // Select a specific record from a table
/// let person: Option<Person> = db.select(("person", "h5wxrf2ewk8xjxosxtyc")).await?;
///
/// // To listen for updates as they happen on a record, a range of records
/// // or entire table use a live query. This is done by simply calling `.live()`
/// // after this method. That gives you a stream of notifications you can listen on.
/// # let resource = Resource::from("person");
/// let mut stream = db.select(resource).live().await?;
///
/// while let Some(notification) = stream.next().await {
/// // Use the notification
/// }
/// #
/// # Ok(())
/// # }
@ -941,22 +952,6 @@ where
}
}
#[doc(hidden)] // Not supported yet
pub fn kill(&self, query_id: Uuid) -> Kill<C> {
Kill {
router: self.router.extract(),
query_id,
}
}
#[doc(hidden)] // Not supported yet
pub fn live(&self, table_name: impl Into<String>) -> Live<C> {
Live {
router: self.router.extract(),
table_name: table_name.into(),
}
}
/// Dumps the database contents to a file
///
/// # Support

View file

@ -39,7 +39,7 @@ macro_rules! into_future {
} = self;
Box::pin(async move {
let param = match range {
Some(range) => resource?.with_range(range)?,
Some(range) => resource?.with_range(range)?.into(),
None => resource?.into(),
};
let mut vec = Vec::with_capacity(patches.len());

View file

@ -5,6 +5,7 @@ use crate::api::opt::Range;
use crate::api::opt::Resource;
use crate::api::Connection;
use crate::api::Result;
use crate::method::Live;
use crate::sql::Id;
use crate::sql::Value;
use serde::de::DeserializeOwned;
@ -34,7 +35,7 @@ macro_rules! into_future {
} = self;
Box::pin(async move {
let param = match range {
Some(range) => resource?.with_range(range)?,
Some(range) => resource?.with_range(range)?.into(),
None => resource?.into(),
};
let mut conn = Client::new(Method::Select);
@ -76,7 +77,7 @@ where
into_future! {execute_vec}
}
impl<C> Select<'_, C, Value>
impl<'r, C> Select<'r, C, Value>
where
C: Connection,
{
@ -87,7 +88,7 @@ where
}
}
impl<C, R> Select<'_, C, Vec<R>>
impl<'r, C, R> Select<'r, C, Vec<R>>
where
C: Connection,
{
@ -97,3 +98,65 @@ where
self
}
}
impl<'r, C, R> Select<'r, C, R>
where
C: Connection,
R: DeserializeOwned,
{
/// Turns a normal select query into a live query
///
/// # Examples
///
/// ```no_run
/// # use futures::StreamExt;
/// # use surrealdb::opt::Resource;
/// # use surrealdb::Result;
/// # use surrealdb::Notification;
/// # #[derive(Debug, serde::Deserialize)]
/// # struct Person;
/// #
/// # #[tokio::main]
/// # async fn main() -> surrealdb::Result<()> {
/// # let db = surrealdb::engine::any::connect("mem://").await?;
/// #
/// // Select the namespace/database to use
/// db.use_ns("namespace").use_db("database").await?;
///
/// // Listen to all updates on a table
/// let mut stream = db.select("person").live().await?;
/// # let _: Option<Result<Notification<Person>>> = stream.next().await;
///
/// // Listen to updates on a range of records
/// let mut stream = db.select("person").range("jane".."john").live().await?;
/// # let _: Option<Result<Notification<Person>>> = stream.next().await;
///
/// // Listen to updates on a specific record
/// let mut stream = db.select(("person", "h5wxrf2ewk8xjxosxtyc")).live().await?;
///
/// // The returned stream implements `futures::Stream` so we can
/// // use it with `futures::StreamExt`, for example.
/// while let Some(result) = stream.next().await {
/// handle(result);
/// }
///
/// // Handle the result of the live query notification
/// fn handle(result: Result<Notification<Person>>) {
/// match result {
/// Ok(notification) => println!("{notification:?}"),
/// Err(error) => eprintln!("{error}"),
/// }
/// }
/// #
/// # Ok(())
/// # }
/// ```
pub fn live(self) -> Live<'r, C, R> {
Live {
router: self.router,
resource: self.resource,
range: self.range,
response_type: self.response_type,
}
}
}

View file

@ -39,7 +39,7 @@ macro_rules! into_future {
} = self;
Box::pin(async move {
let param = match range {
Some(range) => resource?.with_range(range)?,
Some(range) => resource?.with_range(range)?.into(),
None => resource?.into(),
};
let mut conn = Client::new(Method::Update);

View file

@ -123,6 +123,7 @@ where
#[derive(Debug, Clone, Copy, Eq, PartialEq, Ord, PartialOrd, Hash)]
pub(crate) enum ExtraFeatures {
Backup,
LiveQueries,
}
/// A database client instance for embedded or remote databases

View file

@ -38,12 +38,20 @@ impl Config {
}
/// Set the notifications value of the config to the supplied value
#[deprecated(
since = "1.1.0",
note = "Moved to `Capabilities::with_live_query_notifications()`"
)]
pub fn set_notifications(mut self, notifications: bool) -> Self {
self.notifications = notifications;
self
}
/// Set the config to use notifications
#[deprecated(
since = "1.1.0",
note = "Moved to `Capabilities::with_live_query_notifications()`"
)]
pub fn notifications(mut self) -> Self {
self.notifications = true;
self

View file

@ -27,14 +27,13 @@ pub enum Resource {
}
impl Resource {
pub(crate) fn with_range(self, range: Range<Id>) -> Result<Value> {
pub(crate) fn with_range(self, range: Range<Id>) -> Result<sql::Range> {
match self {
Resource::Table(Table(table)) => Ok(sql::Range {
tb: table,
beg: range.start,
end: range.end,
}
.into()),
}),
Resource::RecordId(record_id) => Err(Error::RangeOnRecordId(record_id).into()),
Resource::Object(object) => Err(Error::RangeOnObject(object).into()),
Resource::Array(array) => Err(Error::RangeOnArray(array).into()),
@ -49,30 +48,60 @@ impl From<Table> for Resource {
}
}
impl From<&Table> for Resource {
fn from(table: &Table) -> Self {
Self::Table(table.clone())
}
}
impl From<Thing> for Resource {
fn from(thing: Thing) -> Self {
Self::RecordId(thing)
}
}
impl From<&Thing> for Resource {
fn from(thing: &Thing) -> Self {
Self::RecordId(thing.clone())
}
}
impl From<Object> for Resource {
fn from(object: Object) -> Self {
Self::Object(object)
}
}
impl From<&Object> for Resource {
fn from(object: &Object) -> Self {
Self::Object(object.clone())
}
}
impl From<Array> for Resource {
fn from(array: Array) -> Self {
Self::Array(array)
}
}
impl From<&Array> for Resource {
fn from(array: &Array) -> Self {
Self::Array(array.clone())
}
}
impl From<Edges> for Resource {
fn from(edges: Edges) -> Self {
Self::Edges(edges)
}
}
impl From<&Edges> for Resource {
fn from(edges: &Edges) -> Self {
Self::Edges(edges.clone())
}
}
impl From<&str> for Resource {
fn from(s: &str) -> Self {
match sql::thing(s) {

View file

@ -216,6 +216,7 @@ impl<T: Target + Hash + Eq + PartialEq + std::fmt::Display> std::fmt::Display fo
pub struct Capabilities {
scripting: bool,
guest_access: bool,
live_query_notifications: bool,
allow_funcs: Arc<Targets<FuncTarget>>,
deny_funcs: Arc<Targets<FuncTarget>>,
@ -227,8 +228,8 @@ impl std::fmt::Display for Capabilities {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"scripting={}, guest_access={}, allow_funcs={}, deny_funcs={}, allow_net={}, deny_net={}",
self.scripting, self.guest_access, self.allow_funcs, self.deny_funcs, self.allow_net, self.deny_net
"scripting={}, guest_access={}, live_query_notifications={}, allow_funcs={}, deny_funcs={}, allow_net={}, deny_net={}",
self.scripting, self.guest_access, self.live_query_notifications, self.allow_funcs, self.deny_funcs, self.allow_net, self.deny_net
)
}
}
@ -238,6 +239,7 @@ impl Default for Capabilities {
Self {
scripting: false,
guest_access: false,
live_query_notifications: true,
allow_funcs: Arc::new(Targets::All),
deny_funcs: Arc::new(Targets::None),
@ -252,6 +254,7 @@ impl Capabilities {
Self {
scripting: true,
guest_access: true,
live_query_notifications: true,
allow_funcs: Arc::new(Targets::All),
deny_funcs: Arc::new(Targets::None),
@ -270,6 +273,11 @@ impl Capabilities {
self
}
pub fn with_live_query_notifications(mut self, live_query_notifications: bool) -> Self {
self.live_query_notifications = live_query_notifications;
self
}
pub fn with_functions(mut self, allow_funcs: Targets<FuncTarget>) -> Self {
self.allow_funcs = Arc::new(allow_funcs);
self
@ -298,6 +306,10 @@ impl Capabilities {
self.guest_access
}
pub fn allows_live_query_notifications(&self) -> bool {
self.live_query_notifications
}
pub fn allows_function(&self, target: &FuncTarget) -> bool {
self.allow_funcs.matches(target) && !self.deny_funcs.matches(target)
}
@ -530,6 +542,18 @@ mod tests {
assert!(!caps.allows_guest_access());
}
// When live query notifications are allowed
{
let cap = Capabilities::default().with_live_query_notifications(true);
assert!(cap.allows_live_query_notifications());
}
// When live query notifications are disabled
{
let cap = Capabilities::default().with_live_query_notifications(false);
assert!(!cap.allows_live_query_notifications());
}
// When all nets are allowed
{
let caps = Capabilities::default()

View file

@ -17,9 +17,14 @@ use crate::sql::value::Value;
use crate::sql::Base;
use channel::Receiver;
use futures::lock::Mutex;
use futures::StreamExt;
use std::sync::Arc;
#[cfg(not(target_arch = "wasm32"))]
use tokio::spawn;
use tracing::instrument;
use trice::Instant;
#[cfg(target_arch = "wasm32")]
use wasm_bindgen_futures::spawn_local as spawn;
pub(crate) struct Executor<'a> {
err: bool,
@ -136,24 +141,27 @@ impl<'a> Executor<'a> {
}
/// Consume the live query notifications
async fn clear(&self, _: &Context<'_>, rcv: Receiver<Notification>) {
while rcv.try_recv().is_ok() {
// Ignore notification
}
async fn clear(&self, _: &Context<'_>, mut rcv: Receiver<Notification>) {
spawn(async move {
while rcv.next().await.is_some() {
// Ignore notification
}
});
}
/// Flush notifications from a buffer channel (live queries) to the committed notification channel.
/// This is because we don't want to broadcast notifications to the user for failed transactions.
async fn flush(&self, ctx: &Context<'_>, rcv: Receiver<Notification>) {
if let Some(chn) = ctx.notifications() {
while let Ok(v) = rcv.try_recv() {
let _ = chn.send(v).await;
async fn flush(&self, ctx: &Context<'_>, mut rcv: Receiver<Notification>) {
let sender = ctx.notifications();
spawn(async move {
while let Some(notification) = rcv.next().await {
if let Some(chn) = &sender {
if chn.send(notification).await.is_err() {
break;
}
}
}
} else {
while rcv.try_recv().is_ok() {
// Ignore notification
}
}
});
}
async fn set_ns(&self, ctx: &mut Context<'_>, opt: &mut Options, ns: &str) {

View file

@ -6,6 +6,7 @@ use crate::dbs::{Action, Transaction};
use crate::doc::CursorDoc;
use crate::doc::Document;
use crate::err::Error;
use crate::sql::paths::META;
use crate::sql::paths::SC;
use crate::sql::paths::SD;
use crate::sql::paths::TK;
@ -26,12 +27,8 @@ impl<'a> Document<'a> {
if !opt.force && !self.changed() {
return Ok(());
}
// Get the record id
let rid = self.id.as_ref().unwrap();
// Check if we can send notifications
if let Some(chn) = &opt.sender {
// Clone the sending channel
let chn = chn.clone();
// Loop through all index statements
for lv in self.lv(opt, txn).await?.iter() {
// Create a new statement
@ -103,11 +100,20 @@ impl<'a> Document<'a> {
if stm.is_delete() {
// Send a DELETE notification
if opt.id()? == lv.node.0 {
let thing = (*rid).clone();
chn.send(Notification {
id: lv.id.clone(),
id: lv.id,
action: Action::Delete,
result: Value::Thing(thing),
result: {
// Ensure futures are run
let lqopt: &Options = &lqopt.new_with_futures(true);
// Output the full document before any changes were applied
let mut value =
doc.doc.compute(&lqctx, lqopt, txn, Some(doc)).await?;
// Remove metadata fields on output
value.del(&lqctx, lqopt, txn, &*META).await?;
// Output result
value
},
})
.await?;
} else {
@ -117,7 +123,7 @@ impl<'a> Document<'a> {
// Send a CREATE notification
if opt.id()? == lv.node.0 {
chn.send(Notification {
id: lv.id.clone(),
id: lv.id,
action: Action::Create,
result: self.pluck(&lqctx, &lqopt, txn, &lq).await?,
})
@ -129,7 +135,7 @@ impl<'a> Document<'a> {
// Send a UPDATE notification
if opt.id()? == lv.node.0 {
chn.send(Notification {
id: lv.id.clone(),
id: lv.id,
action: Action::Update,
result: self.pluck(&lqctx, &lqopt, txn, &lq).await?,
})

View file

@ -404,8 +404,8 @@ mod tests {
#[cfg(all(feature = "scripting", feature = "kv-mem"))]
use crate::dbs::Capabilities;
#[test]
fn implementations_are_present() {
#[tokio::test]
async fn implementations_are_present() {
// Accumulate and display all problems at once to avoid a test -> fix -> test -> fix cycle.
let mut problems = Vec::new();
@ -428,7 +428,7 @@ mod tests {
}
#[cfg(all(feature = "scripting", feature = "kv-mem"))]
futures::executor::block_on(async {
{
use crate::sql::Value;
let name = name.replace("::", ".");
@ -446,7 +446,7 @@ mod tests {
} else if tmp != Value::from("function") {
problems.push(format!("function {name} not exported to JavaScript: {tmp:?}"));
}
});
}
}
if !problems.is_empty() {

View file

@ -81,7 +81,7 @@ impl<'js> FromJs<'js> for Value {
let borrow = v.borrow();
let v: &classes::uuid::Uuid = &borrow;
return match &v.value {
Some(v) => Ok(v.clone().into()),
Some(v) => Ok((*v).into()),
None => Ok(Value::None),
};
}

View file

@ -37,7 +37,12 @@ pub fn prefix(ns: &str, db: &str, tb: &str) -> Vec<u8> {
pub fn suffix(ns: &str, db: &str, tb: &str) -> Vec<u8> {
let mut k = super::all::new(ns, db, tb).encode().unwrap();
k.extend_from_slice(&[b'!', b'l', b'q', 0xff]);
k.extend_from_slice(&[b'!', b'l', b'q']);
k.extend_from_slice(Uuid::max().as_ref());
// We need the extra byte here because `getr()` only supports half-open ranges
// so it wouldn't match max UUIDs because it doesn't check for equal matches
// on the upper bound. Adding an extra byte to bring max into range as well.
k.push(0x00);
k
}
@ -95,6 +100,6 @@ mod tests {
#[test]
fn suffix() {
let val = super::suffix("testns", "testdb", "testtb");
assert_eq!(val, b"/*testns\x00*testdb\x00*testtb\x00!lq\xff")
assert_eq!(val, b"/*testns\x00*testdb\x00*testtb\x00!lq\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\x00")
}
}

View file

@ -14,7 +14,9 @@ use crate::err::Error;
use crate::iam::ResourceKind;
use crate::iam::{Action, Auth, Error as IamError, Role};
use crate::key::root::hb::Hb;
use crate::kvs::clock::{SizedClock, SystemClock};
use crate::kvs::clock::SizedClock;
#[allow(unused_imports)]
use crate::kvs::clock::SystemClock;
use crate::kvs::{LockType, LockType::*, TransactionType, TransactionType::*, NO_LIMIT};
use crate::opt::auth::Root;
use crate::sql;
@ -212,10 +214,8 @@ impl Datastore {
#[allow(dead_code)]
async fn new_full_impl(
path: &str,
clock_override: Option<Arc<RwLock<SizedClock>>>,
#[allow(unused_variables)] clock_override: Option<Arc<RwLock<SizedClock>>>,
) -> Result<Datastore, Error> {
let default_clock: Arc<RwLock<SizedClock>> =
Arc::new(RwLock::new(SizedClock::System(SystemClock::new())));
// Initiate the desired datastore
let (inner, clock): (Result<Inner, Error>, Arc<RwLock<SizedClock>>) = match path {
"memory" => {
@ -223,6 +223,8 @@ impl Datastore {
{
info!("Starting kvs store in {}", path);
let v = super::mem::Datastore::new().await.map(Inner::Mem);
let default_clock =
Arc::new(RwLock::new(SizedClock::System(SystemClock::new())));
let clock = clock_override.unwrap_or(default_clock);
info!("Started kvs store in {}", path);
Ok((v, clock))
@ -238,6 +240,8 @@ impl Datastore {
let s = s.trim_start_matches("file://");
let s = s.trim_start_matches("file:");
let v = super::rocksdb::Datastore::new(s).await.map(Inner::RocksDB);
let default_clock =
Arc::new(RwLock::new(SizedClock::System(SystemClock::new())));
let clock = clock_override.unwrap_or(default_clock);
info!("Started kvs store at {}", path);
Ok((v, clock))
@ -254,6 +258,8 @@ impl Datastore {
let s = s.trim_start_matches("rocksdb:");
let v = super::rocksdb::Datastore::new(s).await.map(Inner::RocksDB);
info!("Started kvs store at {}", path);
let default_clock =
Arc::new(RwLock::new(SizedClock::System(SystemClock::new())));
let clock = clock_override.unwrap_or(default_clock);
Ok((v, clock))
}
@ -269,6 +275,8 @@ impl Datastore {
let s = s.trim_start_matches("speedb:");
let v = super::speedb::Datastore::new(s).await.map(Inner::SpeeDB);
info!("Started kvs store at {}", path);
let default_clock =
Arc::new(RwLock::new(SizedClock::System(SystemClock::new())));
let clock = clock_override.unwrap_or(default_clock);
Ok((v, clock))
}
@ -284,6 +292,8 @@ impl Datastore {
let s = s.trim_start_matches("indxdb:");
let v = super::indxdb::Datastore::new(s).await.map(Inner::IndxDB);
info!("Started kvs store at {}", path);
let default_clock =
Arc::new(RwLock::new(SizedClock::System(SystemClock::new())));
let clock = clock_override.unwrap_or(default_clock);
Ok((v, clock))
}
@ -299,6 +309,8 @@ impl Datastore {
let s = s.trim_start_matches("tikv:");
let v = super::tikv::Datastore::new(s).await.map(Inner::TiKV);
info!("Connected to kvs store at {}", path);
let default_clock =
Arc::new(RwLock::new(SizedClock::System(SystemClock::new())));
let clock = clock_override.unwrap_or(default_clock);
Ok((v, clock))
}
@ -314,6 +326,8 @@ impl Datastore {
let s = s.trim_start_matches("fdb:");
let v = super::fdb::Datastore::new(s).await.map(Inner::FoundationDB);
info!("Connected to kvs store at {}", path);
let default_clock =
Arc::new(RwLock::new(SizedClock::System(SystemClock::new())));
let clock = clock_override.unwrap_or(default_clock);
Ok((v, clock))
}
@ -322,8 +336,8 @@ impl Datastore {
}
// The datastore path is not valid
_ => {
// use clock_override and default_clock to remove warning when no kv is enabled.
let _ = (clock_override, default_clock);
// use clock_override to remove warning when no kv is enabled.
let _ = clock_override;
info!("Unable to load the specified datastore {}", path);
Err(Error::Ds("Unable to load the specified datastore".into()))
}
@ -579,7 +593,7 @@ impl Datastore {
for lq in node_lqs {
trace!("Archiving query {:?}", &lq);
let node_archived_lqs =
match self.archive_lv_for_node(tx, &lq.nd, this_node_id.clone()).await {
match self.archive_lv_for_node(tx, &lq.nd, *this_node_id).await {
Ok(lq) => lq,
Err(e) => {
error!("Error archiving lqs during bootstrap phase: {:?}", e);
@ -737,7 +751,7 @@ impl Datastore {
continue;
}
let lv = lv_res.unwrap();
let archived_lvs = lv.clone().archive(this_node_id.clone());
let archived_lvs = lv.clone().archive(this_node_id);
tx.putc_tblq(&lq.ns, &lq.db, &lq.tb, archived_lvs, Some(lv)).await?;
ret.push((lq, None));
}
@ -851,7 +865,7 @@ impl Datastore {
pub async fn heartbeat(&self) -> Result<(), Error> {
let mut tx = self.transaction(Write, Optimistic).await?;
let timestamp = tx.clock().await;
self.heartbeat_full(&mut tx, timestamp, self.id.clone()).await?;
self.heartbeat_full(&mut tx, timestamp, self.id).await?;
tx.commit().await
}

View file

@ -1,45 +1,53 @@
#[tokio::test]
#[serial]
async fn write_scan_tblq() {
let node_id = uuid::Uuid::parse_str("0bee25e0-34d7-448c-abc0-48cdf3db3a53").unwrap();
let clock = Arc::new(RwLock::new(SizedClock::Fake(FakeClock::new(Timestamp::default()))));
let test = init(node_id, clock).await.unwrap();
let node_id = uuid::uuid!("0bee25e0-34d7-448c-abc0-48cdf3db3a53");
let live_ids = [
uuid::Uuid::nil(),
uuid::uuid!("b5aab54e-d1ef-4a14-b537-9206dcde2209"),
uuid::Uuid::new_v4(),
uuid::Uuid::max(),
];
// Write some data
let mut tx = test.db.transaction(Write, Optimistic).await.unwrap();
let ns = "namespace";
let db = "database";
let tb = "table";
let live_id =
sql::Uuid::from(uuid::Uuid::parse_str("b5aab54e-d1ef-4a14-b537-9206dcde2209").unwrap());
let live_stm = LiveStatement {
id: live_id.clone(),
node: sql::Uuid::from(node_id),
expr: Default::default(),
what: Default::default(),
cond: None,
fetch: None,
archived: None,
session: Some(Value::None),
auth: None,
};
tx.putc_tblq(ns, db, tb, live_stm, None).await.unwrap();
tx.commit().await.unwrap();
for live_id in live_ids {
let clock = Arc::new(RwLock::new(SizedClock::Fake(FakeClock::new(Timestamp::default()))));
let test = init(node_id, clock).await.unwrap();
// Verify scan
let mut tx = test.db.transaction(Write, Optimistic).await.unwrap();
let res = tx.scan_tblq(ns, db, tb, 100).await.unwrap();
let no_limit = tx.scan_tblq(ns, db, tb, NO_LIMIT).await.unwrap();
tx.commit().await.unwrap();
assert_eq!(
res,
vec![LqValue {
nd: sql::Uuid::from(node_id),
ns: ns.to_string(),
db: db.to_string(),
tb: tb.to_string(),
lq: live_id
}]
);
assert_eq!(res, no_limit);
// Write some data
let mut tx = test.db.transaction(Write, Optimistic).await.unwrap();
let ns = "namespace";
let db = "database";
let tb = "table";
let live_id = sql::Uuid::from(live_id);
let live_stm = LiveStatement {
id: live_id.clone(),
node: sql::Uuid::from(node_id),
expr: Default::default(),
what: Default::default(),
cond: None,
fetch: None,
archived: None,
session: Some(Value::None),
auth: None,
};
tx.putc_tblq(ns, db, tb, live_stm, None).await.unwrap();
tx.commit().await.unwrap();
// Verify scan
let mut tx = test.db.transaction(Write, Optimistic).await.unwrap();
let res = tx.scan_tblq(ns, db, tb, 100).await.unwrap();
let no_limit = tx.scan_tblq(ns, db, tb, NO_LIMIT).await.unwrap();
tx.commit().await.unwrap();
assert_eq!(
res,
vec![LqValue {
nd: sql::Uuid::from(node_id),
ns: ns.to_string(),
db: db.to_string(),
tb: tb.to_string(),
lq: live_id
}]
);
assert_eq!(res, no_limit);
}
}

View file

@ -1324,7 +1324,7 @@ impl Transaction {
ns: lv.ns.to_string(),
db: lv.db.to_string(),
tb: lv.tb.to_string(),
lq: val.id.clone(),
lq: val.id,
});
// Count
if limit != NO_LIMIT {

View file

@ -166,6 +166,39 @@ pub mod error {
pub use crate::err::Error as Db;
}
/// The action performed on a record
///
/// This is used in live query notifications.
#[derive(Debug, Clone, Copy, Eq, PartialEq, Ord, PartialOrd, Hash)]
#[non_exhaustive]
pub enum Action {
Create,
Update,
Delete,
}
impl From<dbs::Action> for Action {
fn from(action: dbs::Action) -> Self {
match action {
dbs::Action::Create => Self::Create,
dbs::Action::Update => Self::Update,
dbs::Action::Delete => Self::Delete,
}
}
}
/// A live query notification
///
/// Live queries return a stream of notifications. The notification contains an `action` that triggered the change in the database record and `data` itself.
/// For deletions the data is the record before it was deleted. For everything else, it's the newly created record or updated record depending on whether
/// the action is create or update.
#[derive(Debug, Clone, Copy, Eq, PartialEq, Ord, PartialOrd, Hash)]
#[non_exhaustive]
pub struct Notification<R> {
pub action: Action,
pub data: R,
}
/// An error originating from the SurrealDB client library
#[derive(Debug, thiserror::Error, serde::Serialize)]
pub enum Error {

View file

@ -39,7 +39,7 @@ impl KillStatement {
opt.valid_for_db()?;
// Resolve live query id
let live_query_id = match &self.id {
Value::Uuid(id) => id.clone(),
Value::Uuid(id) => *id,
Value::Param(param) => match param.compute(ctx, opt, txn, None).await? {
Value::Uuid(id) => id,
_ => {

View file

@ -17,7 +17,9 @@ use std::str::FromStr;
pub(crate) const TOKEN: &str = "$surrealdb::private::sql::Uuid";
#[derive(Clone, Debug, Default, Eq, Ord, PartialEq, PartialOrd, Serialize, Deserialize, Hash)]
#[derive(
Clone, Copy, Debug, Default, Eq, Ord, PartialEq, PartialOrd, Serialize, Deserialize, Hash,
)]
#[serde(rename = "$surrealdb::private::sql::Uuid")]
#[revisioned(revision = 1)]
pub struct Uuid(pub uuid::Uuid);

View file

@ -5,6 +5,7 @@ mod api_integration {
use serde::Deserialize;
use serde::Serialize;
use serde_json::json;
use serial_test::serial;
use std::borrow::Cow;
use std::ops::Bound;
use std::sync::Arc;
@ -28,6 +29,8 @@ mod api_integration {
use surrealdb::sql::Value;
use surrealdb::Error;
use surrealdb::Surreal;
use tokio::sync::Semaphore;
use tokio::sync::SemaphorePermit;
use tracing_subscriber::filter::EnvFilter;
use tracing_subscriber::fmt;
use tracing_subscriber::layer::SubscriberExt;
@ -38,16 +41,13 @@ mod api_integration {
const ROOT_USER: &str = "root";
const ROOT_PASS: &str = "root";
const TICK_INTERVAL: Duration = Duration::from_secs(1);
// Used to ensure that only one test at a time is setting up the underlying datastore.
// When auth is enabled, multiple tests may try to create the same root user at the same time.
static SETUP_MUTEX: Lazy<Arc<Mutex<()>>> = Lazy::new(|| Arc::new(Mutex::new(())));
#[derive(Debug, Serialize)]
struct Record<'a> {
name: &'a str,
}
#[derive(Debug, Deserialize)]
#[derive(Debug, Clone, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
struct RecordId {
id: Thing,
}
@ -57,7 +57,7 @@ mod api_integration {
name: String,
}
#[derive(Debug, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
#[derive(Debug, Clone, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
struct RecordBuf {
id: Thing,
name: String,
@ -69,23 +69,14 @@ mod api_integration {
pass: &'a str,
}
fn init_logger() {
let test_writer = fmt::layer().with_test_writer();
let builder = fmt::Subscriber::builder().with_env_filter(EnvFilter::from_default_env());
let subscriber = builder.finish();
let _ = subscriber.with(test_writer).try_init();
}
#[cfg(feature = "protocol-ws")]
mod ws {
use super::*;
use surrealdb::engine::remote::ws::Client;
use surrealdb::engine::remote::ws::Ws;
#[allow(clippy::await_holding_lock)]
async fn new_db() -> Surreal<Client> {
let _guard = SETUP_MUTEX.lock().unwrap();
init_logger();
async fn new_db() -> (SemaphorePermit<'static>, Surreal<Client>) {
let permit = PERMITS.acquire().await.unwrap();
let db = Surreal::new::<Ws>("127.0.0.1:8000").await.unwrap();
db.signin(Root {
username: ROOT_USER,
@ -93,16 +84,18 @@ mod api_integration {
})
.await
.unwrap();
db
(permit, db)
}
#[tokio::test]
#[test_log::test(tokio::test)]
async fn any_engine_can_connect() {
init_logger();
let permit = PERMITS.acquire().await.unwrap();
surrealdb::engine::any::connect("ws://127.0.0.1:8000").await.unwrap();
drop(permit);
}
include!("api/mod.rs");
include!("api/live.rs");
}
#[cfg(feature = "protocol-http")]
@ -111,10 +104,8 @@ mod api_integration {
use surrealdb::engine::remote::http::Client;
use surrealdb::engine::remote::http::Http;
#[allow(clippy::await_holding_lock)]
async fn new_db() -> Surreal<Client> {
let _guard = SETUP_MUTEX.lock().unwrap();
init_logger();
async fn new_db() -> (SemaphorePermit<'static>, Surreal<Client>) {
let permit = PERMITS.acquire().await.unwrap();
let db = Surreal::new::<Http>("127.0.0.1:8000").await.unwrap();
db.signin(Root {
username: ROOT_USER,
@ -122,13 +113,14 @@ mod api_integration {
})
.await
.unwrap();
db
(permit, db)
}
#[tokio::test]
#[test_log::test(tokio::test)]
async fn any_engine_can_connect() {
init_logger();
let permit = PERMITS.acquire().await.unwrap();
surrealdb::engine::any::connect("http://127.0.0.1:8000").await.unwrap();
drop(permit);
}
include!("api/mod.rs");
@ -143,8 +135,8 @@ mod api_integration {
use surrealdb::engine::local::Mem;
use surrealdb::iam;
async fn new_db() -> Surreal<Db> {
init_logger();
async fn new_db() -> (SemaphorePermit<'static>, Surreal<Db>) {
let permit = PERMITS.acquire().await.unwrap();
let root = Root {
username: ROOT_USER,
password: ROOT_PASS,
@ -155,25 +147,22 @@ mod api_integration {
.capabilities(Capabilities::all());
let db = Surreal::new::<Mem>(config).await.unwrap();
db.signin(root).await.unwrap();
db
(permit, db)
}
#[tokio::test]
#[test_log::test(tokio::test)]
async fn memory_allowed_as_address() {
init_logger();
surrealdb::engine::any::connect("memory").await.unwrap();
}
#[tokio::test]
#[test_log::test(tokio::test)]
async fn any_engine_can_connect() {
init_logger();
surrealdb::engine::any::connect("mem://").await.unwrap();
surrealdb::engine::any::connect("memory").await.unwrap();
}
#[tokio::test]
#[test_log::test(tokio::test)]
async fn signin_first_not_necessary() {
init_logger();
let db = Surreal::new::<Mem>(()).await.unwrap();
db.use_ns("namespace").use_db("database").await.unwrap();
let Some(record): Option<RecordId> = db.create(("item", "foo")).await.unwrap() else {
@ -182,9 +171,8 @@ mod api_integration {
assert_eq!(record.id.to_string(), "item:foo");
}
#[tokio::test]
#[test_log::test(tokio::test)]
async fn cant_sign_into_default_root_account() {
init_logger();
let db = Surreal::new::<Mem>(()).await.unwrap();
let Error::Db(DbError::InvalidAuth) = db
.signin(Root {
@ -198,9 +186,8 @@ mod api_integration {
};
}
#[tokio::test]
#[test_log::test(tokio::test)]
async fn credentials_activate_authentication() {
init_logger();
let config = Config::new().user(Root {
username: ROOT_USER,
password: ROOT_PASS,
@ -218,7 +205,7 @@ mod api_integration {
};
}
#[tokio::test]
#[test_log::test(tokio::test)]
async fn surreal_clone() {
use surrealdb::engine::any::Any;
@ -232,6 +219,7 @@ mod api_integration {
}
include!("api/mod.rs");
include!("api/live.rs");
include!("api/backup.rs");
}
@ -241,10 +229,8 @@ mod api_integration {
use surrealdb::engine::local::Db;
use surrealdb::engine::local::File;
#[allow(clippy::await_holding_lock)]
async fn new_db() -> Surreal<Db> {
let _guard = SETUP_MUTEX.lock().unwrap();
init_logger();
async fn new_db() -> (SemaphorePermit<'static>, Surreal<Db>) {
let permit = PERMITS.acquire().await.unwrap();
let path = format!("/tmp/{}.db", Ulid::new());
let root = Root {
username: ROOT_USER,
@ -256,18 +242,19 @@ mod api_integration {
.capabilities(Capabilities::all());
let db = Surreal::new::<File>((path, config)).await.unwrap();
db.signin(root).await.unwrap();
db
(permit, db)
}
#[tokio::test]
#[test_log::test(tokio::test)]
async fn any_engine_can_connect() {
init_logger();
let path = Ulid::new();
surrealdb::engine::any::connect(format!("file://{path}.db")).await.unwrap();
surrealdb::engine::any::connect(format!("file:///tmp/{path}.db")).await.unwrap();
let path = format!("{}.db", Ulid::new());
surrealdb::engine::any::connect(format!("file://{path}")).await.unwrap();
surrealdb::engine::any::connect(format!("file:///tmp/{path}")).await.unwrap();
tokio::fs::remove_dir_all(path).await.unwrap();
}
include!("api/mod.rs");
include!("api/live.rs");
include!("api/backup.rs");
}
@ -277,10 +264,8 @@ mod api_integration {
use surrealdb::engine::local::Db;
use surrealdb::engine::local::RocksDb;
#[allow(clippy::await_holding_lock)]
async fn new_db() -> Surreal<Db> {
let _guard = SETUP_MUTEX.lock().unwrap();
init_logger();
async fn new_db() -> (SemaphorePermit<'static>, Surreal<Db>) {
let permit = PERMITS.acquire().await.unwrap();
let path = format!("/tmp/{}.db", Ulid::new());
let root = Root {
username: ROOT_USER,
@ -292,18 +277,19 @@ mod api_integration {
.capabilities(Capabilities::all());
let db = Surreal::new::<RocksDb>((path, config)).await.unwrap();
db.signin(root).await.unwrap();
db
(permit, db)
}
#[tokio::test]
#[test_log::test(tokio::test)]
async fn any_engine_can_connect() {
init_logger();
let path = Ulid::new();
surrealdb::engine::any::connect(format!("rocksdb://{path}.db")).await.unwrap();
surrealdb::engine::any::connect(format!("rocksdb:///tmp/{path}.db")).await.unwrap();
let path = format!("{}.db", Ulid::new());
surrealdb::engine::any::connect(format!("rocksdb://{path}")).await.unwrap();
surrealdb::engine::any::connect(format!("rocksdb:///tmp/{path}")).await.unwrap();
tokio::fs::remove_dir_all(path).await.unwrap();
}
include!("api/mod.rs");
include!("api/live.rs");
include!("api/backup.rs");
}
@ -313,10 +299,8 @@ mod api_integration {
use surrealdb::engine::local::Db;
use surrealdb::engine::local::SpeeDb;
#[allow(clippy::await_holding_lock)]
async fn new_db() -> Surreal<Db> {
let _guard = SETUP_MUTEX.lock().unwrap();
init_logger();
async fn new_db() -> (SemaphorePermit<'static>, Surreal<Db>) {
let permit = PERMITS.acquire().await.unwrap();
let path = format!("/tmp/{}.db", Ulid::new());
let root = Root {
username: ROOT_USER,
@ -328,18 +312,19 @@ mod api_integration {
.capabilities(Capabilities::all());
let db = Surreal::new::<SpeeDb>((path, config)).await.unwrap();
db.signin(root).await.unwrap();
db
(permit, db)
}
#[tokio::test]
#[test_log::test(tokio::test)]
async fn any_engine_can_connect() {
init_logger();
let path = Ulid::new();
surrealdb::engine::any::connect(format!("speedb://{path}.db")).await.unwrap();
surrealdb::engine::any::connect(format!("speedb:///tmp/{path}.db")).await.unwrap();
let path = format!("{}.db", Ulid::new());
surrealdb::engine::any::connect(format!("speedb://{path}")).await.unwrap();
surrealdb::engine::any::connect(format!("speedb:///tmp/{path}")).await.unwrap();
tokio::fs::remove_dir_all(path).await.unwrap();
}
include!("api/mod.rs");
include!("api/live.rs");
include!("api/backup.rs");
}
@ -349,10 +334,8 @@ mod api_integration {
use surrealdb::engine::local::Db;
use surrealdb::engine::local::TiKv;
#[allow(clippy::await_holding_lock)]
async fn new_db() -> Surreal<Db> {
let _guard = SETUP_MUTEX.lock().unwrap();
init_logger();
async fn new_db() -> (SemaphorePermit<'static>, Surreal<Db>) {
let permit = PERMITS.acquire().await.unwrap();
let root = Root {
username: ROOT_USER,
password: ROOT_PASS,
@ -363,16 +346,18 @@ mod api_integration {
.capabilities(Capabilities::all());
let db = Surreal::new::<TiKv>(("127.0.0.1:2379", config)).await.unwrap();
db.signin(root).await.unwrap();
db
(permit, db)
}
#[tokio::test]
#[test_log::test(tokio::test)]
async fn any_engine_can_connect() {
init_logger();
let permit = PERMITS.acquire().await.unwrap();
surrealdb::engine::any::connect("tikv://127.0.0.1:2379").await.unwrap();
drop(permit);
}
include!("api/mod.rs");
include!("api/live.rs");
include!("api/backup.rs");
}
@ -382,10 +367,8 @@ mod api_integration {
use surrealdb::engine::local::Db;
use surrealdb::engine::local::FDb;
#[allow(clippy::await_holding_lock)]
async fn new_db() -> Surreal<Db> {
let _guard = SETUP_MUTEX.lock().unwrap();
init_logger();
async fn new_db() -> (SemaphorePermit<'static>, Surreal<Db>) {
let permit = PERMITS.acquire().await.unwrap();
let root = Root {
username: ROOT_USER,
password: ROOT_PASS,
@ -400,10 +383,11 @@ mod api_integration {
.unwrap();
let db = Surreal::new::<FDb>((path, config)).await.unwrap();
db.signin(root).await.unwrap();
db
(permit, db)
}
include!("api/mod.rs");
include!("api/live.rs");
include!("api/backup.rs");
}
@ -412,10 +396,8 @@ mod api_integration {
use super::*;
use surrealdb::engine::any::Any;
#[allow(clippy::await_holding_lock)]
async fn new_db() -> Surreal<Any> {
let _guard = SETUP_MUTEX.lock().unwrap();
init_logger();
async fn new_db() -> (SemaphorePermit<'static>, Surreal<Any>) {
let permit = PERMITS.acquire().await.unwrap();
let db = surrealdb::engine::any::connect("http://127.0.0.1:8000").await.unwrap();
db.signin(Root {
username: ROOT_USER,
@ -423,7 +405,7 @@ mod api_integration {
})
.await
.unwrap();
db
(permit, db)
}
include!("api/mod.rs");

View file

@ -3,9 +3,9 @@
use tokio::fs::remove_file;
#[tokio::test]
#[test_log::test(tokio::test)]
async fn export_import() {
let db = new_db().await;
let (permit, db) = new_db().await;
let db_name = Ulid::new().to_string();
db.use_ns(NS).use_db(&db_name).await.unwrap();
for i in 0..10 {
@ -17,6 +17,7 @@ async fn export_import() {
.await
.unwrap();
}
drop(permit);
let file = format!("{db_name}.sql");
db.export(&file).await.unwrap();
db.import(&file).await.unwrap();

176
lib/tests/api/live.rs Normal file
View file

@ -0,0 +1,176 @@
// Tests for running live queries
// Supported by the storage engines and the WS protocol
use futures::StreamExt;
use futures::TryStreamExt;
use surrealdb::Action;
use surrealdb::Notification;
#[test_log::test(tokio::test)]
async fn live_select_table() {
let (permit, db) = new_db().await;
db.use_ns(NS).use_db(Ulid::new().to_string()).await.unwrap();
{
let table = Ulid::new().to_string();
// Start listening
let mut users = db.select(&table).live().await.unwrap();
// Create a record
let created: Vec<RecordId> = db.create(table).await.unwrap();
// Pull the notification
let notification: Notification<RecordId> = users.next().await.unwrap().unwrap();
// The returned record should match the created record
assert_eq!(created, vec![notification.data.clone()]);
// It should be newly created
assert_eq!(notification.action, Action::Create);
// Update the record
let _: Option<RecordId> =
db.update(&notification.data.id).content(json!({"foo": "bar"})).await.unwrap();
// Pull the notification
let notification: Notification<RecordId> = users.next().await.unwrap().unwrap();
// It should be updated
assert_eq!(notification.action, Action::Update);
// Delete the record
let _: Option<RecordId> = db.delete(&notification.data.id).await.unwrap();
// Pull the notification
let notification: Notification<RecordId> = users.next().await.unwrap().unwrap();
// It should be deleted
assert_eq!(notification.action, Action::Delete);
}
{
let table = Ulid::new().to_string();
// Start listening
let mut users = db.select(Resource::from(&table)).live().await.unwrap();
// Create a record
db.create(Resource::from(&table)).await.unwrap();
// Pull the notification
let notification = users.next().await.unwrap();
// The returned record should be an object
assert!(notification.data.is_object());
// It should be newly created
assert_eq!(notification.action, Action::Create);
}
drop(permit);
}
#[test_log::test(tokio::test)]
async fn live_select_record_id() {
let (permit, db) = new_db().await;
db.use_ns(NS).use_db(Ulid::new().to_string()).await.unwrap();
{
let record_id = Thing::from((Ulid::new().to_string(), "john".to_owned()));
// Start listening
let mut users = db.select(&record_id).live().await.unwrap();
// Create a record
let created: Option<RecordId> = db.create(record_id).await.unwrap();
// Pull the notification
let notification: Notification<RecordId> = users.try_next().await.unwrap().unwrap();
// The returned record should match the created record
assert_eq!(created, Some(notification.data.clone()));
// It should be newly created
assert_eq!(notification.action, Action::Create);
// Update the record
let _: Option<RecordId> =
db.update(&notification.data.id).content(json!({"foo": "bar"})).await.unwrap();
// Pull the notification
let notification: Notification<RecordId> = users.try_next().await.unwrap().unwrap();
// It should be updated
assert_eq!(notification.action, Action::Update);
// Delete the record
let _: Option<RecordId> = db.delete(&notification.data.id).await.unwrap();
// Pull the notification
let notification: Notification<RecordId> = users.try_next().await.unwrap().unwrap();
// It should be deleted
assert_eq!(notification.action, Action::Delete);
}
{
let record_id = Thing::from((Ulid::new().to_string(), "john".to_owned()));
// Start listening
let mut users = db.select(Resource::from(&record_id)).live().await.unwrap();
// Create a record
db.create(Resource::from(record_id)).await.unwrap();
// Pull the notification
let notification = users.next().await.unwrap();
// The returned record should be an object
assert!(notification.data.is_object());
// It should be newly created
assert_eq!(notification.action, Action::Create);
}
drop(permit);
}
#[test_log::test(tokio::test)]
async fn live_select_record_ranges() {
let (permit, db) = new_db().await;
db.use_ns(NS).use_db(Ulid::new().to_string()).await.unwrap();
{
let table = Ulid::new().to_string();
// Start listening
let mut users = db.select(&table).range("jane".."john").live().await.unwrap();
// Create a record
let created: Option<RecordId> = db.create((table, "jane")).await.unwrap();
// Pull the notification
let notification: Notification<RecordId> = users.try_next().await.unwrap().unwrap();
// The returned record should match the created record
assert_eq!(created, Some(notification.data.clone()));
// It should be newly created
assert_eq!(notification.action, Action::Create);
// Update the record
let _: Option<RecordId> =
db.update(&notification.data.id).content(json!({"foo": "bar"})).await.unwrap();
// Pull the notification
let notification: Notification<RecordId> = users.try_next().await.unwrap().unwrap();
// It should be updated
assert_eq!(notification.action, Action::Update);
// Delete the record
let _: Option<RecordId> = db.delete(&notification.data.id).await.unwrap();
// Pull the notification
let notification: Notification<RecordId> = users.try_next().await.unwrap().unwrap();
// It should be deleted
assert_eq!(notification.action, Action::Delete);
}
{
let table = Ulid::new().to_string();
// Start listening
let mut users =
db.select(Resource::from(&table)).range("jane".."john").live().await.unwrap();
// Create a record
db.create(Resource::from((table, "job"))).await.unwrap();
// Pull the notification
let notification = users.next().await.unwrap();
// The returned record should be an object
assert!(notification.data.is_object());
// It should be newly created
assert_eq!(notification.action, Action::Create);
}
drop(permit);
}

View file

@ -1,14 +1,17 @@
// Tests common to all protocols and storage engines
#[tokio::test]
static PERMITS: Semaphore = Semaphore::const_new(1);
#[test_log::test(tokio::test)]
async fn connect() {
let db = new_db().await;
let (permit, db) = new_db().await;
drop(permit);
db.health().await.unwrap();
}
#[tokio::test]
#[test_log::test(tokio::test)]
async fn yuse() {
let db = new_db().await;
let (permit, db) = new_db().await;
let item = Ulid::new().to_string();
match db.create(Resource::from(item.as_str())).await.unwrap_err() {
// Local engines return this error
@ -27,12 +30,14 @@ async fn yuse() {
}
db.use_db(item.as_str()).await.unwrap();
db.create(Resource::from(item)).await.unwrap();
drop(permit);
}
#[tokio::test]
#[test_log::test(tokio::test)]
async fn invalidate() {
let db = new_db().await;
let (permit, db) = new_db().await;
db.use_ns(NS).use_db(Ulid::new().to_string()).await.unwrap();
drop(permit);
db.invalidate().await.unwrap();
let error = db.create::<Option<RecordId>>(("user", "john")).await.unwrap_err();
assert!(
@ -42,9 +47,9 @@ async fn invalidate() {
);
}
#[tokio::test]
#[test_log::test(tokio::test)]
async fn signup_scope() {
let db = new_db().await;
let (permit, db) = new_db().await;
let database = Ulid::new().to_string();
db.use_ns(NS).use_db(&database).await.unwrap();
let scope = Ulid::new().to_string();
@ -56,6 +61,7 @@ async fn signup_scope() {
"
);
let response = db.query(sql).await.unwrap();
drop(permit);
response.check().unwrap();
db.signup(Scope {
namespace: NS,
@ -70,14 +76,15 @@ async fn signup_scope() {
.unwrap();
}
#[tokio::test]
#[test_log::test(tokio::test)]
async fn signin_ns() {
let db = new_db().await;
let (permit, db) = new_db().await;
db.use_ns(NS).use_db(Ulid::new().to_string()).await.unwrap();
let user = Ulid::new().to_string();
let pass = "password123";
let sql = format!("DEFINE USER {user} ON NAMESPACE PASSWORD '{pass}'");
let response = db.query(sql).await.unwrap();
drop(permit);
response.check().unwrap();
db.signin(Namespace {
namespace: NS,
@ -88,15 +95,16 @@ async fn signin_ns() {
.unwrap();
}
#[tokio::test]
#[test_log::test(tokio::test)]
async fn signin_db() {
let db = new_db().await;
let (permit, db) = new_db().await;
let database = Ulid::new().to_string();
db.use_ns(NS).use_db(&database).await.unwrap();
let user = Ulid::new().to_string();
let pass = "password123";
let sql = format!("DEFINE USER {user} ON DATABASE PASSWORD '{pass}'");
let response = db.query(sql).await.unwrap();
drop(permit);
response.check().unwrap();
db.signin(Database {
namespace: NS,
@ -108,9 +116,9 @@ async fn signin_db() {
.unwrap();
}
#[tokio::test]
#[test_log::test(tokio::test)]
async fn signin_scope() {
let db = new_db().await;
let (permit, db) = new_db().await;
let database = Ulid::new().to_string();
db.use_ns(NS).use_db(&database).await.unwrap();
let scope = Ulid::new().to_string();
@ -124,6 +132,7 @@ async fn signin_scope() {
"
);
let response = db.query(sql).await.unwrap();
drop(permit);
response.check().unwrap();
db.signup(Scope {
namespace: NS,
@ -149,9 +158,9 @@ async fn signin_scope() {
.unwrap();
}
#[tokio::test]
#[test_log::test(tokio::test)]
async fn scope_throws_error() {
let db = new_db().await;
let (permit, db) = new_db().await;
let database = Ulid::new().to_string();
db.use_ns(NS).use_db(&database).await.unwrap();
let scope = Ulid::new().to_string();
@ -165,6 +174,7 @@ async fn scope_throws_error() {
"
);
let response = db.query(sql).await.unwrap();
drop(permit);
response.check().unwrap();
match db
@ -216,9 +226,9 @@ async fn scope_throws_error() {
};
}
#[tokio::test]
#[test_log::test(tokio::test)]
async fn scope_invalid_query() {
let db = new_db().await;
let (permit, db) = new_db().await;
let database = Ulid::new().to_string();
db.use_ns(NS).use_db(&database).await.unwrap();
let scope = Ulid::new().to_string();
@ -232,6 +242,7 @@ async fn scope_invalid_query() {
"
);
let response = db.query(sql).await.unwrap();
drop(permit);
response.check().unwrap();
match db
@ -281,14 +292,15 @@ async fn scope_invalid_query() {
};
}
#[tokio::test]
#[test_log::test(tokio::test)]
async fn authenticate() {
let db = new_db().await;
let (permit, db) = new_db().await;
db.use_ns(NS).use_db(Ulid::new().to_string()).await.unwrap();
let user = Ulid::new().to_string();
let pass = "password123";
let sql = format!("DEFINE USER {user} ON NAMESPACE PASSWORD '{pass}'");
let response = db.query(sql).await.unwrap();
drop(permit);
response.check().unwrap();
let token = db
.signin(Namespace {
@ -301,10 +313,11 @@ async fn authenticate() {
db.authenticate(token).await.unwrap();
}
#[tokio::test]
#[test_log::test(tokio::test)]
async fn query() {
let db = new_db().await;
let (permit, db) = new_db().await;
db.use_ns(NS).use_db(Ulid::new().to_string()).await.unwrap();
drop(permit);
let _ = db
.query(
"
@ -323,9 +336,9 @@ async fn query() {
assert_eq!(name, "John Doe");
}
#[tokio::test]
#[test_log::test(tokio::test)]
async fn query_decimals() {
let db = new_db().await;
let (permit, db) = new_db().await;
db.use_ns(NS).use_db(Ulid::new().to_string()).await.unwrap();
let sql = "
DEFINE TABLE foo;
@ -333,12 +346,14 @@ async fn query_decimals() {
CREATE foo CONTENT { bar: 42.69 };
";
let _ = db.query(sql).await.unwrap().check().unwrap();
drop(permit);
}
#[tokio::test]
#[test_log::test(tokio::test)]
async fn query_binds() {
let db = new_db().await;
let (permit, db) = new_db().await;
db.use_ns(NS).use_db(Ulid::new().to_string()).await.unwrap();
drop(permit);
let mut response =
db.query("CREATE user:john SET name = $name").bind(("name", "John Doe")).await.unwrap();
let Some(record): Option<RecordName> = response.take(0).unwrap() else {
@ -367,10 +382,11 @@ async fn query_binds() {
assert_eq!(record.name, "John Doe");
}
#[tokio::test]
#[test_log::test(tokio::test)]
async fn query_chaining() {
let db = new_db().await;
let (permit, db) = new_db().await;
db.use_ns(NS).use_db(Ulid::new().to_string()).await.unwrap();
drop(permit);
let response = db
.query(BeginStatement)
.query("CREATE account:one SET balance = 135605.16")
@ -383,37 +399,41 @@ async fn query_chaining() {
response.check().unwrap();
}
#[tokio::test]
#[test_log::test(tokio::test)]
async fn mixed_results_query() {
let db = new_db().await;
let (permit, db) = new_db().await;
db.use_ns(NS).use_db(Ulid::new().to_string()).await.unwrap();
drop(permit);
let sql = "CREATE bar SET baz = rand('a'); CREATE foo;";
let mut response = db.query(sql).await.unwrap();
response.take::<Value>(0).unwrap_err();
let _: Option<RecordId> = response.take(1).unwrap();
}
#[tokio::test]
#[test_log::test(tokio::test)]
async fn create_record_no_id() {
let db = new_db().await;
let (permit, db) = new_db().await;
db.use_ns(NS).use_db(Ulid::new().to_string()).await.unwrap();
drop(permit);
let _: Vec<RecordId> = db.create("user").await.unwrap();
let _: Value = db.create(Resource::from("user")).await.unwrap();
}
#[tokio::test]
#[test_log::test(tokio::test)]
async fn create_record_with_id() {
let db = new_db().await;
let (permit, db) = new_db().await;
db.use_ns(NS).use_db(Ulid::new().to_string()).await.unwrap();
drop(permit);
let _: Option<RecordId> = db.create(("user", "jane")).await.unwrap();
let _: Value = db.create(Resource::from(("user", "john"))).await.unwrap();
let _: Value = db.create(Resource::from("user:doe")).await.unwrap();
}
#[tokio::test]
#[test_log::test(tokio::test)]
async fn create_record_no_id_with_content() {
let db = new_db().await;
let (permit, db) = new_db().await;
db.use_ns(NS).use_db(Ulid::new().to_string()).await.unwrap();
drop(permit);
let _: Vec<RecordId> = db
.create("user")
.content(Record {
@ -430,10 +450,11 @@ async fn create_record_no_id_with_content() {
.unwrap();
}
#[tokio::test]
#[test_log::test(tokio::test)]
async fn create_record_with_id_with_content() {
let db = new_db().await;
let (permit, db) = new_db().await;
db.use_ns(NS).use_db(Ulid::new().to_string()).await.unwrap();
drop(permit);
let record: Option<RecordId> = db
.create(("user", "john"))
.content(Record {
@ -452,10 +473,11 @@ async fn create_record_with_id_with_content() {
assert_eq!(value.record(), thing("user:jane").ok());
}
#[tokio::test]
#[test_log::test(tokio::test)]
async fn select_table() {
let db = new_db().await;
let (permit, db) = new_db().await;
db.use_ns(NS).use_db(Ulid::new().to_string()).await.unwrap();
drop(permit);
let table = "user";
let _: Vec<RecordId> = db.create(table).await.unwrap();
let _: Vec<RecordId> = db.create(table).await.unwrap();
@ -464,10 +486,11 @@ async fn select_table() {
assert_eq!(users.len(), 3);
}
#[tokio::test]
#[test_log::test(tokio::test)]
async fn select_record_id() {
let db = new_db().await;
let (permit, db) = new_db().await;
db.use_ns(NS).use_db(Ulid::new().to_string()).await.unwrap();
drop(permit);
let record_id = ("user", "john");
let _: Option<RecordId> = db.create(record_id).await.unwrap();
let Some(record): Option<RecordId> = db.select(record_id).await.unwrap() else {
@ -478,10 +501,11 @@ async fn select_record_id() {
assert_eq!(value.record(), thing("user:john").ok());
}
#[tokio::test]
#[test_log::test(tokio::test)]
async fn select_record_ranges() {
let db = new_db().await;
let (permit, db) = new_db().await;
db.use_ns(NS).use_db(Ulid::new().to_string()).await.unwrap();
drop(permit);
let table = "user";
let _: Option<RecordId> = db.create((table, "amos")).await.unwrap();
let _: Option<RecordId> = db.create((table, "jane")).await.unwrap();
@ -513,10 +537,11 @@ async fn select_record_ranges() {
assert_eq!(convert(users), vec!["john"]);
}
#[tokio::test]
#[test_log::test(tokio::test)]
async fn update_table() {
let db = new_db().await;
let (permit, db) = new_db().await;
db.use_ns(NS).use_db(Ulid::new().to_string()).await.unwrap();
drop(permit);
let table = "user";
let _: Vec<RecordId> = db.create(table).await.unwrap();
let _: Vec<RecordId> = db.create(table).await.unwrap();
@ -525,10 +550,11 @@ async fn update_table() {
assert_eq!(users.len(), 2);
}
#[tokio::test]
#[test_log::test(tokio::test)]
async fn update_record_id() {
let db = new_db().await;
let (permit, db) = new_db().await;
db.use_ns(NS).use_db(Ulid::new().to_string()).await.unwrap();
drop(permit);
let table = "user";
let _: Option<RecordId> = db.create((table, "john")).await.unwrap();
let _: Option<RecordId> = db.create((table, "jane")).await.unwrap();
@ -536,10 +562,11 @@ async fn update_record_id() {
assert_eq!(users.len(), 2);
}
#[tokio::test]
#[test_log::test(tokio::test)]
async fn update_table_with_content() {
let db = new_db().await;
let (permit, db) = new_db().await;
db.use_ns(NS).use_db(Ulid::new().to_string()).await.unwrap();
drop(permit);
let sql = "
CREATE type::thing($table, 'amos') SET name = 'Amos';
CREATE type::thing($table, 'jane') SET name = 'Jane';
@ -579,10 +606,11 @@ async fn update_table_with_content() {
assert_eq!(users, expected);
}
#[tokio::test]
#[test_log::test(tokio::test)]
async fn update_record_range_with_content() {
let db = new_db().await;
let (permit, db) = new_db().await;
db.use_ns(NS).use_db(Ulid::new().to_string()).await.unwrap();
drop(permit);
let sql = "
CREATE type::thing($table, 'amos') SET name = 'Amos';
CREATE type::thing($table, 'jane') SET name = 'Jane';
@ -637,10 +665,11 @@ async fn update_record_range_with_content() {
);
}
#[tokio::test]
#[test_log::test(tokio::test)]
async fn update_record_id_with_content() {
let db = new_db().await;
let (permit, db) = new_db().await;
db.use_ns(NS).use_db(Ulid::new().to_string()).await.unwrap();
drop(permit);
let record_id = ("user", "john");
let user: Option<RecordName> = db
.create(record_id)
@ -677,10 +706,11 @@ struct Person {
marketing: bool,
}
#[tokio::test]
#[test_log::test(tokio::test)]
async fn merge_record_id() {
let db = new_db().await;
let (permit, db) = new_db().await;
db.use_ns(NS).use_db(Ulid::new().to_string()).await.unwrap();
drop(permit);
let record_id = ("person", "jaime");
let mut jaime: Option<Person> = db
.create(record_id)
@ -713,7 +743,7 @@ async fn merge_record_id() {
);
}
#[tokio::test]
#[test_log::test(tokio::test)]
async fn patch_record_id() {
#[derive(Debug, Deserialize, Eq, PartialEq)]
struct Record {
@ -722,8 +752,9 @@ async fn patch_record_id() {
hello: Vec<String>,
}
let db = new_db().await;
let (permit, db) = new_db().await;
db.use_ns(NS).use_db(Ulid::new().to_string()).await.unwrap();
drop(permit);
let id = "john";
let _: Option<RecordId> = db
.create(("user", id))
@ -751,10 +782,11 @@ async fn patch_record_id() {
);
}
#[tokio::test]
#[test_log::test(tokio::test)]
async fn delete_table() {
let db = new_db().await;
let (permit, db) = new_db().await;
db.use_ns(NS).use_db(Ulid::new().to_string()).await.unwrap();
drop(permit);
let table = "user";
let _: Vec<RecordId> = db.create(table).await.unwrap();
let _: Vec<RecordId> = db.create(table).await.unwrap();
@ -767,10 +799,11 @@ async fn delete_table() {
assert!(users.is_empty());
}
#[tokio::test]
#[test_log::test(tokio::test)]
async fn delete_record_id() {
let db = new_db().await;
let (permit, db) = new_db().await;
db.use_ns(NS).use_db(Ulid::new().to_string()).await.unwrap();
drop(permit);
let record_id = ("user", "john");
let _: Option<RecordId> = db.create(record_id).await.unwrap();
let _: Option<RecordId> = db.select(record_id).await.unwrap();
@ -785,10 +818,11 @@ async fn delete_record_id() {
assert_eq!(value, Value::None);
}
#[tokio::test]
#[test_log::test(tokio::test)]
async fn delete_record_range() {
let db = new_db().await;
let (permit, db) = new_db().await;
db.use_ns(NS).use_db(Ulid::new().to_string()).await.unwrap();
drop(permit);
let sql = "
CREATE type::thing($table, 'amos') SET name = 'Amos';
CREATE type::thing($table, 'jane') SET name = 'Jane';
@ -828,15 +862,16 @@ async fn delete_record_range() {
);
}
#[tokio::test]
#[test_log::test(tokio::test)]
async fn changefeed() {
let db = new_db().await;
let (permit, db) = new_db().await;
db.use_ns(NS).use_db(Ulid::new().to_string()).await.unwrap();
// Enable change feeds
let sql = "
DEFINE TABLE user CHANGEFEED 1h;
";
let response = db.query(sql).await.unwrap();
drop(permit);
response.check().unwrap();
// Create and update users
let sql = "
@ -1001,16 +1036,18 @@ async fn changefeed() {
);
}
#[tokio::test]
#[test_log::test(tokio::test)]
async fn version() {
let db = new_db().await;
let (permit, db) = new_db().await;
drop(permit);
db.version().await.unwrap();
}
#[tokio::test]
#[test_log::test(tokio::test)]
async fn set_unset() {
let db = new_db().await;
let (permit, db) = new_db().await;
db.use_ns(NS).use_db(Ulid::new().to_string()).await.unwrap();
drop(permit);
let (key, value) = ("name", "Doe");
let sql = "RETURN $name";
db.set(key, value).await.unwrap();
@ -1025,10 +1062,11 @@ async fn set_unset() {
assert!(name.is_none());
}
#[tokio::test]
#[test_log::test(tokio::test)]
async fn return_bool() {
let db = new_db().await;
let (permit, db) = new_db().await;
let mut response = db.query("RETURN true").await.unwrap();
drop(permit);
let Some(boolean): Option<bool> = response.take(0).unwrap() else {
panic!("record not found");
};

View file

@ -1,13 +1,12 @@
mod parse;
use parse::Parse;
use channel::{Receiver, TryRecvError};
mod helpers;
use helpers::new_ds;
use surrealdb::dbs::{Action, Notification, Session};
use surrealdb::err::Error;
use surrealdb::iam::Role;
use surrealdb::sql::{Id, Thing, Value};
use surrealdb::sql::Value;
#[tokio::test]
async fn delete() -> Result<(), Error> {
@ -410,36 +409,20 @@ async fn delete_filtered_live_notification() -> Result<(), Error> {
assert_eq!(tmp, val);
// Validate notification
let notifications = dbs.notifications();
let notifications = match notifications {
Some(notifications) => notifications,
None => panic!("expected notifications"),
};
let not = recv_notification(&notifications, 10, std::time::Duration::from_millis(100)).unwrap();
let notifications = dbs.notifications().expect("expected notifications");
let notification = notifications.recv().await.unwrap();
assert_eq!(
not,
notification,
Notification {
id: live_id,
action: Action::Delete,
result: Value::Thing(Thing {
tb: "person".to_string(),
id: Id::String("test_true".to_string()),
}),
result: Value::parse(
"{
id: person:test_true,
condition: true,
}"
),
}
);
Ok(())
}
fn recv_notification(
notifications: &Receiver<Notification>,
tries: u8,
poll_rate: std::time::Duration,
) -> Result<Notification, TryRecvError> {
for _ in 0..tries {
if let Ok(not) = notifications.try_recv() {
return Ok(not);
}
std::thread::sleep(poll_rate);
}
notifications.try_recv()
}

View file

@ -255,9 +255,9 @@ impl Connection {
// Serialize the message to send
let message = success(None, notification);
// Get the current output format
let format = rpc.read().await.processor.format.clone();
let format = rpc.read().await.processor.format;
// Send the notification to the client
message.send(format, ws.clone()).await
message.send(format, ws).await
}
}
}
@ -271,7 +271,7 @@ impl Connection {
/// Handle individual WebSocket messages
async fn handle_msg(rpc: Arc<RwLock<Connection>>, msg: Message, chn: Sender<Message>) {
// Get the current output format
let mut out_fmt = rpc.read().await.processor.format.clone();
let mut out_fmt = rpc.read().await.processor.format;
// Prepare Span and Otel context
let span = span_for_request(&rpc.read().await.ws_id);
@ -283,8 +283,12 @@ impl Connection {
match parse_request(msg).await {
Ok(req) => {
if let Some(_out_fmt) = req.out_fmt {
out_fmt = _out_fmt;
if let Some(fmt) = req.out_fmt {
if out_fmt != fmt {
// Update the default format
rpc.write().await.processor.format = fmt;
out_fmt = fmt;
}
}
// Now that we know the method, we can update the span and create otel context
@ -303,11 +307,11 @@ impl Connection {
rpc.write().await.processor.process_request(&req.method, req.params).await;
// Process the response
res.into_response(req.id).send(out_fmt, chn).with_context(otel_cx).await
res.into_response(req.id).send(out_fmt, &chn).with_context(otel_cx).await
}
Err(err) => {
// Process the response
failure(None, err).send(out_fmt, chn).with_context(otel_cx.clone()).await
failure(None, err).send(out_fmt, &chn).with_context(otel_cx.clone()).await
}
}
}

View file

@ -14,7 +14,7 @@ use crate::err;
use crate::rpc::CONN_CLOSED_ERR;
use crate::telemetry::metrics::ws::record_rpc;
#[derive(Debug, Clone)]
#[derive(Debug, Clone, Copy, Eq, PartialEq)]
pub enum OutputFormat {
Json, // JSON
Cbor, // CBOR
@ -62,7 +62,6 @@ impl From<Notification> for Data {
#[derive(Debug, Serialize)]
pub struct Response {
#[serde(skip_serializing_if = "Option::is_none")]
id: Option<Value>,
result: Result<Data, Failure>,
}
@ -93,7 +92,7 @@ impl Response {
}
/// Send the response to the WebSocket channel
pub async fn send(self, out: OutputFormat, chn: Sender<Message>) {
pub async fn send(self, out: OutputFormat, chn: &Sender<Message>) {
let span = Span::current();
debug!("Process RPC response");

View file

@ -8,6 +8,7 @@ mod http_integration {
use reqwest::Client;
use serde_json::json;
use test_log::test;
use ulid::Ulid;
use super::common::{self, PASS, USER};
@ -870,7 +871,7 @@ mod http_integration {
#[test(tokio::test)]
async fn key_endpoint_modify_all() -> Result<(), Box<dyn std::error::Error>> {
let (addr, _server) = common::start_server_with_defaults().await.unwrap();
let table_name = "table";
let table_name = Ulid::new().to_string();
let num_records = 10;
let url = &format!("http://{addr}/key/{table_name}");
@ -884,7 +885,7 @@ mod http_integration {
.default_headers(headers)
.build()?;
seed_table(&client, &addr, table_name, num_records).await?;
seed_table(&client, &addr, &table_name, num_records).await?;
// Modify all records
{