From 70584115abf1173e5a50ca91e17ee007c2470e6d Mon Sep 17 00:00:00 2001 From: Micha de Vries Date: Fri, 23 Aug 2024 09:27:21 +0100 Subject: [PATCH] Versioned storage (#4581) Co-authored-by: Tobie Morgan Hitchcock --- core/src/err/mod.rs | 8 +++++ core/src/key/category.rs | 3 ++ core/src/key/mod.rs | 3 ++ core/src/key/version/mod.rs | 57 ++++++++++++++++++++++++++++++ core/src/kvs/ds.rs | 70 +++++++++++++++++++++++++++++++++---- core/src/kvs/mod.rs | 1 + core/src/kvs/version.rs | 58 ++++++++++++++++++++++++++++++ sdk/src/api/conn/cmd.rs | 6 ++-- sdk/src/api/method/live.rs | 6 ++-- sdk/tests/helpers.rs | 2 ++ src/dbs/mod.rs | 2 ++ 11 files changed, 204 insertions(+), 12 deletions(-) create mode 100644 core/src/key/version/mod.rs create mode 100644 core/src/kvs/version.rs diff --git a/core/src/err/mod.rs b/core/src/err/mod.rs index 5d28fdf5..74a581d2 100644 --- a/core/src/err/mod.rs +++ b/core/src/err/mod.rs @@ -1093,6 +1093,14 @@ pub enum Error { RangeTooBig { max: usize, }, + + /// There was an invalid storage version stored in the database + #[error("There was an invalid storage version stored in the database")] + InvalidStorageVersion, + + /// There was an outdated storage version stored in the database + #[error("The data stored on disk is out-of-date with this version. Please follow the upgrade guides in the documentation")] + OutdatedStorageVersion, } impl From for String { diff --git a/core/src/key/category.rs b/core/src/key/category.rs index 2c34442d..39616605 100644 --- a/core/src/key/category.rs +++ b/core/src/key/category.rs @@ -10,6 +10,8 @@ pub(crate) trait Categorise { #[non_exhaustive] #[allow(unused)] pub enum Category { + /// crate::key::storage::version /sv + Version, /// crate::key::root::all / Root, /// crate::key::root::access::ac /!ac{ac} @@ -154,6 +156,7 @@ pub enum Category { impl Display for Category { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { let name = match self { + Self::Version => "StorageVersion", Self::Root => "Root", Self::Access => "Access", Self::AccessRoot => "AccessRoot", diff --git a/core/src/key/mod.rs b/core/src/key/mod.rs index e0322090..8feec5da 100644 --- a/core/src/key/mod.rs +++ b/core/src/key/mod.rs @@ -1,5 +1,7 @@ //! How the keys are structured in the key value store /// +/// crate::key::version !v +/// /// crate::key::root::all / /// crate::key::root::access::all /*{ac} /// crate::key::root::access::ac /!ac{ac} @@ -74,3 +76,4 @@ pub(crate) mod node; pub(crate) mod root; pub(crate) mod table; pub(crate) mod thing; +pub(crate) mod version; diff --git a/core/src/key/version/mod.rs b/core/src/key/version/mod.rs new file mode 100644 index 00000000..30690ab0 --- /dev/null +++ b/core/src/key/version/mod.rs @@ -0,0 +1,57 @@ +//! Stores a record document +use crate::key::category::Categorise; +use crate::key::category::Category; +use derive::Key; +use serde::{Deserialize, Serialize}; +use std::ops::Range; + +#[derive(Clone, Debug, Eq, PartialEq, PartialOrd, Serialize, Deserialize, Key)] +#[non_exhaustive] +pub struct Version { + __: u8, + _a: u8, +} + +pub fn new() -> Version { + Version::new() +} + +pub fn proceeding() -> Range> { + vec![b'!', b'v', 0x00]..vec![0xff] +} + +impl Categorise for Version { + fn categorise(&self) -> Category { + Category::Version + } +} + +impl Version { + pub fn new() -> Self { + Self { + __: b'!', + _a: b'v', + } + } +} + +impl Default for Version { + fn default() -> Self { + Self::new() + } +} + +#[cfg(test)] +mod tests { + #[test] + fn key() { + use super::*; + #[rustfmt::skip] + let val = Version::new(); + let enc = Version::encode(&val).unwrap(); + assert_eq!(enc, b"!v"); + + let dec = Version::decode(&enc).unwrap(); + assert_eq!(val, dec); + } +} diff --git a/core/src/kvs/ds.rs b/core/src/kvs/ds.rs index ba0101c1..ab225eda 100644 --- a/core/src/kvs/ds.rs +++ b/core/src/kvs/ds.rs @@ -1,5 +1,6 @@ use super::tr::Transactor; use super::tx::Transaction; +use super::version::Version; use crate::cf; use crate::ctx::MutableContext; #[cfg(feature = "jwks")] @@ -508,13 +509,59 @@ impl Datastore { // Initialise the cluster and run bootstrap utilities #[instrument(err, level = "trace", target = "surrealdb::core::kvs::ds", skip_all)] - pub async fn bootstrap(&self) -> Result<(), Error> { - // Insert this node in the cluster - self.insert_node(self.id).await?; - // Mark expired nodes as archived - self.expire_nodes().await?; + pub async fn check_version(&self) -> Result { + // Start a new writeable transaction + let txn = self.transaction(Write, Pessimistic).await?.enclose(); + // Create the key where the version is stored + let key = crate::key::version::new(); + // Check if a version is already set in storage + let val = match catch!(txn, txn.get(key.clone(), None)) { + // There is a version set in the storage + Some(v) => { + // Attempt to decode the current stored version + let val = TryInto::::try_into(v); + // Check for errors, and cancel the transaction + match val { + // There was en error getting the version + Err(err) => { + // We didn't write anything, so just rollback + txn.cancel().await?; + // Return the error + return Err(err); + } + // We could decode the version correctly + Ok(val) => val, + } + } + // There is no version set in the storage + None => { + // Fetch any keys immediately following the version key + let rng = crate::key::version::proceeding(); + let keys = catch!(txn, txn.keys(rng, 1)); + // Check the storage if there are any other keys set + let val = if keys.is_empty() { + // There are no keys set in storage, so this is a new database + Version::latest() + } else { + // There were keys in storage, so this is an upgrade + Version::v1() + }; + // Convert the version to binary + let bytes: Vec = val.into(); + // Attempt to set the current version in storage + catch!(txn, txn.set(key, bytes)); + // We set the version, so commit the transaction + catch!(txn, txn.commit()); + // Return the current version + val + } + }; + // Check we are running the latest version + if !val.is_latest() { + return Err(Error::OutdatedStorageVersion); + } // Everything ok - Ok(()) + Ok(val) } /// Setup the initial cluster access credentials @@ -546,6 +593,17 @@ impl Datastore { } } + // Initialise the cluster and run bootstrap utilities + #[instrument(err, level = "trace", target = "surrealdb::core::kvs::ds", skip_all)] + pub async fn bootstrap(&self) -> Result<(), Error> { + // Insert this node in the cluster + self.insert_node(self.id).await?; + // Mark expired nodes as archived + self.expire_nodes().await?; + // Everything ok + Ok(()) + } + // tick is called periodically to perform maintenance tasks. // This is called every TICK_INTERVAL. #[instrument(err, level = "trace", target = "surrealdb::core::kvs::ds", skip(self))] diff --git a/core/src/kvs/mod.rs b/core/src/kvs/mod.rs index 81f90351..10d11a4e 100644 --- a/core/src/kvs/mod.rs +++ b/core/src/kvs/mod.rs @@ -24,6 +24,7 @@ mod scanner; mod stash; mod tr; mod tx; +mod version; mod fdb; mod indxdb; diff --git a/core/src/kvs/version.rs b/core/src/kvs/version.rs new file mode 100644 index 00000000..d4982be2 --- /dev/null +++ b/core/src/kvs/version.rs @@ -0,0 +1,58 @@ +use crate::err::Error; + +#[derive(Copy, Debug, Clone)] +pub struct Version(u16); + +impl From for Version { + fn from(version: u16) -> Self { + Version(version) + } +} + +impl From> for Version { + fn from(v: Option) -> Self { + v.unwrap_or(0).into() + } +} + +impl From for u16 { + fn from(v: Version) -> Self { + v.0 + } +} + +impl From for Vec { + fn from(v: Version) -> Self { + v.0.to_be_bytes().to_vec() + } +} + +impl TryFrom> for Version { + type Error = Error; + fn try_from(v: Vec) -> Result { + let bin = v.try_into().map_err(|_| Error::InvalidStorageVersion)?; + let val = u16::from_be_bytes(bin).into(); + Ok(val) + } +} + +impl Version { + /// The latest version + pub const LATEST: u16 = 2; + /// The latest version + pub fn latest() -> Self { + Self(2) + } + /// SurrealDB version 1 + pub fn v1() -> Self { + Self(1) + } + /// SurrealDB version 2 + pub fn v2() -> Self { + Self(2) + } + /// Check if we are running the latest version + pub fn is_latest(&self) -> bool { + self.0 == Self::LATEST + } +} diff --git a/sdk/src/api/conn/cmd.rs b/sdk/src/api/conn/cmd.rs index 045d07ba..ec55ae63 100644 --- a/sdk/src/api/conn/cmd.rs +++ b/sdk/src/api/conn/cmd.rs @@ -199,9 +199,9 @@ impl Command { } => { let table = match what { Some(w) => { - let mut tmp = CoreTable::default(); - tmp.0 = w.clone(); - CoreValue::from(tmp) + let mut table = CoreTable::default(); + table.0.clone_from(&w); + CoreValue::from(table) } None => CoreValue::None, }; diff --git a/sdk/src/api/method/live.rs b/sdk/src/api/method/live.rs index 032710ce..f55316d5 100644 --- a/sdk/src/api/method/live.rs +++ b/sdk/src/api/method/live.rs @@ -62,10 +62,10 @@ where } Resource::RecordId(record) => { let record = record.into_inner(); - table.0 = record.tb.clone(); + table.0.clone_from(&record.tb); stmt.what = table.into(); let mut ident = Ident::default(); - ident.0 = ID.to_owned(); + ID.clone_into(&mut ident.0); let mut idiom = Idiom::default(); idiom.0 = vec![Part::from(ident)]; let mut cond = Cond::default(); @@ -81,7 +81,7 @@ where Resource::Edge(_) => return Err(Error::LiveOnEdges.into()), Resource::Range(range) => { let range = range.into_inner(); - table.0 = range.tb.clone(); + table.0.clone_from(&range.tb); stmt.what = table.into(); stmt.cond = range.to_cond(); } diff --git a/sdk/tests/helpers.rs b/sdk/tests/helpers.rs index 7cc96772..2534078e 100644 --- a/sdk/tests/helpers.rs +++ b/sdk/tests/helpers.rs @@ -254,6 +254,7 @@ impl Test { }) } + #[allow(dead_code)] pub async fn new(sql: &str) -> Result { Self::with_ds(new_ds().await?, sql).await } @@ -261,6 +262,7 @@ impl Test { /// Simulates restarting the Datastore /// - Data are persistent (including memory store) /// - Flushing caches (jwks, IndexStore, ...) + #[allow(dead_code)] pub async fn restart(self, sql: &str) -> Result { Self::with_ds(self.ds.restart(), sql).await } diff --git a/src/dbs/mod.rs b/src/dbs/mod.rs index cc36f77a..10bb69c6 100644 --- a/src/dbs/mod.rs +++ b/src/dbs/mod.rs @@ -239,6 +239,8 @@ pub async fn init( .with_auth_enabled(!unauthenticated) .with_temporary_directory(temporary_directory) .with_capabilities(capabilities); + // Ensure the storage version is up-to-date to prevent corruption + dbs.check_version().await?; // Setup initial server auth credentials if let (Some(user), Some(pass)) = (opt.user.as_ref(), opt.pass.as_ref()) { dbs.initialise_credentials(user, pass).await?;