From e30f70b9076318c9fe3299ea3d4a1d3ca505de44 Mon Sep 17 00:00:00 2001 From: Yusuke Kuoka Date: Wed, 28 Jun 2023 08:42:29 +0900 Subject: [PATCH] feat: Key encoding/decoding for change feeds (#2188) --- lib/src/key/cf.rs | 132 +++++++++++++++++++++++++++++++++++++ lib/src/key/dv.rs | 51 +++++++++++++++ lib/src/key/mod.rs | 3 + lib/src/lib.rs | 1 + lib/src/vs/conv.rs | 157 +++++++++++++++++++++++++++++++++++++++++++++ lib/src/vs/mod.rs | 10 +++ 6 files changed, 354 insertions(+) create mode 100644 lib/src/key/cf.rs create mode 100644 lib/src/key/dv.rs create mode 100644 lib/src/vs/conv.rs create mode 100644 lib/src/vs/mod.rs diff --git a/lib/src/key/cf.rs b/lib/src/key/cf.rs new file mode 100644 index 00000000..8697bdb2 --- /dev/null +++ b/lib/src/key/cf.rs @@ -0,0 +1,132 @@ +use derive::Key; +use serde::{Deserialize, Serialize}; + +use crate::vs; + +use std::str; + +// Cf stands for change feeds +#[derive(Clone, Debug, Eq, PartialEq, PartialOrd, Serialize, Deserialize, Key)] +pub struct Cf<'a> { + __: u8, + _a: u8, + pub ns: &'a str, + _b: u8, + pub db: &'a str, + _d: u8, + _e: u8, + _f: u8, + // vs is the versionstamp of the change feed entry that is encoded in big-endian. + // Use the to_u64_be function to convert it to a u128. + pub vs: [u8; 10], + _c: u8, + pub tb: &'a str, +} + +#[allow(unused)] +pub fn new<'a>(ns: &'a str, db: &'a str, ts: u64, tb: &'a str) -> Cf<'a> { + Cf::new(ns, db, vs::u64_to_versionstamp(ts), tb) +} + +#[allow(unused)] +pub fn versionstamped_key_prefix(ns: &str, db: &str) -> Vec { + let mut k = super::database::new(ns, db).encode().unwrap(); + k.extend_from_slice(&[b'!', b'c', b'f']); + k +} + +#[allow(unused)] +pub fn versionstamped_key_suffix(tb: &str) -> Vec { + let mut k: Vec = vec![]; + k.extend_from_slice(&[b'*']); + k.extend_from_slice(tb.as_bytes()); + // Without this, decoding fails with UnexpectedEOF errors + k.extend_from_slice(&[0x00]); + k +} + +/// Returns the prefix for the whole database change feeds since the +/// specified versionstamp. +#[allow(unused)] +pub fn ts_prefix(ns: &str, db: &str, vs: vs::Versionstamp) -> Vec { + let mut k = super::database::new(ns, db).encode().unwrap(); + k.extend_from_slice(&[b'!', b'c', b'f']); + k.extend_from_slice(&vs); + k +} + +/// Returns the prefix for the whole database change feeds +#[allow(unused)] +pub fn prefix(ns: &str, db: &str) -> Vec { + let mut k = super::database::new(ns, db).encode().unwrap(); + k.extend_from_slice(&[b'!', b'c', b'f', 0x00]); + k +} + +/// Returns the suffix for the whole database change feeds +#[allow(unused)] +pub fn suffix(ns: &str, db: &str) -> Vec { + let mut k = super::database::new(ns, db).encode().unwrap(); + k.extend_from_slice(&[b'!', b'c', b'f', 0xff]); + k +} + +impl<'a> Cf<'a> { + pub fn new(ns: &'a str, db: &'a str, vs: [u8; 10], tb: &'a str) -> Self { + Cf { + __: b'/', + _a: b'*', + ns, + _b: b'*', + db, + _d: b'!', + _e: b'c', + _f: b'f', + vs, + _c: b'*', + tb, + } + } +} + +#[cfg(test)] +mod tests { + use crate::vs::*; + use std::ascii::escape_default; + + #[test] + fn key() { + use super::*; + #[rustfmt::skip] + let val = Cf::new( + "test", + "test", + try_u128_to_versionstamp(12345).unwrap(), + "test", + ); + let enc = Cf::encode(&val).unwrap(); + println!("enc={}", show(&enc)); + let dec = Cf::decode(&enc).unwrap(); + assert_eq!(val, dec); + } + + #[test] + fn versionstamp_conversions() { + let a = u64_to_versionstamp(12345); + let b = try_to_u64_be(a).unwrap(); + assert_eq!(12345, b); + + let a = try_u128_to_versionstamp(12345).unwrap(); + let b = to_u128_be(a); + assert_eq!(12345, b); + } + + fn show(bs: &[u8]) -> String { + let mut visible = String::new(); + for &b in bs { + let part: Vec = escape_default(b).collect(); + visible.push_str(std::str::from_utf8(&part).unwrap()); + } + visible + } +} diff --git a/lib/src/key/dv.rs b/lib/src/key/dv.rs new file mode 100644 index 00000000..b52db232 --- /dev/null +++ b/lib/src/key/dv.rs @@ -0,0 +1,51 @@ +use derive::Key; +use serde::{Deserialize, Serialize}; + +// Dv stands for Database Versionstamp +#[derive(Clone, Debug, Eq, PartialEq, PartialOrd, Serialize, Deserialize, Key)] +pub struct Dv<'a> { + __: u8, + _a: u8, + pub ns: &'a str, + _b: u8, + pub db: &'a str, + _d: u8, + _e: u8, + _f: u8, +} + +#[allow(unused)] +pub fn new<'a>(ns: &'a str, db: &'a str) -> Dv<'a> { + Dv::new(ns, db) +} + +impl<'a> Dv<'a> { + pub fn new(ns: &'a str, db: &'a str) -> Self { + Dv { + __: b'/', + _a: b'*', + ns, + _b: b'*', + db, + _d: b'!', + _e: b't', + _f: b't', + } + } +} + +#[cfg(test)] +mod tests { + #[test] + fn key() { + use super::*; + #[rustfmt::skip] + let val = Dv::new( + "test", + "test", + ); + let enc = Dv::encode(&val).unwrap(); + let dec = Dv::decode(&enc).unwrap(); + assert_eq!(val, dec); + } +} diff --git a/lib/src/key/mod.rs b/lib/src/key/mod.rs index 9650f336..fc508778 100644 --- a/lib/src/key/mod.rs +++ b/lib/src/key/mod.rs @@ -16,6 +16,7 @@ /// /// Database /*{ns}*{db} /// AZ /*{ns}*{db}!az{az} +/// CF /*{ns}*{db}!cf{ts} /// DL /*{ns}*{db}!dl{us} /// DT /*{ns}*{db}!dt{tk} /// PA /*{ns}*{db}!pa{pa} @@ -62,11 +63,13 @@ pub mod bp; // Stores BTree nodes for postings pub mod bs; // Stores FullText index states pub mod bt; // Stores BTree nodes for terms pub mod bu; // Stores terms for term_ids +pub mod cf; // Stores change feeds pub mod cl; // Stores cluster membership information pub mod database; // Stores the key prefix for all keys under a database pub mod db; // Stores a DEFINE DATABASE config definition pub mod dl; // Stores a DEFINE LOGIN ON DATABASE config definition pub mod dt; // Stores a DEFINE LOGIN ON DATABASE config definition +pub mod dv; // Stores database versionstamps pub mod ev; // Stores a DEFINE EVENT config definition pub mod fc; // Stores a DEFINE FUNCTION config definition pub mod fd; // Stores a DEFINE FIELD config definition diff --git a/lib/src/lib.rs b/lib/src/lib.rs index fdf06186..947c5db8 100644 --- a/lib/src/lib.rs +++ b/lib/src/lib.rs @@ -112,6 +112,7 @@ mod doc; mod exe; mod fnc; mod key; +mod vs; pub mod sql; diff --git a/lib/src/vs/conv.rs b/lib/src/vs/conv.rs new file mode 100644 index 00000000..1207b9d2 --- /dev/null +++ b/lib/src/vs/conv.rs @@ -0,0 +1,157 @@ +use std::fmt; + +// u64_to_versionstamp converts a u64 to a 10-byte versionstamp +// assuming big-endian and the the last two bytes are zero. +pub fn u64_to_versionstamp(v: u64) -> [u8; 10] { + let mut buf = [0; 10]; + buf[0] = (v >> 56) as u8; + buf[1] = (v >> 48) as u8; + buf[2] = (v >> 40) as u8; + buf[3] = (v >> 32) as u8; + buf[4] = (v >> 24) as u8; + buf[5] = (v >> 16) as u8; + buf[6] = (v >> 8) as u8; + buf[7] = v as u8; + buf +} + +#[allow(unused)] +// u64_u16_to_versionstamp converts a u64 and a u16 to a 10-byte versionstamp +// assuming big-endian. +pub fn u64_u16_to_versionstamp(v: u64, v2: u16) -> [u8; 10] { + let mut buf = [0; 10]; + buf[0] = (v >> 56) as u8; + buf[1] = (v >> 48) as u8; + buf[2] = (v >> 40) as u8; + buf[3] = (v >> 32) as u8; + buf[4] = (v >> 24) as u8; + buf[5] = (v >> 16) as u8; + buf[6] = (v >> 8) as u8; + buf[7] = v as u8; + buf[8] = (v2 >> 8) as u8; + buf[9] = v2 as u8; + buf +} + +#[allow(unused)] +// u64_u16_to_versionstamp converts a u64 and a u16 to a 10-byte versionstamp +// assuming big-endian. +pub fn u16_u64_to_versionstamp(v: u16, v2: u64) -> [u8; 10] { + let mut buf = [0; 10]; + buf[0] = (v >> 8) as u8; + buf[1] = v as u8; + buf[2] = (v2 >> 56) as u8; + buf[3] = (v2 >> 48) as u8; + buf[4] = (v2 >> 40) as u8; + buf[5] = (v2 >> 32) as u8; + buf[6] = (v2 >> 24) as u8; + buf[7] = (v2 >> 16) as u8; + buf[8] = (v2 >> 8) as u8; + buf[9] = v2 as u8; + buf +} + +// u128_to_versionstamp converts a u128 to a 10-byte versionstamp +// assuming big-endian. +#[allow(unused)] +pub fn try_u128_to_versionstamp(v: u128) -> Result<[u8; 10], Error> { + if v >> 80 > 0 { + return Err(Error::InvalidVersionstamp); + } + + let mut buf = [0; 10]; + buf[0] = (v >> 72) as u8; + buf[1] = (v >> 64) as u8; + buf[2] = (v >> 56) as u8; + buf[3] = (v >> 48) as u8; + buf[4] = (v >> 40) as u8; + buf[5] = (v >> 32) as u8; + buf[6] = (v >> 24) as u8; + buf[7] = (v >> 16) as u8; + buf[8] = (v >> 8) as u8; + buf[9] = v as u8; + Ok(buf) +} + +// to_u128_be converts a 10-byte versionstamp to a u128 assuming big-endian. +// This is handy for human comparing versionstamps. +#[allow(unused)] +pub fn to_u128_be(vs: [u8; 10]) -> u128 { + let mut buf = [0; 16]; + let mut i = 0; + while i < 10 { + buf[i + 6] = vs[i]; + i += 1; + } + u128::from_be_bytes(buf) +} + +pub enum Error { + // InvalidVersionstamp is returned when a versionstamp has an unexpected length. + InvalidVersionstamp, +} + +impl fmt::Debug for Error { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match self { + Error::InvalidVersionstamp => write!(f, "invalid versionstamp"), + } + } +} + +// to_u64_be converts a 10-byte versionstamp to a u64 assuming big-endian. +// Only the first 8 bytes are used. +#[allow(unused)] +pub fn try_to_u64_be(vs: [u8; 10]) -> Result { + let mut buf = [0; 8]; + let mut i = 0; + while i < 8 { + buf[i] = vs[i]; + i += 1; + } + if vs[8] != 0 || vs[9] != 0 { + return Err(Error::InvalidVersionstamp); + } + Ok(u64::from_be_bytes(buf)) +} + +// to_u128_le converts a 10-byte versionstamp to a u128 assuming little-endian. +// This is handy for producing human-readable versions of versionstamps. +#[allow(unused)] +pub fn to_u128_le(vs: [u8; 10]) -> u128 { + let mut buf = [0; 16]; + let mut i = 0; + while i < 10 { + buf[i] = vs[i]; + i += 1; + } + u128::from_be_bytes(buf) +} + +mod tests { + #[test] + fn try_to_u64_be() { + use super::*; + // Overflow + let v = [255, 255, 255, 255, 255, 255, 255, 255, 0, 1]; + let res = try_to_u64_be(v); + assert!(res.is_err()); + // No overflow + let v = [255, 255, 255, 255, 255, 255, 255, 255, 0, 0]; + let res = try_to_u64_be(v).unwrap(); + assert_eq!(res, u64::MAX); + } + + #[test] + fn try_u128_to_versionstamp() { + use super::*; + // Overflow + let v = u128::MAX; + let res = try_u128_to_versionstamp(v); + assert!(res.is_err()); + // No overflow + let v = u128::MAX >> 48; + let res = try_u128_to_versionstamp(v).unwrap(); + assert_eq!(res, [255, 255, 255, 255, 255, 255, 255, 255, 255, 255]); + } +} diff --git a/lib/src/vs/mod.rs b/lib/src/vs/mod.rs new file mode 100644 index 00000000..2925f468 --- /dev/null +++ b/lib/src/vs/mod.rs @@ -0,0 +1,10 @@ +//! vs is a module to handle Versionstamps. +//! This module is supplemental to the kvs::tx module and is not intended to be used directly +//! by applications. +//! This module might be migrated into the kvs or kvs::tx module in the future. + +pub type Versionstamp = [u8; 10]; + +pub(crate) mod conv; + +pub use self::conv::*;