Sur 191 self garbage collection 2 ()

Co-authored-by: Tobie Morgan Hitchcock <tobie@surrealdb.com>
This commit is contained in:
Przemyslaw Hugh Kaznowski 2023-07-13 14:44:54 +01:00 committed by GitHub
parent ee3a1c211f
commit ac213d69bb
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
21 changed files with 1105 additions and 163 deletions

View file

@ -1,5 +1,9 @@
use derive::Store;
use crate::err::Error;
use crate::err::Error::TimestampOverflow;
use derive::{Key, Store};
use serde::{Deserialize, Serialize};
use std::ops::{Add, Sub};
use std::time::Duration;
// NOTE: This is not a statement, but as per layering, keeping it here till we
// have a better structure.
@ -17,3 +21,45 @@ pub struct ClusterMembership {
pub struct Timestamp {
pub value: u64,
}
// This struct is to be used only when storing keys as the macro currently
// conflicts when you have Store and Key derive macros.
#[derive(Clone, Debug, Eq, PartialEq, Deserialize, Serialize, PartialOrd, Hash, Key)]
pub struct KeyTimestamp {
pub value: u64,
}
impl From<&Timestamp> for KeyTimestamp {
fn from(ts: &Timestamp) -> Self {
KeyTimestamp {
value: ts.value,
}
}
}
impl Add<Duration> for Timestamp {
type Output = Timestamp;
fn add(self, rhs: Duration) -> Timestamp {
Timestamp {
value: self.value + rhs.as_secs(),
}
}
}
impl Sub<Duration> for Timestamp {
type Output = Result<Timestamp, Error>;
fn sub(self, rhs: Duration) -> Self::Output {
let millis = rhs.as_secs();
if self.value <= millis {
// Removing the duration from this timestamp will cause it to overflow
return Err(TimestampOverflow(format!(
"Failed to subtract {} from {}",
&millis, &self.value
)));
}
Ok(Timestamp {
value: self.value - millis,
})
}
}
// TODO test

View file

@ -1,6 +1,7 @@
use crate::sql::Value;
use crate::sql::{Object, Value};
use serde::{Deserialize, Serialize};
use std::fmt;
use std::fmt::Debug;
use uuid::Uuid;
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
@ -12,11 +13,13 @@ pub struct Notification {
impl fmt::Display for Notification {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(
f,
"Notification {{ id: {}, action: {}, result: {} }}",
self.id, self.action, self.result
)
let obj: Object = map! {
"id".to_string() => self.id.to_string().into(),
"action".to_string() => self.action.to_string().into(),
"result".to_string() => self.result.clone(),
}
.into();
write!(f, "{}", obj)
}
}

View file

@ -36,7 +36,7 @@ impl<'a> Document<'a> {
// Check what type of data change this is
if stm.is_delete() {
// Send a DELETE notification
if opt.id()? == lv.node.0 {
if opt.id()? == lv.node {
let thing = (*rid).clone();
chn.send(Notification {
id: lv.id.0,
@ -49,7 +49,7 @@ impl<'a> Document<'a> {
}
} else if self.is_new() {
// Send a CREATE notification
if opt.id()? == lv.node.0 {
if opt.id()? == lv.node {
chn.send(Notification {
id: lv.id.0,
action: Action::Create,
@ -61,7 +61,7 @@ impl<'a> Document<'a> {
}
} else {
// Send a UPDATE notification
if opt.id()? == lv.node.0 {
if opt.id()? == lv.node {
chn.send(Notification {
id: lv.id.0,
action: Action::Update,

View file

@ -278,6 +278,18 @@ pub enum Error {
value: String,
},
/// The requested live query does not exist
#[error("The live query '{value}' does not exist")]
LvNotFound {
value: String,
},
/// The requested cluster live query does not exist
#[error("The cluster live query '{value}' does not exist")]
LqNotFound {
value: String,
},
/// The requested analyzer does not exist
#[error("The analyzer '{value}' does not exist")]
AzNotFound {
@ -504,6 +516,20 @@ pub enum Error {
DuplicatedMatchRef {
mr: MatchRef,
},
/// Represents a failure in timestamp arithmetic related to database internals
#[error("Timestamp arithmetic error: {0}")]
TimestampOverflow(String),
/// Internal server error
/// This should be used extremely sporadically, since we lose the type of error as a consequence
/// There will be times when it is useful, such as with unusual type conversion errors
#[error("Internal database error: {0}")]
Internal(String),
/// Unimplemented functionality
#[error("Unimplemented functionality: {0}")]
Unimplemented(String),
}
impl From<Error> for String {

View file

@ -17,13 +17,25 @@ pub struct Cl {
impl Cl {
pub fn new(nd: Uuid) -> Self {
Self {
__: 0x2f, // /
_a: 0x21, // !
_b: 0x63, // c
_c: 0x6c, // l
__: b'/',
_a: b'!',
_b: b'c',
_c: b'l',
nd,
}
}
pub fn prefix() -> Vec<u8> {
let mut k = super::kv::new().encode().unwrap();
k.extend_from_slice(&[b'!', b'c', b'l', 0x00]);
k
}
pub fn suffix() -> Vec<u8> {
let mut k = super::kv::new().encode().unwrap();
k.extend_from_slice(&[b'!', b'c', b'l', 0xff]);
k
}
}
#[cfg(test)]
@ -31,12 +43,21 @@ mod tests {
#[test]
fn key() {
use super::*;
#[rustfmt::skip]
let val = Cl::new(
Uuid::default(),
);
let val = Cl::new(Uuid::default());
let enc = Cl::encode(&val).unwrap();
let dec = Cl::decode(&enc).unwrap();
assert_eq!(val, dec);
}
#[test]
fn test_prefix() {
let val = super::Cl::prefix();
assert_eq!(val, b"/!cl\0")
}
#[test]
fn test_suffix() {
let val = super::Cl::suffix();
assert_eq!(val, b"/!cl\xff")
}
}

13
lib/src/key/debug.rs Normal file
View file

@ -0,0 +1,13 @@
use crate::kvs::Key;
/// Helpers for debugging keys
/// sprint_key converts a key to an escaped string.
/// This is used for logging and debugging tests and should not be used in implementation code.
pub fn sprint_key(key: &Key) -> String {
key.clone()
.iter()
.flat_map(|&byte| std::ascii::escape_default(byte))
.map(|byte| byte as char)
.collect::<String>()
}

View file

@ -1,4 +1,4 @@
use crate::dbs::cl::Timestamp;
use crate::dbs::cl::{KeyTimestamp, Timestamp};
use derive::Key;
use serde::{Deserialize, Serialize};
use uuid::Uuid;
@ -9,8 +9,8 @@ pub struct Hb {
_a: u8,
_b: u8,
_c: u8,
_d: u8,
pub hb: Timestamp,
_d: u8,
#[serde(with = "uuid::serde::compact")]
pub nd: Uuid,
}
@ -18,15 +18,40 @@ pub struct Hb {
impl Hb {
pub fn new(hb: Timestamp, nd: Uuid) -> Self {
Self {
__: 0x2f, // /
_a: 0x21, // !
_b: 0x68, // h
_c: 0x62, // b
__: b'/',
_a: b'!',
_b: b'h',
_c: b'b',
hb,
_d: 0x2f, // /
_d: b'/',
nd,
}
}
pub fn prefix() -> Vec<u8> {
let mut k = super::kv::new().encode().unwrap();
k.extend_from_slice(&[b'!', b'h', b'b', 0x00]);
k
}
pub fn suffix(ts: &Timestamp) -> Vec<u8> {
// Add one to timestmap so we get a complete range inclusive of provided timestamp
// Also convert type
let tskey: KeyTimestamp = KeyTimestamp {
value: ts.value + 1,
};
let mut k = super::kv::new().encode().unwrap();
k.extend_from_slice(&[b'!', b'h', b'b']);
k.extend_from_slice(tskey.encode().unwrap().as_ref());
k
}
}
impl From<Timestamp> for Hb {
fn from(ts: Timestamp) -> Self {
let empty_uuid = uuid::Uuid::nil();
Self::new(ts, empty_uuid)
}
}
#[cfg(test)]
@ -35,12 +60,32 @@ mod tests {
fn key() {
use super::*;
#[rustfmt::skip]
let val = Hb::new(
let val = Hb::new(
Timestamp { value: 123 },
Uuid::default(),
Uuid::from_bytes([1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16])
);
let enc = Hb::encode(&val).unwrap();
assert_eq!(
enc,
b"/!hb\x00\x00\x00\x00\x00\x00\x00\x7b/\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f\x10");
let dec = Hb::decode(&enc).unwrap();
assert_eq!(val, dec);
}
#[test]
fn prefix() {
use super::*;
let actual = Hb::prefix();
assert_eq!(actual, b"/!hb\x00")
}
#[test]
fn suffix() {
use super::*;
let ts: Timestamp = Timestamp {
value: 456,
};
let actual = Hb::suffix(&ts);
assert_eq!(actual, b"/!hb\x00\x00\x00\x00\x00\x00\x01\xc9") // 457, because we add 1 to the timestamp
}
}

View file

@ -8,6 +8,7 @@ pub struct Lq<'a> {
_a: u8,
_b: u8,
_c: u8,
#[serde(with = "uuid::serde::compact")]
pub nd: Uuid,
_d: u8,
pub ns: &'a str,
@ -24,6 +25,20 @@ pub fn new<'a>(nd: Uuid, ns: &'a str, db: &'a str, lq: Uuid) -> Lq<'a> {
Lq::new(nd, ns, db, lq)
}
pub fn prefix_nd(nd: &Uuid) -> Vec<u8> {
let mut k = [b'/', b'!', b'n', b'd'].to_vec();
k.extend_from_slice(nd.as_bytes());
k.extend_from_slice(&[0x00]);
k
}
pub fn suffix_nd(nd: &Uuid) -> Vec<u8> {
let mut k = [b'/', b'!', b'n', b'd'].to_vec();
k.extend_from_slice(nd.as_bytes());
k.extend_from_slice(&[0xff]);
k
}
impl<'a> Lq<'a> {
pub fn new(nd: Uuid, ns: &'a str, db: &'a str, lq: Uuid) -> Self {
Self {
@ -46,18 +61,44 @@ impl<'a> Lq<'a> {
#[cfg(test)]
mod tests {
#[test]
fn key() {
use super::*;
#[rustfmt::skip]
let val = Lq::new(
Uuid::default(),
"test",
"test",
Uuid::default(),
);
let nd = Uuid::from_bytes([0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0a, 0x0b, 0x0c, 0x0d, 0x0e, 0x0f, 0x10]);
#[rustfmt::skip]
let lv = Uuid::from_bytes([0x11, 0x12, 0x13, 0x14, 0x15, 0x16, 0x17, 0x18, 0x19, 0x1a, 0x1b, 0x1c, 0x1d, 0x1e, 0x1f, 0x20]);
let val = Lq::new(nd, "testns", "testdb", lv);
let enc = Lq::encode(&val).unwrap();
assert_eq!(enc, b"/!nd\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f\x10*testns\x00*testdb\x00!lv\x11\x12\x13\x14\x15\x16\x17\x18\x19\x1a\x1b\x1c\x1d\x1e\x1f\x20" );
let dec = Lq::decode(&enc).unwrap();
assert_eq!(val, dec);
}
#[test]
fn prefix_nd() {
use super::*;
let nd = Uuid::from_bytes([
0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0a, 0x0b, 0x0c, 0x0d, 0x0e,
0x0f, 0x10,
]);
let val = prefix_nd(&nd);
assert_eq!(
val,
b"/!nd\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f\x10\x00"
);
}
#[test]
fn suffix_nd() {
use super::*;
let nd = Uuid::from_bytes([1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16]);
let val = suffix_nd(&nd);
assert_eq!(
val,
b"/!nd\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f\x10\xff"
);
}
}

View file

@ -54,18 +54,34 @@ impl<'a> Lv<'a> {
#[cfg(test)]
mod tests {
use crate::key::debug;
#[test]
fn key() {
use super::*;
#[rustfmt::skip]
let val = Lv::new(
"test",
"test",
"test",
Uuid::default(),
);
let live_query_id = Uuid::from_bytes([1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16]);
let val = Lv::new("testns", "testdb", "testtb", live_query_id);
let enc = Lv::encode(&val).unwrap();
println!("{:?}", debug::sprint_key(&enc));
assert_eq!(
enc,
b"/*testns\x00*testdb\x00*testtb\x00!lv\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f\x10"
);
let dec = Lv::decode(&enc).unwrap();
assert_eq!(val, dec);
}
#[test]
fn prefix() {
let val = super::prefix("testns", "testdb", "testtb");
assert_eq!(val, b"/*testns\x00*testdb\x00*testtb\x00!lv\x00")
}
#[test]
fn suffix() {
let val = super::suffix("testns", "testdb", "testtb");
assert_eq!(val, b"/*testns\x00*testdb\x00*testtb\x00!lv\xff")
}
}

View file

@ -2,11 +2,11 @@
///
/// KV /
///
/// ND /!nd{nd}
/// LQ /!nd{nd}*{ns}*{db}!lq{lq}
///
/// HB /!hb{ts}/{nd}
///
/// ND /!nd{nd}
/// NQ /!nd{nd}*{ns}*{db}!lq{lq}
///
/// NS /!ns{ns}
///
/// Namespace /*{ns}
@ -16,7 +16,7 @@
///
/// Database /*{ns}*{db}
/// AZ /*{ns}*{db}!az{az}
/// CF /*{ns}*{db}!cf{ts}
/// CF /*{ns}*{db}!cf{ts}
/// DL /*{ns}*{db}!dl{us}
/// DT /*{ns}*{db}!dt{tk}
/// PA /*{ns}*{db}!pa{pa}
@ -67,6 +67,7 @@ pub mod cf; // Stores change feeds
pub mod cl; // Stores cluster membership information
pub mod database; // Stores the key prefix for all keys under a database
pub mod db; // Stores a DEFINE DATABASE config definition
pub mod debug; // Debug purposes only. It may be used in logs. Not for key handling in implementation code.
pub mod dl; // Stores a DEFINE LOGIN ON DATABASE config definition
pub mod dt; // Stores a DEFINE LOGIN ON DATABASE config definition
pub mod dv; // Stores database versionstamps
@ -91,7 +92,7 @@ pub mod scope; // Stores the key prefix for all keys under a scope
pub mod st; // Stores a DEFINE TOKEN ON SCOPE config definition
pub mod table; // Stores the key prefix for all keys under a table
pub mod tb; // Stores a DEFINE TABLE config definition
pub mod thing;
pub mod thing; // Stores a record id
const CHAR_PATH: u8 = 0xb1; // ±
const CHAR_INDEX: u8 = 0xa4; // ¤

View file

@ -1,5 +1,5 @@
use super::tx::Transaction;
use crate::ctx::Context;
use crate::dbs::cl::Timestamp;
use crate::dbs::Attach;
use crate::dbs::Executor;
use crate::dbs::Notification;
@ -8,6 +8,9 @@ use crate::dbs::Response;
use crate::dbs::Session;
use crate::dbs::Variables;
use crate::err::Error;
use crate::key::hb::Hb;
use crate::key::lq;
use crate::key::lv::Lv;
use crate::sql;
use crate::sql::Query;
use crate::sql::Value;
@ -18,8 +21,22 @@ use std::fmt;
use std::sync::Arc;
use std::time::Duration;
use tracing::instrument;
use tracing::trace;
use uuid::Uuid;
use super::tx::Transaction;
/// Used for cluster logic to move LQ data to LQ cleanup code
/// Not a stored struct; Used only in this module
#[derive(Debug, Clone, Eq, PartialEq)]
pub struct LqValue {
pub cl: Uuid,
pub ns: String,
pub db: String,
pub tb: String,
pub lq: Uuid,
}
/// The underlying datastore instance which stores the dataset.
#[allow(dead_code)]
pub struct Datastore {
@ -114,6 +131,13 @@ impl Datastore {
/// # }
/// ```
pub async fn new(path: &str) -> Result<Datastore, Error> {
let id = Uuid::new_v4();
Self::new_full(path, id).await
}
// For testing
pub async fn new_full(path: &str, node_id: Uuid) -> Result<Datastore, Error> {
// Initiate the desired datastore
let inner = match path {
"memory" => {
#[cfg(feature = "kv-mem")]
@ -218,7 +242,7 @@ impl Datastore {
};
// Set the properties on the datastore
inner.map(|inner| Self {
id: Uuid::default(),
id: node_id,
inner,
strict: false,
query_timeout: None,
@ -251,24 +275,204 @@ impl Datastore {
self
}
// Adds entries to the KV store indicating membership information
pub async fn register_membership(&self) -> Result<(), Error> {
/// Creates a new datastore instance
///
/// Use this for clustered environments.
pub async fn new_with_bootstrap(path: &str) -> Result<Datastore, Error> {
let ds = Datastore::new(path).await?;
ds.bootstrap().await?;
Ok(ds)
}
// Initialise bootstrap with implicit values intended for runtime
pub async fn bootstrap(&self) -> Result<(), Error> {
self.bootstrap_full(&self.id).await
}
// Initialise bootstrap with artificial values, intended for testing
pub async fn bootstrap_full(&self, node_id: &Uuid) -> Result<(), Error> {
trace!("Bootstrapping {}", self.id);
let mut tx = self.transaction(true, false).await?;
tx.set_cl(self.id).await?;
tx.set_hb(self.id).await?;
let now = tx.clock();
let archived = self.register_remove_and_archive(&mut tx, node_id, now).await?;
tx.commit().await?;
let mut tx = self.transaction(true, false).await?;
self.remove_archived(&mut tx, archived).await?;
tx.commit().await
}
// Node registration + "mark" stage of mark-and-sweep gc
pub async fn register_remove_and_archive(
&self,
tx: &mut Transaction,
node_id: &Uuid,
timestamp: Timestamp,
) -> Result<Vec<LqValue>, Error> {
trace!("Registering node {}", node_id);
self.register_membership(tx, node_id, &timestamp).await?;
// Determine the timeout for when a cluster node is expired
let ts_expired = (timestamp.clone() - std::time::Duration::from_secs(5))?;
let dead = self.remove_dead_nodes(tx, &ts_expired).await?;
self.archive_dead_lqs(tx, &dead, node_id).await
}
// Adds entries to the KV store indicating membership information
pub async fn register_membership(
&self,
tx: &mut Transaction,
node_id: &Uuid,
timestamp: &Timestamp,
) -> Result<(), Error> {
tx.set_cl(*node_id).await?;
tx.set_hb(timestamp.clone(), *node_id).await?;
Ok(())
}
// Creates a heartbeat entry for the member indicating to the cluster
// that the node is alive
pub async fn heartbeat(&self) -> Result<(), Error> {
let mut tx = self.transaction(true, false).await?;
tx.set_hb(self.id).await?;
tx.commit().await?;
/// Delete dead heartbeats and nodes
/// Returns node IDs
pub async fn remove_dead_nodes(
&self,
tx: &mut Transaction,
ts: &Timestamp,
) -> Result<Vec<Uuid>, Error> {
let hbs = self.delete_dead_heartbeats(tx, ts).await?;
let mut nodes = vec![];
for hb in hbs {
trace!("Deleting node {}", &hb.nd);
tx.del_cl(hb.nd).await?;
nodes.push(hb.nd);
}
Ok(nodes)
}
/// Accepts cluster IDs
/// Archives related live queries
/// Returns live query keys that can be used for deletes
///
/// The reason we archive first is to stop other nodes from picking it up for further updates
/// This means it will be easier to wipe the range in a subsequent transaction
pub async fn archive_dead_lqs(
&self,
tx: &mut Transaction,
nodes: &[Uuid],
this_node_id: &Uuid,
) -> Result<Vec<LqValue>, Error> {
let mut archived = vec![];
for nd in nodes.iter() {
trace!("Archiving node {}", &nd);
// Scan on node prefix for LQ space
let node_lqs = tx.scan_lq(nd, 1000).await?;
trace!("Found {} LQ entries for {:?}", node_lqs.len(), nd);
for lq in node_lqs {
trace!("Archiving query {:?}", &lq);
let node_archived_lqs = self.archive_lv_for_node(tx, &lq.cl, this_node_id).await?;
for lq_value in node_archived_lqs {
archived.push(lq_value);
}
}
}
Ok(archived)
}
pub async fn remove_archived(
&self,
tx: &mut Transaction,
archived: Vec<LqValue>,
) -> Result<(), Error> {
for lq in archived {
// Delete the cluster key, used for finding LQ associated with a node
tx.del(lq::new(lq.cl, lq.ns.as_str(), lq.db.as_str(), lq.lq)).await?;
// Delete the table key, used for finding LQ associated with a table
tx.del(Lv::new(lq.ns.as_str(), lq.db.as_str(), lq.tb.as_str(), lq.lq)).await?;
}
Ok(())
}
pub async fn _garbage_collect(
// TODO not invoked
// But this is garbage collection outside of bootstrap
&self,
tx: &mut Transaction,
watermark: &Timestamp,
this_node_id: &Uuid,
) -> Result<(), Error> {
let dead_heartbeats = self.delete_dead_heartbeats(tx, watermark).await?;
trace!("Found dead hbs: {:?}", dead_heartbeats);
let mut archived: Vec<LqValue> = vec![];
for hb in dead_heartbeats {
let new_archived = self.archive_lv_for_node(tx, &hb.nd, this_node_id).await?;
tx.del_cl(hb.nd).await?;
trace!("Deleted node {}", hb.nd);
for lq_value in new_archived {
archived.push(lq_value);
}
}
Ok(())
}
// Returns a list of live query IDs
pub async fn archive_lv_for_node(
&self,
tx: &mut Transaction,
nd: &Uuid,
this_node_id: &Uuid,
) -> Result<Vec<LqValue>, Error> {
let lqs = tx.all_lq(nd).await?;
trace!("Archiving lqs and found {} LQ entries for {}", lqs.len(), nd);
let mut ret = vec![];
for lq in lqs {
let lvs = tx.get_lv(lq.ns.as_str(), lq.db.as_str(), lq.tb.as_str(), &lq.lq).await?;
let archived_lvs = lvs.clone().archive(*this_node_id);
tx.putc_lv(&lq.ns, &lq.db, &lq.tb, archived_lvs, Some(lvs)).await?;
ret.push(lq);
}
Ok(ret)
}
/// Given a timestamp, delete all the heartbeats that have expired
/// Return the removed heartbeats as they will contain node information
pub async fn delete_dead_heartbeats(
&self,
tx: &mut Transaction,
ts: &Timestamp,
) -> Result<Vec<Hb>, Error> {
let limit = 1000;
let dead = tx.scan_hb(ts, limit).await?;
tx.delr_hb(dead.clone(), 1000).await?;
for dead_node in dead.clone() {
tx.del_cl(dead_node.nd).await?;
}
Ok::<Vec<Hb>, Error>(dead)
}
// Creates a heartbeat entry for the member indicating to the cluster
// that the node is alive.
// This is the preferred way of creating heartbeats inside the database, so try to use this.
pub async fn heartbeat(&self) -> Result<(), Error> {
let mut tx = self.transaction(true, false).await?;
let timestamp = tx.clock();
self.heartbeat_full(&mut tx, timestamp, self.id).await?;
tx.commit().await
}
// Creates a heartbeat entry for the member indicating to the cluster
// that the node is alive. Intended for testing.
// This includes all dependencies that are hard to control and is done in such a way for testing.
// Inside the database, try to use the heartbeat() function instead.
pub async fn heartbeat_full(
&self,
tx: &mut Transaction,
timestamp: Timestamp,
node_id: Uuid,
) -> Result<(), Error> {
tx.set_hb(timestamp, node_id).await
}
// -----
// End cluster helpers, storage functions here
// -----
/// Create a new transaction on this datastore
///
/// ```rust,no_run

View file

@ -0,0 +1,114 @@
use futures::lock::Mutex;
use std::sync::Arc;
use crate::ctx::context;
use crate::dbs::{Options, Session};
use crate::sql;
use crate::sql::statements::LiveStatement;
use crate::sql::Value::Table;
use crate::sql::{Fields, Value};
use uuid;
#[tokio::test]
#[serial]
async fn expired_nodes_are_garbage_collected() {
let test = match init().await {
Ok(test) => test,
Err(e) => panic!("{}", e),
};
// Set up the first node at an early timestamp
let old_node = uuid::Uuid::new_v4();
let old_time = Timestamp {
value: 123,
};
test.bootstrap_at_time(&old_node, old_time.clone()).await.unwrap();
// Set up second node at a later timestamp
let new_node = uuid::Uuid::new_v4();
let new_time = Timestamp {
value: 456,
};
test.bootstrap_at_time(&new_node, new_time.clone()).await.unwrap();
// Now scan the heartbeats to validate there is only one node left
let mut tx = test.db.transaction(true, false).await.unwrap();
let scanned = tx.scan_hb(&new_time, 100).await.unwrap();
assert_eq!(scanned.len(), 1);
for hb in scanned.iter() {
assert_eq!(&hb.nd, &new_node);
}
// And scan the nodes to verify its just the latest also
let scanned = tx.scan_cl(100).await.unwrap();
assert_eq!(scanned.len(), 1);
for cl in scanned.iter() {
assert_eq!(&cl.name, &new_node.to_string());
}
tx.commit().await.unwrap();
}
#[tokio::test]
#[serial]
async fn expired_nodes_get_live_queries_archived() {
let test = match init().await {
Ok(test) => test,
Err(e) => panic!("{}", e),
};
// Set up the first node at an early timestamp
let old_node = uuid::Uuid::from_fields(0, 1, 2, &[3, 4, 5, 6, 7, 8, 9, 10]);
let old_time = Timestamp {
value: 123,
};
test.bootstrap_at_time(&old_node, old_time.clone()).await.unwrap();
// Set up live query
let ses = Session::for_kv()
.with_ns(test.test_str("testns").as_str())
.with_db(test.test_str("testdb").as_str());
let table = "my_table";
let lq = LiveStatement {
id: sql::Uuid(uuid::Uuid::new_v4()),
node: Uuid::new_v4(),
expr: Fields(vec![sql::Field::All], false),
what: Table(sql::Table::from(table)),
cond: None,
fetch: None,
archived: Some(old_node),
};
let ctx = context::Context::background();
let (sender, _) = channel::unbounded();
let opt = Options::new()
.with_ns(ses.ns())
.with_db(ses.db())
.with_auth(Arc::new(Default::default()))
.with_live(true)
.with_id(old_node.clone());
let opt = Options::new_with_sender(&opt, sender);
let tx = Arc::new(Mutex::new(test.db.transaction(true, false).await.unwrap()));
let res = lq.compute(&ctx, &opt, &tx, None).await.unwrap();
match res {
Value::Uuid(_) => {}
_ => {
panic!("Not a uuid: {:?}", res);
}
}
tx.lock().await.commit().await.unwrap();
// Set up second node at a later timestamp
let new_node = uuid::Uuid::from_fields(16, 17, 18, &[19, 20, 21, 22, 23, 24, 25, 26]);
let new_time = Timestamp {
value: 456,
}; // TODO These timestsamps are incorrect and should really be derived; Also check timestamp errors
test.bootstrap_at_time(&new_node, new_time.clone()).await.unwrap();
// Now validate lq was removed
let mut tx = test.db.transaction(true, false).await.unwrap();
let scanned =
tx.all_lv(ses.ns().unwrap().as_ref(), ses.db().unwrap().as_ref(), table).await.unwrap();
assert_eq!(scanned.len(), 0);
tx.commit().await.unwrap();
}

View file

@ -0,0 +1,63 @@
use crate::dbs::cl::Timestamp;
use crate::err::Error;
use std::sync::Once;
use tracing::Level;
use tracing_subscriber;
pub struct TestContext {
pub(crate) db: Datastore,
// A string identifier for this context.
// It will usually be a uuid or combination of uuid and fixed string identifier.
// It is useful for separating test setups when environments are shared.
pub(crate) context_id: String,
}
static INIT: Once = Once::new();
/// TestContext is a container for an initialised test context
/// Anything stateful (such as storage layer and logging) can be tied with this
impl TestContext {
pub(crate) async fn bootstrap_at_time(
&self,
node_id: &Uuid,
time: Timestamp,
) -> Result<(), Error> {
let mut tx = self.db.transaction(true, true).await?;
let archived = self.db.register_remove_and_archive(&mut tx, node_id, time).await?;
tx.commit().await?;
let mut tx = self.db.transaction(true, true).await?;
self.db.remove_archived(&mut tx, archived).await?;
Ok(tx.commit().await?)
}
// Use this to generate strings that have the test uuid associated with it
pub fn test_str(&self, prefix: &str) -> String {
return format!("{}-{}", prefix, self.context_id);
}
}
/// Initialise logging and prepare a useable datastore
/// In the future it would be nice to handle multiple datastores
pub(crate) async fn init() -> Result<TestContext, Error> {
// Set tracing for tests for debug, but only do it once
INIT.call_once(|| {
let _subscriber = tracing_subscriber::fmt().with_max_level(Level::TRACE).try_init();
});
let db = new_ds().await;
return Ok(TestContext {
db,
context_id: Uuid::new_v4().to_string(), // The context does not always have to be a uuid
});
}
/// Scan the entire storage layer displaying keys
/// Useful to debug scans ;)
async fn _debug_scan(tx: &mut Transaction, message: &str) {
let r = tx.scan(vec![0]..vec![u8::MAX], 100000).await.unwrap();
println!("START OF RANGE SCAN - {}", message);
for (k, _v) in r.iter() {
println!("{}", crate::key::debug::sprint_key(k.as_ref()));
}
println!("END OF RANGE SCAN - {}", message);
}

42
lib/src/kvs/tests/lq.rs Normal file
View file

@ -0,0 +1,42 @@
use uuid::Uuid;
#[tokio::test]
#[serial]
async fn scan_node_lq() {
let test = init().await.unwrap();
let mut tx = test.db.transaction(true, true).await.unwrap();
let node_id = Uuid::from_bytes([
0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E,
0x0F,
]);
let namespace = "test_namespace";
let database = "test_database";
let live_query_id = Uuid::from_bytes([
0x10, 0x11, 0x12, 0x13, 0x14, 0x15, 0x16, 0x17, 0x18, 0x19, 0x1A, 0x1B, 0x1C, 0x1D, 0x1E,
0x1F,
]);
let key = crate::key::lq::new(node_id, namespace, database, live_query_id);
trace!(
"Inserting key: {}",
key.encode()
.unwrap()
.iter()
.flat_map(|byte| std::ascii::escape_default(byte.clone()))
.map(|byte| byte as char)
.collect::<String>()
);
let _ = tx.putc(key, "value", None).await.unwrap();
tx.commit().await.unwrap();
let mut tx = test.db.transaction(true, true).await.unwrap();
let res = tx.scan_lq(&node_id, 100).await.unwrap();
assert_eq!(res.len(), 1);
for val in res {
assert_eq!(val.cl, node_id);
assert_eq!(val.ns, namespace);
assert_eq!(val.db, database);
assert_eq!(val.lq, live_query_id);
}
tx.commit().await.unwrap();
}

52
lib/src/kvs/tests/lv.rs Normal file
View file

@ -0,0 +1,52 @@
use crate::sql::statements::live::live;
#[tokio::test]
#[serial]
async fn archive_lv_for_node_archives() {
let test = init().await.unwrap();
let mut tx = test.db.transaction(true, true).await.unwrap();
let namespace = "test_namespace";
let database = "test_database";
let table = "test_table";
let node_id = Uuid::from_bytes([
0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E,
0x0F,
]);
tx.set_cl(node_id).await.unwrap();
let lv_id = Uuid::from_bytes([
0x10, 0x11, 0x12, 0x13, 0x14, 0x15, 0x16, 0x17, 0x18, 0x19, 0x1A, 0x1B, 0x1C, 0x1D, 0x1E,
0x1F,
]);
let key = crate::key::lq::new(node_id, namespace, database, lv_id);
tx.putc(key, table, None).await.unwrap();
let (_, mut stm) = live(format!("LIVE SELECT * FROM {}", table).as_str()).unwrap();
stm.id = crate::sql::Uuid::from(lv_id);
tx.putc_lv(namespace, database, table, stm, None).await.unwrap();
let this_node_id = Uuid::from_bytes([
0x20, 0x21, 0x22, 0x23, 0x24, 0x25, 0x26, 0x27, 0x28, 0x29, 0x2A, 0x2B, 0x2C, 0x2D, 0x2E,
0x2F,
]);
// We commit after setup because otherwise in memory does not have read your own writes
// i.e. setup data is part of same transaction as required implementation checks
tx.commit().await.unwrap();
let mut tx = test.db.transaction(true, false).await.unwrap();
let results = test.db.archive_lv_for_node(&mut tx, &node_id, &this_node_id).await.unwrap();
assert_eq!(results.len(), 1);
assert_eq!(results[0].cl, node_id);
assert_eq!(results[0].ns, namespace);
assert_eq!(results[0].db, database);
assert_eq!(results[0].tb, table);
assert_eq!(results[0].lq, lv_id);
tx.commit().await.unwrap();
let mut tx = test.db.transaction(true, false).await.unwrap();
let lv = tx.all_lv(namespace, database, table).await.unwrap();
assert_eq!(lv.len(), 1, "{:?}", lv);
assert_eq!(lv[0].archived, Some(this_node_id));
tx.commit().await.unwrap();
}

View file

@ -13,8 +13,13 @@ mod mem {
new_ds().await.transaction(write, lock).await.unwrap()
}
include!("helper.rs");
include!("cluster_init.rs");
include!("lq.rs");
include!("lv.rs");
include!("raw.rs");
include!("snapshot.rs");
include!("tb.rs");
include!("multireader.rs");
}
@ -35,8 +40,13 @@ mod rocksdb {
new_ds().await.transaction(write, lock).await.unwrap()
}
include!("helper.rs");
include!("cluster_init.rs");
include!("lq.rs");
include!("lv.rs");
include!("raw.rs");
include!("snapshot.rs");
include!("tb.rs");
include!("multireader.rs");
include!("multiwriter_different_keys.rs");
include!("multiwriter_same_keys_conflict.rs");
@ -59,8 +69,13 @@ mod speedb {
new_ds().await.transaction(write, lock).await.unwrap()
}
include!("helper.rs");
include!("cluster_init.rs");
include!("lq.rs");
include!("lv.rs");
include!("raw.rs");
include!("snapshot.rs");
include!("tb.rs");
include!("multireader.rs");
include!("multiwriter_different_keys.rs");
include!("multiwriter_same_keys_conflict.rs");
@ -87,8 +102,13 @@ mod tikv {
new_ds().await.transaction(write, lock).await.unwrap()
}
include!("cluster_init.rs");
include!("helper.rs");
include!("lq.rs");
include!("lv.rs");
include!("raw.rs");
include!("snapshot.rs");
include!("tb.rs");
include!("multireader.rs");
include!("multiwriter_different_keys.rs");
include!("multiwriter_same_keys_conflict.rs");
@ -115,8 +135,13 @@ mod fdb {
new_ds().await.transaction(write, lock).await.unwrap()
}
include!("cluster_init.rs");
include!("helper.rs");
include!("lq.rs");
include!("lv.rs");
include!("raw.rs");
include!("snapshot.rs");
include!("tb.rs");
include!("multireader.rs");
include!("multiwriter_different_keys.rs");
include!("multiwriter_same_keys_allow.rs");

View file

@ -1,103 +1,90 @@
#[cfg(feature = "kv-mem")]
pub(crate) mod table {
use crate::err::Error;
use crate::key::tb;
use crate::key::tb::Tb;
use crate::kvs::Datastore;
use crate::sql::statements::DefineTableStatement;
use crate::key::tb;
use crate::key::tb::Tb;
use crate::sql::statements::DefineTableStatement;
struct TestContext {
db: Datastore,
}
#[tokio::test]
#[serial]
async fn table_definitions_can_be_scanned() {
// Setup
let test = match init().await {
Ok(ctx) => ctx,
Err(e) => panic!("{:?}", e),
};
let mut tx = match test.db.transaction(true, false).await {
Ok(tx) => tx,
Err(e) => panic!("{:?}", e),
};
async fn init() -> Result<TestContext, Error> {
let db = Datastore::new("memory").await?;
return Ok(TestContext {
db,
});
}
// Create a table definition
let namespace = "test_namespace";
let database = "test_database";
let table = "test_table";
let key = Tb::new(namespace, database, table);
let value = DefineTableStatement {
name: Default::default(),
drop: false,
full: false,
view: None,
permissions: Default::default(),
changefeed: None,
};
match tx.set(&key, &value).await {
Ok(_) => {}
Err(e) => panic!("{:?}", e),
};
#[tokio::test]
#[rustfmt::skip]
async fn table_definitions_can_be_scanned() {
// Setup
let test = match init().await {
Ok(ctx) => ctx,
Err(e) => panic!("{:?}", e),
};
let mut tx = match test.db.transaction(true, false).await {
Ok(tx) => tx,
Err(e) => panic!("{:?}", e),
};
// Create a table definition
let namespace = "test_namespace";
let database = "test_database";
let table = "test_table";
let key = Tb::new(namespace, database, table);
let value = DefineTableStatement {
name: Default::default(),
drop: false,
full: false,
view: None,
permissions: Default::default(),
};
match tx.set(&key, &value).await {
Ok(_) => {}
Err(e) => panic!("{:?}", e),
};
// Validate with scan
match tx.scan(tb::prefix(namespace, database)..tb::suffix(namespace, database), 1000).await {
Ok(scan) => {
assert_eq!(scan.len(), 1);
let read = DefineTableStatement::from(&scan[0].1);
assert_eq!(&read, &value);
}
Err(e) => panic!("{:?}", e),
// Validate with scan
match tx.scan(tb::prefix(namespace, database)..tb::suffix(namespace, database), 1000).await {
Ok(scan) => {
assert_eq!(scan.len(), 1);
let read = DefineTableStatement::from(&scan[0].1);
assert_eq!(&read, &value);
}
}
#[tokio::test]
async fn table_definitions_can_be_deleted() {
// Setup
let test = match init().await {
Ok(ctx) => ctx,
Err(e) => panic!("{:?}", e),
};
let mut tx = match test.db.transaction(true, false).await {
Ok(tx) => tx,
Err(e) => panic!("{:?}", e),
};
// Create a table definition
let namespace = "test_namespace";
let database = "test_database";
let table = "test_table";
let key = Tb::new(namespace, database, table);
let value = DefineTableStatement {
name: Default::default(),
drop: false,
full: false,
view: None,
permissions: Default::default(),
};
match tx.set(&key, &value).await {
Ok(_) => {}
Err(e) => panic!("{:?}", e),
};
// Validate delete
match tx.del(&key).await {
Ok(_) => {}
Err(e) => panic!("{:?}", e),
};
// Should not exist
match tx.get(&key).await {
Ok(None) => {}
Ok(Some(o)) => panic!("Should not exist but was {:?}", o),
Err(e) => panic!("Unexpected error on get {:?}", e),
};
Err(e) => panic!("{:?}", e),
}
}
#[tokio::test]
#[serial]
async fn table_definitions_can_be_deleted() {
// Setup
let test = match init().await {
Ok(ctx) => ctx,
Err(e) => panic!("{:?}", e),
};
let mut tx = match test.db.transaction(true, false).await {
Ok(tx) => tx,
Err(e) => panic!("{:?}", e),
};
// Create a table definition
let namespace = "test_namespace";
let database = "test_database";
let table = "test_table";
let key = Tb::new(namespace, database, table);
let value = DefineTableStatement {
name: Default::default(),
drop: false,
full: false,
view: None,
permissions: Default::default(),
changefeed: None,
};
match tx.set(&key, &value).await {
Ok(_) => {}
Err(e) => panic!("{:?}", e),
};
// Validate delete
match tx.del(&key).await {
Ok(_) => {}
Err(e) => panic!("{:?}", e),
};
// Should not exist
match tx.get(&key).await {
Ok(None) => {}
Ok(Some(o)) => panic!("Should not exist but was {:?}", o),
Err(e) => panic!("Unexpected error on get {:?}", e),
};
}

View file

@ -5,15 +5,20 @@ use super::Val;
use crate::dbs::cl::ClusterMembership;
use crate::dbs::cl::Timestamp;
use crate::err::Error;
use crate::key::thing;
use crate::key::hb::Hb;
use crate::key::lq::Lq;
use crate::key::lv::Lv;
use crate::key::{lq, thing};
use crate::kvs::cache::Cache;
use crate::kvs::cache::Entry;
use crate::kvs::LqValue;
use crate::sql;
use crate::sql::paths::EDGE;
use crate::sql::paths::IN;
use crate::sql::paths::OUT;
use crate::sql::thing::Thing;
use crate::sql::value::Value;
use crate::sql::Strand;
use channel::Sender;
use sql::permission::Permissions;
use sql::statements::DefineAnalyzerStatement;
@ -820,7 +825,7 @@ impl Transaction {
}
}
fn clock(&self) -> Timestamp {
pub(crate) fn clock(&self) -> Timestamp {
// Use a timestamp oracle if available
let now: u128 = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_millis();
Timestamp {
@ -829,21 +834,190 @@ impl Transaction {
}
// Set heartbeat
pub async fn set_hb(&mut self, id: Uuid) -> Result<(), Error> {
let now = self.clock();
let key = crate::key::hb::Hb::new(now.clone(), id);
pub async fn set_hb(&mut self, timestamp: Timestamp, id: Uuid) -> Result<(), Error> {
let key = Hb::new(timestamp.clone(), id);
// We do not need to do a read, we always want to overwrite
self.put(
key,
ClusterMembership {
name: id.to_string(),
heartbeat: now,
heartbeat: timestamp,
},
)
.await?;
Ok(())
}
// Delete a cluster registration entry
pub async fn del_cl(&mut self, node: uuid::Uuid) -> Result<(), Error> {
let key = crate::key::cl::Cl::new(node);
self.del(key).await
}
// Delete the live query notification registry on the table
// Return the Table ID
pub async fn del_cllv(&mut self, cl: &Uuid) -> Result<Uuid, Error> {
// This isn't implemented because it is covered by del_cl
// Will add later for remote node kill
Err(Error::ClNotFound {
value: format!("Missing cluster {:?}", cl),
})
}
// Scans up until the heartbeat timestamp and returns the discovered nodes
pub async fn scan_hb(&mut self, time_to: &Timestamp, limit: u32) -> Result<Vec<Hb>, Error> {
let beg = crate::key::hb::Hb::prefix();
let end = crate::key::hb::Hb::suffix(time_to);
trace!("Scan start: {} ({:?})", String::from_utf8_lossy(&beg).to_string(), &beg);
trace!("Scan end: {} ({:?})", String::from_utf8_lossy(&end).to_string(), &end);
let mut nxt: Option<Key> = None;
let mut num = limit;
let mut out: Vec<Hb> = vec![];
// Start processing
while num > 0 {
// Get records batch
let res = match nxt {
None => {
let min = beg.clone();
let max = end.clone();
let num = std::cmp::min(1000, num);
self.scan(min..max, num).await?
}
Some(ref mut beg) => {
beg.push(0x00);
let min = beg.clone();
let max = end.clone();
let num = std::cmp::min(1000, num);
self.scan(min..max, num).await?
}
};
// Get total results
let n = res.len();
// Exit when settled
if n == 0 {
break;
}
// Loop over results
for (i, (k, _)) in res.into_iter().enumerate() {
// Ready the next
if n == i + 1 {
nxt = Some(k.clone());
}
out.push(Hb::decode(k.as_slice())?);
// Count
num -= 1;
}
}
trace!("scan_hb: {:?}", out);
Ok(out)
}
pub async fn scan_cl(&mut self, limit: u32) -> Result<Vec<ClusterMembership>, Error> {
let beg = crate::key::cl::Cl::prefix();
let end = crate::key::cl::Cl::suffix();
trace!("Scan start: {} ({:?})", String::from_utf8_lossy(&beg).to_string(), &beg);
trace!("Scan end: {} ({:?})", String::from_utf8_lossy(&end).to_string(), &end);
let mut nxt: Option<Key> = None;
let mut num = limit;
let mut out: Vec<ClusterMembership> = vec![];
// Start processing
while num > 0 {
// Get records batch
let res = match nxt {
None => {
let min = beg.clone();
let max = end.clone();
let num = std::cmp::min(1000, num);
self.scan(min..max, num).await?
}
Some(ref mut beg) => {
beg.push(0x00);
let min = beg.clone();
let max = end.clone();
let num = std::cmp::min(1000, num);
self.scan(min..max, num).await?
}
};
// Get total results
let n = res.len();
// Exit when settled
if n == 0 {
break;
}
// Loop over results
for (i, (k, v)) in res.into_iter().enumerate() {
// Ready the next
if n == i + 1 {
nxt = Some(k.clone());
}
out.push((&v).into());
// Count
num -= 1;
}
}
trace!("scan_hb: {:?}", out);
Ok(out)
}
pub async fn delr_hb(&mut self, ts: Vec<Hb>, limit: u32) -> Result<(), Error> {
trace!("delr_hb: ts={:?} limit={:?}", ts, limit);
for hb in ts.into_iter() {
self.del(hb).await?;
}
Ok(())
}
pub async fn del_lv(&mut self, ns: &str, db: &str, tb: &str, lv: Uuid) -> Result<(), Error> {
trace!("del_lv: ns={:?} db={:?} tb={:?} lv={:?}", ns, db, tb, lv);
let key = crate::key::lv::new(ns, db, tb, lv);
self.cache.del(&key.clone().into());
self.del(key).await
}
pub async fn scan_lq<'a>(
&mut self,
node: &uuid::Uuid,
limit: u32,
) -> Result<Vec<LqValue>, Error> {
let pref = lq::prefix_nd(node);
let suff = lq::suffix_nd(node);
trace!(
"Scanning range from pref={}, suff={}",
crate::key::debug::sprint_key(&pref),
crate::key::debug::sprint_key(&suff),
);
let rng = pref..suff;
let scanned = self.scan(rng, limit).await?;
let mut res: Vec<LqValue> = vec![];
for (key, value) in scanned {
trace!("scan_lq: key={:?} value={:?}", &key, &value);
let lq = Lq::decode(key.as_slice())?;
let tb: String = String::from_utf8(value).unwrap();
res.push(LqValue {
cl: lq.nd,
ns: lq.ns.to_string(),
db: lq.db.to_string(),
tb,
lq: lq.lq,
});
}
Ok(res)
}
pub async fn putc_lv(
&mut self,
ns: &str,
db: &str,
tb: &str,
live_stm: LiveStatement,
expected: Option<LiveStatement>,
) -> Result<(), Error> {
let key = crate::key::lv::new(ns, db, tb, live_stm.id.0);
let key_enc = Lv::encode(&key)?;
trace!("putc_lv ({:?}): key={:?}", &live_stm.id, crate::key::debug::sprint_key(&key_enc));
self.putc(key_enc, live_stm, expected).await
}
/// Retrieve all namespace definitions in a datastore.
pub async fn all_ns(&mut self) -> Result<Arc<[DefineNamespaceStatement]>, Error> {
let key = crate::key::ns::prefix();
@ -1201,6 +1375,30 @@ impl Transaction {
val
})
}
pub async fn all_lq(&mut self, nd: &uuid::Uuid) -> Result<Vec<LqValue>, Error> {
let beg = crate::key::lq::prefix_nd(nd);
let end = crate::key::lq::suffix_nd(nd);
let lq_pairs = self.getr(beg..end, u32::MAX).await?;
let mut lqs = vec![];
for (key, value) in lq_pairs {
let lq_key = Lq::decode(key.as_slice())?;
trace!("Value is {:?}", &value);
let lq_value = String::from_utf8(value).map_err(|e| {
Error::Internal(format!("Failed to decode a value while reading LQ: {}", e))
})?;
let lqv = LqValue {
cl: *nd,
ns: lq_key.ns.to_string(),
db: lq_key.db.to_string(),
tb: lq_value,
lq: lq_key.lq,
};
lqs.push(lqv);
}
Ok(lqs)
}
/// Retrieve all analyzer definitions for a specific database.
pub async fn all_az(
&mut self,
@ -1331,6 +1529,37 @@ impl Transaction {
Ok(val.into())
}
/// Return the table stored at the lq address
pub async fn get_lq(
&mut self,
nd: Uuid,
ns: &str,
db: &str,
lq: Uuid,
) -> Result<Strand, Error> {
let key = lq::new(nd, ns, db, lq);
let val = self.get(key).await?.ok_or(Error::LqNotFound {
value: lq.to_string(),
})?;
Value::from(val).convert_to_strand()
}
pub async fn get_lv(
&mut self,
ns: &str,
db: &str,
tb: &str,
lv: &Uuid,
) -> Result<LiveStatement, Error> {
let key = crate::key::lv::new(ns, db, tb, *lv);
let key_enc = Lv::encode(&key)?;
trace!("Getting lv ({:?}) {:?}", lv, crate::key::debug::sprint_key(&key_enc));
let val = self.get(key_enc).await?.ok_or(Error::LvNotFound {
value: lv.to_string(),
})?;
Ok(val.into())
}
/// Retrieve a specific param definition.
pub async fn get_pa(
&mut self,

View file

@ -77,7 +77,6 @@ impl Constant {
Self::TimeEpoch => ConstantValue::Datetime(Datetime(Utc.timestamp_nanos(0))),
}
}
/// Process this type returning a computed simple Value
pub(crate) async fn compute(
&self,

View file

@ -24,11 +24,16 @@ use std::fmt;
#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize, Store, Hash)]
pub struct LiveStatement {
pub id: Uuid,
pub node: Uuid,
pub node: uuid::Uuid,
pub expr: Fields,
pub what: Value,
pub cond: Option<Cond>,
pub fetch: Option<Fetchs>,
// Non-query properties that are necessary for storage or otherwise carrying information
// When a live query is archived, this should be the node ID that archived the query.
pub archived: Option<uuid::Uuid>,
}
impl LiveStatement {
@ -54,7 +59,10 @@ impl LiveStatement {
// Clone the current statement
let mut stm = self.clone();
// Store the current Node ID
stm.node = Uuid(opt.id()?);
if let Err(e) = opt.id() {
trace!("No ID for live query {:?}, error={:?}", stm, e)
}
stm.node = opt.id()?;
// Insert the node live query
let key = crate::key::lq::new(opt.id()?, opt.ns(), opt.db(), self.id.0);
run.putc(key, tb.as_str(), None).await?;
@ -71,6 +79,11 @@ impl LiveStatement {
// Return the query id
Ok(self.id.clone().into())
}
pub(crate) fn archive(mut self, node_id: uuid::Uuid) -> LiveStatement {
self.archived = Some(node_id);
self
}
}
impl fmt::Display for LiveStatement {
@ -100,11 +113,12 @@ pub fn live(i: &str) -> IResult<&str, LiveStatement> {
i,
LiveStatement {
id: Uuid::new_v4(),
node: Uuid::default(),
node: uuid::Uuid::new_v4(),
expr,
what,
cond,
fetch,
archived: None,
},
))
}

View file

@ -49,6 +49,7 @@ pub async fn init(
.with_strict_mode(strict_mode)
.with_query_timeout(query_timeout)
.with_transaction_timeout(transaction_timeout);
dbs.bootstrap().await?;
// Store database instance
let _ = DB.set(dbs);
// All ok