feat: Key encoding/decoding for change feeds (#2188)

This commit is contained in:
Yusuke Kuoka 2023-06-28 08:42:29 +09:00 committed by GitHub
parent 8ae8770812
commit e30f70b907
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 354 additions and 0 deletions

132
lib/src/key/cf.rs Normal file
View file

@ -0,0 +1,132 @@
use derive::Key;
use serde::{Deserialize, Serialize};
use crate::vs;
use std::str;
// Cf stands for change feeds
#[derive(Clone, Debug, Eq, PartialEq, PartialOrd, Serialize, Deserialize, Key)]
pub struct Cf<'a> {
__: u8,
_a: u8,
pub ns: &'a str,
_b: u8,
pub db: &'a str,
_d: u8,
_e: u8,
_f: u8,
// vs is the versionstamp of the change feed entry that is encoded in big-endian.
// Use the to_u64_be function to convert it to a u128.
pub vs: [u8; 10],
_c: u8,
pub tb: &'a str,
}
#[allow(unused)]
pub fn new<'a>(ns: &'a str, db: &'a str, ts: u64, tb: &'a str) -> Cf<'a> {
Cf::new(ns, db, vs::u64_to_versionstamp(ts), tb)
}
#[allow(unused)]
pub fn versionstamped_key_prefix(ns: &str, db: &str) -> Vec<u8> {
let mut k = super::database::new(ns, db).encode().unwrap();
k.extend_from_slice(&[b'!', b'c', b'f']);
k
}
#[allow(unused)]
pub fn versionstamped_key_suffix(tb: &str) -> Vec<u8> {
let mut k: Vec<u8> = vec![];
k.extend_from_slice(&[b'*']);
k.extend_from_slice(tb.as_bytes());
// Without this, decoding fails with UnexpectedEOF errors
k.extend_from_slice(&[0x00]);
k
}
/// Returns the prefix for the whole database change feeds since the
/// specified versionstamp.
#[allow(unused)]
pub fn ts_prefix(ns: &str, db: &str, vs: vs::Versionstamp) -> Vec<u8> {
let mut k = super::database::new(ns, db).encode().unwrap();
k.extend_from_slice(&[b'!', b'c', b'f']);
k.extend_from_slice(&vs);
k
}
/// Returns the prefix for the whole database change feeds
#[allow(unused)]
pub fn prefix(ns: &str, db: &str) -> Vec<u8> {
let mut k = super::database::new(ns, db).encode().unwrap();
k.extend_from_slice(&[b'!', b'c', b'f', 0x00]);
k
}
/// Returns the suffix for the whole database change feeds
#[allow(unused)]
pub fn suffix(ns: &str, db: &str) -> Vec<u8> {
let mut k = super::database::new(ns, db).encode().unwrap();
k.extend_from_slice(&[b'!', b'c', b'f', 0xff]);
k
}
impl<'a> Cf<'a> {
pub fn new(ns: &'a str, db: &'a str, vs: [u8; 10], tb: &'a str) -> Self {
Cf {
__: b'/',
_a: b'*',
ns,
_b: b'*',
db,
_d: b'!',
_e: b'c',
_f: b'f',
vs,
_c: b'*',
tb,
}
}
}
#[cfg(test)]
mod tests {
use crate::vs::*;
use std::ascii::escape_default;
#[test]
fn key() {
use super::*;
#[rustfmt::skip]
let val = Cf::new(
"test",
"test",
try_u128_to_versionstamp(12345).unwrap(),
"test",
);
let enc = Cf::encode(&val).unwrap();
println!("enc={}", show(&enc));
let dec = Cf::decode(&enc).unwrap();
assert_eq!(val, dec);
}
#[test]
fn versionstamp_conversions() {
let a = u64_to_versionstamp(12345);
let b = try_to_u64_be(a).unwrap();
assert_eq!(12345, b);
let a = try_u128_to_versionstamp(12345).unwrap();
let b = to_u128_be(a);
assert_eq!(12345, b);
}
fn show(bs: &[u8]) -> String {
let mut visible = String::new();
for &b in bs {
let part: Vec<u8> = escape_default(b).collect();
visible.push_str(std::str::from_utf8(&part).unwrap());
}
visible
}
}

51
lib/src/key/dv.rs Normal file
View file

@ -0,0 +1,51 @@
use derive::Key;
use serde::{Deserialize, Serialize};
// Dv stands for Database Versionstamp
#[derive(Clone, Debug, Eq, PartialEq, PartialOrd, Serialize, Deserialize, Key)]
pub struct Dv<'a> {
__: u8,
_a: u8,
pub ns: &'a str,
_b: u8,
pub db: &'a str,
_d: u8,
_e: u8,
_f: u8,
}
#[allow(unused)]
pub fn new<'a>(ns: &'a str, db: &'a str) -> Dv<'a> {
Dv::new(ns, db)
}
impl<'a> Dv<'a> {
pub fn new(ns: &'a str, db: &'a str) -> Self {
Dv {
__: b'/',
_a: b'*',
ns,
_b: b'*',
db,
_d: b'!',
_e: b't',
_f: b't',
}
}
}
#[cfg(test)]
mod tests {
#[test]
fn key() {
use super::*;
#[rustfmt::skip]
let val = Dv::new(
"test",
"test",
);
let enc = Dv::encode(&val).unwrap();
let dec = Dv::decode(&enc).unwrap();
assert_eq!(val, dec);
}
}

View file

@ -16,6 +16,7 @@
/// ///
/// Database /*{ns}*{db} /// Database /*{ns}*{db}
/// AZ /*{ns}*{db}!az{az} /// AZ /*{ns}*{db}!az{az}
/// CF /*{ns}*{db}!cf{ts}
/// DL /*{ns}*{db}!dl{us} /// DL /*{ns}*{db}!dl{us}
/// DT /*{ns}*{db}!dt{tk} /// DT /*{ns}*{db}!dt{tk}
/// PA /*{ns}*{db}!pa{pa} /// PA /*{ns}*{db}!pa{pa}
@ -62,11 +63,13 @@ pub mod bp; // Stores BTree nodes for postings
pub mod bs; // Stores FullText index states pub mod bs; // Stores FullText index states
pub mod bt; // Stores BTree nodes for terms pub mod bt; // Stores BTree nodes for terms
pub mod bu; // Stores terms for term_ids pub mod bu; // Stores terms for term_ids
pub mod cf; // Stores change feeds
pub mod cl; // Stores cluster membership information pub mod cl; // Stores cluster membership information
pub mod database; // Stores the key prefix for all keys under a database pub mod database; // Stores the key prefix for all keys under a database
pub mod db; // Stores a DEFINE DATABASE config definition pub mod db; // Stores a DEFINE DATABASE config definition
pub mod dl; // Stores a DEFINE LOGIN ON DATABASE config definition pub mod dl; // Stores a DEFINE LOGIN ON DATABASE config definition
pub mod dt; // Stores a DEFINE LOGIN ON DATABASE config definition pub mod dt; // Stores a DEFINE LOGIN ON DATABASE config definition
pub mod dv; // Stores database versionstamps
pub mod ev; // Stores a DEFINE EVENT config definition pub mod ev; // Stores a DEFINE EVENT config definition
pub mod fc; // Stores a DEFINE FUNCTION config definition pub mod fc; // Stores a DEFINE FUNCTION config definition
pub mod fd; // Stores a DEFINE FIELD config definition pub mod fd; // Stores a DEFINE FIELD config definition

View file

@ -112,6 +112,7 @@ mod doc;
mod exe; mod exe;
mod fnc; mod fnc;
mod key; mod key;
mod vs;
pub mod sql; pub mod sql;

157
lib/src/vs/conv.rs Normal file
View file

@ -0,0 +1,157 @@
use std::fmt;
// u64_to_versionstamp converts a u64 to a 10-byte versionstamp
// assuming big-endian and the the last two bytes are zero.
pub fn u64_to_versionstamp(v: u64) -> [u8; 10] {
let mut buf = [0; 10];
buf[0] = (v >> 56) as u8;
buf[1] = (v >> 48) as u8;
buf[2] = (v >> 40) as u8;
buf[3] = (v >> 32) as u8;
buf[4] = (v >> 24) as u8;
buf[5] = (v >> 16) as u8;
buf[6] = (v >> 8) as u8;
buf[7] = v as u8;
buf
}
#[allow(unused)]
// u64_u16_to_versionstamp converts a u64 and a u16 to a 10-byte versionstamp
// assuming big-endian.
pub fn u64_u16_to_versionstamp(v: u64, v2: u16) -> [u8; 10] {
let mut buf = [0; 10];
buf[0] = (v >> 56) as u8;
buf[1] = (v >> 48) as u8;
buf[2] = (v >> 40) as u8;
buf[3] = (v >> 32) as u8;
buf[4] = (v >> 24) as u8;
buf[5] = (v >> 16) as u8;
buf[6] = (v >> 8) as u8;
buf[7] = v as u8;
buf[8] = (v2 >> 8) as u8;
buf[9] = v2 as u8;
buf
}
#[allow(unused)]
// u64_u16_to_versionstamp converts a u64 and a u16 to a 10-byte versionstamp
// assuming big-endian.
pub fn u16_u64_to_versionstamp(v: u16, v2: u64) -> [u8; 10] {
let mut buf = [0; 10];
buf[0] = (v >> 8) as u8;
buf[1] = v as u8;
buf[2] = (v2 >> 56) as u8;
buf[3] = (v2 >> 48) as u8;
buf[4] = (v2 >> 40) as u8;
buf[5] = (v2 >> 32) as u8;
buf[6] = (v2 >> 24) as u8;
buf[7] = (v2 >> 16) as u8;
buf[8] = (v2 >> 8) as u8;
buf[9] = v2 as u8;
buf
}
// u128_to_versionstamp converts a u128 to a 10-byte versionstamp
// assuming big-endian.
#[allow(unused)]
pub fn try_u128_to_versionstamp(v: u128) -> Result<[u8; 10], Error> {
if v >> 80 > 0 {
return Err(Error::InvalidVersionstamp);
}
let mut buf = [0; 10];
buf[0] = (v >> 72) as u8;
buf[1] = (v >> 64) as u8;
buf[2] = (v >> 56) as u8;
buf[3] = (v >> 48) as u8;
buf[4] = (v >> 40) as u8;
buf[5] = (v >> 32) as u8;
buf[6] = (v >> 24) as u8;
buf[7] = (v >> 16) as u8;
buf[8] = (v >> 8) as u8;
buf[9] = v as u8;
Ok(buf)
}
// to_u128_be converts a 10-byte versionstamp to a u128 assuming big-endian.
// This is handy for human comparing versionstamps.
#[allow(unused)]
pub fn to_u128_be(vs: [u8; 10]) -> u128 {
let mut buf = [0; 16];
let mut i = 0;
while i < 10 {
buf[i + 6] = vs[i];
i += 1;
}
u128::from_be_bytes(buf)
}
pub enum Error {
// InvalidVersionstamp is returned when a versionstamp has an unexpected length.
InvalidVersionstamp,
}
impl fmt::Debug for Error {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
Error::InvalidVersionstamp => write!(f, "invalid versionstamp"),
}
}
}
// to_u64_be converts a 10-byte versionstamp to a u64 assuming big-endian.
// Only the first 8 bytes are used.
#[allow(unused)]
pub fn try_to_u64_be(vs: [u8; 10]) -> Result<u64, Error> {
let mut buf = [0; 8];
let mut i = 0;
while i < 8 {
buf[i] = vs[i];
i += 1;
}
if vs[8] != 0 || vs[9] != 0 {
return Err(Error::InvalidVersionstamp);
}
Ok(u64::from_be_bytes(buf))
}
// to_u128_le converts a 10-byte versionstamp to a u128 assuming little-endian.
// This is handy for producing human-readable versions of versionstamps.
#[allow(unused)]
pub fn to_u128_le(vs: [u8; 10]) -> u128 {
let mut buf = [0; 16];
let mut i = 0;
while i < 10 {
buf[i] = vs[i];
i += 1;
}
u128::from_be_bytes(buf)
}
mod tests {
#[test]
fn try_to_u64_be() {
use super::*;
// Overflow
let v = [255, 255, 255, 255, 255, 255, 255, 255, 0, 1];
let res = try_to_u64_be(v);
assert!(res.is_err());
// No overflow
let v = [255, 255, 255, 255, 255, 255, 255, 255, 0, 0];
let res = try_to_u64_be(v).unwrap();
assert_eq!(res, u64::MAX);
}
#[test]
fn try_u128_to_versionstamp() {
use super::*;
// Overflow
let v = u128::MAX;
let res = try_u128_to_versionstamp(v);
assert!(res.is_err());
// No overflow
let v = u128::MAX >> 48;
let res = try_u128_to_versionstamp(v).unwrap();
assert_eq!(res, [255, 255, 255, 255, 255, 255, 255, 255, 255, 255]);
}
}

10
lib/src/vs/mod.rs Normal file
View file

@ -0,0 +1,10 @@
//! vs is a module to handle Versionstamps.
//! This module is supplemental to the kvs::tx module and is not intended to be used directly
//! by applications.
//! This module might be migrated into the kvs or kvs::tx module in the future.
pub type Versionstamp = [u8; 10];
pub(crate) mod conv;
pub use self::conv::*;