Versioned storage (#4581)

Co-authored-by: Tobie Morgan Hitchcock <tobie@surrealdb.com>
This commit is contained in:
Micha de Vries 2024-08-23 09:27:21 +01:00 committed by GitHub
parent f2e598379b
commit 70584115ab
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
11 changed files with 204 additions and 12 deletions

View file

@ -1093,6 +1093,14 @@ pub enum Error {
RangeTooBig {
max: usize,
},
/// There was an invalid storage version stored in the database
#[error("There was an invalid storage version stored in the database")]
InvalidStorageVersion,
/// There was an outdated storage version stored in the database
#[error("The data stored on disk is out-of-date with this version. Please follow the upgrade guides in the documentation")]
OutdatedStorageVersion,
}
impl From<Error> for String {

View file

@ -10,6 +10,8 @@ pub(crate) trait Categorise {
#[non_exhaustive]
#[allow(unused)]
pub enum Category {
/// crate::key::storage::version /sv
Version,
/// crate::key::root::all /
Root,
/// crate::key::root::access::ac /!ac{ac}
@ -154,6 +156,7 @@ pub enum Category {
impl Display for Category {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
let name = match self {
Self::Version => "StorageVersion",
Self::Root => "Root",
Self::Access => "Access",
Self::AccessRoot => "AccessRoot",

View file

@ -1,5 +1,7 @@
//! How the keys are structured in the key value store
///
/// crate::key::version !v
///
/// crate::key::root::all /
/// crate::key::root::access::all /*{ac}
/// crate::key::root::access::ac /!ac{ac}
@ -74,3 +76,4 @@ pub(crate) mod node;
pub(crate) mod root;
pub(crate) mod table;
pub(crate) mod thing;
pub(crate) mod version;

View file

@ -0,0 +1,57 @@
//! Stores a record document
use crate::key::category::Categorise;
use crate::key::category::Category;
use derive::Key;
use serde::{Deserialize, Serialize};
use std::ops::Range;
#[derive(Clone, Debug, Eq, PartialEq, PartialOrd, Serialize, Deserialize, Key)]
#[non_exhaustive]
pub struct Version {
__: u8,
_a: u8,
}
pub fn new() -> Version {
Version::new()
}
pub fn proceeding() -> Range<Vec<u8>> {
vec![b'!', b'v', 0x00]..vec![0xff]
}
impl Categorise for Version {
fn categorise(&self) -> Category {
Category::Version
}
}
impl Version {
pub fn new() -> Self {
Self {
__: b'!',
_a: b'v',
}
}
}
impl Default for Version {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
#[test]
fn key() {
use super::*;
#[rustfmt::skip]
let val = Version::new();
let enc = Version::encode(&val).unwrap();
assert_eq!(enc, b"!v");
let dec = Version::decode(&enc).unwrap();
assert_eq!(val, dec);
}
}

View file

@ -1,5 +1,6 @@
use super::tr::Transactor;
use super::tx::Transaction;
use super::version::Version;
use crate::cf;
use crate::ctx::MutableContext;
#[cfg(feature = "jwks")]
@ -508,13 +509,59 @@ impl Datastore {
// Initialise the cluster and run bootstrap utilities
#[instrument(err, level = "trace", target = "surrealdb::core::kvs::ds", skip_all)]
pub async fn bootstrap(&self) -> Result<(), Error> {
// Insert this node in the cluster
self.insert_node(self.id).await?;
// Mark expired nodes as archived
self.expire_nodes().await?;
pub async fn check_version(&self) -> Result<Version, Error> {
// Start a new writeable transaction
let txn = self.transaction(Write, Pessimistic).await?.enclose();
// Create the key where the version is stored
let key = crate::key::version::new();
// Check if a version is already set in storage
let val = match catch!(txn, txn.get(key.clone(), None)) {
// There is a version set in the storage
Some(v) => {
// Attempt to decode the current stored version
let val = TryInto::<Version>::try_into(v);
// Check for errors, and cancel the transaction
match val {
// There was en error getting the version
Err(err) => {
// We didn't write anything, so just rollback
txn.cancel().await?;
// Return the error
return Err(err);
}
// We could decode the version correctly
Ok(val) => val,
}
}
// There is no version set in the storage
None => {
// Fetch any keys immediately following the version key
let rng = crate::key::version::proceeding();
let keys = catch!(txn, txn.keys(rng, 1));
// Check the storage if there are any other keys set
let val = if keys.is_empty() {
// There are no keys set in storage, so this is a new database
Version::latest()
} else {
// There were keys in storage, so this is an upgrade
Version::v1()
};
// Convert the version to binary
let bytes: Vec<u8> = val.into();
// Attempt to set the current version in storage
catch!(txn, txn.set(key, bytes));
// We set the version, so commit the transaction
catch!(txn, txn.commit());
// Return the current version
val
}
};
// Check we are running the latest version
if !val.is_latest() {
return Err(Error::OutdatedStorageVersion);
}
// Everything ok
Ok(())
Ok(val)
}
/// Setup the initial cluster access credentials
@ -546,6 +593,17 @@ impl Datastore {
}
}
// Initialise the cluster and run bootstrap utilities
#[instrument(err, level = "trace", target = "surrealdb::core::kvs::ds", skip_all)]
pub async fn bootstrap(&self) -> Result<(), Error> {
// Insert this node in the cluster
self.insert_node(self.id).await?;
// Mark expired nodes as archived
self.expire_nodes().await?;
// Everything ok
Ok(())
}
// tick is called periodically to perform maintenance tasks.
// This is called every TICK_INTERVAL.
#[instrument(err, level = "trace", target = "surrealdb::core::kvs::ds", skip(self))]

View file

@ -24,6 +24,7 @@ mod scanner;
mod stash;
mod tr;
mod tx;
mod version;
mod fdb;
mod indxdb;

58
core/src/kvs/version.rs Normal file
View file

@ -0,0 +1,58 @@
use crate::err::Error;
#[derive(Copy, Debug, Clone)]
pub struct Version(u16);
impl From<u16> for Version {
fn from(version: u16) -> Self {
Version(version)
}
}
impl From<Option<u16>> for Version {
fn from(v: Option<u16>) -> Self {
v.unwrap_or(0).into()
}
}
impl From<Version> for u16 {
fn from(v: Version) -> Self {
v.0
}
}
impl From<Version> for Vec<u8> {
fn from(v: Version) -> Self {
v.0.to_be_bytes().to_vec()
}
}
impl TryFrom<Vec<u8>> for Version {
type Error = Error;
fn try_from(v: Vec<u8>) -> Result<Self, Self::Error> {
let bin = v.try_into().map_err(|_| Error::InvalidStorageVersion)?;
let val = u16::from_be_bytes(bin).into();
Ok(val)
}
}
impl Version {
/// The latest version
pub const LATEST: u16 = 2;
/// The latest version
pub fn latest() -> Self {
Self(2)
}
/// SurrealDB version 1
pub fn v1() -> Self {
Self(1)
}
/// SurrealDB version 2
pub fn v2() -> Self {
Self(2)
}
/// Check if we are running the latest version
pub fn is_latest(&self) -> bool {
self.0 == Self::LATEST
}
}

View file

@ -199,9 +199,9 @@ impl Command {
} => {
let table = match what {
Some(w) => {
let mut tmp = CoreTable::default();
tmp.0 = w.clone();
CoreValue::from(tmp)
let mut table = CoreTable::default();
table.0.clone_from(&w);
CoreValue::from(table)
}
None => CoreValue::None,
};

View file

@ -62,10 +62,10 @@ where
}
Resource::RecordId(record) => {
let record = record.into_inner();
table.0 = record.tb.clone();
table.0.clone_from(&record.tb);
stmt.what = table.into();
let mut ident = Ident::default();
ident.0 = ID.to_owned();
ID.clone_into(&mut ident.0);
let mut idiom = Idiom::default();
idiom.0 = vec![Part::from(ident)];
let mut cond = Cond::default();
@ -81,7 +81,7 @@ where
Resource::Edge(_) => return Err(Error::LiveOnEdges.into()),
Resource::Range(range) => {
let range = range.into_inner();
table.0 = range.tb.clone();
table.0.clone_from(&range.tb);
stmt.what = table.into();
stmt.cond = range.to_cond();
}

View file

@ -254,6 +254,7 @@ impl Test {
})
}
#[allow(dead_code)]
pub async fn new(sql: &str) -> Result<Self, Error> {
Self::with_ds(new_ds().await?, sql).await
}
@ -261,6 +262,7 @@ impl Test {
/// Simulates restarting the Datastore
/// - Data are persistent (including memory store)
/// - Flushing caches (jwks, IndexStore, ...)
#[allow(dead_code)]
pub async fn restart(self, sql: &str) -> Result<Self, Error> {
Self::with_ds(self.ds.restart(), sql).await
}

View file

@ -239,6 +239,8 @@ pub async fn init(
.with_auth_enabled(!unauthenticated)
.with_temporary_directory(temporary_directory)
.with_capabilities(capabilities);
// Ensure the storage version is up-to-date to prevent corruption
dbs.check_version().await?;
// Setup initial server auth credentials
if let (Some(user), Some(pass)) = (opt.user.as_ref(), opt.pass.as_ref()) {
dbs.initialise_credentials(user, pass).await?;