Change lq from uuid::Uuid to sql::Uuid (#2289)

This commit is contained in:
Przemyslaw Hugh Kaznowski 2023-07-24 17:15:45 +01:00 committed by GitHub
parent d36ece79de
commit e309ee6df2
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 72 additions and 78 deletions

View file

@ -1,7 +1,6 @@
use crate::sql::{Object, Value};
use serde::{ser::SerializeStruct, Deserialize, Serialize};
use crate::sql::{Object, Uuid, Value};
use serde::{Deserialize, Serialize};
use std::fmt::{self, Debug, Display};
use uuid::Uuid;
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "UPPERCASE")]
@ -21,7 +20,7 @@ impl Display for Action {
}
}
#[derive(Clone, Debug, PartialEq, Deserialize)]
#[derive(Clone, Debug, PartialEq, Deserialize, Serialize)]
pub struct Notification {
pub id: Uuid,
pub action: Action,
@ -39,16 +38,3 @@ impl Display for Notification {
write!(f, "{}", obj)
}
}
impl Serialize for Notification {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
let mut val = serializer.serialize_struct("Notification", 3)?;
val.serialize_field("id", &self.id.to_string())?;
val.serialize_field("action", &self.action)?;
val.serialize_field("result", &self.result)?;
val.end()
}
}

View file

@ -5,7 +5,6 @@ use crate::dbs::Notification;
use crate::err::Error;
use channel::Sender;
use std::sync::Arc;
use uuid::Uuid;
/// An Options is passed around when processing a set of query
/// statements. An Options contains specific information for how
@ -16,7 +15,7 @@ use uuid::Uuid;
#[derive(Clone, Debug)]
pub struct Options {
/// Current Node ID
id: Option<Uuid>,
id: Option<uuid::Uuid>,
/// Currently selected NS
ns: Option<Arc<str>>,
/// Currently selected DB
@ -93,7 +92,7 @@ impl Options {
/// Set the Node ID for subsequent code which uses
/// this `Options`, with support for chaining.
pub fn with_id(mut self, id: Uuid) -> Self {
pub fn with_id(mut self, id: uuid::Uuid) -> Self {
self.id = Some(id);
self
}
@ -329,7 +328,7 @@ impl Options {
// --------------------------------------------------
/// Get current Node ID
pub fn id(&self) -> Result<Uuid, Error> {
pub fn id(&self) -> Result<uuid::Uuid, Error> {
self.id.ok_or(Error::Unreachable)
}

View file

@ -36,10 +36,10 @@ impl<'a> Document<'a> {
// Check what type of data change this is
if stm.is_delete() {
// Send a DELETE notification
if opt.id()? == lv.node {
if opt.id()? == lv.node.0 {
let thing = (*rid).clone();
chn.send(Notification {
id: lv.id.0,
id: lv.id.clone(),
action: Action::Delete,
result: Value::Thing(thing),
})
@ -49,9 +49,9 @@ impl<'a> Document<'a> {
}
} else if self.is_new() {
// Send a CREATE notification
if opt.id()? == lv.node {
if opt.id()? == lv.node.0 {
chn.send(Notification {
id: lv.id.0,
id: lv.id.clone(),
action: Action::Create,
result: self.pluck(ctx, opt, txn, &lq).await?,
})
@ -61,9 +61,9 @@ impl<'a> Document<'a> {
}
} else {
// Send a UPDATE notification
if opt.id()? == lv.node {
if opt.id()? == lv.node.0 {
chn.send(Notification {
id: lv.id.0,
id: lv.id.clone(),
action: Action::Update,
result: self.pluck(ctx, opt, txn, &lq).await?,
})

View file

@ -12,8 +12,8 @@ use crate::dbs::Variables;
use crate::err::Error;
use crate::key::root::hb::Hb;
use crate::sql;
use crate::sql::Query;
use crate::sql::Value;
use crate::sql::{Query, Uuid};
use channel::Receiver;
use channel::Sender;
use futures::lock::Mutex;
@ -22,7 +22,6 @@ use std::sync::Arc;
use std::time::Duration;
use tracing::instrument;
use tracing::trace;
use uuid::Uuid;
/// Used for cluster logic to move LQ data to LQ cleanup code
/// Not a stored struct; Used only in this module
@ -322,8 +321,8 @@ impl Datastore {
node_id: &Uuid,
timestamp: &Timestamp,
) -> Result<(), Error> {
tx.set_nd(*node_id).await?;
tx.set_hb(timestamp.clone(), *node_id).await?;
tx.set_nd(node_id.0).await?;
tx.set_hb(timestamp.clone(), node_id.0).await?;
Ok(())
}
@ -339,7 +338,7 @@ impl Datastore {
for hb in hbs {
trace!("Deleting node {}", &hb.nd);
tx.del_nd(hb.nd).await?;
nodes.push(hb.nd);
nodes.push(crate::sql::uuid::Uuid::from(hb.nd));
}
Ok(nodes)
}
@ -364,7 +363,8 @@ impl Datastore {
trace!("Found {} LQ entries for {:?}", node_lqs.len(), nd);
for lq in node_lqs {
trace!("Archiving query {:?}", &lq);
let node_archived_lqs = self.archive_lv_for_node(tx, &lq.nd, this_node_id).await?;
let node_archived_lqs =
self.archive_lv_for_node(tx, &lq.nd, this_node_id.clone()).await?;
for lq_value in node_archived_lqs {
archived.push(lq_value);
}
@ -380,10 +380,10 @@ impl Datastore {
) -> Result<(), Error> {
for lq in archived {
// Delete the cluster key, used for finding LQ associated with a node
let key = crate::key::node::lq::new(lq.nd, lq.lq, &lq.ns, &lq.db);
let key = crate::key::node::lq::new(lq.nd.0, lq.lq.0, &lq.ns, &lq.db);
tx.del(key).await?;
// Delete the table key, used for finding LQ associated with a table
let key = crate::key::table::lq::new(&lq.ns, &lq.db, &lq.tb, lq.lq);
let key = crate::key::table::lq::new(&lq.ns, &lq.db, &lq.tb, lq.lq.0);
tx.del(key).await?;
}
Ok(())
@ -401,7 +401,9 @@ impl Datastore {
trace!("Found dead hbs: {:?}", dead_heartbeats);
let mut archived: Vec<LqValue> = vec![];
for hb in dead_heartbeats {
let new_archived = self.archive_lv_for_node(tx, &hb.nd, this_node_id).await?;
let new_archived = self
.archive_lv_for_node(tx, &crate::sql::uuid::Uuid::from(hb.nd), this_node_id.clone())
.await?;
tx.del_nd(hb.nd).await?;
trace!("Deleted node {}", hb.nd);
for lq_value in new_archived {
@ -416,14 +418,14 @@ impl Datastore {
&self,
tx: &mut Transaction,
nd: &Uuid,
this_node_id: &Uuid,
this_node_id: Uuid,
) -> Result<Vec<LqValue>, Error> {
let lqs = tx.all_lq(nd).await?;
trace!("Archiving lqs and found {} LQ entries for {}", lqs.len(), nd);
let mut ret = vec![];
for lq in lqs {
let lvs = tx.get_lv(lq.ns.as_str(), lq.db.as_str(), lq.tb.as_str(), &lq.lq).await?;
let archived_lvs = lvs.clone().archive(*this_node_id);
let archived_lvs = lvs.clone().archive(this_node_id.clone());
tx.putc_lv(&lq.ns, &lq.db, &lq.tb, archived_lvs, Some(lvs)).await?;
ret.push(lq);
}
@ -452,7 +454,7 @@ impl Datastore {
pub async fn heartbeat(&self) -> Result<(), Error> {
let mut tx = self.transaction(true, false).await?;
let timestamp = tx.clock();
self.heartbeat_full(&mut tx, timestamp, self.id).await?;
self.heartbeat_full(&mut tx, timestamp, self.id.clone()).await?;
tx.commit().await
}
@ -466,7 +468,7 @@ impl Datastore {
timestamp: Timestamp,
node_id: Uuid,
) -> Result<(), Error> {
tx.set_hb(timestamp, node_id).await
tx.set_hb(timestamp, node_id.0).await
}
// -----
@ -587,7 +589,7 @@ impl Datastore {
) -> Result<Vec<Response>, Error> {
// Create a new query options
let opt = Options::default()
.with_id(self.id)
.with_id(self.id.0)
.with_ns(sess.ns())
.with_db(sess.db())
.with_live(sess.live())
@ -640,7 +642,7 @@ impl Datastore {
) -> Result<Value, Error> {
// Create a new query options
let opt = Options::default()
.with_id(self.id)
.with_id(self.id.0)
.with_ns(sess.ns())
.with_db(sess.db())
.with_live(sess.live())

View file

@ -19,32 +19,32 @@ async fn expired_nodes_are_garbage_collected() {
};
// Set up the first node at an early timestamp
let old_node = uuid::Uuid::new_v4();
let old_node = crate::sql::uuid::Uuid::new_v4();
let old_time = Timestamp {
value: 123,
};
test.bootstrap_at_time(&old_node, old_time.clone()).await.unwrap();
test.bootstrap_at_time(old_node, old_time.clone()).await.unwrap();
// Set up second node at a later timestamp
let new_node = uuid::Uuid::new_v4();
let new_node = crate::sql::uuid::Uuid::new_v4();
let new_time = Timestamp {
value: 456,
};
test.bootstrap_at_time(&new_node, new_time.clone()).await.unwrap();
test.bootstrap_at_time(new_node.clone(), new_time.clone()).await.unwrap();
// Now scan the heartbeats to validate there is only one node left
let mut tx = test.db.transaction(true, false).await.unwrap();
let scanned = tx.scan_hb(&new_time, 100).await.unwrap();
assert_eq!(scanned.len(), 1);
for hb in scanned.iter() {
assert_eq!(&hb.nd, &new_node);
assert_eq!(&hb.nd, &new_node.0);
}
// And scan the nodes to verify its just the latest also
let scanned = tx.scan_cl(100).await.unwrap();
assert_eq!(scanned.len(), 1);
for cl in scanned.iter() {
assert_eq!(&cl.name, &new_node.to_string());
assert_eq!(&cl.name, &new_node.0.to_string());
}
tx.commit().await.unwrap();
@ -59,11 +59,12 @@ async fn expired_nodes_get_live_queries_archived() {
};
// Set up the first node at an early timestamp
let old_node = uuid::Uuid::from_fields(0, 1, 2, &[3, 4, 5, 6, 7, 8, 9, 10]);
let old_node =
crate::sql::uuid::Uuid::from(uuid::Uuid::from_fields(0, 1, 2, &[3, 4, 5, 6, 7, 8, 9, 10]));
let old_time = Timestamp {
value: 123,
};
test.bootstrap_at_time(&old_node, old_time.clone()).await.unwrap();
test.bootstrap_at_time(old_node.clone(), old_time.clone()).await.unwrap();
// Set up live query
let ses = Session::for_kv()
@ -72,12 +73,12 @@ async fn expired_nodes_get_live_queries_archived() {
let table = "my_table";
let lq = LiveStatement {
id: sql::Uuid(uuid::Uuid::new_v4()),
node: Uuid::new_v4(),
node: crate::sql::uuid::Uuid::from(Uuid::new_v4()),
expr: Fields(vec![sql::Field::All], false),
what: Table(sql::Table::from(table)),
cond: None,
fetch: None,
archived: Some(old_node),
archived: Some(crate::sql::uuid::Uuid::from(old_node.0)),
};
let ctx = context::Context::background();
let (sender, _) = channel::unbounded();
@ -86,7 +87,7 @@ async fn expired_nodes_get_live_queries_archived() {
.with_db(ses.db())
.with_auth(Arc::new(Default::default()))
.with_live(true)
.with_id(old_node.clone());
.with_id(old_node.0);
let opt = Options::new_with_sender(&opt, sender);
let tx = Arc::new(Mutex::new(test.db.transaction(true, false).await.unwrap()));
let res = lq.compute(&ctx, &opt, &tx, None).await.unwrap();
@ -99,11 +100,16 @@ async fn expired_nodes_get_live_queries_archived() {
tx.lock().await.commit().await.unwrap();
// Set up second node at a later timestamp
let new_node = uuid::Uuid::from_fields(16, 17, 18, &[19, 20, 21, 22, 23, 24, 25, 26]);
let new_node = crate::sql::uuid::Uuid::from(uuid::Uuid::from_fields(
16,
17,
18,
&[19, 20, 21, 22, 23, 24, 25, 26],
));
let new_time = Timestamp {
value: 456,
}; // TODO These timestsamps are incorrect and should really be derived; Also check timestamp errors
test.bootstrap_at_time(&new_node, new_time.clone()).await.unwrap();
test.bootstrap_at_time(new_node, new_time.clone()).await.unwrap();
// Now validate lq was removed
let mut tx = test.db.transaction(true, false).await.unwrap();

View file

@ -14,11 +14,11 @@ pub struct TestContext {
impl TestContext {
pub(crate) async fn bootstrap_at_time(
&self,
node_id: &Uuid,
node_id: crate::sql::uuid::Uuid,
time: Timestamp,
) -> Result<(), Error> {
let mut tx = self.db.transaction(true, true).await?;
let archived = self.db.register_remove_and_archive(&mut tx, node_id, time).await?;
let archived = self.db.register_remove_and_archive(&mut tx, &node_id, time).await?;
tx.commit().await?;
let mut tx = self.db.transaction(true, true).await?;
self.db.remove_archived(&mut tx, archived).await?;

View file

@ -32,10 +32,10 @@ async fn scan_node_lq() {
let res = tx.scan_lq(&node_id, 100).await.unwrap();
assert_eq!(res.len(), 1);
for val in res {
assert_eq!(val.nd, node_id);
assert_eq!(val.nd.0, node_id.clone());
assert_eq!(val.ns, namespace);
assert_eq!(val.db, database);
assert_eq!(val.lq, live_query_id);
assert_eq!(val.lq.0, live_query_id.clone());
}
tx.commit().await.unwrap();

View file

@ -8,34 +8,35 @@ async fn archive_lv_for_node_archives() {
let namespace = "test_namespace";
let database = "test_database";
let table = "test_table";
let node_id = Uuid::from_bytes([
let node_id = crate::sql::uuid::Uuid::from(Uuid::from_bytes([
0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E,
0x0F,
]);
tx.set_nd(node_id).await.unwrap();
]));
tx.set_nd(node_id.0).await.unwrap();
let lv_id = Uuid::from_bytes([
let lv_id = crate::sql::uuid::Uuid::from(Uuid::from_bytes([
0x10, 0x11, 0x12, 0x13, 0x14, 0x15, 0x16, 0x17, 0x18, 0x19, 0x1A, 0x1B, 0x1C, 0x1D, 0x1E,
0x1F,
]);
]));
let key = crate::key::node::lq::new(node_id, lv_id, namespace, database);
let key = crate::key::node::lq::new(node_id.0.clone(), lv_id.0.clone(), namespace, database);
tx.putc(key, table, None).await.unwrap();
let (_, mut stm) = live(format!("LIVE SELECT * FROM {}", table).as_str()).unwrap();
stm.id = crate::sql::Uuid::from(lv_id);
stm.id = lv_id.clone();
tx.putc_lv(namespace, database, table, stm, None).await.unwrap();
let this_node_id = Uuid::from_bytes([
let this_node_id = crate::sql::uuid::Uuid::from(Uuid::from_bytes([
0x20, 0x21, 0x22, 0x23, 0x24, 0x25, 0x26, 0x27, 0x28, 0x29, 0x2A, 0x2B, 0x2C, 0x2D, 0x2E,
0x2F,
]);
]));
// We commit after setup because otherwise in memory does not have read your own writes
// i.e. setup data is part of same transaction as required implementation checks
tx.commit().await.unwrap();
let mut tx = test.db.transaction(true, false).await.unwrap();
let results = test.db.archive_lv_for_node(&mut tx, &node_id, &this_node_id).await.unwrap();
let results =
test.db.archive_lv_for_node(&mut tx, &node_id, this_node_id.clone()).await.unwrap();
assert_eq!(results.len(), 1);
assert_eq!(results[0].nd, node_id);
assert_eq!(results[0].ns, namespace);

View file

@ -1112,11 +1112,11 @@ impl Transaction {
let lq = crate::key::node::lq::Lq::decode(key.as_slice())?;
let tb: String = String::from_utf8(value).unwrap();
res.push(LqValue {
nd: lq.nd,
nd: crate::sql::uuid::Uuid::from(lq.nd),
ns: lq.ns.to_string(),
db: lq.db.to_string(),
tb,
lq: lq.lq,
lq: crate::sql::uuid::Uuid::from(lq.lq),
});
}
Ok(res)
@ -1506,11 +1506,11 @@ impl Transaction {
Error::Internal(format!("Failed to decode a value while reading LQ: {}", e))
})?;
let lqv = LqValue {
nd: *nd,
nd: crate::sql::uuid::Uuid::from(*nd),
ns: lq_key.ns.to_string(),
db: lq_key.db.to_string(),
tb: lq_value,
lq: lq_key.lq,
lq: crate::sql::uuid::Uuid::from(lq_key.lq),
};
lqs.push(lqv);
}

View file

@ -24,7 +24,7 @@ use std::fmt;
#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize, Store, Hash)]
pub struct LiveStatement {
pub id: Uuid,
pub node: uuid::Uuid,
pub node: Uuid,
pub expr: Fields,
pub what: Value,
pub cond: Option<Cond>,
@ -33,7 +33,7 @@ pub struct LiveStatement {
// Non-query properties that are necessary for storage or otherwise carrying information
// When a live query is archived, this should be the node ID that archived the query.
pub archived: Option<uuid::Uuid>,
pub archived: Option<Uuid>,
}
impl LiveStatement {
@ -62,7 +62,7 @@ impl LiveStatement {
if let Err(e) = opt.id() {
trace!("No ID for live query {:?}, error={:?}", stm, e)
}
stm.node = opt.id()?;
stm.node = Uuid(opt.id()?);
// Insert the node live query
let key = crate::key::node::lq::new(opt.id()?, self.id.0, opt.ns(), opt.db());
run.putc(key, tb.as_str(), None).await?;
@ -80,7 +80,7 @@ impl LiveStatement {
Ok(self.id.clone().into())
}
pub(crate) fn archive(mut self, node_id: uuid::Uuid) -> LiveStatement {
pub(crate) fn archive(mut self, node_id: Uuid) -> LiveStatement {
self.archived = Some(node_id);
self
}
@ -113,7 +113,7 @@ pub fn live(i: &str) -> IResult<&str, LiveStatement> {
i,
LiveStatement {
id: Uuid::new_v4(),
node: uuid::Uuid::new_v4(),
node: Uuid::new_v4(),
expr,
what,
cond,