Add Versionstamp generator (#3716)

Co-authored-by: Rushmore Mushambi <rushmore@webenchanter.com>
This commit is contained in:
Przemyslaw Hugh Kaznowski 2024-03-21 11:37:07 +00:00 committed by GitHub
parent ce8e2d4578
commit 2b4fb84511
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 245 additions and 345 deletions

View file

@ -44,6 +44,12 @@ pub mod rpc;
#[doc(hidden)] #[doc(hidden)]
pub mod syn; pub mod syn;
#[doc(hidden)]
pub mod test_helpers {
pub use crate::vs::conv::to_u128_be;
pub use crate::vs::generate_versionstamp_sequences;
}
#[doc(hidden)] #[doc(hidden)]
/// Channels for receiving a SurrealQL database export /// Channels for receiving a SurrealQL database export
pub mod channel { pub mod channel {

View file

@ -21,3 +21,91 @@ pub(crate) mod oracle;
pub use self::conv::*; pub use self::conv::*;
pub use self::oracle::*; pub use self::oracle::*;
/// Generate S-tuples of valid, sequenced versionstamps within range.
/// The limit is used, because these are combinatorics - without an upper bound, combinations aren't possible.
#[doc(hidden)]
pub fn generate_versionstamp_sequences(start: Versionstamp) -> VersionstampSequence {
VersionstampSequence {
next_state: Some(start),
}
}
#[doc(hidden)]
pub struct VersionstampSequence {
next_state: Option<Versionstamp>,
}
#[doc(hidden)]
impl Iterator for VersionstampSequence {
type Item = Versionstamp;
fn next(&mut self) -> Option<Self::Item> {
self.next_state?;
let returned_state = self.next_state.unwrap();
// Now calculate next
let mut next_state = self.next_state.unwrap();
let index_to_increase =
next_state.iter().enumerate().rev().skip(2).find(|(_, &x)| x < 255u8).take();
if index_to_increase.is_none() {
self.next_state = None;
return Some(returned_state);
}
let (index_to_increase, _) = index_to_increase.unwrap();
next_state[index_to_increase] += 1;
for next_state_byte in
next_state.iter_mut().take(returned_state.len() - 2).skip(index_to_increase + 1)
{
*next_state_byte = 0;
}
self.next_state = Some(next_state);
Some(returned_state)
}
}
#[cfg(test)]
mod test {
use crate::vs::{to_u128_be, Versionstamp};
#[test]
pub fn generate_one_vs() {
let vs = super::generate_versionstamp_sequences([0; 10]).take(1).collect::<Vec<_>>();
assert_eq!(vs.len(), 1, "Should be 1, but was {:?}", vs);
assert_eq!(vs[0], [0; 10]);
}
#[test]
pub fn generate_two_vs_in_sequence() {
let vs =
super::generate_versionstamp_sequences([0, 0, 0, 0, 0, 0, 0, 1, 0, 0]).flat_map(|vs| {
let skip_because_first_is_equal = 1;
super::generate_versionstamp_sequences(vs)
.skip(skip_because_first_is_equal)
.map(move |vs2| (vs, vs2))
});
let versionstamps = vs.take(4).collect::<Vec<(Versionstamp, Versionstamp)>>();
assert_eq!(
versionstamps.len(),
4,
"We expect the combinations to be 2x2 matrix, but was {:?}",
versionstamps
);
let acceptable_values = [65536u128, 131072, 196608, 262144, 327680, 393216];
for (first, second) in versionstamps {
assert!(first < second, "First: {:?}, Second: {:?}", first, second);
let first = to_u128_be(first);
let second = to_u128_be(second);
assert!(acceptable_values.contains(&first));
assert!(acceptable_values.contains(&second));
}
}
#[test]
pub fn iteration_stops_past_end() {
let mut iter = super::generate_versionstamp_sequences([255; 10]);
assert!(iter.next().is_some());
assert!(iter.next().is_none());
}
}

View file

@ -11,6 +11,7 @@ use surrealdb::kvs::Datastore;
use surrealdb::kvs::LockType::Optimistic; use surrealdb::kvs::LockType::Optimistic;
use surrealdb::kvs::TransactionType::Write; use surrealdb::kvs::TransactionType::Write;
use surrealdb::sql::Value; use surrealdb::sql::Value;
use surrealdb_core2::test_helpers::{generate_versionstamp_sequences, to_u128_be};
mod helpers; mod helpers;
@ -64,212 +65,47 @@ async fn database_change_feeds() -> Result<(), Error> {
let tmp = res.remove(0).result; let tmp = res.remove(0).result;
assert!(tmp.is_ok()); assert!(tmp.is_ok());
// Two timestamps
let variance = 4;
let first_timestamp = generate_versionstamp_sequences([0; 10]).take(variance);
let second_timestamp = first_timestamp.flat_map(|vs1| {
generate_versionstamp_sequences(vs1).skip(1).take(variance).map(move |vs2| (vs1, vs2))
});
let potential_show_changes_values: Vec<Value> = match FFLAGS.change_feed_live_queries.enabled() let potential_show_changes_values: Vec<Value> = match FFLAGS.change_feed_live_queries.enabled()
{ {
true => vec![ true => second_timestamp
Value::parse( .map(|(vs1, vs2)| {
"[ let vs1 = to_u128_be(vs1);
{ let vs2 = to_u128_be(vs2);
versionstamp: 65536, Value::parse(
changes: [ format!(
{ r#"[
create: { {{ versionstamp: {}, changes: [ {{ create: {{ id: person:test, name: 'Name: Tobie' }} }} ] }},
id: person:test, {{ versionstamp: {}, changes: [ {{ delete: {{ id: person:test }} }} ] }}
name: 'Name: Tobie' ]"#,
} vs1, vs2
} )
] .as_str(),
}, )
{ })
versionstamp: 131072, .collect(),
changes: [ false => second_timestamp
{ .map(|(vs1, vs2)| {
delete: { let vs1 = to_u128_be(vs1);
id: person:test let vs2 = to_u128_be(vs2);
} Value::parse(
} format!(
] r#"[
} {{ versionstamp: {}, changes: [ {{ update: {{ id: person:test, name: 'Name: Tobie' }} }} ] }},
]", {{ versionstamp: {}, changes: [ {{ delete: {{ id: person:test }} }} ] }}
), ]"#,
Value::parse( vs1, vs2
"[ )
{ .as_str(),
versionstamp: 65536, )
changes: [ })
{ .collect(),
create: {
id: person:test,
name: 'Name: Tobie'
}
}
]
},
{
versionstamp: 196608,
changes: [
{
delete: {
id: person:test
}
}
]
}
]",
),
Value::parse(
"[
{
versionstamp: 131072,
changes: [
{
create: {
id: person:test,
name: 'Name: Tobie'
}
}
]
},
{
versionstamp: 196608,
changes: [
{
delete: {
id: person:test
}
}
]
}
]",
),
Value::parse(
"[
{
versionstamp: 131072,
changes: [
{
create: {
id: person:test,
name: 'Name: Tobie'
}
}
]
},
{
versionstamp: 262144,
changes: [
{
delete: {
id: person:test
}
}
]
}
]",
),
],
false => vec![
Value::parse(
"[
{
versionstamp: 65536,
changes: [
{
update: {
id: person:test,
name: 'Name: Tobie'
}
}
]
},
{
versionstamp: 131072,
changes: [
{
delete: {
id: person:test
}
}
]
}
]",
),
Value::parse(
"[
{
versionstamp: 65536,
changes: [
{
update: {
id: person:test,
name: 'Name: Tobie'
}
}
]
},
{
versionstamp: 196608,
changes: [
{
delete: {
id: person:test
}
}
]
}
]",
),
Value::parse(
"[
{
versionstamp: 131072,
changes: [
{
update: {
id: person:test,
name: 'Name: Tobie'
}
}
]
},
{
versionstamp: 196608,
changes: [
{
delete: {
id: person:test
}
}
]
}
]",
),
Value::parse(
"[
{
versionstamp: 131072,
changes: [
{
update: {
id: person:test,
name: 'Name: Tobie'
}
}
]
},
{
versionstamp: 262144,
changes: [
{
delete: {
id: person:test
}
}
]
}
]",
),
],
}; };
// Declare check that is repeatable // Declare check that is repeatable
@ -309,7 +145,15 @@ async fn database_change_feeds() -> Result<(), Error> {
.find(|x| *x == &tmp) .find(|x| *x == &tmp)
// We actually dont want to capture if its found // We actually dont want to capture if its found
.map(|_v| ()) .map(|_v| ())
.ok_or(format!("Expected SHOW CHANGES value not found:\n{}", tmp))?; .ok_or(format!(
"Expected SHOW CHANGES value not found:\n{}\nin:\n{}",
tmp,
cf_val_arr
.iter()
.map(|vs| vs.to_string())
.reduce(|left, right| format!("{}\n{}", left, right))
.unwrap()
))?;
Ok(()) Ok(())
} }
@ -351,7 +195,7 @@ async fn database_change_feeds() -> Result<(), Error> {
let res = &mut dbs.execute(sql, &ses, None).await?; let res = &mut dbs.execute(sql, &ses, None).await?;
let tmp = res.remove(0).result?; let tmp = res.remove(0).result?;
let val = Value::parse("[]"); let val = Value::parse("[]");
assert_eq!(tmp, val); assert_eq!(val, tmp);
// //
Ok(()) Ok(())
} }
@ -450,145 +294,98 @@ async fn table_change_feeds() -> Result<(), Error> {
let _tmp = res.remove(0).result?; let _tmp = res.remove(0).result?;
// SHOW CHANGES // SHOW CHANGES
let tmp = res.remove(0).result?; let tmp = res.remove(0).result?;
let val = match FFLAGS.change_feed_live_queries.enabled() { // If you want to write a macro, you are welcome to
true => Value::parse( let limit_variance = 3;
"[ let first = generate_versionstamp_sequences([0; 10]).take(limit_variance);
{ let second = first.flat_map(|vs1| {
versionstamp: 65536, generate_versionstamp_sequences(vs1).take(limit_variance).skip(1).map(move |vs2| (vs1, vs2))
changes: [ });
{ let third = second.flat_map(|(vs1, vs2)| {
define_table: { generate_versionstamp_sequences(vs2)
name: 'person' .take(limit_variance)
} .skip(1)
} .map(move |vs3| (vs1, vs2, vs3))
] });
}, let fourth = third.flat_map(|(vs1, vs2, vs3)| {
{ generate_versionstamp_sequences(vs3)
versionstamp: 131072, .take(limit_variance)
changes: [ .skip(1)
{ .map(move |vs4| (vs1, vs2, vs3, vs4))
create: { });
id: person:test, let fifth = fourth.flat_map(|(vs1, vs2, vs3, vs4)| {
name: 'Name: Tobie' generate_versionstamp_sequences(vs4)
} .take(limit_variance)
} .skip(1)
] .map(move |vs5| (vs1, vs2, vs3, vs4, vs5))
}, });
{ let sixth = fifth.flat_map(|(vs1, vs2, vs3, vs4, vs5)| {
versionstamp: 196608, generate_versionstamp_sequences(vs5)
changes: [ .take(limit_variance)
{ .skip(1)
update: { .map(move |vs6| (vs1, vs2, vs3, vs4, vs5, vs6))
id: person:test, });
name: 'Name: Jaime' let allowed_values: Vec<Value> = match FFLAGS.change_feed_live_queries.enabled() {
} true => sixth
} .map(|(vs1, vs2, vs3, vs4, vs5, vs6)| {
] let (vs1, vs2, vs3, vs4, vs5, vs6) = (
}, to_u128_be(vs1),
{ to_u128_be(vs2),
versionstamp: 262144, to_u128_be(vs3),
changes: [ to_u128_be(vs4),
{ to_u128_be(vs5),
update: { to_u128_be(vs6),
id: person:test, );
name: 'Name: Tobie' Value::parse(
} format!(
} r#"[
] {{ versionstamp: {vs1}, changes: [ {{ define_table: {{ name: 'person' }} }} ] }},
}, {{ versionstamp: {vs2}, changes: [ {{ create: {{ id: person:test, name: 'Name: Tobie' }} }} ] }},
{ {{ versionstamp: {vs3}, changes: [ {{ update: {{ id: person:test, name: 'Name: Jaime' }} }} ] }},
versionstamp: 327680, {{ versionstamp: {vs4}, changes: [ {{ update: {{ id: person:test, name: 'Name: Tobie' }} }} ] }},
changes: [ {{ versionstamp: {vs5}, changes: [ {{ delete: {{ id: person:test }} }} ] }},
{ {{ versionstamp: {vs6}, changes: [ {{ create: {{ id: person:1000, name: 'Name: Yusuke' }} }} ] }}
delete: { ]"#,
id: person:test )
} .as_str(),
} )
] })
}, .collect(),
{ false => sixth
versionstamp: 393216, .map(|(vs1, vs2, vs3, vs4, vs5, vs6)| {
changes: [ let (vs1, vs2, vs3, vs4, vs5, vs6) = (
{ to_u128_be(vs1),
create: { to_u128_be(vs2),
id: person:1000, to_u128_be(vs3),
name: 'Name: Yusuke' to_u128_be(vs4),
} to_u128_be(vs5),
} to_u128_be(vs6),
] );
} Value::parse(
]", format!(
), r#"[
false => Value::parse( {{ versionstamp: {vs1}, changes: [ {{ define_table: {{ name: 'person' }} }} ] }},
"[ {{ versionstamp: {vs2}, changes: [ {{ update: {{ id: person:test, name: 'Name: Tobie' }} }} ] }},
{ {{ versionstamp: {vs3}, changes: [ {{ update: {{ id: person:test, name: 'Name: Jaime' }} }} ] }},
versionstamp: 65536, {{ versionstamp: {vs4}, changes: [ {{ update: {{ id: person:test, name: 'Name: Tobie' }} }} ] }},
changes: [ {{ versionstamp: {vs5}, changes: [ {{ delete: {{ id: person:test }} }} ] }},
{ {{ versionstamp: {vs6}, changes: [ {{ update: {{ id: person:1000, name: 'Name: Yusuke' }} }} ] }}
define_table: { ]"#
name: 'person' )
} .as_str(),
} )
] })
}, .collect(),
{
versionstamp: 131072,
changes: [
{
update: {
id: person:test,
name: 'Name: Tobie'
}
}
]
},
{
versionstamp: 196608,
changes: [
{
update: {
id: person:test,
name: 'Name: Jaime'
}
}
]
},
{
versionstamp: 262144,
changes: [
{
update: {
id: person:test,
name: 'Name: Tobie'
}
}
]
},
{
versionstamp: 327680,
changes: [
{
delete: {
id: person:test
}
}
]
},
{
versionstamp: 393216,
changes: [
{
update: {
id: person:1000,
name: 'Name: Yusuke'
}
}
]
}
]",
),
}; };
assert_eq!(tmp, val); assert!(
allowed_values.contains(&tmp),
"tmp:\n{}\nchecked:\n{}",
tmp,
allowed_values
.iter()
.map(|v| v.to_string())
.reduce(|a, b| format!("{}\n{}", a, b))
.unwrap()
);
// Retain for 1h // Retain for 1h
let sql = " let sql = "
SHOW CHANGES FOR TABLE person SINCE 0; SHOW CHANGES FOR TABLE person SINCE 0;
@ -596,7 +393,16 @@ async fn table_change_feeds() -> Result<(), Error> {
dbs.tick_at(end_ts + 3599).await?; dbs.tick_at(end_ts + 3599).await?;
let res = &mut dbs.execute(sql, &ses, None).await?; let res = &mut dbs.execute(sql, &ses, None).await?;
let tmp = res.remove(0).result?; let tmp = res.remove(0).result?;
assert_eq!(tmp, val); assert!(
allowed_values.contains(&tmp),
"tmp:\n{}\nchecked:\n{}",
tmp,
allowed_values
.iter()
.map(|v| v.to_string())
.reduce(|a, b| format!("{}\n{}", a, b))
.unwrap()
);
// GC after 1hs // GC after 1hs
dbs.tick_at(end_ts + 3600).await?; dbs.tick_at(end_ts + 3600).await?;
let res = &mut dbs.execute(sql, &ses, None).await?; let res = &mut dbs.execute(sql, &ses, None).await?;