Remove race conditions in cf tests (#3669)

This commit is contained in:
Przemyslaw Hugh Kaznowski 2024-03-13 12:12:35 +00:00 committed by GitHub
parent 9e2a0c75ca
commit 4804426648
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
13 changed files with 371 additions and 100 deletions

View file

@ -1,5 +1,7 @@
use crate::err::Error;
use crate::key::change;
#[cfg(debug_assertions)]
use crate::key::debug::sprint_key;
use crate::kvs::Transaction;
use crate::vs;
use crate::vs::Versionstamp;
@ -27,10 +29,19 @@ pub async fn gc_ns(
let dbs = tx.all_db(ns).await?;
let dbs = dbs.as_ref();
for db in dbs {
// We get the expiration of the change feed defined on the database
let db_cf_expiry = match &db.changefeed {
None => 0,
Some(cf) => cf.expiry.as_secs(),
};
#[cfg(debug_assertions)]
trace!(
"Performing garbage collection on ns {} db {} for ts {}. The cf expiration is {}",
ns,
db.name,
ts,
db_cf_expiry
);
let tbs = tx.all_tb(ns, db.name.as_str()).await?;
let tbs = tbs.as_ref();
let max_tb_cf_expiry = tbs.iter().fold(0, |acc, tb| match &tb.changefeed {
@ -47,7 +58,10 @@ pub async fn gc_ns(
if ts < cf_expiry {
continue;
}
// We only want to retain the expiry window, so we are going to delete everything before
let watermark_ts = ts - cf_expiry;
#[cfg(debug_assertions)]
trace!("The watermark is {} after removing {cf_expiry} from {ts}", watermark_ts);
let watermark_vs =
tx.get_versionstamp_from_timestamp(watermark_ts, ns, db.name.as_str(), true).await?;
if let Some(watermark_vs) = watermark_vs {
@ -67,6 +81,15 @@ pub async fn gc_db(
) -> Result<(), Error> {
let beg: Vec<u8> = change::prefix_ts(ns, db, vs::u64_to_versionstamp(0));
let end = change::prefix_ts(ns, db, watermark);
#[cfg(debug_assertions)]
trace!(
"DB GC: ns: {}, db: {}, watermark: {:?}, prefix: {}, end: {}",
ns,
db,
watermark,
sprint_key(&beg),
sprint_key(&end)
);
let limit = limit.unwrap_or(100);

View file

@ -3,7 +3,7 @@ use crate::sql::object::Object;
use crate::sql::statements::DefineTableStatement;
use crate::sql::thing::Thing;
use crate::sql::value::Value;
use crate::vs::to_u128_be;
use crate::vs::versionstamp_to_u64;
use derive::Store;
use revision::revisioned;
use serde::{Deserialize, Serialize};
@ -104,7 +104,7 @@ impl DatabaseMutation {
impl ChangeSet {
pub fn into_value(self) -> Value {
let mut m = BTreeMap::<String, Value>::new();
let vs = to_u128_be(self.0);
let vs = versionstamp_to_u64(&self.0);
m.insert("versionstamp".to_string(), Value::from(vs));
m.insert("changes".to_string(), self.1.into_value());
let so: Object = m.into();
@ -174,7 +174,7 @@ mod tests {
use super::*;
use std::collections::HashMap;
let cs = ChangeSet(
[0, 0, 0, 0, 0, 0, 0, 0, 0, 1],
[0, 0, 0, 0, 0, 0, 0, 1, 0, 0],
DatabaseMutation(vec![TableMutations(
"mytb".to_string(),
vec![
@ -209,7 +209,7 @@ mod tests {
use super::*;
use std::collections::HashMap;
let cs = ChangeSet(
[0, 0, 0, 0, 0, 0, 0, 0, 0, 1],
[0, 0, 0, 0, 0, 0, 0, 1, 0, 0],
DatabaseMutation(vec![TableMutations(
"mytb".to_string(),
vec![

View file

@ -1,6 +1,8 @@
use crate::cf::{ChangeSet, DatabaseMutation, TableMutations};
use crate::err::Error;
use crate::key::change;
#[cfg(debug_assertions)]
use crate::key::debug::sprint_key;
use crate::kvs::{Limit, ScanPage, Transaction};
use crate::sql::statements::show::ShowSince;
use crate::vs;
@ -56,7 +58,8 @@ pub async fn read(
let mut r = Vec::<ChangeSet>::new();
// iterate over _x and put decoded elements to r
for (k, v) in scan.values {
trace!("read change feed; {k:?}");
#[cfg(debug_assertions)]
trace!("read change feed; {}", sprint_key(&k));
let dec = crate::key::change::Cf::decode(&k).unwrap();

View file

@ -835,6 +835,7 @@ impl Datastore {
// It is handy for testing, because it allows you to specify the timestamp,
// without depending on a system clock.
pub async fn tick_at(&self, ts: u64) -> Result<(), Error> {
trace!("Ticking at timestamp {}", ts);
let _vs = self.save_timestamp_for_versionstamp(ts).await?;
self.garbage_collect_stale_change_feeds(ts).await?;
// TODO Add LQ GC
@ -843,6 +844,7 @@ impl Datastore {
}
// save_timestamp_for_versionstamp saves the current timestamp for the each database's current versionstamp.
// Note: the returned VS is flawed, as there are multiple {ts: vs} mappings per (ns, db)
pub(crate) async fn save_timestamp_for_versionstamp(
&self,
ts: u64,
@ -902,7 +904,7 @@ impl Datastore {
trace!(
"There were no changes in the change feed for {:?} from versionstamp {:?}",
selector,
vs
conv::versionstamp_to_u64(vs)
)
}
if let Some(change_set) = res.last() {
@ -1145,6 +1147,7 @@ impl Datastore {
let dbs = dbs.as_ref();
for db in dbs {
let db = db.name.as_str();
// TODO(SUR-341): This is incorrect, it's a [ns,db] to vs pair
vs = Some(tx.set_timestamp_for_versionstamp(ts, ns, db, true).await?);
}
}

View file

@ -1,10 +1,12 @@
#![cfg(feature = "kv-mem")]
use crate::err::Error;
#[cfg(debug_assertions)]
use crate::key::debug::sprint_key;
use crate::kvs::Check;
use crate::kvs::Key;
use crate::kvs::Val;
use crate::vs::{try_to_u64_be, u64_to_versionstamp, Versionstamp};
use crate::vs::{u64_to_versionstamp, versionstamp_to_u64, Versionstamp};
use std::ops::Range;
pub struct Datastore {
@ -168,11 +170,19 @@ impl Transaction {
Some(prev) => {
let slice = prev.as_slice();
let res: Result<[u8; 10], Error> = match slice.try_into() {
Ok(ba) => Ok(ba),
Ok(ba) => {
#[cfg(debug_assertions)]
trace!(
"Previous timestamp for key {} is {}",
sprint_key(&k),
sprint_key(&ba)
);
Ok(ba)
}
Err(e) => Err(Error::Ds(e.to_string())),
};
let array = res?;
let prev = try_to_u64_be(array)?;
let prev = versionstamp_to_u64(&array);
prev + 1
}
None => 1,
@ -204,15 +214,22 @@ impl Transaction {
}
let ts_key: Key = ts_key.into();
#[cfg(debug_assertions)]
let dbg_ts = sprint_key(&ts_key);
let prefix: Key = prefix.into();
let suffix: Key = suffix.into();
#[cfg(debug_assertions)]
let dbg_prefix = sprint_key(&prefix);
let mut suffix: Key = suffix.into();
#[cfg(debug_assertions)]
let dbg_suffix = sprint_key(&suffix);
let ts = self.get_timestamp(ts_key.clone())?;
let mut k: Vec<u8> = prefix.clone();
let ts = self.get_timestamp(ts_key)?;
let mut k: Vec<u8> = prefix;
k.append(&mut ts.to_vec());
k.append(&mut suffix.clone());
k.append(&mut suffix);
trace!("get_versionstamped_key; {ts_key:?} {prefix:?} {ts:?} {suffix:?} {k:?}",);
#[cfg(debug_assertions)]
trace!("get_versionstamped_key; prefix={dbg_prefix} ts={dbg_ts} suff={dbg_suffix}");
Ok(k)
}

View file

@ -1,4 +1,7 @@
use crate::key::debug::sprint_key;
use crate::key::error::KeyCategory;
use crate::kvs::lq_structs::{KillEntry, LqEntry, TrackedResult};
use crate::sql::Strand;
#[tokio::test]
#[serial]
@ -58,3 +61,82 @@ async fn kill_queries_sent_to_tx_are_received() {
assert_eq!(live_queries.len(), 1);
assert_eq!(live_queries[0], TrackedResult::KillQuery(kill_entry));
}
#[tokio::test]
#[serial]
async fn delr_range_correct() {
let node_id = uuid::uuid!("d0f1a200-e24e-44fe-98c1-2271a5781da7");
let clock = Arc::new(SizedClock::Fake(FakeClock::new(Timestamp::default())));
let test = init(node_id, clock).await.unwrap();
// Create some data
let mut tx = test.db.transaction(Write, Optimistic).await.unwrap();
tx.putc(b"hugh\x00\x10", Value::Strand(Strand::from("0010")), None).await.unwrap();
tx.put(KeyCategory::ChangeFeed, b"hugh\x00\x10\x10", Value::Strand(Strand::from("001010")))
.await
.unwrap();
tx.putc(b"hugh\x00\x20", Value::Strand(Strand::from("0020")), None).await.unwrap();
tx.commit().await.unwrap();
// Check we have all data
let mut tx = test.db.transaction(Write, Optimistic).await.unwrap();
let vals = tx.scan(b"hugh\x00".to_vec()..b"hugh\xff".to_vec(), 100).await.unwrap();
assert_eq!(vals.len(), 3);
tx.cancel().await.unwrap();
// Delete first range, inclusive of next key, without deleting next key
let mut tx = test.db.transaction(Write, Optimistic).await.unwrap();
tx.delr(b"hugh\x00".to_vec()..b"hugh\x00\x10\x10".to_vec(), 100).await.unwrap();
tx.commit().await.unwrap();
// Scan results
let mut tx = test.db.transaction(Write, Optimistic).await.unwrap();
let vals = tx.scan(b"hugh\x00"..b"hugh\xff", 100).await.unwrap();
assert_eq!(vals.len(), 2);
tx.cancel().await.unwrap();
// Delete second range, beyond next key but beyond limit
let mut tx = test.db.transaction(Write, Optimistic).await.unwrap();
tx.delr(b"hugh\x00\x20".to_vec()..b"hugh\xff".to_vec(), 1).await.unwrap();
tx.commit().await.unwrap();
// Scan results
let mut tx = test.db.transaction(Write, Optimistic).await.unwrap();
let vals = tx.scan(b"hugh\x00"..b"hugh\xff", 100).await.unwrap();
assert_eq!(vals.len(), 1);
tx.cancel().await.unwrap();
}
#[tokio::test]
#[serial]
async fn set_versionstamp_is_incremental() {
let node_id = uuid::uuid!("3988b179-6212-4a45-a496-4d9ee4cbd639");
let clock = Arc::new(SizedClock::Fake(FakeClock::new(Timestamp::default())));
let test = init(node_id, clock).await.unwrap();
// Create the first timestamped key
let mut tx = test.db.transaction(Write, Optimistic).await.unwrap();
tx.set_versionstamped_key(b"ts_key", b"prefix", b"suffix", Value::from("'value'"))
.await
.unwrap();
tx.commit().await.unwrap();
// Create the second timestamped key
let mut tx = test.db.transaction(Write, Optimistic).await.unwrap();
tx.set_versionstamped_key(b"ts_key", b"prefix", b"suffix", Value::from("'value'"))
.await
.unwrap();
tx.commit().await.unwrap();
// Scan the keys and validate versionstamps match expected
let mut tx = test.db.transaction(Write, Optimistic).await.unwrap();
let found = tx.scan(b"prefix".to_vec()..b"prefix\xff".to_vec(), 1000).await.unwrap();
tx.cancel().await.unwrap();
assert_eq!(found.len(), 2);
let expected_keys = [
b"prefix\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00suffix",
b"prefix\x00\x00\x00\x00\x00\x00\x00\x02\x00\x00suffix",
];
assert_eq!(found[0].0, expected_keys[0], "key was {}", sprint_key(&found[0].0));
assert_eq!(found[1].0, expected_keys[1], "key was {}", sprint_key(&found[1].0));
}

View file

@ -29,6 +29,7 @@ use crate::dbs::node::ClusterMembership;
use crate::dbs::node::Timestamp;
use crate::err::Error;
use crate::idg::u32::U32;
#[cfg(debug_assertions)]
use crate::key::debug::sprint_key;
use crate::key::error::KeyCategory;
use crate::key::key_req::KeyRequirements;
@ -45,8 +46,8 @@ use crate::sql::paths::OUT;
use crate::sql::thing::Thing;
use crate::sql::Strand;
use crate::sql::Value;
use crate::vs::Oracle;
use crate::vs::Versionstamp;
use crate::vs::{conv, Oracle};
use super::kv::Add;
use super::kv::Convert;
@ -363,7 +364,7 @@ impl Transaction {
{
let key = key.into();
#[cfg(debug_assertions)]
trace!("Del {:?}", sprint_key(&key));
trace!("Del {}", sprint_key(&key));
match self {
#[cfg(feature = "kv-mem")]
Transaction {
@ -412,7 +413,7 @@ impl Transaction {
K: Into<Key> + Debug + AsRef<[u8]>,
{
#[cfg(debug_assertions)]
trace!("Exi {:?}", sprint_key(&key));
trace!("Exi {}", sprint_key(&key));
match self {
#[cfg(feature = "kv-mem")]
Transaction {
@ -462,7 +463,7 @@ impl Transaction {
{
let key = key.into();
#[cfg(debug_assertions)]
trace!("Get {:?}", sprint_key(&key));
trace!("Get {}", sprint_key(&key));
match self {
#[cfg(feature = "kv-mem")]
Transaction {
@ -513,7 +514,7 @@ impl Transaction {
{
let key = key.into();
#[cfg(debug_assertions)]
trace!("Set {:?} => {:?}", sprint_key(&key), val);
trace!("Set {} => {:?}", sprint_key(&key), val);
match self {
#[cfg(feature = "kv-mem")]
Transaction {
@ -568,7 +569,7 @@ impl Transaction {
// We convert to byte slice as its easier at this level
let key = key.into();
#[cfg(debug_assertions)]
trace!("Get Timestamp {:?}", sprint_key(&key));
trace!("Get Timestamp {}", sprint_key(&key));
match self {
#[cfg(feature = "kv-mem")]
Transaction {
@ -646,6 +647,16 @@ impl Transaction {
K: Into<Key> + Debug,
V: Into<Val> + Debug,
{
let ts_key = ts_key.into();
let prefix = prefix.into();
let suffix = suffix.into();
#[cfg(debug_assertions)]
trace!(
"Set Versionstamped Key ts={} prefix={} suffix={}",
sprint_key(&prefix),
sprint_key(&ts_key),
sprint_key(&suffix)
);
match self {
#[cfg(feature = "kv-mem")]
Transaction {
@ -766,7 +777,7 @@ impl Transaction {
end: rng.end.into(),
};
#[cfg(debug_assertions)]
trace!("Scan {:?} - {:?}", sprint_key(&rng.start), sprint_key(&rng.end));
trace!("Scan {} - {}", sprint_key(&rng.start), sprint_key(&rng.end));
match self {
#[cfg(feature = "kv-mem")]
Transaction {
@ -821,7 +832,7 @@ impl Transaction {
K: Into<Key> + From<Vec<u8>> + AsRef<[u8]> + Debug + Clone,
{
#[cfg(debug_assertions)]
trace!("Scan {:?} - {:?}", sprint_key(&page.range.start), sprint_key(&page.range.end));
trace!("Scan paged {} - {}", sprint_key(&page.range.start), sprint_key(&page.range.end));
let range = page.range.clone();
let res = match self {
#[cfg(feature = "kv-mem")]
@ -895,7 +906,7 @@ impl Transaction {
{
let key = key.into();
#[cfg(debug_assertions)]
trace!("Putc {:?} if {:?} => {:?}", sprint_key(&key), chk, val);
trace!("Putc {} if {:?} => {:?}", sprint_key(&key), chk, val);
match self {
#[cfg(feature = "kv-mem")]
Transaction {
@ -946,7 +957,7 @@ impl Transaction {
{
let key = key.into();
#[cfg(debug_assertions)]
trace!("Delc {:?} if {:?}", sprint_key(&key), chk);
trace!("Delc {} if {:?}", sprint_key(&key), chk);
match self {
#[cfg(feature = "kv-mem")]
Transaction {
@ -1002,7 +1013,7 @@ impl Transaction {
let beg: Key = rng.start.into();
let end: Key = rng.end.into();
#[cfg(debug_assertions)]
trace!("Getr {:?}..{:?} (limit: {limit})", sprint_key(&beg), sprint_key(&end));
trace!("Getr {}..{} (limit: {limit})", sprint_key(&beg), sprint_key(&end));
let mut out: Vec<(Key, Val)> = vec![];
let mut next_page = Some(ScanPage {
range: beg..end,
@ -1038,7 +1049,7 @@ impl Transaction {
end: rng.end.into(),
};
#[cfg(debug_assertions)]
trace!("Delr {:?}..{:?} (limit: {limit})", sprint_key(&rng.start), sprint_key(&rng.end));
trace!("Delr {}..{} (limit: {limit})", sprint_key(&rng.start), sprint_key(&rng.end));
match self {
#[cfg(feature = "kv-tikv")]
Transaction {
@ -1076,11 +1087,14 @@ impl Transaction {
let res = res.values;
// Exit when settled
if res.is_empty() {
trace!("Delr page was empty");
break;
}
// Loop over results
for (k, _) in res.into_iter() {
// Delete
#[cfg(debug_assertions)]
trace!("Delr key {}", sprint_key(&k));
self.del(k).await?;
}
}
@ -1096,7 +1110,7 @@ impl Transaction {
let beg: Key = key.into();
let end: Key = beg.clone().add(0xff);
#[cfg(debug_assertions)]
trace!("Getp {:?}-{:?} (limit: {limit})", sprint_key(&beg), sprint_key(&end));
trace!("Getp {}-{} (limit: {limit})", sprint_key(&beg), sprint_key(&end));
let mut out: Vec<(Key, Val)> = vec![];
// Start processing
let mut next_page = Some(ScanPage {
@ -1130,7 +1144,7 @@ impl Transaction {
let beg: Key = key.into();
let end: Key = beg.clone().add(0xff);
#[cfg(debug_assertions)]
trace!("Delp {:?}-{:?} (limit: {limit})", sprint_key(&beg), sprint_key(&end));
trace!("Delp {}-{} (limit: {limit})", sprint_key(&beg), sprint_key(&end));
let min = beg.clone();
let max = end.clone();
self.delr(min..max, limit).await?;
@ -2022,7 +2036,7 @@ impl Transaction {
) -> Result<LiveStatement, Error> {
let key = crate::key::table::lq::new(ns, db, tb, *lv);
let key_enc = crate::key::table::lq::Lq::encode(&key)?;
trace!("Getting lv ({:?}) {:?}", lv, sprint_key(&key_enc));
trace!("Getting lv ({:?}) {}", lv, sprint_key(&key_enc));
let val = self.get(key_enc).await?.ok_or(Error::LvNotFound {
value: lv.to_string(),
})?;
@ -2040,7 +2054,7 @@ impl Transaction {
) -> Result<DefineEventStatement, Error> {
let key = crate::key::table::ev::new(ns, db, tb, ev);
let key_enc = crate::key::table::ev::Ev::encode(&key)?;
trace!("Getting ev ({:?}) {:?}", ev, sprint_key(&key_enc));
trace!("Getting ev ({:?}) {}", ev, sprint_key(&key_enc));
let val = self.get(key_enc).await?.ok_or(Error::EvNotFound {
value: ev.to_string(),
})?;
@ -2058,7 +2072,7 @@ impl Transaction {
) -> Result<DefineFieldStatement, Error> {
let key = crate::key::table::fd::new(ns, db, tb, fd);
let key_enc = crate::key::table::fd::Fd::encode(&key)?;
trace!("Getting fd ({:?}) {:?}", fd, sprint_key(&key_enc));
trace!("Getting fd ({:?}) {}", fd, sprint_key(&key_enc));
let val = self.get(key_enc).await?.ok_or(Error::FdNotFound {
value: fd.to_string(),
})?;
@ -2076,7 +2090,7 @@ impl Transaction {
) -> Result<DefineIndexStatement, Error> {
let key = crate::key::table::ix::new(ns, db, tb, ix);
let key_enc = crate::key::table::ix::Ix::encode(&key)?;
trace!("Getting ix ({:?}) {:?}", ix, sprint_key(&key_enc));
trace!("Getting ix ({:?}) {}", ix, sprint_key(&key_enc));
let val = self.get(key_enc).await?.ok_or(Error::IxNotFound {
value: ix.to_string(),
})?;
@ -2887,6 +2901,14 @@ impl Transaction {
// This also works as an advisory lock on the ts keys so that there is
// on other concurrent transactions that can write to the ts_key or the keys after it.
let vs = self.get_timestamp(crate::key::database::vs::new(ns, db), lock).await?;
#[cfg(debug_assertions)]
trace!(
"Setting timestamp {} for versionstamp {:?} in ns: {}, db: {}",
ts,
conv::versionstamp_to_u64(&vs),
ns,
db
);
// Ensure there are no keys after the ts_key
// Otherwise we can go back in time!
@ -2896,6 +2918,13 @@ impl Transaction {
let ts_pairs: Vec<(Vec<u8>, Vec<u8>)> = self.getr(begin..end, u32::MAX).await?;
let latest_ts_pair = ts_pairs.last();
if let Some((k, _)) = latest_ts_pair {
trace!(
"There already was a greater committed timestamp {} in ns: {}, db: {} found: {}",
ts,
ns,
db,
sprint_key(k)
);
let k = crate::key::database::ts::Ts::decode(k)?;
let latest_ts = k.ts;
if latest_ts >= ts {
@ -2979,6 +3008,23 @@ impl Transaction {
_ => unreachable!(),
}
}
#[cfg(debug_assertions)]
#[allow(unused)]
#[doc(hidden)]
pub async fn print_all(&mut self) {
let mut next_page =
Some(ScanPage::from(crate::key::root::ns::prefix()..b"\xff\xff\xff".to_vec()));
println!("Start print all");
while next_page.is_some() {
let res = self.scan_paged(next_page.unwrap(), 1000).await.unwrap();
for (k, _) in res.values {
println!("{}", sprint_key(&k));
}
next_page = res.next_page;
}
println!("End print all");
}
}
#[cfg(test)]

View file

@ -70,8 +70,8 @@ impl ShowStatement {
.await?;
// Return the changes
let mut a = Vec::<Value>::new();
for r in r.iter() {
let v: Value = r.clone().into_value();
for r in r.into_iter() {
let v: Value = r.into_value();
a.push(v);
}
let v: Value = Value::Array(crate::sql::array::Array(a));

View file

@ -75,11 +75,14 @@ pub fn try_u128_to_versionstamp(v: u128) -> Result<[u8; 10], Error> {
Ok(buf)
}
/// Take the most significant, time-based bytes and ignores the last 2 bytes
#[doc(hidden)]
pub fn versionstamp_to_u64(vs: &Versionstamp) -> u64 {
u64::from_be_bytes(vs[..8].try_into().unwrap())
}
// to_u128_be converts a 10-byte versionstamp to a u128 assuming big-endian.
// This is handy for human comparing versionstamps.
// This is not the same as timestamp u64 representation as the tailing bytes are included
#[allow(unused)]
pub fn to_u128_be(vs: [u8; 10]) -> u128 {
let mut buf = [0; 16];
@ -135,7 +138,10 @@ pub fn to_u128_le(vs: [u8; 10]) -> u128 {
u128::from_be_bytes(buf)
}
#[cfg(test)]
mod tests {
use crate::vs::{u64_to_versionstamp, versionstamp_to_u64};
#[test]
fn try_to_u64_be() {
use super::*;
@ -161,4 +167,20 @@ mod tests {
let res = try_u128_to_versionstamp(v).unwrap();
assert_eq!(res, [255, 255, 255, 255, 255, 255, 255, 255, 255, 255]);
}
#[test]
fn can_add_u64_conversion() {
let start = 5u64;
let vs = u64_to_versionstamp(start);
// The last 2 bytes are empty
assert_eq!("00000000000000050000", hex::encode(vs));
let mid = versionstamp_to_u64(&vs);
assert_eq!(start, mid);
let mid = mid + 1;
let vs = u64_to_versionstamp(mid);
// The last 2 bytes are empty
assert_eq!("00000000000000060000", hex::encode(vs));
let end = versionstamp_to_u64(&vs);
assert_eq!(end, 6);
}
}

View file

@ -153,10 +153,9 @@ fn secs_since_unix_epoch() -> u64 {
}
}
#[cfg(test)]
mod tests {
#[allow(unused)]
use super::*;
#[allow(unused)]
use crate::vs::to_u128_be;
#[test]

View file

@ -49,7 +49,7 @@ async fn bootstrap_removes_unreachable_nodes() -> Result<(), Error> {
dbs.bootstrap().await.unwrap();
// Declare a function that will assert
async fn try_validate(mut tx: &mut Transaction, bad_node: &uuid::Uuid) -> Result<(), String> {
async fn try_validate(tx: &mut Transaction, bad_node: &uuid::Uuid) -> Result<(), String> {
let res = tx.scan_nd(1000).await.map_err(|e| e.to_string())?;
tx.commit().await.map_err(|e| e.to_string())?;
for node in &res {

View file

@ -6,19 +6,27 @@ use surrealdb::dbs::Session;
use surrealdb::err::Error;
use surrealdb::sql::Value;
use surrealdb_core::fflags::FFLAGS;
use surrealdb_core::kvs::Datastore;
use surrealdb_core::kvs::LockType::Optimistic;
use surrealdb_core::kvs::TransactionType::Write;
mod helpers;
mod parse;
#[tokio::test]
#[test_log::test(tokio::test)]
async fn database_change_feeds() -> Result<(), Error> {
let sql = "
DEFINE DATABASE test CHANGEFEED 1h;
// This is a unique shared identifier
let identifier = "alpaca";
let ns = format!("namespace_{identifier}");
let db = format!("database_{identifier}");
let sql = format!(
"
DEFINE DATABASE {db} CHANGEFEED 1h;
DEFINE TABLE person;
DEFINE FIELD name ON TABLE person
ASSERT
IF $input THEN
$input = /^[A-Z]{1}[a-z]+$/
$input = /^[A-Z]{{1}}[a-z]+$/
ELSE
true
END
@ -29,18 +37,22 @@ async fn database_change_feeds() -> Result<(), Error> {
$value
END
;
"
);
let sql2 = "
UPDATE person:test CONTENT { name: 'Tobie' };
DELETE person:test;
SHOW CHANGES FOR TABLE person SINCE 0;
";
let dbs = new_ds().await?;
let ses = Session::owner().with_ns("test").with_db("test");
let start_ts = 0u64;
let end_ts = start_ts + 1;
dbs.tick_at(start_ts).await?;
let res = &mut dbs.execute(sql, &ses, None).await?;
dbs.tick_at(end_ts).await?;
assert_eq!(res.len(), 6);
let ses = Session::owner().with_ns(ns.as_str()).with_db(db.as_str());
let mut current_time = 0u64;
dbs.tick_at(current_time).await?;
let res = &mut dbs.execute(sql.as_str(), &ses, None).await?;
// Increment by a second (sic)
current_time += 1;
dbs.tick_at(current_time).await?;
assert_eq!(res.len(), 3);
// DEFINE DATABASE
let tmp = res.remove(0).result;
assert!(tmp.is_ok());
@ -50,28 +62,12 @@ async fn database_change_feeds() -> Result<(), Error> {
// DEFINE FIELD
let tmp = res.remove(0).result;
assert!(tmp.is_ok());
// UPDATE CONTENT
let tmp = res.remove(0).result?;
let val = Value::parse(
"[
{
id: person:test,
name: 'Name: Tobie',
}
]",
);
assert_eq!(tmp, val);
// DELETE
let tmp = res.remove(0).result?;
let val = Value::parse("[]");
assert_eq!(tmp, val);
// SHOW CHANGES
let tmp = res.remove(0).result?;
let val = match FFLAGS.change_feed_live_queries.enabled() {
let cf_val_arr = match FFLAGS.change_feed_live_queries.enabled() {
true => Value::parse(
"[
{
versionstamp: 65536,
versionstamp: 2,
changes: [
{
create: {
@ -82,7 +78,7 @@ async fn database_change_feeds() -> Result<(), Error> {
]
},
{
versionstamp: 131072,
versionstamp: 3,
changes: [
{
delete: {
@ -96,7 +92,7 @@ async fn database_change_feeds() -> Result<(), Error> {
false => Value::parse(
"[
{
versionstamp: 65536,
versionstamp: 2,
changes: [
{
update: {
@ -107,7 +103,7 @@ async fn database_change_feeds() -> Result<(), Error> {
]
},
{
versionstamp: 131072,
versionstamp: 3,
changes: [
{
delete: {
@ -119,17 +115,81 @@ async fn database_change_feeds() -> Result<(), Error> {
]",
),
};
assert_eq!(tmp, val);
// Declare check that is repeatable
async fn check_test(
dbs: &Datastore,
sql2: &str,
ses: &Session,
cf_val_arr: &Value,
) -> Result<(), String> {
let res = &mut dbs.execute(sql2, ses, None).await?;
assert_eq!(res.len(), 3);
// UPDATE CONTENT
let tmp = res.remove(0).result?;
let val = Value::parse(
"[
{
id: person:test,
name: 'Name: Tobie',
}
]",
);
Some(&tmp)
.filter(|x| *x == &val)
.map(|v| ())
.ok_or(format!("Expected UPDATE value:\nleft: {}\nright: {}", tmp, val))?;
// DELETE
let tmp = res.remove(0).result?;
let val = Value::parse("[]");
Some(&tmp)
.filter(|x| *x == &val)
.map(|v| ())
.ok_or(format!("Expected DELETE value:\nleft: {}\nright: {}", tmp, val))?;
// SHOW CHANGES
let tmp = res.remove(0).result?;
Some(&tmp)
.filter(|x| *x == cf_val_arr)
.map(|v| ())
.ok_or(format!("Expected SHOW CHANGES value:\nleft: {}\nright: {}", tmp, cf_val_arr))?;
Ok(())
}
// Check the validation with repeats
let limit = 1;
for i in 0..limit {
let test_result = check_test(&dbs, sql2, &ses, &cf_val_arr).await;
match test_result {
Ok(_) => break,
Err(e) => {
if i == limit - 1 {
panic!("Failed after retries: {}", e);
}
println!("Failed after retry {}:\n{}", i, e);
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
}
}
}
// Retain for 1h
let sql = "
SHOW CHANGES FOR TABLE person SINCE 0;
";
dbs.tick_at(end_ts + 3599).await?;
// This is neccessary to mark a point in time that can be GC'd
current_time += 1;
dbs.tick_at(current_time).await?;
let mut tx = dbs.transaction(Write, Optimistic).await?;
tx.print_all().await;
tx.cancel().await?;
let res = &mut dbs.execute(sql, &ses, None).await?;
let tmp = res.remove(0).result?;
assert_eq!(tmp, val);
assert_eq!(tmp, cf_val_arr);
// GC after 1hs
dbs.tick_at(end_ts + 3600).await?;
let one_hour_in_secs = 3600;
current_time += one_hour_in_secs;
current_time += 1;
dbs.tick_at(current_time).await?;
let res = &mut dbs.execute(sql, &ses, None).await?;
let tmp = res.remove(0).result?;
let val = Value::parse("[]");
@ -166,7 +226,7 @@ async fn table_change_feeds() -> Result<(), Error> {
SHOW CHANGES FOR TABLE person SINCE 0;
";
let dbs = new_ds().await?;
let ses = Session::owner().with_ns("test").with_db("test");
let ses = Session::owner().with_ns("test-tb-cf").with_db("test-tb-cf");
let start_ts = 0u64;
let end_ts = start_ts + 1;
dbs.tick_at(start_ts).await?;
@ -236,7 +296,7 @@ async fn table_change_feeds() -> Result<(), Error> {
true => Value::parse(
"[
{
versionstamp: 65536,
versionstamp: 1,
changes: [
{
define_table: {
@ -246,7 +306,7 @@ async fn table_change_feeds() -> Result<(), Error> {
]
},
{
versionstamp: 131072,
versionstamp: 2,
changes: [
{
create: {
@ -257,7 +317,7 @@ async fn table_change_feeds() -> Result<(), Error> {
]
},
{
versionstamp: 196608,
versionstamp: 3,
changes: [
{
update: {
@ -268,7 +328,7 @@ async fn table_change_feeds() -> Result<(), Error> {
]
},
{
versionstamp: 262144,
versionstamp: 4,
changes: [
{
update: {
@ -279,7 +339,7 @@ async fn table_change_feeds() -> Result<(), Error> {
]
},
{
versionstamp: 327680,
versionstamp: 5,
changes: [
{
delete: {
@ -289,7 +349,7 @@ async fn table_change_feeds() -> Result<(), Error> {
]
},
{
versionstamp: 393216,
versionstamp: 6,
changes: [
{
create: {
@ -304,7 +364,7 @@ async fn table_change_feeds() -> Result<(), Error> {
false => Value::parse(
"[
{
versionstamp: 65536,
versionstamp: 1,
changes: [
{
define_table: {
@ -314,7 +374,7 @@ async fn table_change_feeds() -> Result<(), Error> {
]
},
{
versionstamp: 131072,
versionstamp: 2,
changes: [
{
update: {
@ -325,7 +385,7 @@ async fn table_change_feeds() -> Result<(), Error> {
]
},
{
versionstamp: 196608,
versionstamp: 3,
changes: [
{
update: {
@ -336,7 +396,7 @@ async fn table_change_feeds() -> Result<(), Error> {
]
},
{
versionstamp: 262144,
versionstamp: 4,
changes: [
{
update: {
@ -347,7 +407,7 @@ async fn table_change_feeds() -> Result<(), Error> {
]
},
{
versionstamp: 327680,
versionstamp: 5,
changes: [
{
delete: {
@ -357,7 +417,7 @@ async fn table_change_feeds() -> Result<(), Error> {
]
},
{
versionstamp: 393216,
versionstamp: 6,
changes: [
{
update: {
@ -392,7 +452,7 @@ async fn table_change_feeds() -> Result<(), Error> {
#[tokio::test]
async fn changefeed_with_ts() -> Result<(), Error> {
let db = new_ds().await?;
let ses = Session::owner().with_ns("test").with_db("test");
let ses = Session::owner().with_ns("test-cf-ts").with_db("test-cf-ts");
// Enable change feeds
let sql = "
DEFINE TABLE user CHANGEFEED 1h;

View file

@ -665,23 +665,39 @@ mod cli_integration {
.input("SHOW CHANGES FOR TABLE thing SINCE 0 LIMIT 10;\n")
.output()
.unwrap();
let output = remove_debug_info(output);
assert_eq!(
output,
"[[{ changes: [{ define_table: { name: 'thing' } }], versionstamp: 65536 }, { changes: [{ create: { id: thing:one } }], versionstamp: 131072 }]]\n\n"
.to_owned(),
"failed to send sql: {args}");
let output = remove_debug_info(output).replace('\n', "");
let allowed = [
"[[{ changes: [{ define_table: { name: 'thing' } }], versionstamp: 1 }, { changes: [{ create: { id: thing:one } }], versionstamp: 2 }]]",
"[[{ changes: [{ define_table: { name: 'thing' } }], versionstamp: 1 }, { changes: [{ create: { id: thing:one } }], versionstamp: 3 }]]",
"[[{ changes: [{ define_table: { name: 'thing' } }], versionstamp: 2 }, { changes: [{ create: { id: thing:one } }], versionstamp: 3 }]]",
"[[{ changes: [{ define_table: { name: 'thing' } }], versionstamp: 2 }, { changes: [{ create: { id: thing:one } }], versionstamp: 4 }]]",
];
allowed
.into_iter()
.find(|case| *case == output)
.ok_or(format!("Output didnt match an example output: {output}"))
.unwrap();
} else {
let output = common::run(&args)
.input("SHOW CHANGES FOR TABLE thing SINCE 0 LIMIT 10;\n")
.output()
.unwrap();
let output = remove_debug_info(output);
assert_eq!(
output,
"[[{ changes: [{ define_table: { name: 'thing' } }], versionstamp: 65536 }, { changes: [{ update: { id: thing:one } }], versionstamp: 131072 }]]\n\n"
.to_owned(),
"failed to send sql: {args}" );
let output = remove_debug_info(output).replace('\n', "");
let allowed = [
"[[{ changes: [{ define_table: { name: 'thing' } }], versionstamp: 1 }, { changes: [{ update: { id: thing:one } }], versionstamp: 2 }]]",
"[[{ changes: [{ define_table: { name: 'thing' } }], versionstamp: 1 }, { changes: [{ update: { id: thing:one } }], versionstamp: 3 }]]",
"[[{ changes: [{ define_table: { name: 'thing' } }], versionstamp: 2 }, { changes: [{ update: { id: thing:one } }], versionstamp: 3 }]]",
"[[{ changes: [{ define_table: { name: 'thing' } }], versionstamp: 2 }, { changes: [{ update: { id: thing:one } }], versionstamp: 4 }]]",
];
allowed
.into_iter()
.find(|case| {
let a = *case == output;
println!("Comparing\n{case}\n{output}\n{a}");
a
})
.ok_or(format!("Output didnt match an example output: {output}"))
.unwrap();
}
};