Rebased live query changes (#2136)

This commit is contained in:
Przemyslaw Hugh Kaznowski 2023-06-20 23:50:26 +01:00 committed by GitHub
parent 5485883d3c
commit 64adb2e913
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
21 changed files with 601 additions and 75 deletions

19
lib/src/dbs/cl.rs Normal file
View file

@ -0,0 +1,19 @@
use derive::Store;
use serde::{Deserialize, Serialize};
// NOTE: This is not a statement, but as per layering, keeping it here till we
// have a better structure.
#[derive(Clone, Debug, Eq, PartialEq, Deserialize, Serialize, PartialOrd, Hash, Store)]
pub struct ClusterMembership {
pub name: String,
// TiKV = TiKV TSO Timestamp as u64
// not TiKV = local nanos as u64
pub heartbeat: Timestamp,
}
// This struct is meant to represent a timestamp that can be used to partially order
// events in a cluster. It should be derived from a timestamp oracle, such as the
// one available in TiKV via the client `TimestampExt` implementation.
#[derive(Clone, Debug, Eq, PartialEq, Deserialize, Serialize, PartialOrd, Hash, Store)]
pub struct Timestamp {
pub value: u64,
}

View file

@ -1,11 +1,12 @@
use crate::cnf::PROTECTED_PARAM_NAMES;
use crate::ctx::Context;
use crate::dbs::response::Response;
use crate::dbs::Auth;
use crate::dbs::Level;
use crate::dbs::Notification;
use crate::dbs::Options;
use crate::dbs::Transaction;
use crate::dbs::LOG;
use crate::dbs::{Auth, QueryType};
use crate::err::Error;
use crate::kvs::Datastore;
use crate::sql::paths::DB;
@ -13,6 +14,7 @@ use crate::sql::paths::NS;
use crate::sql::query::Query;
use crate::sql::statement::Statement;
use crate::sql::value::Value;
use channel::{Receiver, Sender};
use futures::lock::Mutex;
use std::sync::Arc;
use tracing::instrument;
@ -97,6 +99,7 @@ impl<'a> Executor<'a> {
Response {
time: v.time,
result: Err(Error::QueryCancelled),
query_type: QueryType::Other,
}
}
@ -113,11 +116,27 @@ impl<'a> Executor<'a> {
.unwrap_or(Error::QueryNotExecuted)),
Err(e) => Err(e),
},
query_type: QueryType::Other,
},
_ => v,
}
}
/// Consume the live query notifications
async fn clear(&self, _: Sender<Notification>, rcv: Receiver<Notification>) {
while rcv.try_recv().is_ok() {
// Ignore notification
}
}
/// Flush notifications from a buffer channel (live queries) to the committed notification channel.
/// This is because we don't want to broadcast notifications to the user for failed transactions.
async fn flush(&self, chn: Sender<Notification>, rcv: Receiver<Notification>) {
while let Ok(v) = rcv.try_recv() {
let _ = chn.send(v).await;
}
}
async fn set_ns(&self, ctx: &mut Context<'_>, opt: &mut Options, ns: &str) {
let mut session = ctx.value("session").unwrap_or(&Value::None).clone();
session.put(NS.as_ref(), ns.to_owned().into());
@ -136,9 +155,15 @@ impl<'a> Executor<'a> {
pub async fn execute(
&mut self,
mut ctx: Context<'_>,
mut opt: Options,
opt: Options,
qry: Query,
) -> Result<Vec<Response>, Error> {
// Take the notification channel
let chn = opt.sender.clone();
// Create a notification channel
let (send, recv) = channel::unbounded();
// Swap the notification channel
let mut opt = opt.sender(send);
// Initialise buffer of responses
let mut buf: Vec<Response> = vec![];
// Initialise array of responses
@ -156,7 +181,7 @@ impl<'a> Executor<'a> {
// Check if this is a RETURN statement
let clr = matches!(stm, Statement::Output(_));
// Process a single statement
let res = match stm {
let res = match stm.clone() {
// Specify runtime options
Statement::Option(mut stm) => {
// Selected DB?
@ -185,17 +210,21 @@ impl<'a> Executor<'a> {
// Cancel a running transaction
Statement::Cancel(_) => {
self.cancel(true).await;
self.clear(chn.clone(), recv.clone()).await;
buf = buf.into_iter().map(|v| self.buf_cancel(v)).collect();
out.append(&mut buf);
debug_assert!(self.txn.is_none(), "cancel(true) should have unset txn");
self.txn = None;
continue;
}
// Commit a running transaction
Statement::Commit(_) => {
let commit_error = self.commit(true).await.err();
buf = buf.into_iter().map(|v| self.buf_commit(v, &commit_error)).collect();
self.flush(chn.clone(), recv.clone()).await;
out.append(&mut buf);
debug_assert!(self.txn.is_none(), "commit(true) should have unset txn");
self.txn = None;
continue;
}
// Switch to a different NS or DB
@ -263,13 +292,22 @@ impl<'a> Executor<'a> {
// Finalise transaction, returning nothing unless it couldn't commit
if writeable {
match self.commit(loc).await {
Err(e) => Err(Error::QueryNotExecutedDetail {
Err(e) => {
// Clear live query notifications
self.clear(chn.clone(), recv.clone()).await;
Err(Error::QueryNotExecutedDetail {
message: e.to_string(),
}),
Ok(_) => Ok(Value::None),
})
}
Ok(_) => {
// Flush live query notifications
self.flush(chn.clone(), recv.clone()).await;
Ok(Value::None)
}
}
} else {
self.cancel(loc).await;
self.clear(chn.clone(), recv.clone()).await;
Ok(Value::None)
}
}
@ -327,18 +365,23 @@ impl<'a> Executor<'a> {
// Finalise transaction and return the result.
if res.is_ok() && stm.writeable() {
if let Err(e) = self.commit(loc).await {
// Clear live query notification details
self.clear(chn.clone(), recv.clone()).await;
// The commit failed
Err(Error::QueryNotExecutedDetail {
message: e.to_string(),
})
} else {
// Flush the live query change notifications
self.flush(chn.clone(), recv.clone()).await;
// Successful, committed result
res
}
} else {
self.cancel(loc).await;
// An error
// Clear live query notification details
self.clear(chn.clone(), recv.clone()).await;
// Return an error
res
}
}
@ -356,6 +399,11 @@ impl<'a> Executor<'a> {
self.err = true;
e
}),
query_type: match stm {
Statement::Live(_) => QueryType::Live,
Statement::Kill(_) => QueryType::Kill,
_ => QueryType::Other,
},
};
// Output the response
if self.txn.is_some() {

View file

@ -6,6 +6,7 @@ mod auth;
mod executor;
mod iterate;
mod iterator;
mod notification;
mod options;
mod response;
mod session;
@ -20,6 +21,7 @@ pub use self::session::*;
pub(crate) use self::executor::*;
pub(crate) use self::iterator::*;
pub(crate) use self::notification::*;
pub(crate) use self::statement::*;
pub(crate) use self::transaction::*;
pub(crate) use self::variables::*;
@ -30,6 +32,8 @@ mod channel;
#[cfg(not(target_arch = "wasm32"))]
pub use self::channel::*;
pub mod cl;
#[cfg(test)]
pub(crate) mod test;

View file

@ -0,0 +1,39 @@
use crate::sql::Value;
use serde::{Deserialize, Serialize};
use std::fmt;
use uuid::Uuid;
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub struct Notification {
pub id: Uuid,
pub action: Action,
pub result: Value,
}
impl fmt::Display for Notification {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(
f,
"Notification {{ id: {}, action: {}, result: {} }}",
self.id, self.action, self.result
)
}
}
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "UPPERCASE")]
pub enum Action {
Create,
Update,
Delete,
}
impl fmt::Display for Action {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
Action::Create => write!(f, "CREATE"),
Action::Update => write!(f, "UPDATE"),
Action::Delete => write!(f, "DELETE"),
}
}
}

View file

@ -1,8 +1,11 @@
use crate::cnf;
use crate::dbs::Auth;
use crate::dbs::Level;
use crate::dbs::Notification;
use crate::err::Error;
use channel::Sender;
use std::sync::Arc;
use uuid::Uuid;
/// An Options is passed around when processing a set of query
/// statements. An Options contains specific information for how
@ -11,8 +14,10 @@ use std::sync::Arc;
/// whether field/event/table queries should be processed (useful
/// when importing data, where these queries might fail).
#[derive(Clone, Debug, Eq, PartialEq)]
#[derive(Clone, Debug)]
pub struct Options {
/// Current Node ID
pub id: Arc<Uuid>,
/// Currently selected NS
pub ns: Option<Arc<str>>,
/// Currently selected DB
@ -39,18 +44,21 @@ pub struct Options {
pub indexes: bool,
/// Should we process function futures?
pub futures: bool,
///
pub sender: Sender<Notification>,
}
impl Default for Options {
fn default() -> Self {
Options::new(Auth::No)
Options::new(Arc::new(Uuid::new_v4()), channel::unbounded().0, Arc::new(Auth::No))
}
}
impl Options {
/// Create a new Options object
pub fn new(auth: Auth) -> Options {
pub fn new(id: Arc<Uuid>, send: Sender<Notification>, auth: Arc<Auth>) -> Options {
Options {
id,
ns: None,
db: None,
dive: 0,
@ -63,10 +71,16 @@ impl Options {
tables: true,
indexes: true,
futures: false,
auth: Arc::new(auth),
sender: send,
auth,
}
}
/// Get current Node ID
pub fn id(&self) -> &Uuid {
self.id.as_ref()
}
/// Get currently selected NS
pub fn ns(&self) -> &str {
self.ns.as_ref().unwrap()
@ -85,7 +99,9 @@ impl Options {
let dive = self.dive.saturating_add(cost);
if dive <= *cnf::MAX_COMPUTATION_DEPTH {
Ok(Options {
sender: self.sender.clone(),
auth: self.auth.clone(),
id: self.id.clone(),
ns: self.ns.clone(),
db: self.db.clone(),
dive,
@ -99,7 +115,9 @@ impl Options {
/// Create a new Options object for a subquery
pub fn force(&self, v: bool) -> Options {
Options {
sender: self.sender.clone(),
auth: self.auth.clone(),
id: self.id.clone(),
ns: self.ns.clone(),
db: self.db.clone(),
force: v,
@ -110,7 +128,9 @@ impl Options {
/// Create a new Options object for a subquery
pub fn perms(&self, v: bool) -> Options {
Options {
sender: self.sender.clone(),
auth: self.auth.clone(),
id: self.id.clone(),
ns: self.ns.clone(),
db: self.db.clone(),
perms: v,
@ -121,7 +141,9 @@ impl Options {
/// Create a new Options object for a subquery
pub fn fields(&self, v: bool) -> Options {
Options {
sender: self.sender.clone(),
auth: self.auth.clone(),
id: self.id.clone(),
ns: self.ns.clone(),
db: self.db.clone(),
fields: v,
@ -132,7 +154,9 @@ impl Options {
/// Create a new Options object for a subquery
pub fn events(&self, v: bool) -> Options {
Options {
sender: self.sender.clone(),
auth: self.auth.clone(),
id: self.id.clone(),
ns: self.ns.clone(),
db: self.db.clone(),
events: v,
@ -143,7 +167,9 @@ impl Options {
/// Create a new Options object for a subquery
pub fn tables(&self, v: bool) -> Options {
Options {
sender: self.sender.clone(),
auth: self.auth.clone(),
id: self.id.clone(),
ns: self.ns.clone(),
db: self.db.clone(),
tables: v,
@ -154,7 +180,9 @@ impl Options {
/// Create a new Options object for a subquery
pub fn indexes(&self, v: bool) -> Options {
Options {
sender: self.sender.clone(),
auth: self.auth.clone(),
id: self.id.clone(),
ns: self.ns.clone(),
db: self.db.clone(),
indexes: v,
@ -165,7 +193,9 @@ impl Options {
/// Create a new Options object for a subquery
pub fn import(&self, v: bool) -> Options {
Options {
sender: self.sender.clone(),
auth: self.auth.clone(),
id: self.id.clone(),
ns: self.ns.clone(),
db: self.db.clone(),
fields: !v,
@ -178,7 +208,9 @@ impl Options {
/// Create a new Options object for a subquery
pub fn strict(&self, v: bool) -> Options {
Options {
sender: self.sender.clone(),
auth: self.auth.clone(),
id: self.id.clone(),
ns: self.ns.clone(),
db: self.db.clone(),
strict: v,
@ -189,7 +221,9 @@ impl Options {
/// Create a new Options object for a subquery
pub fn futures(&self, v: bool) -> Options {
Options {
sender: self.sender.clone(),
auth: self.auth.clone(),
id: self.id.clone(),
ns: self.ns.clone(),
db: self.db.clone(),
futures: v,
@ -197,6 +231,18 @@ impl Options {
}
}
/// Create a new Options object for a subquery
pub fn sender(&self, v: Sender<Notification>) -> Options {
Options {
auth: self.auth.clone(),
id: self.id.clone(),
ns: self.ns.clone(),
db: self.db.clone(),
sender: v,
..*self
}
}
/// Check whether realtime queries are supported
pub fn realtime(&self) -> Result<(), Error> {
if !self.live {

View file

@ -6,11 +6,23 @@ use std::time::Duration;
pub(crate) const TOKEN: &str = "$surrealdb::private::sql::Response";
#[derive(Debug)]
pub enum QueryType {
// Any kind of query
Other,
// Indicates that the response live query id must be tracked
Live,
// Indicates that the live query should be removed from tracking
Kill,
}
/// The return value when running a query set on the database.
#[derive(Debug)]
pub struct Response {
pub time: Duration,
pub result: Result<Value, Error>,
// Record the query type in case processing the response is necessary (such as tracking live queries).
pub query_type: QueryType,
}
impl Response {

View file

@ -1,12 +1,13 @@
use crate::ctx::Context;
use crate::dbs::Options;
use crate::dbs::{Auth, Options};
use crate::kvs::Datastore;
use futures::lock::Mutex;
use std::sync::Arc;
use uuid::Uuid;
pub async fn mock<'a>() -> (Context<'a>, Options) {
let mut ctx = Context::default();
let opt = Options::default();
let opt = Options::new(Arc::new(Uuid::new_v4()), channel::unbounded().0, Arc::new(Auth::Kv));
let kvs = Datastore::new("memory").await.unwrap();
let txn = kvs.transaction(true, false).await.unwrap();
let txn = Arc::new(Mutex::new(txn));

View file

@ -1,15 +1,18 @@
use crate::ctx::Context;
use crate::dbs::Action;
use crate::dbs::Notification;
use crate::dbs::Options;
use crate::dbs::Statement;
use crate::doc::Document;
use crate::err::Error;
use crate::sql::Value;
impl<'a> Document<'a> {
pub async fn lives(
&self,
ctx: &Context<'_>,
opt: &Options,
_stm: &Statement<'_>,
stm: &Statement<'_>,
) -> Result<(), Error> {
// Check if forced
if !opt.force && !self.changed() {
@ -18,24 +21,56 @@ impl<'a> Document<'a> {
// Clone transaction
let txn = ctx.clone_transaction()?;
// Get the record id
let _ = self.id.as_ref().unwrap();
let id = self.id.as_ref().unwrap();
// Loop through all index statements
for lv in self.lv(opt, &txn).await?.iter() {
// Create a new statement
let stm = Statement::from(lv);
let lq = Statement::from(lv);
// Check LIVE SELECT where condition
if self.check(ctx, opt, &stm).await.is_err() {
if self.check(ctx, opt, stm).await.is_err() {
continue;
}
// Check what type of data change this is
if stm.is_delete() {
// Send a DELETE notification to the WebSocket
} else if self.is_new() {
// Process the CREATE notification to send
let _ = self.pluck(ctx, opt, &stm).await?;
// Send a DELETE notification
if opt.id() == &lv.node.0 {
let thing = (*id).clone();
opt.sender
.send(Notification {
id: lv.id.0,
action: Action::Delete,
result: Value::Thing(thing),
})
.await?;
} else {
// Process the CREATE notification to send
let _ = self.pluck(ctx, opt, &stm).await?;
// TODO: Send to storage
}
} else if self.is_new() {
// Send a CREATE notification
if opt.id() == &lv.node.0 {
opt.sender
.send(Notification {
id: lv.id.0,
action: Action::Create,
result: self.pluck(ctx, opt, &lq).await?,
})
.await?;
} else {
// TODO: Send to storage
}
} else {
// Send a UPDATE notification
if opt.id() == &lv.node.0 {
opt.sender
.send(Notification {
id: lv.id.0,
action: Action::Update,
result: self.pluck(ctx, opt, &lq).await?,
})
.await?;
} else {
// TODO: Send to storage
}
};
}
// Carry on

View file

@ -247,6 +247,18 @@ pub enum Error {
value: String,
},
// The cluster node already exists
#[error("The node '{value}' already exists")]
ClAlreadyExists {
value: String,
},
// The cluster node does not exist
#[error("The node '{value}' does not exist")]
ClNotFound {
value: String,
},
/// The requested scope token does not exist
#[error("The scope token '{value}' does not exist")]
StNotFound {

41
lib/src/key/cl.rs Normal file
View file

@ -0,0 +1,41 @@
use derive::Key;
use serde::{Deserialize, Serialize};
use uuid::Uuid;
// Represents cluster information.
// In the future, this could also include broadcast addresses and other information.
#[derive(Clone, Debug, Eq, PartialEq, PartialOrd, Serialize, Deserialize, Key)]
pub struct Cl {
__: u8,
_a: u8,
_b: u8,
_c: u8,
pub nd: Uuid,
}
impl Cl {
pub fn new(nd: Uuid) -> Self {
Self {
__: 0x2f, // /
_a: 0x21, // !
_b: 0x63, // c
_c: 0x6c, // l
nd,
}
}
}
#[cfg(test)]
mod tests {
#[test]
fn key() {
use super::*;
#[rustfmt::skip]
let val = Cl::new(
Uuid::default(),
);
let enc = Cl::encode(&val).unwrap();
let dec = Cl::decode(&enc).unwrap();
assert_eq!(val, dec);
}
}

45
lib/src/key/hb.rs Normal file
View file

@ -0,0 +1,45 @@
use crate::dbs::cl::Timestamp;
use derive::Key;
use serde::{Deserialize, Serialize};
use uuid::Uuid;
#[derive(Clone, Debug, Eq, PartialEq, PartialOrd, Serialize, Deserialize, Key)]
pub struct Hb {
__: u8,
_a: u8,
_b: u8,
_c: u8,
_d: u8,
pub hb: Timestamp,
pub nd: Uuid,
}
impl Hb {
pub fn new(hb: Timestamp, nd: Uuid) -> Self {
Self {
__: 0x2f, // /
_a: 0x21, // !
_b: 0x68, // h
_c: 0x62, // b
hb,
_d: 0x2f, // /
nd,
}
}
}
#[cfg(test)]
mod tests {
#[test]
fn key() {
use super::*;
#[rustfmt::skip]
let val = Hb::new(
Timestamp { value: 123 },
Uuid::default(),
);
let enc = Hb::encode(&val).unwrap();
let dec = Hb::decode(&enc).unwrap();
assert_eq!(val, dec);
}
}

View file

@ -1,35 +1,43 @@
use crate::sql::uuid::Uuid;
use derive::Key;
use serde::{Deserialize, Serialize};
use uuid::Uuid;
#[derive(Clone, Debug, Eq, PartialEq, PartialOrd, Serialize, Deserialize, Key)]
pub struct Lq<'a> {
__: u8,
_a: u8,
pub ns: &'a str,
_b: u8,
pub db: &'a str,
_c: u8,
pub nd: Uuid,
_d: u8,
pub ns: &'a str,
_e: u8,
pub db: &'a str,
_f: u8,
_g: u8,
_h: u8,
pub lq: Uuid,
}
pub fn new<'a>(ns: &'a str, db: &'a str, lq: &Uuid) -> Lq<'a> {
Lq::new(ns, db, lq.to_owned())
pub fn new<'a>(nd: &Uuid, ns: &'a str, db: &'a str, lq: &Uuid) -> Lq<'a> {
Lq::new(nd.to_owned(), ns, db, lq.to_owned())
}
impl<'a> Lq<'a> {
pub fn new(ns: &'a str, db: &'a str, lq: Uuid) -> Self {
pub fn new(nd: Uuid, ns: &'a str, db: &'a str, lq: Uuid) -> Self {
Self {
__: b'/',
_a: b'*',
_a: b'!',
_b: b'n',
_c: b'd',
nd,
_d: b'*',
ns,
_b: b'*',
_e: b'*',
db,
_c: b'!',
_d: b'l',
_e: b'v',
_f: b'!',
_g: b'l',
_h: b'v',
lq,
}
}
@ -42,6 +50,7 @@ mod tests {
use super::*;
#[rustfmt::skip]
let val = Lq::new(
Uuid::default(),
"test",
"test",
Uuid::default(),

View file

@ -1,6 +1,6 @@
use crate::sql::uuid::Uuid;
use derive::Key;
use serde::{Deserialize, Serialize};
use uuid::Uuid;
#[derive(Clone, Debug, Eq, PartialEq, PartialOrd, Serialize, Deserialize, Key)]
pub struct Lv<'a> {

View file

@ -1,6 +1,12 @@
//! How the keys are structured in the key value store
///
/// KV /
///
/// ND /!nd{nd}
/// LQ /!nd{nd}*{ns}*{db}!lq{lq}
///
/// HB /!hb{ts}/{nd}
///
/// NS /!ns{ns}
///
/// Namespace /*{ns}
@ -15,7 +21,6 @@
/// PA /*{ns}*{db}!pa{pa}
/// SC /*{ns}*{db}!sc{sc}
/// TB /*{ns}*{db}!tb{tb}
/// LQ /*{ns}*{db}!lq{lq}
///
/// Scope /*{ns}*{db}±{sc}
/// ST /*{ns}*{db}±{sc}!st{tk}
@ -56,6 +61,7 @@ pub mod bp; // Stores BTree nodes for postings
pub mod bs; // Stores FullText index states
pub mod bt; // Stores BTree nodes for terms
pub mod bu; // Stores terms for term_ids
pub mod cl; // Stores cluster membership information
pub mod database; // Stores the key prefix for all keys under a database
pub mod db; // Stores a DEFINE DATABASE config definition
pub mod dl; // Stores a DEFINE LOGIN ON DATABASE config definition
@ -65,6 +71,7 @@ pub mod fc; // Stores a DEFINE FUNCTION config definition
pub mod fd; // Stores a DEFINE FIELD config definition
pub mod ft; // Stores a DEFINE TABLE AS config definition
pub mod graph; // Stores a graph edge pointer
pub mod hb; // Stores a heartbeat per registered cluster node
pub mod index; // Stores an index entry
pub mod ix; // Stores a DEFINE INDEX config definition
pub mod kv; // Stores the key prefix for all keys

View file

@ -2,6 +2,7 @@ use super::tx::Transaction;
use crate::ctx::Context;
use crate::dbs::Attach;
use crate::dbs::Executor;
use crate::dbs::Notification;
use crate::dbs::Options;
use crate::dbs::Response;
use crate::dbs::Session;
@ -11,17 +12,22 @@ use crate::kvs::LOG;
use crate::sql;
use crate::sql::Query;
use crate::sql::Value;
use channel::Receiver;
use channel::Sender;
use futures::lock::Mutex;
use std::fmt;
use std::sync::Arc;
use std::time::Duration;
use tracing::instrument;
use uuid::Uuid;
/// The underlying datastore instance which stores the dataset.
#[allow(dead_code)]
pub struct Datastore {
pub(super) id: Arc<Uuid>,
pub(super) inner: Inner,
pub(super) send: Sender<Notification>,
pub(super) recv: Receiver<Notification>,
query_timeout: Option<Duration>,
}
@ -204,8 +210,13 @@ impl Datastore {
Err(Error::Ds("Unable to load the specified datastore".into()))
}
};
// Create a live query notification channel
let (send, recv) = channel::bounded(100);
inner.map(|inner| Self {
id: Arc::new(Uuid::new_v4()),
inner,
send,
recv,
query_timeout: None,
})
}
@ -216,6 +227,24 @@ impl Datastore {
self
}
// Adds entries to the KV store indicating membership information
pub async fn register_membership(&self) -> Result<(), Error> {
let mut tx = self.transaction(true, false).await?;
tx.set_cl(sql::Uuid::from(*self.id.as_ref())).await?;
tx.set_hb(sql::Uuid::from(*self.id.as_ref())).await?;
tx.commit().await?;
Ok(())
}
// Creates a heartbeat entry for the member indicating to the cluster
// that the node is alive
pub async fn heartbeat(&self) -> Result<(), Error> {
let mut tx = self.transaction(true, false).await?;
tx.set_hb(sql::Uuid::from(*self.id.as_ref())).await?;
tx.commit().await?;
Ok(())
}
/// Create a new transaction on this datastore
///
/// ```rust,no_run
@ -343,6 +372,8 @@ impl Datastore {
let ctx = sess.context(ctx);
// Store the query variables
let ctx = vars.attach(ctx)?;
// Setup the notification channel
opt.sender = self.send.clone();
// Setup the auth options
opt.auth = sess.au.clone();
// Setup the live options
@ -400,6 +431,8 @@ impl Datastore {
let ctx = sess.context(ctx);
// Store the query variables
let ctx = vars.attach(ctx)?;
// Setup the notification channel
opt.sender = self.send.clone();
// Setup the auth options
opt.auth = sess.au.clone();
// Set current NS and DB
@ -418,6 +451,28 @@ impl Datastore {
Ok(res)
}
/// Subscribe to live notifications
///
/// ```rust,no_run
/// use surrealdb::kvs::Datastore;
/// use surrealdb::err::Error;
/// use surrealdb::dbs::Session;
///
/// #[tokio::main]
/// async fn main() -> Result<(), Error> {
/// let ds = Datastore::new("memory").await?;
/// let ses = Session::for_kv();
/// while let Ok(v) = ds.notifications().recv().await {
/// println!("Received notification: {v}");
/// }
/// Ok(())
/// }
/// ```
#[instrument(skip_all)]
pub fn notifications(&self) -> Receiver<Notification> {
self.recv.clone()
}
/// Performs a full database export as SQL
#[instrument(skip(self, chn))]
pub async fn export(&self, ns: String, db: String, chn: Sender<Vec<u8>>) -> Result<(), Error> {

View file

@ -2,6 +2,8 @@ use super::kv::Add;
use super::kv::Convert;
use super::Key;
use super::Val;
use crate::dbs::cl::ClusterMembership;
use crate::dbs::cl::Timestamp;
use crate::err::Error;
use crate::key::thing;
use crate::kvs::cache::Cache;
@ -11,7 +13,7 @@ use crate::sql::paths::EDGE;
use crate::sql::paths::IN;
use crate::sql::paths::OUT;
use crate::sql::thing::Thing;
use crate::sql::Value;
use crate::sql::{Uuid, Value};
use channel::Sender;
use sql::permission::Permissions;
use sql::statements::DefineAnalyzerStatement;
@ -31,6 +33,7 @@ use std::fmt;
use std::fmt::Debug;
use std::ops::Range;
use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};
#[cfg(debug_assertions)]
const LOG: &str = "surrealdb::txn";
@ -789,6 +792,60 @@ impl Transaction {
Ok(())
}
// Register cluster membership
// NOTE: Setting cluster membership sets the heartbeat
// Remember to set the heartbeat as well
pub async fn set_cl(&mut self, id: Uuid) -> Result<(), Error> {
let key = crate::key::cl::Cl::new(id.0);
match self.get_cl(id.clone()).await? {
Some(_) => Err(Error::ClAlreadyExists {
value: id.0.to_string(),
}),
None => {
let value = ClusterMembership {
name: id.0.to_string(),
heartbeat: self.clock(),
};
self.put(key, value).await?;
Ok(())
}
}
}
// Retrieve cluster information
pub async fn get_cl(&mut self, id: Uuid) -> Result<Option<ClusterMembership>, Error> {
let key = crate::key::cl::Cl::new(id.0);
let val = self.get(key).await?;
match val {
Some(v) => Ok(Some::<ClusterMembership>(v.into())),
None => Ok(None),
}
}
fn clock(&self) -> Timestamp {
// Use a timestamp oracle if available
let now: u128 = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_millis();
Timestamp {
value: now as u64,
}
}
// Set heartbeat
pub async fn set_hb(&mut self, id: Uuid) -> Result<(), Error> {
let now = self.clock();
let key = crate::key::hb::Hb::new(now.clone(), id.0);
// We do not need to do a read, we always want to overwrite
self.put(
key,
ClusterMembership {
name: id.0.to_string(),
heartbeat: now,
},
)
.await?;
Ok(())
}
/// Retrieve all namespace definitions in a datastore.
pub async fn all_ns(&mut self) -> Result<Arc<[DefineNamespaceStatement]>, Error> {
let key = crate::key::ns::prefix();

View file

@ -29,14 +29,14 @@ impl KillStatement {
let txn = ctx.clone_transaction()?;
// Claim transaction
let mut run = txn.lock().await;
// Create the live query key
let key = crate::key::lq::new(opt.ns(), opt.db(), &self.id);
// Fetch the live query key
let key = crate::key::lq::new(opt.id(), opt.ns(), opt.db(), &self.id);
// Fetch the live query key if it exists
match run.get(key).await? {
Some(val) => match std::str::from_utf8(&val) {
Ok(tb) => {
// Delete the live query
let key = crate::key::lq::new(opt.ns(), opt.db(), &self.id);
// Delete the node live query
let key = crate::key::lq::new(opt.id(), opt.ns(), opt.db(), &self.id);
run.del(key).await?;
// Delete the table live query
let key = crate::key::lv::new(opt.ns(), opt.db(), tb, &self.id);

View file

@ -9,8 +9,8 @@ use crate::sql::fetch::{fetch, Fetchs};
use crate::sql::field::{fields, Fields};
use crate::sql::param::param;
use crate::sql::table::table;
use crate::sql::uuid::Uuid;
use crate::sql::value::Value;
use crate::sql::Uuid;
use derive::Store;
use nom::branch::alt;
use nom::bytes::complete::tag_no_case;
@ -23,6 +23,7 @@ use std::fmt;
#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize, Store, Hash)]
pub struct LiveStatement {
pub id: Uuid,
pub node: Uuid,
pub expr: Fields,
pub what: Value,
pub cond: Option<Cond>,
@ -45,12 +46,16 @@ impl LiveStatement {
// Process the live query table
match self.what.compute(ctx, opt).await? {
Value::Table(tb) => {
// Insert the live query
let key = crate::key::lq::new(opt.ns(), opt.db(), &self.id);
// Clone the current statement
let mut stm = self.clone();
// Store the current Node ID
stm.node = Uuid(*opt.id);
// Insert the node live query
let key = crate::key::lq::new(opt.id(), opt.ns(), opt.db(), &self.id);
run.putc(key, tb.as_str(), None).await?;
// Insert the table live query
let key = crate::key::lv::new(opt.ns(), opt.db(), &tb, &self.id);
run.putc(key, self.clone(), None).await?;
run.putc(key, stm, None).await?;
}
v => {
return Err(Error::LiveStatement {
@ -59,7 +64,7 @@ impl LiveStatement {
}
};
// Return the query id
Ok(self.id.clone().into())
Ok(Value::Uuid(self.id.clone()))
}
}
@ -89,7 +94,8 @@ pub fn live(i: &str) -> IResult<&str, LiveStatement> {
Ok((
i,
LiveStatement {
id: Uuid::new(),
id: Uuid::new_v4(),
node: Uuid::default(),
expr,
what,
cond,

View file

@ -58,11 +58,12 @@ pub fn sleep(i: &str) -> IResult<&str, SleepStatement> {
#[cfg(test)]
mod tests {
use super::*;
use crate::dbs::test::mock;
use crate::dbs::Auth;
use std::sync::Arc;
use std::time::SystemTime;
use uuid::Uuid;
#[test]
fn test_sleep_statement_sec() {
@ -86,7 +87,8 @@ mod tests {
async fn test_sleep_compute() {
let sql = "SLEEP 500ms";
let time = SystemTime::now();
let opt = Options::new(Auth::Kv);
let opt =
Options::new(Arc::new(Uuid::new_v4()), channel::unbounded().0, Arc::new(Auth::Kv));
let (ctx, _) = mock().await;
let (_, stm) = sleep(sql).unwrap();
let value = stm.compute(&ctx, &opt).await.unwrap();

View file

@ -14,13 +14,13 @@ use crate::rpc::res::Failure;
use crate::rpc::res::Output;
use futures::{SinkExt, StreamExt};
use once_cell::sync::Lazy;
use serde::Serialize;
use std::collections::BTreeMap;
use std::collections::HashMap;
use std::sync::Arc;
use surrealdb::channel;
use surrealdb::channel::Sender;
use surrealdb::dbs::Session;
use surrealdb::dbs::{QueryType, Response, Session};
use surrealdb::opt::auth::Root;
use surrealdb::sql::Array;
use surrealdb::sql::Object;
@ -33,8 +33,11 @@ use warp::ws::{Message, WebSocket, Ws};
use warp::Filter;
type WebSockets = RwLock<HashMap<Uuid, Sender<Message>>>;
// Mapping of LiveQueryID to WebSocketID
type LiveQueries = RwLock<HashMap<Uuid, Uuid>>;
static WEBSOCKETS: Lazy<WebSockets> = Lazy::new(WebSockets::default);
static LIVE_QUERIES: Lazy<LiveQueries> = Lazy::new(LiveQueries::default);
#[allow(opaque_hidden_inferred_bound)]
pub fn config() -> impl Filter<Extract = impl warp::Reply, Error = warp::Rejection> + Clone {
@ -119,6 +122,44 @@ impl Rpc {
}
}
});
// Send notifications to the client
let moved_rpc = rpc.clone();
tokio::task::spawn(async move {
let rpc = moved_rpc;
while let Ok(v) = DB.get().unwrap().notifications().recv().await {
trace!(target: LOG, "Received notification: {:?}", v);
// Find which websocket the notification belongs to
match LIVE_QUERIES.read().await.get(&v.id) {
Some(ws_id) => {
// Send the notification to the client
let msg_text = res::success(None, v.clone());
let ws_write = WEBSOCKETS.write().await;
match ws_write.get(ws_id) {
None => {
error!(
target: LOG,
"Tracked WebSocket {:?} not found for lq: {:?}", ws_id, &v.id
);
}
Some(ws_sender) => {
msg_text
.send(rpc.read().await.format.clone(), ws_sender.clone())
.await;
trace!(
target: LOG,
"Sent notification to WebSocket {:?} for lq: {:?}",
ws_id,
&v.id
);
}
}
}
None => {
error!(target: LOG, "Unknown websocket for live query: {:?}", v.id);
}
}
}
});
// Get messages from the client
while let Some(msg) = wrx.next().await {
match msg {
@ -172,6 +213,18 @@ impl Rpc {
trace!(target: LOG, "WebSocket {} disconnected", id);
// Remove this WebSocket from the list of WebSockets
WEBSOCKETS.write().await.remove(&id);
// Remove all live queries
let mut locked_lq_map = LIVE_QUERIES.write().await;
let mut live_query_to_gc: Vec<Uuid> = vec![];
for (key, value) in locked_lq_map.iter() {
if value == &id {
trace!(target: LOG, "Removing live query: {}", key);
live_query_to_gc.push(*key);
}
}
for key in live_query_to_gc {
locked_lq_map.remove(&key);
}
}
/// Call RPC methods from the WebSocket
@ -463,44 +516,40 @@ impl Rpc {
#[instrument(skip_all, name = "rpc kill", fields(websocket=self.uuid.to_string()))]
async fn kill(&self, id: Value) -> Result<Value, Error> {
// Get a database reference
let kvs = DB.get().unwrap();
// Get local copy of options
let opt = CF.get().unwrap();
// Specify the SQL query string
let sql = "KILL $id";
// Specify the query parameters
let var = Some(map! {
let var = map! {
String::from("id") => id,
=> &self.vars
});
};
// Execute the query on the database
let mut res = kvs.execute(sql, &self.session, var, opt.strict).await?;
let mut res = self.query_with(Strand::from(sql), Object::from(var)).await?;
// Extract the first query result
let res = res.remove(0).result?;
// Return the result to the client
Ok(res)
let response = res.remove(0);
match response.result {
Ok(v) => Ok(v),
Err(e) => Err(Error::from(e)),
}
}
#[instrument(skip_all, name = "rpc live", fields(websocket=self.uuid.to_string()))]
async fn live(&self, tb: Value) -> Result<Value, Error> {
// Get a database reference
let kvs = DB.get().unwrap();
// Get local copy of options
let opt = CF.get().unwrap();
// Specify the SQL query string
let sql = "LIVE SELECT * FROM $tb";
// Specify the query parameters
let var = Some(map! {
let var = map! {
String::from("tb") => tb.could_be_table(),
=> &self.vars
});
};
// Execute the query on the database
let mut res = kvs.execute(sql, &self.session, var, opt.strict).await?;
let mut res = self.query_with(Strand::from(sql), Object::from(var)).await?;
// Extract the first query result
let res = res.remove(0).result?;
// Return the result to the client
Ok(res)
let response = res.remove(0);
match response.result {
Ok(v) => Ok(v),
Err(e) => Err(Error::from(e)),
}
}
// ------------------------------
@ -687,12 +736,43 @@ impl Rpc {
Ok(res)
}
async fn handle_live_query_results(&self, res: &Response) {
match &res.query_type {
QueryType::Live => {
if let Ok(Value::Uuid(lqid)) = &res.result {
// Match on Uuid type
LIVE_QUERIES.write().await.insert(lqid.0, self.uuid);
trace!(
target: LOG,
"Registered live query {} on websocket {}",
lqid,
self.uuid
);
}
}
QueryType::Kill => {
if let Ok(Value::Uuid(lqid)) = &res.result {
let ws_id = LIVE_QUERIES.write().await.remove(&lqid.0);
if let Some(ws_id) = ws_id {
trace!(
target: LOG,
"Unregistered live query {} on websocket {}",
lqid,
ws_id
);
}
}
}
_ => {}
}
}
// ------------------------------
// Methods for querying
// ------------------------------
#[instrument(skip_all, name = "rpc query", fields(websocket=self.uuid.to_string()))]
async fn query(&self, sql: Strand) -> Result<impl Serialize, Error> {
async fn query(&self, sql: Strand) -> Result<Vec<Response>, Error> {
// Get a database reference
let kvs = DB.get().unwrap();
// Get local copy of options
@ -701,12 +781,16 @@ impl Rpc {
let var = Some(self.vars.clone());
// Execute the query on the database
let res = kvs.execute(&sql, &self.session, var, opt.strict).await?;
// Post-process hooks for web layer
for response in &res {
self.handle_live_query_results(response).await;
}
// Return the result to the client
Ok(res)
}
#[instrument(skip_all, name = "rpc query_with", fields(websocket=self.uuid.to_string()))]
async fn query_with(&self, sql: Strand, mut vars: Object) -> Result<impl Serialize, Error> {
async fn query_with(&self, sql: Strand, mut vars: Object) -> Result<Vec<Response>, Error> {
// Get a database reference
let kvs = DB.get().unwrap();
// Get local copy of options
@ -715,6 +799,10 @@ impl Rpc {
let var = Some(mrg! { vars.0, &self.vars });
// Execute the query on the database
let res = kvs.execute(&sql, &self.session, var, opt.strict).await?;
// Post-process hooks for web layer
for response in &res {
self.handle_live_query_results(response).await;
}
// Return the result to the client
Ok(res)
}

View file

@ -117,7 +117,7 @@ mod cli_integration {
let _server = run(&start_args);
std::thread::sleep(std::time::Duration::from_millis(500));
std::thread::sleep(std::time::Duration::from_millis(5000));
assert!(run(&format!("isready --conn http://{addr}")).output().is_ok());