From e00c456389a2a64e9e642221c7f20588b0539c98 Mon Sep 17 00:00:00 2001 From: Przemyslaw Hugh Kaznowski Date: Thu, 21 Mar 2024 10:15:40 +0000 Subject: [PATCH] Revert CF timestamp format (#3690) --- core/src/cf/mutations.rs | 8 +- core/src/kvs/mem/mod.rs | 4 +- core/src/vs/conv.rs | 2 + core/src/vs/mod.rs | 8 ++ lib/tests/changefeeds.rs | 215 +++++++++++++++++++++++++++++++++------ 5 files changed, 202 insertions(+), 35 deletions(-) diff --git a/core/src/cf/mutations.rs b/core/src/cf/mutations.rs index 58d59d0e..c6ce5432 100644 --- a/core/src/cf/mutations.rs +++ b/core/src/cf/mutations.rs @@ -4,7 +4,7 @@ use crate::sql::statements::DefineTableStatement; use crate::sql::thing::Thing; use crate::sql::value::Value; use crate::sql::Operation; -use crate::vs::versionstamp_to_u64; +use crate::vs::to_u128_be; use derive::Store; use revision::revisioned; use serde::{Deserialize, Serialize}; @@ -127,7 +127,7 @@ impl DatabaseMutation { impl ChangeSet { pub fn into_value(self) -> Value { let mut m = BTreeMap::::new(); - let vs = versionstamp_to_u64(&self.0); + let vs = to_u128_be(self.0); m.insert("versionstamp".to_string(), Value::from(vs)); m.insert("changes".to_string(), self.1.into_value()); let so: Object = m.into(); @@ -221,7 +221,7 @@ mod tests { let s = serde_json::to_string(&v).unwrap(); assert_eq!( s, - r#"{"changes":[{"update":{"id":"mytb:tobie","note":"surreal"}},{"delete":{"id":"mytb:tobie"}},{"define_table":{"name":"mytb"}}],"versionstamp":1}"# + r#"{"changes":[{"update":{"id":"mytb:tobie","note":"surreal"}},{"delete":{"id":"mytb:tobie"}},{"define_table":{"name":"mytb"}}],"versionstamp":65536}"# ); } @@ -276,7 +276,7 @@ mod tests { let s = serde_json::to_string(&v).unwrap(); assert_eq!( s, - r#"{"changes":[{"current":{"id":"mytb:tobie","note":"surreal"},"update":[{"op":"add","path":"/`/note`","value":"surreal"}]},{"current":{"id":"mytb:tobie2","note":"surreal"},"update":[{"op":"remove","path":"/`/temp`"}]},{"delete":{"id":"mytb:tobie"}},{"define_table":{"name":"mytb"}}],"versionstamp":1}"# + r#"{"changes":[{"current":{"id":"mytb:tobie","note":"surreal"},"update":[{"op":"add","path":"/`/note`","value":"surreal"}]},{"current":{"id":"mytb:tobie2","note":"surreal"},"update":[{"op":"remove","path":"/`/temp`"}]},{"delete":{"id":"mytb:tobie"}},{"define_table":{"name":"mytb"}}],"versionstamp":65536}"# ); } } diff --git a/core/src/kvs/mem/mod.rs b/core/src/kvs/mem/mod.rs index ae7483d8..2d80e5b6 100644 --- a/core/src/kvs/mem/mod.rs +++ b/core/src/kvs/mem/mod.rs @@ -6,7 +6,7 @@ use crate::key::debug::sprint_key; use crate::kvs::Check; use crate::kvs::Key; use crate::kvs::Val; -use crate::vs::{u64_to_versionstamp, versionstamp_to_u64, Versionstamp}; +use crate::vs::{try_to_u64_be, u64_to_versionstamp, Versionstamp}; use std::ops::Range; pub struct Datastore { @@ -182,7 +182,7 @@ impl Transaction { Err(e) => Err(Error::Ds(e.to_string())), }; let array = res?; - let prev = versionstamp_to_u64(&array); + let prev = try_to_u64_be(array)?; prev + 1 } None => 1, diff --git a/core/src/vs/conv.rs b/core/src/vs/conv.rs index 43b7e2ef..a581e40c 100644 --- a/core/src/vs/conv.rs +++ b/core/src/vs/conv.rs @@ -76,6 +76,8 @@ pub fn try_u128_to_versionstamp(v: u128) -> Result<[u8; 10], Error> { } /// Take the most significant, time-based bytes and ignores the last 2 bytes +/// +/// You probably want `to_u128_be` instead #[doc(hidden)] pub fn versionstamp_to_u64(vs: &Versionstamp) -> u64 { u64::from_be_bytes(vs[..8].try_into().unwrap()) diff --git a/core/src/vs/mod.rs b/core/src/vs/mod.rs index ddfc94c2..390d13aa 100644 --- a/core/src/vs/mod.rs +++ b/core/src/vs/mod.rs @@ -6,6 +6,14 @@ /// Versionstamp is a 10-byte array used to identify a specific version of a key. /// The first 8 bytes are significant (the u64), and the remaining 2 bytes are not significant, but used for extra precision. /// To convert to and from this module, see the conv module in this same directory. +/// +/// You're going to want these +/// 65536 +/// 131072 +/// 196608 +/// 262144 +/// 327680 +/// 393216 pub type Versionstamp = [u8; 10]; pub(crate) mod conv; diff --git a/lib/tests/changefeeds.rs b/lib/tests/changefeeds.rs index f92d1439..e087e2b0 100644 --- a/lib/tests/changefeeds.rs +++ b/lib/tests/changefeeds.rs @@ -64,11 +64,13 @@ async fn database_change_feeds() -> Result<(), Error> { let tmp = res.remove(0).result; assert!(tmp.is_ok()); - let cf_val_arr = match FFLAGS.change_feed_live_queries.enabled() { - true => Value::parse( - "[ + let potential_show_changes_values: Vec = match FFLAGS.change_feed_live_queries.enabled() + { + true => vec![ + Value::parse( + "[ { - versionstamp: 2, + versionstamp: 65536, changes: [ { create: { @@ -79,7 +81,7 @@ async fn database_change_feeds() -> Result<(), Error> { ] }, { - versionstamp: 3, + versionstamp: 131072, changes: [ { delete: { @@ -89,11 +91,88 @@ async fn database_change_feeds() -> Result<(), Error> { ] } ]", - ), - false => Value::parse( - "[ + ), + Value::parse( + "[ { - versionstamp: 2, + versionstamp: 65536, + changes: [ + { + create: { + id: person:test, + name: 'Name: Tobie' + } + } + ] + }, + { + versionstamp: 196608, + changes: [ + { + delete: { + id: person:test + } + } + ] + } + ]", + ), + Value::parse( + "[ + { + versionstamp: 131072, + changes: [ + { + create: { + id: person:test, + name: 'Name: Tobie' + } + } + ] + }, + { + versionstamp: 196608, + changes: [ + { + delete: { + id: person:test + } + } + ] + } + ]", + ), + Value::parse( + "[ + { + versionstamp: 131072, + changes: [ + { + create: { + id: person:test, + name: 'Name: Tobie' + } + } + ] + }, + { + versionstamp: 262144, + changes: [ + { + delete: { + id: person:test + } + } + ] + } + ]", + ), + ], + false => vec![ + Value::parse( + "[ + { + versionstamp: 65536, changes: [ { update: { @@ -104,7 +183,7 @@ async fn database_change_feeds() -> Result<(), Error> { ] }, { - versionstamp: 3, + versionstamp: 131072, changes: [ { delete: { @@ -114,7 +193,83 @@ async fn database_change_feeds() -> Result<(), Error> { ] } ]", - ), + ), + Value::parse( + "[ + { + versionstamp: 65536, + changes: [ + { + update: { + id: person:test, + name: 'Name: Tobie' + } + } + ] + }, + { + versionstamp: 196608, + changes: [ + { + delete: { + id: person:test + } + } + ] + } + ]", + ), + Value::parse( + "[ + { + versionstamp: 131072, + changes: [ + { + update: { + id: person:test, + name: 'Name: Tobie' + } + } + ] + }, + { + versionstamp: 196608, + changes: [ + { + delete: { + id: person:test + } + } + ] + } + ]", + ), + Value::parse( + "[ + { + versionstamp: 131072, + changes: [ + { + update: { + id: person:test, + name: 'Name: Tobie' + } + } + ] + }, + { + versionstamp: 262144, + changes: [ + { + delete: { + id: person:test + } + } + ] + } + ]", + ), + ], }; // Declare check that is repeatable @@ -122,7 +277,7 @@ async fn database_change_feeds() -> Result<(), Error> { dbs: &Datastore, sql2: &str, ses: &Session, - cf_val_arr: &Value, + cf_val_arr: &Vec, ) -> Result<(), String> { let res = &mut dbs.execute(sql2, ses, None).await?; assert_eq!(res.len(), 3); @@ -149,17 +304,19 @@ async fn database_change_feeds() -> Result<(), Error> { .ok_or(format!("Expected DELETE value:\nleft: {}\nright: {}", tmp, val))?; // SHOW CHANGES let tmp = res.remove(0).result?; - Some(&tmp) - .filter(|x| *x == cf_val_arr) + cf_val_arr + .iter() + .find(|x| *x == &tmp) + // We actually dont want to capture if its found .map(|_v| ()) - .ok_or(format!("Expected SHOW CHANGES value:\nleft: {}\nright: {}", tmp, cf_val_arr))?; + .ok_or(format!("Expected SHOW CHANGES value not found:\n{}", tmp))?; Ok(()) } // Check the validation with repeats let limit = 1; for i in 0..limit { - let test_result = check_test(&dbs, sql2, &ses, &cf_val_arr).await; + let test_result = check_test(&dbs, sql2, &ses, &potential_show_changes_values).await; match test_result { Ok(_) => break, Err(e) => { @@ -185,7 +342,7 @@ async fn database_change_feeds() -> Result<(), Error> { let res = &mut dbs.execute(sql, &ses, None).await?; let tmp = res.remove(0).result?; - assert_eq!(tmp, cf_val_arr); + assert!(potential_show_changes_values.contains(&tmp)); // GC after 1hs let one_hour_in_secs = 3600; current_time += one_hour_in_secs; @@ -297,7 +454,7 @@ async fn table_change_feeds() -> Result<(), Error> { true => Value::parse( "[ { - versionstamp: 1, + versionstamp: 65536, changes: [ { define_table: { @@ -307,7 +464,7 @@ async fn table_change_feeds() -> Result<(), Error> { ] }, { - versionstamp: 2, + versionstamp: 131072, changes: [ { create: { @@ -318,7 +475,7 @@ async fn table_change_feeds() -> Result<(), Error> { ] }, { - versionstamp: 3, + versionstamp: 196608, changes: [ { update: { @@ -329,7 +486,7 @@ async fn table_change_feeds() -> Result<(), Error> { ] }, { - versionstamp: 4, + versionstamp: 262144, changes: [ { update: { @@ -340,7 +497,7 @@ async fn table_change_feeds() -> Result<(), Error> { ] }, { - versionstamp: 5, + versionstamp: 327680, changes: [ { delete: { @@ -350,7 +507,7 @@ async fn table_change_feeds() -> Result<(), Error> { ] }, { - versionstamp: 6, + versionstamp: 393216, changes: [ { create: { @@ -365,7 +522,7 @@ async fn table_change_feeds() -> Result<(), Error> { false => Value::parse( "[ { - versionstamp: 1, + versionstamp: 65536, changes: [ { define_table: { @@ -375,7 +532,7 @@ async fn table_change_feeds() -> Result<(), Error> { ] }, { - versionstamp: 2, + versionstamp: 131072, changes: [ { update: { @@ -386,7 +543,7 @@ async fn table_change_feeds() -> Result<(), Error> { ] }, { - versionstamp: 3, + versionstamp: 196608, changes: [ { update: { @@ -397,7 +554,7 @@ async fn table_change_feeds() -> Result<(), Error> { ] }, { - versionstamp: 4, + versionstamp: 262144, changes: [ { update: { @@ -408,7 +565,7 @@ async fn table_change_feeds() -> Result<(), Error> { ] }, { - versionstamp: 5, + versionstamp: 327680, changes: [ { delete: { @@ -418,7 +575,7 @@ async fn table_change_feeds() -> Result<(), Error> { ] }, { - versionstamp: 6, + versionstamp: 393216, changes: [ { update: {