Improve fix logic, fix graph edges (#4715)
This commit is contained in:
parent
fcdc37227e
commit
b589f7fec1
5 changed files with 317 additions and 70 deletions
|
@ -187,6 +187,31 @@ impl<'a> Graph<'a> {
|
|||
fk: fk.id.to_owned(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn new_from_id(
|
||||
ns: &'a str,
|
||||
db: &'a str,
|
||||
tb: &'a str,
|
||||
id: Id,
|
||||
eg: Dir,
|
||||
ft: &'a str,
|
||||
fk: Id,
|
||||
) -> Self {
|
||||
Self {
|
||||
__: b'/',
|
||||
_a: b'*',
|
||||
ns,
|
||||
_b: b'*',
|
||||
db,
|
||||
_c: b'*',
|
||||
tb,
|
||||
_d: b'~',
|
||||
id,
|
||||
eg,
|
||||
ft,
|
||||
fk,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
|
|
@ -66,7 +66,7 @@
|
|||
///
|
||||
/// crate::key::thing /*{ns}*{db}*{tb}*{id}
|
||||
///
|
||||
/// crate::key::graph /*{ns}*{db}*{tb}~{id}{eg}{fk}
|
||||
/// crate::key::graph /*{ns}*{db}*{tb}~{id}{eg}{ft}{fk}
|
||||
///
|
||||
pub(crate) mod category;
|
||||
pub(crate) mod change;
|
||||
|
|
|
@ -1,69 +0,0 @@
|
|||
use std::sync::Arc;
|
||||
|
||||
use crate::{err::Error, kvs::Transaction};
|
||||
|
||||
pub async fn v1_to_2_id_uuid(tx: Arc<Transaction>) -> Result<(), Error> {
|
||||
for ns in tx.all_ns().await?.iter() {
|
||||
let ns = ns.name.as_str();
|
||||
for db in tx.all_db(ns).await?.iter() {
|
||||
let db = db.name.as_str();
|
||||
for tb in tx.all_tb(ns, db).await?.iter() {
|
||||
let tb = tb.name.as_str();
|
||||
// mutable beg, as we update it each iteration to the last record id + a null byte
|
||||
let mut beg = crate::key::thing::prefix(ns, db, tb);
|
||||
let end = crate::key::thing::suffix(ns, db, tb);
|
||||
// queue of record ids to fix
|
||||
let mut queue: Vec<Vec<u8>> = Vec::new();
|
||||
|
||||
// Explanation for these numbers:
|
||||
//
|
||||
// Before the Id enum: /*{NS}\0*{DB}\0*{TB}\0*
|
||||
// We are counting: ^^ ^ ^ ^ ^ ^ ^
|
||||
//
|
||||
// Looking at the first four bytes for Id::Array (revision 1), we find: [0, 0, 0, 2]
|
||||
// First 3 bytes can be discarded, that 2 is the enum entry which we need to fix.
|
||||
// This totals to 11 bytes, plus the lengths of the bytes for namespace + database + tablename.
|
||||
//
|
||||
// For revision 2 of the Id enum, we added Uuid in index 2 (after number and string)
|
||||
// This means that any entry which was previously 2 or higher, now needs to be 3 or higher.
|
||||
// Resulting in a threshold of 2 (as a u8), used down below.
|
||||
//
|
||||
let pos = 11 + ns.as_bytes().len() + db.as_bytes().len() + tb.as_bytes().len();
|
||||
let threshold = 2_u8;
|
||||
|
||||
'scan: loop {
|
||||
let keys = tx.keys(beg.clone()..end.clone(), 1000).await?;
|
||||
if keys.is_empty() {
|
||||
break 'scan;
|
||||
}
|
||||
|
||||
// We suffix the last id with a null byte, to prevent scanning it twice (which would result in an infinite loop)
|
||||
beg.clone_from(keys.last().unwrap());
|
||||
beg.extend_from_slice(b"\0");
|
||||
|
||||
for key in keys.iter() {
|
||||
// Check if the id is affected
|
||||
if key.get(pos).is_some_and(|b| b >= &threshold) {
|
||||
// This ID needs fixing, add to queue
|
||||
queue.push(key.to_owned());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for key in queue.iter() {
|
||||
// Bump the enum entry by 1
|
||||
let mut fixed = key.clone();
|
||||
// This is safe, because we previously obtained the byte from the original id
|
||||
unsafe { *fixed.get_unchecked_mut(pos) += 1 };
|
||||
// Get the value for the old key. We can unwrap the option, as we know that the key exists in the KV store
|
||||
let val = tx.get(key.clone().to_owned(), None).await?.unwrap();
|
||||
// Delete the old key
|
||||
tx.del(key.to_owned()).await?;
|
||||
// Set the fixed key
|
||||
tx.set(fixed, val, None).await?;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
176
core/src/kvs/version/fixes/v1_to_2_id_uuid/deps.rs
Normal file
176
core/src/kvs/version/fixes/v1_to_2_id_uuid/deps.rs
Normal file
|
@ -0,0 +1,176 @@
|
|||
use crate::sql::{id::Gen, id::Id as NewId, Array, IdRange, Object};
|
||||
use derive::Key;
|
||||
use revision::revisioned;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
#[revisioned(revision = 1)]
|
||||
#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize, Key, Hash)]
|
||||
#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
|
||||
#[non_exhaustive]
|
||||
pub enum Id {
|
||||
Number(i64),
|
||||
String(String),
|
||||
Array(Array),
|
||||
Object(Object),
|
||||
Generate(Gen),
|
||||
Range(Box<IdRange>),
|
||||
}
|
||||
|
||||
impl Id {
|
||||
pub fn fix(&self) -> Option<NewId> {
|
||||
match self {
|
||||
Self::Number(_) => None,
|
||||
Self::String(_) => None,
|
||||
Self::Array(x) => Some(NewId::Array(x.to_owned())),
|
||||
Self::Object(x) => Some(NewId::Object(x.to_owned())),
|
||||
Self::Generate(x) => Some(NewId::Generate(x.to_owned())),
|
||||
Self::Range(x) => Some(NewId::Range(x.to_owned())),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn is_affected(&self) -> bool {
|
||||
match self {
|
||||
Self::Number(_) => false,
|
||||
Self::String(_) => false,
|
||||
Self::Array(_) => true,
|
||||
Self::Object(_) => true,
|
||||
Self::Generate(_) => true,
|
||||
Self::Range(_) => true,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<Id> for NewId {
|
||||
fn from(id: Id) -> Self {
|
||||
match id {
|
||||
Id::Number(x) => NewId::Number(x),
|
||||
Id::String(x) => NewId::String(x),
|
||||
Id::Array(x) => NewId::Array(x),
|
||||
Id::Object(x) => NewId::Object(x),
|
||||
Id::Generate(x) => NewId::Generate(x),
|
||||
Id::Range(x) => NewId::Range(x),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub mod key {
|
||||
use derive::Key;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::sql::{id::Id as NewId, Dir};
|
||||
|
||||
use super::Id;
|
||||
|
||||
#[derive(Clone, Debug, Eq, PartialEq, PartialOrd, Serialize, Deserialize, Key)]
|
||||
#[non_exhaustive]
|
||||
pub struct Graph<'a> {
|
||||
__: u8,
|
||||
_a: u8,
|
||||
pub ns: &'a str,
|
||||
_b: u8,
|
||||
pub db: &'a str,
|
||||
_c: u8,
|
||||
pub tb: &'a str,
|
||||
_d: u8,
|
||||
pub id: Id,
|
||||
pub eg: Dir,
|
||||
pub ft: &'a str,
|
||||
pub fk: Id,
|
||||
}
|
||||
|
||||
impl<'a> Graph<'a> {
|
||||
pub fn new(
|
||||
ns: &'a str,
|
||||
db: &'a str,
|
||||
tb: &'a str,
|
||||
id: Id,
|
||||
eg: Dir,
|
||||
ft: &'a str,
|
||||
fk: &'a Id,
|
||||
) -> Self {
|
||||
Self {
|
||||
__: b'/',
|
||||
_a: b'*',
|
||||
ns,
|
||||
_b: b'*',
|
||||
db,
|
||||
_c: b'*',
|
||||
tb,
|
||||
_d: b'~',
|
||||
id,
|
||||
eg,
|
||||
ft,
|
||||
fk: fk.to_owned(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn fix(&self) -> Option<crate::key::graph::Graph> {
|
||||
let fixed = match (self.id.fix(), self.fk.fix()) {
|
||||
(None, None) => return None,
|
||||
(Some(id), None) => crate::key::graph::Graph::new_from_id(
|
||||
self.ns,
|
||||
self.db,
|
||||
self.tb,
|
||||
id,
|
||||
self.eg.to_owned(),
|
||||
self.ft,
|
||||
NewId::from(self.fk.to_owned()),
|
||||
),
|
||||
(None, Some(fk)) => crate::key::graph::Graph::new_from_id(
|
||||
self.ns,
|
||||
self.db,
|
||||
self.tb,
|
||||
self.id.to_owned().into(),
|
||||
self.eg.to_owned(),
|
||||
self.ft,
|
||||
fk,
|
||||
),
|
||||
(Some(id), Some(fk)) => crate::key::graph::Graph::new_from_id(
|
||||
self.ns,
|
||||
self.db,
|
||||
self.tb,
|
||||
id,
|
||||
self.eg.to_owned(),
|
||||
self.ft,
|
||||
fk,
|
||||
),
|
||||
};
|
||||
|
||||
Some(fixed)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Eq, PartialEq, PartialOrd, Serialize, Deserialize, Key)]
|
||||
#[non_exhaustive]
|
||||
pub struct Thing<'a> {
|
||||
__: u8,
|
||||
_a: u8,
|
||||
pub ns: &'a str,
|
||||
_b: u8,
|
||||
pub db: &'a str,
|
||||
_c: u8,
|
||||
pub tb: &'a str,
|
||||
_d: u8,
|
||||
pub id: Id,
|
||||
}
|
||||
|
||||
impl<'a> Thing<'a> {
|
||||
pub fn new(ns: &'a str, db: &'a str, tb: &'a str, id: Id) -> Self {
|
||||
Self {
|
||||
__: b'/',
|
||||
_a: b'*',
|
||||
ns,
|
||||
_b: b'*',
|
||||
db,
|
||||
_c: b'*',
|
||||
tb,
|
||||
_d: b'*',
|
||||
id,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn fix(&self) -> Option<crate::key::thing::Thing> {
|
||||
self.id.fix().map(|id| crate::key::thing::new(self.ns, self.db, self.tb, &id))
|
||||
}
|
||||
}
|
||||
}
|
115
core/src/kvs/version/fixes/v1_to_2_id_uuid/mod.rs
Normal file
115
core/src/kvs/version/fixes/v1_to_2_id_uuid/mod.rs
Normal file
|
@ -0,0 +1,115 @@
|
|||
mod deps;
|
||||
use deps::*;
|
||||
use std::sync::Arc;
|
||||
|
||||
use crate::{err::Error, kvs::Transaction};
|
||||
|
||||
pub async fn v1_to_2_id_uuid(tx: Arc<Transaction>) -> Result<(), Error> {
|
||||
for ns in tx.all_ns().await?.iter() {
|
||||
let ns = ns.name.as_str();
|
||||
for db in tx.all_db(ns).await?.iter() {
|
||||
let db = db.name.as_str();
|
||||
for tb in tx.all_tb(ns, db).await?.iter() {
|
||||
let tb = tb.name.as_str();
|
||||
migrate_tb_records(tx.clone(), ns, db, tb).await?;
|
||||
migrate_tb_edges(tx.clone(), ns, db, tb).await?;
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn migrate_tb_records(
|
||||
tx: Arc<Transaction>,
|
||||
ns: &str,
|
||||
db: &str,
|
||||
tb: &str,
|
||||
) -> Result<(), Error> {
|
||||
// mutable beg, as we update it each iteration to the last record id + a null byte
|
||||
let mut beg = crate::key::thing::prefix(ns, db, tb);
|
||||
let end = crate::key::thing::suffix(ns, db, tb);
|
||||
|
||||
// We need to scan ALL keys and queue them first,
|
||||
// because if we fix them as we iterate, the pagination is off
|
||||
let mut queue: Vec<Vec<u8>> = Vec::new();
|
||||
|
||||
'scan: loop {
|
||||
let keys = tx.keys(beg.clone()..end.clone(), 1000).await?;
|
||||
if keys.is_empty() {
|
||||
break 'scan;
|
||||
}
|
||||
|
||||
// We suffix the last id with a null byte, to prevent scanning it twice (which would result in an infinite loop)
|
||||
beg.clone_from(keys.last().unwrap());
|
||||
beg.extend_from_slice(b"\0");
|
||||
|
||||
for enc in keys.into_iter() {
|
||||
let dec = key::Thing::decode(&enc).unwrap();
|
||||
// Check if the id is affected
|
||||
if dec.id.is_affected() {
|
||||
// This ID needs fixing, add to queue
|
||||
queue.push(enc);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for enc in queue.iter() {
|
||||
let broken = key::Thing::decode(enc).unwrap();
|
||||
// Get a fixed id
|
||||
let fixed = broken.fix().unwrap();
|
||||
// Get the value for the old key. We can unwrap the option, as we know that the key exists in the KV store
|
||||
let val = tx.get(broken.clone().to_owned(), None).await?.unwrap();
|
||||
// Delete the old key
|
||||
tx.del(broken.to_owned()).await?;
|
||||
// Set the fixed key
|
||||
tx.set(fixed, val, None).await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn migrate_tb_edges(tx: Arc<Transaction>, ns: &str, db: &str, tb: &str) -> Result<(), Error> {
|
||||
// mutable beg, as we update it each iteration to the last record id + a null byte
|
||||
let mut beg = crate::key::table::all::new(ns, db, tb).encode()?;
|
||||
beg.extend_from_slice(&[b'~', 0x00]);
|
||||
let mut end = crate::key::table::all::new(ns, db, tb).encode()?;
|
||||
end.extend_from_slice(&[b'~', 0xff]);
|
||||
|
||||
// We need to scan ALL keys and queue them first,
|
||||
// because if we fix them as we iterate, the pagination is off
|
||||
let mut queue: Vec<Vec<u8>> = Vec::new();
|
||||
|
||||
'scan: loop {
|
||||
let keys = tx.keys(beg.clone()..end.clone(), 1000).await?;
|
||||
if keys.is_empty() {
|
||||
break 'scan;
|
||||
}
|
||||
|
||||
// We suffix the last id with a null byte, to prevent scanning it twice (which would result in an infinite loop)
|
||||
beg.clone_from(keys.last().unwrap());
|
||||
beg.extend_from_slice(b"\0");
|
||||
|
||||
for enc in keys.into_iter() {
|
||||
let dec = key::Graph::decode(&enc).unwrap();
|
||||
// Check if the id is affected
|
||||
if dec.id.is_affected() || dec.fk.is_affected() {
|
||||
// This ID needs fixing, add to queue
|
||||
queue.push(enc);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for enc in queue.iter() {
|
||||
let broken = key::Graph::decode(enc).unwrap();
|
||||
// Get a fixed id
|
||||
let fixed = broken.fix().unwrap();
|
||||
// Get the value for the old key. We can unwrap the option, as we know that the key exists in the KV store
|
||||
let val = tx.get(broken.clone().to_owned(), None).await?.unwrap();
|
||||
// Delete the old key
|
||||
tx.del(broken.to_owned()).await?;
|
||||
// Set the fixed key
|
||||
tx.set(fixed, val, None).await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
Loading…
Reference in a new issue