diff --git a/core/src/cf/gc.rs b/core/src/cf/gc.rs index 0fc0eef6..b36fe9f7 100644 --- a/core/src/cf/gc.rs +++ b/core/src/cf/gc.rs @@ -1,5 +1,7 @@ use crate::err::Error; use crate::key::change; +#[cfg(debug_assertions)] +use crate::key::debug::sprint_key; use crate::kvs::Transaction; use crate::vs; use crate::vs::Versionstamp; @@ -27,10 +29,19 @@ pub async fn gc_ns( let dbs = tx.all_db(ns).await?; let dbs = dbs.as_ref(); for db in dbs { + // We get the expiration of the change feed defined on the database let db_cf_expiry = match &db.changefeed { None => 0, Some(cf) => cf.expiry.as_secs(), }; + #[cfg(debug_assertions)] + trace!( + "Performing garbage collection on ns {} db {} for ts {}. The cf expiration is {}", + ns, + db.name, + ts, + db_cf_expiry + ); let tbs = tx.all_tb(ns, db.name.as_str()).await?; let tbs = tbs.as_ref(); let max_tb_cf_expiry = tbs.iter().fold(0, |acc, tb| match &tb.changefeed { @@ -47,7 +58,10 @@ pub async fn gc_ns( if ts < cf_expiry { continue; } + // We only want to retain the expiry window, so we are going to delete everything before let watermark_ts = ts - cf_expiry; + #[cfg(debug_assertions)] + trace!("The watermark is {} after removing {cf_expiry} from {ts}", watermark_ts); let watermark_vs = tx.get_versionstamp_from_timestamp(watermark_ts, ns, db.name.as_str(), true).await?; if let Some(watermark_vs) = watermark_vs { @@ -67,6 +81,15 @@ pub async fn gc_db( ) -> Result<(), Error> { let beg: Vec = change::prefix_ts(ns, db, vs::u64_to_versionstamp(0)); let end = change::prefix_ts(ns, db, watermark); + #[cfg(debug_assertions)] + trace!( + "DB GC: ns: {}, db: {}, watermark: {:?}, prefix: {}, end: {}", + ns, + db, + watermark, + sprint_key(&beg), + sprint_key(&end) + ); let limit = limit.unwrap_or(100); diff --git a/core/src/cf/mutations.rs b/core/src/cf/mutations.rs index 897929b1..6505b135 100644 --- a/core/src/cf/mutations.rs +++ b/core/src/cf/mutations.rs @@ -3,7 +3,7 @@ use crate::sql::object::Object; use crate::sql::statements::DefineTableStatement; use crate::sql::thing::Thing; use crate::sql::value::Value; -use crate::vs::to_u128_be; +use crate::vs::versionstamp_to_u64; use derive::Store; use revision::revisioned; use serde::{Deserialize, Serialize}; @@ -104,7 +104,7 @@ impl DatabaseMutation { impl ChangeSet { pub fn into_value(self) -> Value { let mut m = BTreeMap::::new(); - let vs = to_u128_be(self.0); + let vs = versionstamp_to_u64(&self.0); m.insert("versionstamp".to_string(), Value::from(vs)); m.insert("changes".to_string(), self.1.into_value()); let so: Object = m.into(); @@ -174,7 +174,7 @@ mod tests { use super::*; use std::collections::HashMap; let cs = ChangeSet( - [0, 0, 0, 0, 0, 0, 0, 0, 0, 1], + [0, 0, 0, 0, 0, 0, 0, 1, 0, 0], DatabaseMutation(vec![TableMutations( "mytb".to_string(), vec![ @@ -209,7 +209,7 @@ mod tests { use super::*; use std::collections::HashMap; let cs = ChangeSet( - [0, 0, 0, 0, 0, 0, 0, 0, 0, 1], + [0, 0, 0, 0, 0, 0, 0, 1, 0, 0], DatabaseMutation(vec![TableMutations( "mytb".to_string(), vec![ diff --git a/core/src/cf/reader.rs b/core/src/cf/reader.rs index 952fefeb..c96f698e 100644 --- a/core/src/cf/reader.rs +++ b/core/src/cf/reader.rs @@ -1,6 +1,8 @@ use crate::cf::{ChangeSet, DatabaseMutation, TableMutations}; use crate::err::Error; use crate::key::change; +#[cfg(debug_assertions)] +use crate::key::debug::sprint_key; use crate::kvs::{Limit, ScanPage, Transaction}; use crate::sql::statements::show::ShowSince; use crate::vs; @@ -56,7 +58,8 @@ pub async fn read( let mut r = Vec::::new(); // iterate over _x and put decoded elements to r for (k, v) in scan.values { - trace!("read change feed; {k:?}"); + #[cfg(debug_assertions)] + trace!("read change feed; {}", sprint_key(&k)); let dec = crate::key::change::Cf::decode(&k).unwrap(); diff --git a/core/src/kvs/ds.rs b/core/src/kvs/ds.rs index de6a85e9..8e3f1e19 100644 --- a/core/src/kvs/ds.rs +++ b/core/src/kvs/ds.rs @@ -835,6 +835,7 @@ impl Datastore { // It is handy for testing, because it allows you to specify the timestamp, // without depending on a system clock. pub async fn tick_at(&self, ts: u64) -> Result<(), Error> { + trace!("Ticking at timestamp {}", ts); let _vs = self.save_timestamp_for_versionstamp(ts).await?; self.garbage_collect_stale_change_feeds(ts).await?; // TODO Add LQ GC @@ -843,6 +844,7 @@ impl Datastore { } // save_timestamp_for_versionstamp saves the current timestamp for the each database's current versionstamp. + // Note: the returned VS is flawed, as there are multiple {ts: vs} mappings per (ns, db) pub(crate) async fn save_timestamp_for_versionstamp( &self, ts: u64, @@ -902,7 +904,7 @@ impl Datastore { trace!( "There were no changes in the change feed for {:?} from versionstamp {:?}", selector, - vs + conv::versionstamp_to_u64(vs) ) } if let Some(change_set) = res.last() { @@ -1145,6 +1147,7 @@ impl Datastore { let dbs = dbs.as_ref(); for db in dbs { let db = db.name.as_str(); + // TODO(SUR-341): This is incorrect, it's a [ns,db] to vs pair vs = Some(tx.set_timestamp_for_versionstamp(ts, ns, db, true).await?); } } diff --git a/core/src/kvs/mem/mod.rs b/core/src/kvs/mem/mod.rs index 57a52b72..ae7483d8 100644 --- a/core/src/kvs/mem/mod.rs +++ b/core/src/kvs/mem/mod.rs @@ -1,10 +1,12 @@ #![cfg(feature = "kv-mem")] use crate::err::Error; +#[cfg(debug_assertions)] +use crate::key::debug::sprint_key; use crate::kvs::Check; use crate::kvs::Key; use crate::kvs::Val; -use crate::vs::{try_to_u64_be, u64_to_versionstamp, Versionstamp}; +use crate::vs::{u64_to_versionstamp, versionstamp_to_u64, Versionstamp}; use std::ops::Range; pub struct Datastore { @@ -168,11 +170,19 @@ impl Transaction { Some(prev) => { let slice = prev.as_slice(); let res: Result<[u8; 10], Error> = match slice.try_into() { - Ok(ba) => Ok(ba), + Ok(ba) => { + #[cfg(debug_assertions)] + trace!( + "Previous timestamp for key {} is {}", + sprint_key(&k), + sprint_key(&ba) + ); + Ok(ba) + } Err(e) => Err(Error::Ds(e.to_string())), }; let array = res?; - let prev = try_to_u64_be(array)?; + let prev = versionstamp_to_u64(&array); prev + 1 } None => 1, @@ -204,15 +214,22 @@ impl Transaction { } let ts_key: Key = ts_key.into(); + #[cfg(debug_assertions)] + let dbg_ts = sprint_key(&ts_key); let prefix: Key = prefix.into(); - let suffix: Key = suffix.into(); + #[cfg(debug_assertions)] + let dbg_prefix = sprint_key(&prefix); + let mut suffix: Key = suffix.into(); + #[cfg(debug_assertions)] + let dbg_suffix = sprint_key(&suffix); - let ts = self.get_timestamp(ts_key.clone())?; - let mut k: Vec = prefix.clone(); + let ts = self.get_timestamp(ts_key)?; + let mut k: Vec = prefix; k.append(&mut ts.to_vec()); - k.append(&mut suffix.clone()); + k.append(&mut suffix); - trace!("get_versionstamped_key; {ts_key:?} {prefix:?} {ts:?} {suffix:?} {k:?}",); + #[cfg(debug_assertions)] + trace!("get_versionstamped_key; prefix={dbg_prefix} ts={dbg_ts} suff={dbg_suffix}"); Ok(k) } diff --git a/core/src/kvs/tests/tx_test.rs b/core/src/kvs/tests/tx_test.rs index e04bd082..6822a52c 100644 --- a/core/src/kvs/tests/tx_test.rs +++ b/core/src/kvs/tests/tx_test.rs @@ -1,4 +1,7 @@ +use crate::key::debug::sprint_key; +use crate::key::error::KeyCategory; use crate::kvs::lq_structs::{KillEntry, LqEntry, TrackedResult}; +use crate::sql::Strand; #[tokio::test] #[serial] @@ -58,3 +61,82 @@ async fn kill_queries_sent_to_tx_are_received() { assert_eq!(live_queries.len(), 1); assert_eq!(live_queries[0], TrackedResult::KillQuery(kill_entry)); } + +#[tokio::test] +#[serial] +async fn delr_range_correct() { + let node_id = uuid::uuid!("d0f1a200-e24e-44fe-98c1-2271a5781da7"); + let clock = Arc::new(SizedClock::Fake(FakeClock::new(Timestamp::default()))); + let test = init(node_id, clock).await.unwrap(); + + // Create some data + let mut tx = test.db.transaction(Write, Optimistic).await.unwrap(); + tx.putc(b"hugh\x00\x10", Value::Strand(Strand::from("0010")), None).await.unwrap(); + tx.put(KeyCategory::ChangeFeed, b"hugh\x00\x10\x10", Value::Strand(Strand::from("001010"))) + .await + .unwrap(); + tx.putc(b"hugh\x00\x20", Value::Strand(Strand::from("0020")), None).await.unwrap(); + tx.commit().await.unwrap(); + + // Check we have all data + let mut tx = test.db.transaction(Write, Optimistic).await.unwrap(); + let vals = tx.scan(b"hugh\x00".to_vec()..b"hugh\xff".to_vec(), 100).await.unwrap(); + assert_eq!(vals.len(), 3); + tx.cancel().await.unwrap(); + + // Delete first range, inclusive of next key, without deleting next key + let mut tx = test.db.transaction(Write, Optimistic).await.unwrap(); + tx.delr(b"hugh\x00".to_vec()..b"hugh\x00\x10\x10".to_vec(), 100).await.unwrap(); + tx.commit().await.unwrap(); + + // Scan results + let mut tx = test.db.transaction(Write, Optimistic).await.unwrap(); + let vals = tx.scan(b"hugh\x00"..b"hugh\xff", 100).await.unwrap(); + assert_eq!(vals.len(), 2); + tx.cancel().await.unwrap(); + + // Delete second range, beyond next key but beyond limit + let mut tx = test.db.transaction(Write, Optimistic).await.unwrap(); + tx.delr(b"hugh\x00\x20".to_vec()..b"hugh\xff".to_vec(), 1).await.unwrap(); + tx.commit().await.unwrap(); + + // Scan results + let mut tx = test.db.transaction(Write, Optimistic).await.unwrap(); + let vals = tx.scan(b"hugh\x00"..b"hugh\xff", 100).await.unwrap(); + assert_eq!(vals.len(), 1); + tx.cancel().await.unwrap(); +} + +#[tokio::test] +#[serial] +async fn set_versionstamp_is_incremental() { + let node_id = uuid::uuid!("3988b179-6212-4a45-a496-4d9ee4cbd639"); + let clock = Arc::new(SizedClock::Fake(FakeClock::new(Timestamp::default()))); + let test = init(node_id, clock).await.unwrap(); + + // Create the first timestamped key + let mut tx = test.db.transaction(Write, Optimistic).await.unwrap(); + tx.set_versionstamped_key(b"ts_key", b"prefix", b"suffix", Value::from("'value'")) + .await + .unwrap(); + tx.commit().await.unwrap(); + + // Create the second timestamped key + let mut tx = test.db.transaction(Write, Optimistic).await.unwrap(); + tx.set_versionstamped_key(b"ts_key", b"prefix", b"suffix", Value::from("'value'")) + .await + .unwrap(); + tx.commit().await.unwrap(); + + // Scan the keys and validate versionstamps match expected + let mut tx = test.db.transaction(Write, Optimistic).await.unwrap(); + let found = tx.scan(b"prefix".to_vec()..b"prefix\xff".to_vec(), 1000).await.unwrap(); + tx.cancel().await.unwrap(); + assert_eq!(found.len(), 2); + let expected_keys = [ + b"prefix\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00suffix", + b"prefix\x00\x00\x00\x00\x00\x00\x00\x02\x00\x00suffix", + ]; + assert_eq!(found[0].0, expected_keys[0], "key was {}", sprint_key(&found[0].0)); + assert_eq!(found[1].0, expected_keys[1], "key was {}", sprint_key(&found[1].0)); +} diff --git a/core/src/kvs/tx.rs b/core/src/kvs/tx.rs index 16c46b5e..b81a7963 100644 --- a/core/src/kvs/tx.rs +++ b/core/src/kvs/tx.rs @@ -29,6 +29,7 @@ use crate::dbs::node::ClusterMembership; use crate::dbs::node::Timestamp; use crate::err::Error; use crate::idg::u32::U32; +#[cfg(debug_assertions)] use crate::key::debug::sprint_key; use crate::key::error::KeyCategory; use crate::key::key_req::KeyRequirements; @@ -45,8 +46,8 @@ use crate::sql::paths::OUT; use crate::sql::thing::Thing; use crate::sql::Strand; use crate::sql::Value; -use crate::vs::Oracle; use crate::vs::Versionstamp; +use crate::vs::{conv, Oracle}; use super::kv::Add; use super::kv::Convert; @@ -363,7 +364,7 @@ impl Transaction { { let key = key.into(); #[cfg(debug_assertions)] - trace!("Del {:?}", sprint_key(&key)); + trace!("Del {}", sprint_key(&key)); match self { #[cfg(feature = "kv-mem")] Transaction { @@ -412,7 +413,7 @@ impl Transaction { K: Into + Debug + AsRef<[u8]>, { #[cfg(debug_assertions)] - trace!("Exi {:?}", sprint_key(&key)); + trace!("Exi {}", sprint_key(&key)); match self { #[cfg(feature = "kv-mem")] Transaction { @@ -462,7 +463,7 @@ impl Transaction { { let key = key.into(); #[cfg(debug_assertions)] - trace!("Get {:?}", sprint_key(&key)); + trace!("Get {}", sprint_key(&key)); match self { #[cfg(feature = "kv-mem")] Transaction { @@ -513,7 +514,7 @@ impl Transaction { { let key = key.into(); #[cfg(debug_assertions)] - trace!("Set {:?} => {:?}", sprint_key(&key), val); + trace!("Set {} => {:?}", sprint_key(&key), val); match self { #[cfg(feature = "kv-mem")] Transaction { @@ -568,7 +569,7 @@ impl Transaction { // We convert to byte slice as its easier at this level let key = key.into(); #[cfg(debug_assertions)] - trace!("Get Timestamp {:?}", sprint_key(&key)); + trace!("Get Timestamp {}", sprint_key(&key)); match self { #[cfg(feature = "kv-mem")] Transaction { @@ -646,6 +647,16 @@ impl Transaction { K: Into + Debug, V: Into + Debug, { + let ts_key = ts_key.into(); + let prefix = prefix.into(); + let suffix = suffix.into(); + #[cfg(debug_assertions)] + trace!( + "Set Versionstamped Key ts={} prefix={} suffix={}", + sprint_key(&prefix), + sprint_key(&ts_key), + sprint_key(&suffix) + ); match self { #[cfg(feature = "kv-mem")] Transaction { @@ -766,7 +777,7 @@ impl Transaction { end: rng.end.into(), }; #[cfg(debug_assertions)] - trace!("Scan {:?} - {:?}", sprint_key(&rng.start), sprint_key(&rng.end)); + trace!("Scan {} - {}", sprint_key(&rng.start), sprint_key(&rng.end)); match self { #[cfg(feature = "kv-mem")] Transaction { @@ -821,7 +832,7 @@ impl Transaction { K: Into + From> + AsRef<[u8]> + Debug + Clone, { #[cfg(debug_assertions)] - trace!("Scan {:?} - {:?}", sprint_key(&page.range.start), sprint_key(&page.range.end)); + trace!("Scan paged {} - {}", sprint_key(&page.range.start), sprint_key(&page.range.end)); let range = page.range.clone(); let res = match self { #[cfg(feature = "kv-mem")] @@ -895,7 +906,7 @@ impl Transaction { { let key = key.into(); #[cfg(debug_assertions)] - trace!("Putc {:?} if {:?} => {:?}", sprint_key(&key), chk, val); + trace!("Putc {} if {:?} => {:?}", sprint_key(&key), chk, val); match self { #[cfg(feature = "kv-mem")] Transaction { @@ -946,7 +957,7 @@ impl Transaction { { let key = key.into(); #[cfg(debug_assertions)] - trace!("Delc {:?} if {:?}", sprint_key(&key), chk); + trace!("Delc {} if {:?}", sprint_key(&key), chk); match self { #[cfg(feature = "kv-mem")] Transaction { @@ -1002,7 +1013,7 @@ impl Transaction { let beg: Key = rng.start.into(); let end: Key = rng.end.into(); #[cfg(debug_assertions)] - trace!("Getr {:?}..{:?} (limit: {limit})", sprint_key(&beg), sprint_key(&end)); + trace!("Getr {}..{} (limit: {limit})", sprint_key(&beg), sprint_key(&end)); let mut out: Vec<(Key, Val)> = vec![]; let mut next_page = Some(ScanPage { range: beg..end, @@ -1038,7 +1049,7 @@ impl Transaction { end: rng.end.into(), }; #[cfg(debug_assertions)] - trace!("Delr {:?}..{:?} (limit: {limit})", sprint_key(&rng.start), sprint_key(&rng.end)); + trace!("Delr {}..{} (limit: {limit})", sprint_key(&rng.start), sprint_key(&rng.end)); match self { #[cfg(feature = "kv-tikv")] Transaction { @@ -1076,11 +1087,14 @@ impl Transaction { let res = res.values; // Exit when settled if res.is_empty() { + trace!("Delr page was empty"); break; } // Loop over results for (k, _) in res.into_iter() { // Delete + #[cfg(debug_assertions)] + trace!("Delr key {}", sprint_key(&k)); self.del(k).await?; } } @@ -1096,7 +1110,7 @@ impl Transaction { let beg: Key = key.into(); let end: Key = beg.clone().add(0xff); #[cfg(debug_assertions)] - trace!("Getp {:?}-{:?} (limit: {limit})", sprint_key(&beg), sprint_key(&end)); + trace!("Getp {}-{} (limit: {limit})", sprint_key(&beg), sprint_key(&end)); let mut out: Vec<(Key, Val)> = vec![]; // Start processing let mut next_page = Some(ScanPage { @@ -1130,7 +1144,7 @@ impl Transaction { let beg: Key = key.into(); let end: Key = beg.clone().add(0xff); #[cfg(debug_assertions)] - trace!("Delp {:?}-{:?} (limit: {limit})", sprint_key(&beg), sprint_key(&end)); + trace!("Delp {}-{} (limit: {limit})", sprint_key(&beg), sprint_key(&end)); let min = beg.clone(); let max = end.clone(); self.delr(min..max, limit).await?; @@ -2022,7 +2036,7 @@ impl Transaction { ) -> Result { let key = crate::key::table::lq::new(ns, db, tb, *lv); let key_enc = crate::key::table::lq::Lq::encode(&key)?; - trace!("Getting lv ({:?}) {:?}", lv, sprint_key(&key_enc)); + trace!("Getting lv ({:?}) {}", lv, sprint_key(&key_enc)); let val = self.get(key_enc).await?.ok_or(Error::LvNotFound { value: lv.to_string(), })?; @@ -2040,7 +2054,7 @@ impl Transaction { ) -> Result { let key = crate::key::table::ev::new(ns, db, tb, ev); let key_enc = crate::key::table::ev::Ev::encode(&key)?; - trace!("Getting ev ({:?}) {:?}", ev, sprint_key(&key_enc)); + trace!("Getting ev ({:?}) {}", ev, sprint_key(&key_enc)); let val = self.get(key_enc).await?.ok_or(Error::EvNotFound { value: ev.to_string(), })?; @@ -2058,7 +2072,7 @@ impl Transaction { ) -> Result { let key = crate::key::table::fd::new(ns, db, tb, fd); let key_enc = crate::key::table::fd::Fd::encode(&key)?; - trace!("Getting fd ({:?}) {:?}", fd, sprint_key(&key_enc)); + trace!("Getting fd ({:?}) {}", fd, sprint_key(&key_enc)); let val = self.get(key_enc).await?.ok_or(Error::FdNotFound { value: fd.to_string(), })?; @@ -2076,7 +2090,7 @@ impl Transaction { ) -> Result { let key = crate::key::table::ix::new(ns, db, tb, ix); let key_enc = crate::key::table::ix::Ix::encode(&key)?; - trace!("Getting ix ({:?}) {:?}", ix, sprint_key(&key_enc)); + trace!("Getting ix ({:?}) {}", ix, sprint_key(&key_enc)); let val = self.get(key_enc).await?.ok_or(Error::IxNotFound { value: ix.to_string(), })?; @@ -2887,6 +2901,14 @@ impl Transaction { // This also works as an advisory lock on the ts keys so that there is // on other concurrent transactions that can write to the ts_key or the keys after it. let vs = self.get_timestamp(crate::key::database::vs::new(ns, db), lock).await?; + #[cfg(debug_assertions)] + trace!( + "Setting timestamp {} for versionstamp {:?} in ns: {}, db: {}", + ts, + conv::versionstamp_to_u64(&vs), + ns, + db + ); // Ensure there are no keys after the ts_key // Otherwise we can go back in time! @@ -2896,6 +2918,13 @@ impl Transaction { let ts_pairs: Vec<(Vec, Vec)> = self.getr(begin..end, u32::MAX).await?; let latest_ts_pair = ts_pairs.last(); if let Some((k, _)) = latest_ts_pair { + trace!( + "There already was a greater committed timestamp {} in ns: {}, db: {} found: {}", + ts, + ns, + db, + sprint_key(k) + ); let k = crate::key::database::ts::Ts::decode(k)?; let latest_ts = k.ts; if latest_ts >= ts { @@ -2979,6 +3008,23 @@ impl Transaction { _ => unreachable!(), } } + + #[cfg(debug_assertions)] + #[allow(unused)] + #[doc(hidden)] + pub async fn print_all(&mut self) { + let mut next_page = + Some(ScanPage::from(crate::key::root::ns::prefix()..b"\xff\xff\xff".to_vec())); + println!("Start print all"); + while next_page.is_some() { + let res = self.scan_paged(next_page.unwrap(), 1000).await.unwrap(); + for (k, _) in res.values { + println!("{}", sprint_key(&k)); + } + next_page = res.next_page; + } + println!("End print all"); + } } #[cfg(test)] diff --git a/core/src/sql/v1/statements/show.rs b/core/src/sql/v1/statements/show.rs index f2309e76..7020dc1a 100644 --- a/core/src/sql/v1/statements/show.rs +++ b/core/src/sql/v1/statements/show.rs @@ -70,8 +70,8 @@ impl ShowStatement { .await?; // Return the changes let mut a = Vec::::new(); - for r in r.iter() { - let v: Value = r.clone().into_value(); + for r in r.into_iter() { + let v: Value = r.into_value(); a.push(v); } let v: Value = Value::Array(crate::sql::array::Array(a)); diff --git a/core/src/vs/conv.rs b/core/src/vs/conv.rs index fab9b112..43b7e2ef 100644 --- a/core/src/vs/conv.rs +++ b/core/src/vs/conv.rs @@ -75,11 +75,14 @@ pub fn try_u128_to_versionstamp(v: u128) -> Result<[u8; 10], Error> { Ok(buf) } +/// Take the most significant, time-based bytes and ignores the last 2 bytes +#[doc(hidden)] pub fn versionstamp_to_u64(vs: &Versionstamp) -> u64 { u64::from_be_bytes(vs[..8].try_into().unwrap()) } // to_u128_be converts a 10-byte versionstamp to a u128 assuming big-endian. // This is handy for human comparing versionstamps. +// This is not the same as timestamp u64 representation as the tailing bytes are included #[allow(unused)] pub fn to_u128_be(vs: [u8; 10]) -> u128 { let mut buf = [0; 16]; @@ -135,7 +138,10 @@ pub fn to_u128_le(vs: [u8; 10]) -> u128 { u128::from_be_bytes(buf) } +#[cfg(test)] mod tests { + use crate::vs::{u64_to_versionstamp, versionstamp_to_u64}; + #[test] fn try_to_u64_be() { use super::*; @@ -161,4 +167,20 @@ mod tests { let res = try_u128_to_versionstamp(v).unwrap(); assert_eq!(res, [255, 255, 255, 255, 255, 255, 255, 255, 255, 255]); } + + #[test] + fn can_add_u64_conversion() { + let start = 5u64; + let vs = u64_to_versionstamp(start); + // The last 2 bytes are empty + assert_eq!("00000000000000050000", hex::encode(vs)); + let mid = versionstamp_to_u64(&vs); + assert_eq!(start, mid); + let mid = mid + 1; + let vs = u64_to_versionstamp(mid); + // The last 2 bytes are empty + assert_eq!("00000000000000060000", hex::encode(vs)); + let end = versionstamp_to_u64(&vs); + assert_eq!(end, 6); + } } diff --git a/core/src/vs/oracle.rs b/core/src/vs/oracle.rs index 3f564dfb..89d972a3 100644 --- a/core/src/vs/oracle.rs +++ b/core/src/vs/oracle.rs @@ -153,10 +153,9 @@ fn secs_since_unix_epoch() -> u64 { } } +#[cfg(test)] mod tests { - #[allow(unused)] use super::*; - #[allow(unused)] use crate::vs::to_u128_be; #[test] diff --git a/lib/tests/bootstrap.rs b/lib/tests/bootstrap.rs index e98e4b3d..74e421f0 100644 --- a/lib/tests/bootstrap.rs +++ b/lib/tests/bootstrap.rs @@ -49,7 +49,7 @@ async fn bootstrap_removes_unreachable_nodes() -> Result<(), Error> { dbs.bootstrap().await.unwrap(); // Declare a function that will assert - async fn try_validate(mut tx: &mut Transaction, bad_node: &uuid::Uuid) -> Result<(), String> { + async fn try_validate(tx: &mut Transaction, bad_node: &uuid::Uuid) -> Result<(), String> { let res = tx.scan_nd(1000).await.map_err(|e| e.to_string())?; tx.commit().await.map_err(|e| e.to_string())?; for node in &res { diff --git a/lib/tests/changefeeds.rs b/lib/tests/changefeeds.rs index e0ad292c..cbbbda79 100644 --- a/lib/tests/changefeeds.rs +++ b/lib/tests/changefeeds.rs @@ -6,19 +6,27 @@ use surrealdb::dbs::Session; use surrealdb::err::Error; use surrealdb::sql::Value; use surrealdb_core::fflags::FFLAGS; +use surrealdb_core::kvs::Datastore; +use surrealdb_core::kvs::LockType::Optimistic; +use surrealdb_core::kvs::TransactionType::Write; mod helpers; mod parse; -#[tokio::test] +#[test_log::test(tokio::test)] async fn database_change_feeds() -> Result<(), Error> { - let sql = " - DEFINE DATABASE test CHANGEFEED 1h; + // This is a unique shared identifier + let identifier = "alpaca"; + let ns = format!("namespace_{identifier}"); + let db = format!("database_{identifier}"); + let sql = format!( + " + DEFINE DATABASE {db} CHANGEFEED 1h; DEFINE TABLE person; DEFINE FIELD name ON TABLE person ASSERT IF $input THEN - $input = /^[A-Z]{1}[a-z]+$/ + $input = /^[A-Z]{{1}}[a-z]+$/ ELSE true END @@ -29,18 +37,22 @@ async fn database_change_feeds() -> Result<(), Error> { $value END ; + " + ); + let sql2 = " UPDATE person:test CONTENT { name: 'Tobie' }; DELETE person:test; SHOW CHANGES FOR TABLE person SINCE 0; "; let dbs = new_ds().await?; - let ses = Session::owner().with_ns("test").with_db("test"); - let start_ts = 0u64; - let end_ts = start_ts + 1; - dbs.tick_at(start_ts).await?; - let res = &mut dbs.execute(sql, &ses, None).await?; - dbs.tick_at(end_ts).await?; - assert_eq!(res.len(), 6); + let ses = Session::owner().with_ns(ns.as_str()).with_db(db.as_str()); + let mut current_time = 0u64; + dbs.tick_at(current_time).await?; + let res = &mut dbs.execute(sql.as_str(), &ses, None).await?; + // Increment by a second (sic) + current_time += 1; + dbs.tick_at(current_time).await?; + assert_eq!(res.len(), 3); // DEFINE DATABASE let tmp = res.remove(0).result; assert!(tmp.is_ok()); @@ -50,28 +62,12 @@ async fn database_change_feeds() -> Result<(), Error> { // DEFINE FIELD let tmp = res.remove(0).result; assert!(tmp.is_ok()); - // UPDATE CONTENT - let tmp = res.remove(0).result?; - let val = Value::parse( - "[ - { - id: person:test, - name: 'Name: Tobie', - } - ]", - ); - assert_eq!(tmp, val); - // DELETE - let tmp = res.remove(0).result?; - let val = Value::parse("[]"); - assert_eq!(tmp, val); - // SHOW CHANGES - let tmp = res.remove(0).result?; - let val = match FFLAGS.change_feed_live_queries.enabled() { + + let cf_val_arr = match FFLAGS.change_feed_live_queries.enabled() { true => Value::parse( "[ { - versionstamp: 65536, + versionstamp: 2, changes: [ { create: { @@ -82,7 +78,7 @@ async fn database_change_feeds() -> Result<(), Error> { ] }, { - versionstamp: 131072, + versionstamp: 3, changes: [ { delete: { @@ -96,7 +92,7 @@ async fn database_change_feeds() -> Result<(), Error> { false => Value::parse( "[ { - versionstamp: 65536, + versionstamp: 2, changes: [ { update: { @@ -107,7 +103,7 @@ async fn database_change_feeds() -> Result<(), Error> { ] }, { - versionstamp: 131072, + versionstamp: 3, changes: [ { delete: { @@ -119,17 +115,81 @@ async fn database_change_feeds() -> Result<(), Error> { ]", ), }; - assert_eq!(tmp, val); + + // Declare check that is repeatable + async fn check_test( + dbs: &Datastore, + sql2: &str, + ses: &Session, + cf_val_arr: &Value, + ) -> Result<(), String> { + let res = &mut dbs.execute(sql2, ses, None).await?; + assert_eq!(res.len(), 3); + // UPDATE CONTENT + let tmp = res.remove(0).result?; + let val = Value::parse( + "[ + { + id: person:test, + name: 'Name: Tobie', + } + ]", + ); + Some(&tmp) + .filter(|x| *x == &val) + .map(|v| ()) + .ok_or(format!("Expected UPDATE value:\nleft: {}\nright: {}", tmp, val))?; + // DELETE + let tmp = res.remove(0).result?; + let val = Value::parse("[]"); + Some(&tmp) + .filter(|x| *x == &val) + .map(|v| ()) + .ok_or(format!("Expected DELETE value:\nleft: {}\nright: {}", tmp, val))?; + // SHOW CHANGES + let tmp = res.remove(0).result?; + Some(&tmp) + .filter(|x| *x == cf_val_arr) + .map(|v| ()) + .ok_or(format!("Expected SHOW CHANGES value:\nleft: {}\nright: {}", tmp, cf_val_arr))?; + Ok(()) + } + + // Check the validation with repeats + let limit = 1; + for i in 0..limit { + let test_result = check_test(&dbs, sql2, &ses, &cf_val_arr).await; + match test_result { + Ok(_) => break, + Err(e) => { + if i == limit - 1 { + panic!("Failed after retries: {}", e); + } + println!("Failed after retry {}:\n{}", i, e); + tokio::time::sleep(std::time::Duration::from_millis(500)).await; + } + } + } // Retain for 1h let sql = " SHOW CHANGES FOR TABLE person SINCE 0; "; - dbs.tick_at(end_ts + 3599).await?; + // This is neccessary to mark a point in time that can be GC'd + current_time += 1; + dbs.tick_at(current_time).await?; + let mut tx = dbs.transaction(Write, Optimistic).await?; + tx.print_all().await; + tx.cancel().await?; + let res = &mut dbs.execute(sql, &ses, None).await?; let tmp = res.remove(0).result?; - assert_eq!(tmp, val); + assert_eq!(tmp, cf_val_arr); + // GC after 1hs - dbs.tick_at(end_ts + 3600).await?; + let one_hour_in_secs = 3600; + current_time += one_hour_in_secs; + current_time += 1; + dbs.tick_at(current_time).await?; let res = &mut dbs.execute(sql, &ses, None).await?; let tmp = res.remove(0).result?; let val = Value::parse("[]"); @@ -166,7 +226,7 @@ async fn table_change_feeds() -> Result<(), Error> { SHOW CHANGES FOR TABLE person SINCE 0; "; let dbs = new_ds().await?; - let ses = Session::owner().with_ns("test").with_db("test"); + let ses = Session::owner().with_ns("test-tb-cf").with_db("test-tb-cf"); let start_ts = 0u64; let end_ts = start_ts + 1; dbs.tick_at(start_ts).await?; @@ -236,7 +296,7 @@ async fn table_change_feeds() -> Result<(), Error> { true => Value::parse( "[ { - versionstamp: 65536, + versionstamp: 1, changes: [ { define_table: { @@ -246,7 +306,7 @@ async fn table_change_feeds() -> Result<(), Error> { ] }, { - versionstamp: 131072, + versionstamp: 2, changes: [ { create: { @@ -257,7 +317,7 @@ async fn table_change_feeds() -> Result<(), Error> { ] }, { - versionstamp: 196608, + versionstamp: 3, changes: [ { update: { @@ -268,7 +328,7 @@ async fn table_change_feeds() -> Result<(), Error> { ] }, { - versionstamp: 262144, + versionstamp: 4, changes: [ { update: { @@ -279,7 +339,7 @@ async fn table_change_feeds() -> Result<(), Error> { ] }, { - versionstamp: 327680, + versionstamp: 5, changes: [ { delete: { @@ -289,7 +349,7 @@ async fn table_change_feeds() -> Result<(), Error> { ] }, { - versionstamp: 393216, + versionstamp: 6, changes: [ { create: { @@ -304,7 +364,7 @@ async fn table_change_feeds() -> Result<(), Error> { false => Value::parse( "[ { - versionstamp: 65536, + versionstamp: 1, changes: [ { define_table: { @@ -314,7 +374,7 @@ async fn table_change_feeds() -> Result<(), Error> { ] }, { - versionstamp: 131072, + versionstamp: 2, changes: [ { update: { @@ -325,7 +385,7 @@ async fn table_change_feeds() -> Result<(), Error> { ] }, { - versionstamp: 196608, + versionstamp: 3, changes: [ { update: { @@ -336,7 +396,7 @@ async fn table_change_feeds() -> Result<(), Error> { ] }, { - versionstamp: 262144, + versionstamp: 4, changes: [ { update: { @@ -347,7 +407,7 @@ async fn table_change_feeds() -> Result<(), Error> { ] }, { - versionstamp: 327680, + versionstamp: 5, changes: [ { delete: { @@ -357,7 +417,7 @@ async fn table_change_feeds() -> Result<(), Error> { ] }, { - versionstamp: 393216, + versionstamp: 6, changes: [ { update: { @@ -392,7 +452,7 @@ async fn table_change_feeds() -> Result<(), Error> { #[tokio::test] async fn changefeed_with_ts() -> Result<(), Error> { let db = new_ds().await?; - let ses = Session::owner().with_ns("test").with_db("test"); + let ses = Session::owner().with_ns("test-cf-ts").with_db("test-cf-ts"); // Enable change feeds let sql = " DEFINE TABLE user CHANGEFEED 1h; diff --git a/tests/cli_integration.rs b/tests/cli_integration.rs index bc230d27..b9b4542f 100644 --- a/tests/cli_integration.rs +++ b/tests/cli_integration.rs @@ -665,23 +665,39 @@ mod cli_integration { .input("SHOW CHANGES FOR TABLE thing SINCE 0 LIMIT 10;\n") .output() .unwrap(); - let output = remove_debug_info(output); - assert_eq!( - output, - "[[{ changes: [{ define_table: { name: 'thing' } }], versionstamp: 65536 }, { changes: [{ create: { id: thing:one } }], versionstamp: 131072 }]]\n\n" - .to_owned(), - "failed to send sql: {args}"); + let output = remove_debug_info(output).replace('\n', ""); + let allowed = [ + "[[{ changes: [{ define_table: { name: 'thing' } }], versionstamp: 1 }, { changes: [{ create: { id: thing:one } }], versionstamp: 2 }]]", + "[[{ changes: [{ define_table: { name: 'thing' } }], versionstamp: 1 }, { changes: [{ create: { id: thing:one } }], versionstamp: 3 }]]", + "[[{ changes: [{ define_table: { name: 'thing' } }], versionstamp: 2 }, { changes: [{ create: { id: thing:one } }], versionstamp: 3 }]]", + "[[{ changes: [{ define_table: { name: 'thing' } }], versionstamp: 2 }, { changes: [{ create: { id: thing:one } }], versionstamp: 4 }]]", + ]; + allowed + .into_iter() + .find(|case| *case == output) + .ok_or(format!("Output didnt match an example output: {output}")) + .unwrap(); } else { let output = common::run(&args) .input("SHOW CHANGES FOR TABLE thing SINCE 0 LIMIT 10;\n") .output() .unwrap(); - let output = remove_debug_info(output); - assert_eq!( - output, - "[[{ changes: [{ define_table: { name: 'thing' } }], versionstamp: 65536 }, { changes: [{ update: { id: thing:one } }], versionstamp: 131072 }]]\n\n" - .to_owned(), - "failed to send sql: {args}" ); + let output = remove_debug_info(output).replace('\n', ""); + let allowed = [ + "[[{ changes: [{ define_table: { name: 'thing' } }], versionstamp: 1 }, { changes: [{ update: { id: thing:one } }], versionstamp: 2 }]]", + "[[{ changes: [{ define_table: { name: 'thing' } }], versionstamp: 1 }, { changes: [{ update: { id: thing:one } }], versionstamp: 3 }]]", + "[[{ changes: [{ define_table: { name: 'thing' } }], versionstamp: 2 }, { changes: [{ update: { id: thing:one } }], versionstamp: 3 }]]", + "[[{ changes: [{ define_table: { name: 'thing' } }], versionstamp: 2 }, { changes: [{ update: { id: thing:one } }], versionstamp: 4 }]]", + ]; + allowed + .into_iter() + .find(|case| { + let a = *case == output; + println!("Comparing\n{case}\n{output}\n{a}"); + a + }) + .ok_or(format!("Output didnt match an example output: {output}")) + .unwrap(); } };