diff --git a/lib/src/dbs/notification.rs b/lib/src/dbs/notification.rs index dd6d8b92..1fbb4862 100644 --- a/lib/src/dbs/notification.rs +++ b/lib/src/dbs/notification.rs @@ -1,7 +1,6 @@ -use crate::sql::{Object, Value}; -use serde::{ser::SerializeStruct, Deserialize, Serialize}; +use crate::sql::{Object, Uuid, Value}; +use serde::{Deserialize, Serialize}; use std::fmt::{self, Debug, Display}; -use uuid::Uuid; #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] #[serde(rename_all = "UPPERCASE")] @@ -21,7 +20,7 @@ impl Display for Action { } } -#[derive(Clone, Debug, PartialEq, Deserialize)] +#[derive(Clone, Debug, PartialEq, Deserialize, Serialize)] pub struct Notification { pub id: Uuid, pub action: Action, @@ -39,16 +38,3 @@ impl Display for Notification { write!(f, "{}", obj) } } - -impl Serialize for Notification { - fn serialize(&self, serializer: S) -> Result - where - S: serde::Serializer, - { - let mut val = serializer.serialize_struct("Notification", 3)?; - val.serialize_field("id", &self.id.to_string())?; - val.serialize_field("action", &self.action)?; - val.serialize_field("result", &self.result)?; - val.end() - } -} diff --git a/lib/src/dbs/options.rs b/lib/src/dbs/options.rs index 913dac8a..0f4ab5bc 100644 --- a/lib/src/dbs/options.rs +++ b/lib/src/dbs/options.rs @@ -5,7 +5,6 @@ use crate::dbs::Notification; use crate::err::Error; use channel::Sender; use std::sync::Arc; -use uuid::Uuid; /// An Options is passed around when processing a set of query /// statements. An Options contains specific information for how @@ -16,7 +15,7 @@ use uuid::Uuid; #[derive(Clone, Debug)] pub struct Options { /// Current Node ID - id: Option, + id: Option, /// Currently selected NS ns: Option>, /// Currently selected DB @@ -93,7 +92,7 @@ impl Options { /// Set the Node ID for subsequent code which uses /// this `Options`, with support for chaining. - pub fn with_id(mut self, id: Uuid) -> Self { + pub fn with_id(mut self, id: uuid::Uuid) -> Self { self.id = Some(id); self } @@ -329,7 +328,7 @@ impl Options { // -------------------------------------------------- /// Get current Node ID - pub fn id(&self) -> Result { + pub fn id(&self) -> Result { self.id.ok_or(Error::Unreachable) } diff --git a/lib/src/doc/lives.rs b/lib/src/doc/lives.rs index 8fb3c35b..7b0677ba 100644 --- a/lib/src/doc/lives.rs +++ b/lib/src/doc/lives.rs @@ -36,10 +36,10 @@ impl<'a> Document<'a> { // Check what type of data change this is if stm.is_delete() { // Send a DELETE notification - if opt.id()? == lv.node { + if opt.id()? == lv.node.0 { let thing = (*rid).clone(); chn.send(Notification { - id: lv.id.0, + id: lv.id.clone(), action: Action::Delete, result: Value::Thing(thing), }) @@ -49,9 +49,9 @@ impl<'a> Document<'a> { } } else if self.is_new() { // Send a CREATE notification - if opt.id()? == lv.node { + if opt.id()? == lv.node.0 { chn.send(Notification { - id: lv.id.0, + id: lv.id.clone(), action: Action::Create, result: self.pluck(ctx, opt, txn, &lq).await?, }) @@ -61,9 +61,9 @@ impl<'a> Document<'a> { } } else { // Send a UPDATE notification - if opt.id()? == lv.node { + if opt.id()? == lv.node.0 { chn.send(Notification { - id: lv.id.0, + id: lv.id.clone(), action: Action::Update, result: self.pluck(ctx, opt, txn, &lq).await?, }) diff --git a/lib/src/kvs/ds.rs b/lib/src/kvs/ds.rs index 5f7c5853..56ddd71f 100644 --- a/lib/src/kvs/ds.rs +++ b/lib/src/kvs/ds.rs @@ -12,8 +12,8 @@ use crate::dbs::Variables; use crate::err::Error; use crate::key::root::hb::Hb; use crate::sql; -use crate::sql::Query; use crate::sql::Value; +use crate::sql::{Query, Uuid}; use channel::Receiver; use channel::Sender; use futures::lock::Mutex; @@ -22,7 +22,6 @@ use std::sync::Arc; use std::time::Duration; use tracing::instrument; use tracing::trace; -use uuid::Uuid; /// Used for cluster logic to move LQ data to LQ cleanup code /// Not a stored struct; Used only in this module @@ -322,8 +321,8 @@ impl Datastore { node_id: &Uuid, timestamp: &Timestamp, ) -> Result<(), Error> { - tx.set_nd(*node_id).await?; - tx.set_hb(timestamp.clone(), *node_id).await?; + tx.set_nd(node_id.0).await?; + tx.set_hb(timestamp.clone(), node_id.0).await?; Ok(()) } @@ -339,7 +338,7 @@ impl Datastore { for hb in hbs { trace!("Deleting node {}", &hb.nd); tx.del_nd(hb.nd).await?; - nodes.push(hb.nd); + nodes.push(crate::sql::uuid::Uuid::from(hb.nd)); } Ok(nodes) } @@ -364,7 +363,8 @@ impl Datastore { trace!("Found {} LQ entries for {:?}", node_lqs.len(), nd); for lq in node_lqs { trace!("Archiving query {:?}", &lq); - let node_archived_lqs = self.archive_lv_for_node(tx, &lq.nd, this_node_id).await?; + let node_archived_lqs = + self.archive_lv_for_node(tx, &lq.nd, this_node_id.clone()).await?; for lq_value in node_archived_lqs { archived.push(lq_value); } @@ -380,10 +380,10 @@ impl Datastore { ) -> Result<(), Error> { for lq in archived { // Delete the cluster key, used for finding LQ associated with a node - let key = crate::key::node::lq::new(lq.nd, lq.lq, &lq.ns, &lq.db); + let key = crate::key::node::lq::new(lq.nd.0, lq.lq.0, &lq.ns, &lq.db); tx.del(key).await?; // Delete the table key, used for finding LQ associated with a table - let key = crate::key::table::lq::new(&lq.ns, &lq.db, &lq.tb, lq.lq); + let key = crate::key::table::lq::new(&lq.ns, &lq.db, &lq.tb, lq.lq.0); tx.del(key).await?; } Ok(()) @@ -401,7 +401,9 @@ impl Datastore { trace!("Found dead hbs: {:?}", dead_heartbeats); let mut archived: Vec = vec![]; for hb in dead_heartbeats { - let new_archived = self.archive_lv_for_node(tx, &hb.nd, this_node_id).await?; + let new_archived = self + .archive_lv_for_node(tx, &crate::sql::uuid::Uuid::from(hb.nd), this_node_id.clone()) + .await?; tx.del_nd(hb.nd).await?; trace!("Deleted node {}", hb.nd); for lq_value in new_archived { @@ -416,14 +418,14 @@ impl Datastore { &self, tx: &mut Transaction, nd: &Uuid, - this_node_id: &Uuid, + this_node_id: Uuid, ) -> Result, Error> { let lqs = tx.all_lq(nd).await?; trace!("Archiving lqs and found {} LQ entries for {}", lqs.len(), nd); let mut ret = vec![]; for lq in lqs { let lvs = tx.get_lv(lq.ns.as_str(), lq.db.as_str(), lq.tb.as_str(), &lq.lq).await?; - let archived_lvs = lvs.clone().archive(*this_node_id); + let archived_lvs = lvs.clone().archive(this_node_id.clone()); tx.putc_lv(&lq.ns, &lq.db, &lq.tb, archived_lvs, Some(lvs)).await?; ret.push(lq); } @@ -452,7 +454,7 @@ impl Datastore { pub async fn heartbeat(&self) -> Result<(), Error> { let mut tx = self.transaction(true, false).await?; let timestamp = tx.clock(); - self.heartbeat_full(&mut tx, timestamp, self.id).await?; + self.heartbeat_full(&mut tx, timestamp, self.id.clone()).await?; tx.commit().await } @@ -466,7 +468,7 @@ impl Datastore { timestamp: Timestamp, node_id: Uuid, ) -> Result<(), Error> { - tx.set_hb(timestamp, node_id).await + tx.set_hb(timestamp, node_id.0).await } // ----- @@ -587,7 +589,7 @@ impl Datastore { ) -> Result, Error> { // Create a new query options let opt = Options::default() - .with_id(self.id) + .with_id(self.id.0) .with_ns(sess.ns()) .with_db(sess.db()) .with_live(sess.live()) @@ -640,7 +642,7 @@ impl Datastore { ) -> Result { // Create a new query options let opt = Options::default() - .with_id(self.id) + .with_id(self.id.0) .with_ns(sess.ns()) .with_db(sess.db()) .with_live(sess.live()) diff --git a/lib/src/kvs/tests/cluster_init.rs b/lib/src/kvs/tests/cluster_init.rs index 00e2e234..9eefa19d 100644 --- a/lib/src/kvs/tests/cluster_init.rs +++ b/lib/src/kvs/tests/cluster_init.rs @@ -19,32 +19,32 @@ async fn expired_nodes_are_garbage_collected() { }; // Set up the first node at an early timestamp - let old_node = uuid::Uuid::new_v4(); + let old_node = crate::sql::uuid::Uuid::new_v4(); let old_time = Timestamp { value: 123, }; - test.bootstrap_at_time(&old_node, old_time.clone()).await.unwrap(); + test.bootstrap_at_time(old_node, old_time.clone()).await.unwrap(); // Set up second node at a later timestamp - let new_node = uuid::Uuid::new_v4(); + let new_node = crate::sql::uuid::Uuid::new_v4(); let new_time = Timestamp { value: 456, }; - test.bootstrap_at_time(&new_node, new_time.clone()).await.unwrap(); + test.bootstrap_at_time(new_node.clone(), new_time.clone()).await.unwrap(); // Now scan the heartbeats to validate there is only one node left let mut tx = test.db.transaction(true, false).await.unwrap(); let scanned = tx.scan_hb(&new_time, 100).await.unwrap(); assert_eq!(scanned.len(), 1); for hb in scanned.iter() { - assert_eq!(&hb.nd, &new_node); + assert_eq!(&hb.nd, &new_node.0); } // And scan the nodes to verify its just the latest also let scanned = tx.scan_cl(100).await.unwrap(); assert_eq!(scanned.len(), 1); for cl in scanned.iter() { - assert_eq!(&cl.name, &new_node.to_string()); + assert_eq!(&cl.name, &new_node.0.to_string()); } tx.commit().await.unwrap(); @@ -59,11 +59,12 @@ async fn expired_nodes_get_live_queries_archived() { }; // Set up the first node at an early timestamp - let old_node = uuid::Uuid::from_fields(0, 1, 2, &[3, 4, 5, 6, 7, 8, 9, 10]); + let old_node = + crate::sql::uuid::Uuid::from(uuid::Uuid::from_fields(0, 1, 2, &[3, 4, 5, 6, 7, 8, 9, 10])); let old_time = Timestamp { value: 123, }; - test.bootstrap_at_time(&old_node, old_time.clone()).await.unwrap(); + test.bootstrap_at_time(old_node.clone(), old_time.clone()).await.unwrap(); // Set up live query let ses = Session::for_kv() @@ -72,12 +73,12 @@ async fn expired_nodes_get_live_queries_archived() { let table = "my_table"; let lq = LiveStatement { id: sql::Uuid(uuid::Uuid::new_v4()), - node: Uuid::new_v4(), + node: crate::sql::uuid::Uuid::from(Uuid::new_v4()), expr: Fields(vec![sql::Field::All], false), what: Table(sql::Table::from(table)), cond: None, fetch: None, - archived: Some(old_node), + archived: Some(crate::sql::uuid::Uuid::from(old_node.0)), }; let ctx = context::Context::background(); let (sender, _) = channel::unbounded(); @@ -86,7 +87,7 @@ async fn expired_nodes_get_live_queries_archived() { .with_db(ses.db()) .with_auth(Arc::new(Default::default())) .with_live(true) - .with_id(old_node.clone()); + .with_id(old_node.0); let opt = Options::new_with_sender(&opt, sender); let tx = Arc::new(Mutex::new(test.db.transaction(true, false).await.unwrap())); let res = lq.compute(&ctx, &opt, &tx, None).await.unwrap(); @@ -99,11 +100,16 @@ async fn expired_nodes_get_live_queries_archived() { tx.lock().await.commit().await.unwrap(); // Set up second node at a later timestamp - let new_node = uuid::Uuid::from_fields(16, 17, 18, &[19, 20, 21, 22, 23, 24, 25, 26]); + let new_node = crate::sql::uuid::Uuid::from(uuid::Uuid::from_fields( + 16, + 17, + 18, + &[19, 20, 21, 22, 23, 24, 25, 26], + )); let new_time = Timestamp { value: 456, }; // TODO These timestsamps are incorrect and should really be derived; Also check timestamp errors - test.bootstrap_at_time(&new_node, new_time.clone()).await.unwrap(); + test.bootstrap_at_time(new_node, new_time.clone()).await.unwrap(); // Now validate lq was removed let mut tx = test.db.transaction(true, false).await.unwrap(); diff --git a/lib/src/kvs/tests/helper.rs b/lib/src/kvs/tests/helper.rs index ba0769e7..49e3e1d2 100644 --- a/lib/src/kvs/tests/helper.rs +++ b/lib/src/kvs/tests/helper.rs @@ -14,11 +14,11 @@ pub struct TestContext { impl TestContext { pub(crate) async fn bootstrap_at_time( &self, - node_id: &Uuid, + node_id: crate::sql::uuid::Uuid, time: Timestamp, ) -> Result<(), Error> { let mut tx = self.db.transaction(true, true).await?; - let archived = self.db.register_remove_and_archive(&mut tx, node_id, time).await?; + let archived = self.db.register_remove_and_archive(&mut tx, &node_id, time).await?; tx.commit().await?; let mut tx = self.db.transaction(true, true).await?; self.db.remove_archived(&mut tx, archived).await?; diff --git a/lib/src/kvs/tests/lq.rs b/lib/src/kvs/tests/lq.rs index 1f7a3972..1fb87dd2 100644 --- a/lib/src/kvs/tests/lq.rs +++ b/lib/src/kvs/tests/lq.rs @@ -32,10 +32,10 @@ async fn scan_node_lq() { let res = tx.scan_lq(&node_id, 100).await.unwrap(); assert_eq!(res.len(), 1); for val in res { - assert_eq!(val.nd, node_id); + assert_eq!(val.nd.0, node_id.clone()); assert_eq!(val.ns, namespace); assert_eq!(val.db, database); - assert_eq!(val.lq, live_query_id); + assert_eq!(val.lq.0, live_query_id.clone()); } tx.commit().await.unwrap(); diff --git a/lib/src/kvs/tests/nq.rs b/lib/src/kvs/tests/nq.rs index 83a1df3a..f12c2b03 100644 --- a/lib/src/kvs/tests/nq.rs +++ b/lib/src/kvs/tests/nq.rs @@ -8,34 +8,35 @@ async fn archive_lv_for_node_archives() { let namespace = "test_namespace"; let database = "test_database"; let table = "test_table"; - let node_id = Uuid::from_bytes([ + let node_id = crate::sql::uuid::Uuid::from(Uuid::from_bytes([ 0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F, - ]); - tx.set_nd(node_id).await.unwrap(); + ])); + tx.set_nd(node_id.0).await.unwrap(); - let lv_id = Uuid::from_bytes([ + let lv_id = crate::sql::uuid::Uuid::from(Uuid::from_bytes([ 0x10, 0x11, 0x12, 0x13, 0x14, 0x15, 0x16, 0x17, 0x18, 0x19, 0x1A, 0x1B, 0x1C, 0x1D, 0x1E, 0x1F, - ]); + ])); - let key = crate::key::node::lq::new(node_id, lv_id, namespace, database); + let key = crate::key::node::lq::new(node_id.0.clone(), lv_id.0.clone(), namespace, database); tx.putc(key, table, None).await.unwrap(); let (_, mut stm) = live(format!("LIVE SELECT * FROM {}", table).as_str()).unwrap(); - stm.id = crate::sql::Uuid::from(lv_id); + stm.id = lv_id.clone(); tx.putc_lv(namespace, database, table, stm, None).await.unwrap(); - let this_node_id = Uuid::from_bytes([ + let this_node_id = crate::sql::uuid::Uuid::from(Uuid::from_bytes([ 0x20, 0x21, 0x22, 0x23, 0x24, 0x25, 0x26, 0x27, 0x28, 0x29, 0x2A, 0x2B, 0x2C, 0x2D, 0x2E, 0x2F, - ]); + ])); // We commit after setup because otherwise in memory does not have read your own writes // i.e. setup data is part of same transaction as required implementation checks tx.commit().await.unwrap(); let mut tx = test.db.transaction(true, false).await.unwrap(); - let results = test.db.archive_lv_for_node(&mut tx, &node_id, &this_node_id).await.unwrap(); + let results = + test.db.archive_lv_for_node(&mut tx, &node_id, this_node_id.clone()).await.unwrap(); assert_eq!(results.len(), 1); assert_eq!(results[0].nd, node_id); assert_eq!(results[0].ns, namespace); diff --git a/lib/src/kvs/tx.rs b/lib/src/kvs/tx.rs index aa629058..b1af53ed 100644 --- a/lib/src/kvs/tx.rs +++ b/lib/src/kvs/tx.rs @@ -1112,11 +1112,11 @@ impl Transaction { let lq = crate::key::node::lq::Lq::decode(key.as_slice())?; let tb: String = String::from_utf8(value).unwrap(); res.push(LqValue { - nd: lq.nd, + nd: crate::sql::uuid::Uuid::from(lq.nd), ns: lq.ns.to_string(), db: lq.db.to_string(), tb, - lq: lq.lq, + lq: crate::sql::uuid::Uuid::from(lq.lq), }); } Ok(res) @@ -1506,11 +1506,11 @@ impl Transaction { Error::Internal(format!("Failed to decode a value while reading LQ: {}", e)) })?; let lqv = LqValue { - nd: *nd, + nd: crate::sql::uuid::Uuid::from(*nd), ns: lq_key.ns.to_string(), db: lq_key.db.to_string(), tb: lq_value, - lq: lq_key.lq, + lq: crate::sql::uuid::Uuid::from(lq_key.lq), }; lqs.push(lqv); } diff --git a/lib/src/sql/statements/live.rs b/lib/src/sql/statements/live.rs index d43cf74d..5fdfa08a 100644 --- a/lib/src/sql/statements/live.rs +++ b/lib/src/sql/statements/live.rs @@ -24,7 +24,7 @@ use std::fmt; #[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize, Store, Hash)] pub struct LiveStatement { pub id: Uuid, - pub node: uuid::Uuid, + pub node: Uuid, pub expr: Fields, pub what: Value, pub cond: Option, @@ -33,7 +33,7 @@ pub struct LiveStatement { // Non-query properties that are necessary for storage or otherwise carrying information // When a live query is archived, this should be the node ID that archived the query. - pub archived: Option, + pub archived: Option, } impl LiveStatement { @@ -62,7 +62,7 @@ impl LiveStatement { if let Err(e) = opt.id() { trace!("No ID for live query {:?}, error={:?}", stm, e) } - stm.node = opt.id()?; + stm.node = Uuid(opt.id()?); // Insert the node live query let key = crate::key::node::lq::new(opt.id()?, self.id.0, opt.ns(), opt.db()); run.putc(key, tb.as_str(), None).await?; @@ -80,7 +80,7 @@ impl LiveStatement { Ok(self.id.clone().into()) } - pub(crate) fn archive(mut self, node_id: uuid::Uuid) -> LiveStatement { + pub(crate) fn archive(mut self, node_id: Uuid) -> LiveStatement { self.archived = Some(node_id); self } @@ -113,7 +113,7 @@ pub fn live(i: &str) -> IResult<&str, LiveStatement> { i, LiveStatement { id: Uuid::new_v4(), - node: uuid::Uuid::new_v4(), + node: Uuid::new_v4(), expr, what, cond,