surreal fix
command (#4605)
Co-authored-by: Tobie Morgan Hitchcock <tobie@surrealdb.com>
This commit is contained in:
parent
050f7c577b
commit
876e55918f
14 changed files with 618 additions and 157 deletions
|
@ -208,52 +208,23 @@ script = """
|
|||
|
||||
[tasks.start-tikv]
|
||||
category = "CI - SERVICES"
|
||||
env = { SURREAL_LINK = "https://github.com/surrealdb/surrealdb/releases/download/v1.2.0/surreal-v1.2.0.linux-amd64.tgz" }
|
||||
script = """
|
||||
#!/bin/bash -ex
|
||||
|
||||
if [ ! -f "/tmp/test_surreal" ]; then
|
||||
echo "Downloading surrealdb for startup test"
|
||||
curl -L $SURREAL_LINK | tar -xzO > /tmp/test_surreal
|
||||
chmod +x /tmp/test_surreal
|
||||
fi
|
||||
|
||||
${HOME}/.tiup/bin/tiup install pd tikv playground
|
||||
|
||||
playground_attempts=0
|
||||
while [[ $playground_attempts -lt 5 ]]; do
|
||||
nohup ${HOME}/.tiup/bin/tiup playground --mode tikv-slim --kv 1 --pd 1 --db 0 --ticdc 0 --tiflash 0 --without-monitor > /tmp/tiup.log &
|
||||
|
||||
echo $! > /tmp/tiup.pid
|
||||
|
||||
set +e
|
||||
echo "Waiting for tiup playground to be ready..."
|
||||
tries=0
|
||||
echo "Waiting for tiup playground to be ready..."
|
||||
while [[ $tries -lt 10 ]]; do
|
||||
if ! ${HOME}/.tiup/bin/tiup playground display >/dev/null; then
|
||||
tries=$((tries + 1));
|
||||
sleep 5;
|
||||
continue
|
||||
fi
|
||||
if echo "create __tikv_test_thing" | /tmp/test_surreal sql --hide-welcome --endpoint tikv://127.0.0.1:2379 > /dev/null; then
|
||||
echo "TIKV started correctly";
|
||||
exit 0;
|
||||
fi
|
||||
sleep 5;
|
||||
tries=$((tries + 1))
|
||||
done
|
||||
set -e
|
||||
|
||||
echo "ERROR: TiUP Playground is unhealthy! Try again..."
|
||||
${HOME}/.tiup/bin/tiup clean --all
|
||||
|
||||
|
||||
playground_attempts=$((playground_attempts + 1))
|
||||
done
|
||||
|
||||
echo "PANIC: Couldn't start tiup playground! Here are the logs for the last attempt:"
|
||||
cat /tmp/tiup.log
|
||||
|
||||
exit 1
|
||||
"""
|
||||
|
||||
|
|
|
@ -510,6 +510,18 @@ impl Datastore {
|
|||
// Initialise the cluster and run bootstrap utilities
|
||||
#[instrument(err, level = "trace", target = "surrealdb::core::kvs::ds", skip_all)]
|
||||
pub async fn check_version(&self) -> Result<Version, Error> {
|
||||
let version = self.get_version().await?;
|
||||
// Check we are running the latest version
|
||||
if !version.is_latest() {
|
||||
return Err(Error::OutdatedStorageVersion);
|
||||
}
|
||||
// Everything ok
|
||||
Ok(version)
|
||||
}
|
||||
|
||||
// Initialise the cluster and run bootstrap utilities
|
||||
#[instrument(err, level = "trace", target = "surrealdb::core::kvs::ds", skip_all)]
|
||||
pub async fn get_version(&self) -> Result<Version, Error> {
|
||||
// Start a new writeable transaction
|
||||
let txn = self.transaction(Write, Pessimistic).await?.enclose();
|
||||
// Create the key where the version is stored
|
||||
|
@ -530,7 +542,12 @@ impl Datastore {
|
|||
return Err(err);
|
||||
}
|
||||
// We could decode the version correctly
|
||||
Ok(val) => val,
|
||||
Ok(val) => {
|
||||
// We didn't write anything, so just rollback
|
||||
txn.cancel().await?;
|
||||
// Return the current version
|
||||
val
|
||||
}
|
||||
}
|
||||
}
|
||||
// There is no version set in the storage
|
||||
|
@ -556,10 +573,6 @@ impl Datastore {
|
|||
val
|
||||
}
|
||||
};
|
||||
// Check we are running the latest version
|
||||
if !val.is_latest() {
|
||||
return Err(Error::OutdatedStorageVersion);
|
||||
}
|
||||
// Everything ok
|
||||
Ok(val)
|
||||
}
|
||||
|
|
|
@ -1,58 +0,0 @@
|
|||
use crate::err::Error;
|
||||
|
||||
#[derive(Copy, Debug, Clone)]
|
||||
pub struct Version(u16);
|
||||
|
||||
impl From<u16> for Version {
|
||||
fn from(version: u16) -> Self {
|
||||
Version(version)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<Option<u16>> for Version {
|
||||
fn from(v: Option<u16>) -> Self {
|
||||
v.unwrap_or(0).into()
|
||||
}
|
||||
}
|
||||
|
||||
impl From<Version> for u16 {
|
||||
fn from(v: Version) -> Self {
|
||||
v.0
|
||||
}
|
||||
}
|
||||
|
||||
impl From<Version> for Vec<u8> {
|
||||
fn from(v: Version) -> Self {
|
||||
v.0.to_be_bytes().to_vec()
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<Vec<u8>> for Version {
|
||||
type Error = Error;
|
||||
fn try_from(v: Vec<u8>) -> Result<Self, Self::Error> {
|
||||
let bin = v.try_into().map_err(|_| Error::InvalidStorageVersion)?;
|
||||
let val = u16::from_be_bytes(bin).into();
|
||||
Ok(val)
|
||||
}
|
||||
}
|
||||
|
||||
impl Version {
|
||||
/// The latest version
|
||||
pub const LATEST: u16 = 2;
|
||||
/// The latest version
|
||||
pub fn latest() -> Self {
|
||||
Self(2)
|
||||
}
|
||||
/// SurrealDB version 1
|
||||
pub fn v1() -> Self {
|
||||
Self(1)
|
||||
}
|
||||
/// SurrealDB version 2
|
||||
pub fn v2() -> Self {
|
||||
Self(2)
|
||||
}
|
||||
/// Check if we are running the latest version
|
||||
pub fn is_latest(&self) -> bool {
|
||||
self.0 == Self::LATEST
|
||||
}
|
||||
}
|
5
core/src/kvs/version/fixes/mod.rs
Normal file
5
core/src/kvs/version/fixes/mod.rs
Normal file
|
@ -0,0 +1,5 @@
|
|||
mod v1_to_2_id_uuid;
|
||||
mod v1_to_2_migrate_to_access;
|
||||
|
||||
pub use v1_to_2_id_uuid::v1_to_2_id_uuid;
|
||||
pub use v1_to_2_migrate_to_access::v1_to_2_migrate_to_access;
|
69
core/src/kvs/version/fixes/v1_to_2_id_uuid.rs
Normal file
69
core/src/kvs/version/fixes/v1_to_2_id_uuid.rs
Normal file
|
@ -0,0 +1,69 @@
|
|||
use std::sync::Arc;
|
||||
|
||||
use crate::{err::Error, kvs::Transaction};
|
||||
|
||||
pub async fn v1_to_2_id_uuid(tx: Arc<Transaction>) -> Result<(), Error> {
|
||||
for ns in tx.all_ns().await?.iter() {
|
||||
let ns = ns.name.as_str();
|
||||
for db in tx.all_db(ns).await?.iter() {
|
||||
let db = db.name.as_str();
|
||||
for tb in tx.all_tb(ns, db).await?.iter() {
|
||||
let tb = tb.name.as_str();
|
||||
// mutable beg, as we update it each iteration to the last record id + a null byte
|
||||
let mut beg = crate::key::thing::prefix(ns, db, tb);
|
||||
let end = crate::key::thing::suffix(ns, db, tb);
|
||||
// queue of record ids to fix
|
||||
let mut queue: Vec<Vec<u8>> = Vec::new();
|
||||
|
||||
// Explanation for these numbers:
|
||||
//
|
||||
// Before the Id enum: /*{NS}\0*{DB}\0*{TB}\0*
|
||||
// We are counting: ^^ ^ ^ ^ ^ ^ ^
|
||||
//
|
||||
// Looking at the first four bytes for Id::Array (revision 1), we find: [0, 0, 0, 2]
|
||||
// First 3 bytes can be discarded, that 2 is the enum entry which we need to fix.
|
||||
// This totals to 11 bytes, plus the lengths of the bytes for namespace + database + tablename.
|
||||
//
|
||||
// For revision 2 of the Id enum, we added Uuid in index 2 (after number and string)
|
||||
// This means that any entry which was previously 2 or higher, now needs to be 3 or higher.
|
||||
// Resulting in a threshold of 2 (as a u8), used down below.
|
||||
//
|
||||
let pos = 11 + ns.as_bytes().len() + db.as_bytes().len() + tb.as_bytes().len();
|
||||
let threshold = 2_u8;
|
||||
|
||||
'scan: loop {
|
||||
let keys = tx.keys(beg.clone()..end.clone(), 1000).await?;
|
||||
if keys.is_empty() {
|
||||
break 'scan;
|
||||
}
|
||||
|
||||
// We suffix the last id with a null byte, to prevent scanning it twice (which would result in an infinite loop)
|
||||
beg.clone_from(keys.last().unwrap());
|
||||
beg.extend_from_slice(&[b'\0']);
|
||||
|
||||
for key in keys.iter() {
|
||||
// Check if the id is affected
|
||||
if key.get(pos).is_some_and(|b| b >= &threshold) {
|
||||
// This ID needs fixing, add to queue
|
||||
queue.push(key.to_owned());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for key in queue.iter() {
|
||||
// Bump the enum entry by 1
|
||||
let mut fixed = key.clone();
|
||||
// This is safe, because we previously obtained the byte from the original id
|
||||
unsafe { *fixed.get_unchecked_mut(pos) += 1 };
|
||||
// Get the value for the old key. We can unwrap the option, as we know that the key exists in the KV store
|
||||
let val = tx.get(key.clone().to_owned(), None).await?.unwrap();
|
||||
// Delete the old key
|
||||
tx.del(key.to_owned()).await?;
|
||||
// Set the fixed key
|
||||
tx.set(fixed, val, None).await?;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
265
core/src/kvs/version/fixes/v1_to_2_migrate_to_access.rs
Normal file
265
core/src/kvs/version/fixes/v1_to_2_migrate_to_access.rs
Normal file
|
@ -0,0 +1,265 @@
|
|||
use std::sync::Arc;
|
||||
|
||||
use crate::{
|
||||
err::Error,
|
||||
kvs::Transaction,
|
||||
sql::{
|
||||
access_type::{JwtAccessVerify, JwtAccessVerifyKey},
|
||||
statements::{
|
||||
define::{DefineScopeStatement, DefineTokenStatement},
|
||||
DefineAccessStatement,
|
||||
},
|
||||
AccessType, Ident,
|
||||
},
|
||||
};
|
||||
|
||||
pub async fn v1_to_2_migrate_to_access(tx: Arc<Transaction>) -> Result<(), Error> {
|
||||
for ns in tx.all_ns().await?.iter() {
|
||||
let ns = ns.name.as_str();
|
||||
migrate_ns_tokens(tx.clone(), ns).await?;
|
||||
|
||||
for db in tx.all_db(ns).await?.iter() {
|
||||
let db = db.name.as_str();
|
||||
migrate_db_tokens(tx.clone(), ns, db).await?;
|
||||
|
||||
let scope_keys = collect_db_scope_keys(tx.clone(), ns, db).await?;
|
||||
for key in scope_keys.iter() {
|
||||
let ac = migrate_db_scope_key(tx.clone(), ns, db, key.to_owned()).await?;
|
||||
migrate_sc_tokens(tx.clone(), ns, db, ac).await?;
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn migrate_ns_tokens(tx: Arc<Transaction>, ns: &str) -> Result<(), Error> {
|
||||
// Find all tokens on the namespace level
|
||||
let mut beg = crate::key::namespace::all::new(ns).encode()?;
|
||||
beg.extend_from_slice(&[b'!', b't', b'k', 0x00]);
|
||||
let mut end = crate::key::namespace::all::new(ns).encode()?;
|
||||
end.extend_from_slice(&[b'!', b't', b'k', 0xff]);
|
||||
|
||||
// queue of tokens to migrate
|
||||
let mut queue: Vec<Vec<u8>> = Vec::new();
|
||||
|
||||
// Scan the token definitions
|
||||
'scan: loop {
|
||||
let mut keys = tx.keys(beg.clone()..end.clone(), 1000).await?;
|
||||
if keys.is_empty() {
|
||||
break 'scan;
|
||||
}
|
||||
|
||||
// We suffix the last id with a null byte, to prevent scanning it twice (which would result in an infinite loop)
|
||||
beg.clone_from(keys.last().unwrap());
|
||||
beg.extend_from_slice(&[b'\0']);
|
||||
|
||||
// Assign to queue
|
||||
queue.append(&mut keys);
|
||||
}
|
||||
|
||||
// Migrate the tokens to accesses
|
||||
for key in queue.iter() {
|
||||
// Get the value for the old key. We can unwrap the option, as we know that the key exists in the KV store
|
||||
let tk: DefineTokenStatement = tx.get(key.clone().to_owned(), None).await?.unwrap().into();
|
||||
// Convert into access
|
||||
let ac: DefineAccessStatement = tk.into();
|
||||
|
||||
// Delete the old key
|
||||
tx.del(key.to_owned()).await?;
|
||||
|
||||
// Extract the name
|
||||
let name = ac.name.clone();
|
||||
// Construct the new key
|
||||
let key = crate::key::namespace::ac::new(ns, &name);
|
||||
// Set the fixed key
|
||||
tx.set(key, ac, None).await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn migrate_db_tokens(tx: Arc<Transaction>, ns: &str, db: &str) -> Result<(), Error> {
|
||||
// Find all tokens on the namespace level
|
||||
let mut beg = crate::key::database::all::new(ns, db).encode()?;
|
||||
beg.extend_from_slice(&[b'!', b't', b'k', 0x00]);
|
||||
let mut end = crate::key::database::all::new(ns, db).encode()?;
|
||||
end.extend_from_slice(&[b'!', b't', b'k', 0xff]);
|
||||
|
||||
// queue of tokens to migrate
|
||||
let mut queue: Vec<Vec<u8>> = Vec::new();
|
||||
|
||||
// Scan the token definitions
|
||||
'scan: loop {
|
||||
let mut keys = tx.keys(beg.clone()..end.clone(), 1000).await?;
|
||||
if keys.is_empty() {
|
||||
break 'scan;
|
||||
}
|
||||
|
||||
// We suffix the last id with a null byte, to prevent scanning it twice (which would result in an infinite loop)
|
||||
beg.clone_from(keys.last().unwrap());
|
||||
beg.extend_from_slice(&[b'\0']);
|
||||
|
||||
// Assign to queue
|
||||
queue.append(&mut keys);
|
||||
}
|
||||
|
||||
// Migrate the tokens to accesses
|
||||
for key in queue.iter() {
|
||||
// Get the value for the old key. We can unwrap the option, as we know that the key exists in the KV store
|
||||
let tk: DefineTokenStatement = tx.get(key.clone().to_owned(), None).await?.unwrap().into();
|
||||
// Convert into access
|
||||
let ac: DefineAccessStatement = tk.into();
|
||||
|
||||
// Delete the old key
|
||||
tx.del(key.to_owned()).await?;
|
||||
|
||||
// Extract the name
|
||||
let name = ac.name.clone();
|
||||
// Construct the new key
|
||||
let key = crate::key::database::ac::new(ns, db, &name);
|
||||
// Set the fixed key
|
||||
tx.set(key, ac, None).await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn collect_db_scope_keys(
|
||||
tx: Arc<Transaction>,
|
||||
ns: &str,
|
||||
db: &str,
|
||||
) -> Result<Vec<Vec<u8>>, Error> {
|
||||
// Find all tokens on the namespace level
|
||||
let mut beg = crate::key::database::all::new(ns, db).encode()?;
|
||||
beg.extend_from_slice(&[b'!', b's', b'c', 0x00]);
|
||||
let mut end = crate::key::database::all::new(ns, db).encode()?;
|
||||
end.extend_from_slice(&[b'!', b's', b'c', 0xff]);
|
||||
|
||||
// queue of tokens to migrate
|
||||
let mut queue: Vec<Vec<u8>> = Vec::new();
|
||||
|
||||
// Scan the token definitions
|
||||
'scan: loop {
|
||||
let mut keys = tx.keys(beg.clone()..end.clone(), 1000).await?;
|
||||
if keys.is_empty() {
|
||||
break 'scan;
|
||||
}
|
||||
|
||||
// We suffix the last id with a null byte, to prevent scanning it twice (which would result in an infinite loop)
|
||||
beg.clone_from(keys.last().unwrap());
|
||||
beg.extend_from_slice(&[b'\0']);
|
||||
|
||||
// Assign to queue
|
||||
queue.append(&mut keys);
|
||||
}
|
||||
|
||||
Ok(queue)
|
||||
}
|
||||
|
||||
async fn migrate_db_scope_key(
|
||||
tx: Arc<Transaction>,
|
||||
ns: &str,
|
||||
db: &str,
|
||||
key: Vec<u8>,
|
||||
) -> Result<DefineAccessStatement, Error> {
|
||||
// Get the value for the old key. We can unwrap the option, as we know that the key exists in the KV store
|
||||
let sc: DefineScopeStatement = tx.get(key.clone().to_owned(), None).await?.unwrap().into();
|
||||
// Convert into access
|
||||
let ac: DefineAccessStatement = sc.into();
|
||||
|
||||
// Delete the old key
|
||||
tx.del(key.to_owned()).await?;
|
||||
|
||||
// Extract the name
|
||||
let name = ac.name.clone();
|
||||
// Construct the new key
|
||||
let key = crate::key::database::ac::new(ns, db, &name);
|
||||
// Set the fixed key
|
||||
tx.set(key, ac.clone(), None).await?;
|
||||
|
||||
Ok(ac)
|
||||
}
|
||||
|
||||
async fn migrate_sc_tokens(
|
||||
tx: Arc<Transaction>,
|
||||
ns: &str,
|
||||
db: &str,
|
||||
ac: DefineAccessStatement,
|
||||
) -> Result<(), Error> {
|
||||
let name = ac.name.clone();
|
||||
// Find all tokens on the namespace level
|
||||
// 0xb1 = ±
|
||||
// Inserting the string manually does not add a null byte at the end of the string.
|
||||
// Hence, in the third `extend_from_slice`, we add the null byte manually, followed by the token key prefix
|
||||
let mut beg = crate::key::database::all::new(ns, db).encode()?;
|
||||
beg.extend_from_slice(&[0xb1]);
|
||||
beg.extend_from_slice(name.as_bytes());
|
||||
beg.extend_from_slice(&[0x00, b'!', b't', b'k', 0x00]);
|
||||
let mut end = crate::key::database::all::new(ns, db).encode()?;
|
||||
end.extend_from_slice(&[0xb1]);
|
||||
end.extend_from_slice(name.as_bytes());
|
||||
end.extend_from_slice(&[0x00, b'!', b't', b'k', 0xff]);
|
||||
|
||||
// queue of tokens to migrate
|
||||
let mut queue: Vec<Vec<u8>> = Vec::new();
|
||||
|
||||
// Scan the token definitions
|
||||
'scan: loop {
|
||||
let mut keys = tx.keys(beg.clone()..end.clone(), 1000).await?;
|
||||
if keys.is_empty() {
|
||||
break 'scan;
|
||||
}
|
||||
|
||||
// We suffix the last id with a null byte, to prevent scanning it twice (which would result in an infinite loop)
|
||||
beg.clone_from(keys.last().unwrap());
|
||||
beg.extend_from_slice(&[b'\0']);
|
||||
|
||||
// Assign to queue
|
||||
queue.append(&mut keys);
|
||||
}
|
||||
|
||||
println!("\n==================");
|
||||
println!("NS: `{ns}`, DB: `{db}`, SC: `{}`", ac.name);
|
||||
println!("Can not automatically merge scope tokens scope into the new scope-converted access method.");
|
||||
println!("Logging the merged access definitions individually, with their names joined together like `scope_token`.");
|
||||
println!("The old tokens will be removed from the datastore, but no fixes will be applied. They need manual resolution.");
|
||||
println!("==================\n");
|
||||
|
||||
// Log example merged accesses
|
||||
for key in queue.iter() {
|
||||
// Get the value for the old key. We can unwrap the option, as we know that the key exists in the KV store
|
||||
let tk: DefineTokenStatement = tx.get(key.clone().to_owned(), None).await?.unwrap().into();
|
||||
|
||||
// Delete the old key
|
||||
tx.del(key.to_owned()).await?;
|
||||
|
||||
// Merge the access and token definitions
|
||||
let mut merged = merge_ac_and_tk(ac.clone(), tk.clone());
|
||||
merged.name = Ident(format!("{}_{}", ac.name.0, tk.name.0));
|
||||
println!("{merged:#}\n");
|
||||
}
|
||||
|
||||
println!("==================\n");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn merge_ac_and_tk(ac: DefineAccessStatement, tk: DefineTokenStatement) -> DefineAccessStatement {
|
||||
let mut ac = ac;
|
||||
ac.kind = match ac.kind {
|
||||
AccessType::Record(ak) => {
|
||||
let mut ak = ak;
|
||||
ak.jwt.verify = JwtAccessVerify::Key(JwtAccessVerifyKey {
|
||||
alg: tk.kind,
|
||||
key: tk.code,
|
||||
});
|
||||
AccessType::Record(ak)
|
||||
}
|
||||
|
||||
// We can not reach this code, because the code which invokes this
|
||||
// method only passes record accesses, which we previously constructed
|
||||
// based on old scope definitions.
|
||||
_ => unreachable!("Unexpected access kind"),
|
||||
};
|
||||
ac
|
||||
}
|
109
core/src/kvs/version/mod.rs
Normal file
109
core/src/kvs/version/mod.rs
Normal file
|
@ -0,0 +1,109 @@
|
|||
use super::{Datastore, LockType, TransactionType};
|
||||
use crate::err::Error;
|
||||
use std::sync::Arc;
|
||||
|
||||
mod fixes;
|
||||
|
||||
#[derive(Copy, Debug, Clone)]
|
||||
pub struct Version(u16);
|
||||
|
||||
impl From<u16> for Version {
|
||||
fn from(version: u16) -> Self {
|
||||
Version(version)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<Option<u16>> for Version {
|
||||
fn from(v: Option<u16>) -> Self {
|
||||
v.unwrap_or(0).into()
|
||||
}
|
||||
}
|
||||
|
||||
impl From<Version> for u16 {
|
||||
fn from(v: Version) -> Self {
|
||||
v.0
|
||||
}
|
||||
}
|
||||
|
||||
impl From<Version> for Vec<u8> {
|
||||
fn from(v: Version) -> Self {
|
||||
v.0.to_be_bytes().to_vec()
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<Vec<u8>> for Version {
|
||||
type Error = Error;
|
||||
fn try_from(v: Vec<u8>) -> Result<Self, Self::Error> {
|
||||
let bin = v.try_into().map_err(|_| Error::InvalidStorageVersion)?;
|
||||
let val = u16::from_be_bytes(bin).into();
|
||||
Ok(val)
|
||||
}
|
||||
}
|
||||
|
||||
impl Version {
|
||||
/// The latest version
|
||||
pub const LATEST: u16 = 2;
|
||||
/// The latest version
|
||||
pub fn latest() -> Self {
|
||||
Self(2)
|
||||
}
|
||||
/// SurrealDB version 1
|
||||
pub fn v1() -> Self {
|
||||
Self(1)
|
||||
}
|
||||
/// SurrealDB version 2
|
||||
pub fn v2() -> Self {
|
||||
Self(2)
|
||||
}
|
||||
/// Check if we are running the latest version
|
||||
pub fn is_latest(&self) -> bool {
|
||||
self.0 == Self::LATEST
|
||||
}
|
||||
/// Fix
|
||||
pub async fn fix(&self, ds: Arc<Datastore>) -> Result<(), Error> {
|
||||
// We iterate through each version from the current to the latest
|
||||
// and apply the fixes for each version. We update storage version
|
||||
// and commit changes each iteration, to keep transactions as small
|
||||
// as possible.
|
||||
//
|
||||
// We commit all fixes and the storage version update in one transaction,
|
||||
// because we never want to leave storage in a broken state where half of
|
||||
// the fixes are applied and the storage version is not updated.
|
||||
//
|
||||
for v in self.0..Version::LATEST {
|
||||
// Create a new transaction
|
||||
let tx = Arc::new(ds.transaction(TransactionType::Write, LockType::Pessimistic).await?);
|
||||
|
||||
// Easy shortcut to apply a fix
|
||||
macro_rules! apply_fix {
|
||||
($name:ident) => {{
|
||||
match fixes::$name(tx.clone()).await {
|
||||
// Fail early and cancel transaction if the fix failed
|
||||
Err(e) => {
|
||||
tx.cancel().await?;
|
||||
return Err(e);
|
||||
}
|
||||
_ => {}
|
||||
};
|
||||
}};
|
||||
}
|
||||
|
||||
// Apply fixes based on the current version
|
||||
if v == 1 {
|
||||
apply_fix!(v1_to_2_id_uuid);
|
||||
apply_fix!(v1_to_2_migrate_to_access);
|
||||
}
|
||||
|
||||
// Obtain storage version key and value
|
||||
let key = crate::key::version::new();
|
||||
let val: Vec<u8> = Version::from(v + 1).into();
|
||||
// Attempt to set the current version in storage
|
||||
tx.set(key, val, None).await?;
|
||||
|
||||
// Commit the transaction
|
||||
tx.commit().await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
|
@ -1,4 +1,9 @@
|
|||
use crate::sql::{Duration, Ident, Strand, Value};
|
||||
use crate::sql::{
|
||||
access::AccessDuration,
|
||||
access_type::{JwtAccessIssue, JwtAccessVerify, JwtAccessVerifyKey},
|
||||
statements::DefineAccessStatement,
|
||||
AccessType, Algorithm, Base, Duration, Ident, JwtAccess, RecordAccess, Strand, Value,
|
||||
};
|
||||
use derive::Store;
|
||||
use revision::revisioned;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
@ -17,3 +22,35 @@ pub struct DefineScopeStatement {
|
|||
#[revision(start = 2)]
|
||||
pub if_not_exists: bool,
|
||||
}
|
||||
|
||||
impl From<DefineScopeStatement> for DefineAccessStatement {
|
||||
fn from(sc: DefineScopeStatement) -> DefineAccessStatement {
|
||||
DefineAccessStatement {
|
||||
name: sc.name,
|
||||
base: Base::Db,
|
||||
comment: sc.comment,
|
||||
if_not_exists: sc.if_not_exists,
|
||||
kind: AccessType::Record(RecordAccess {
|
||||
signup: sc.signup,
|
||||
signin: sc.signin,
|
||||
jwt: JwtAccess {
|
||||
issue: Some(JwtAccessIssue {
|
||||
alg: Algorithm::Hs512,
|
||||
key: sc.code.clone(),
|
||||
}),
|
||||
verify: JwtAccessVerify::Key(JwtAccessVerifyKey {
|
||||
alg: Algorithm::Hs512,
|
||||
key: sc.code,
|
||||
}),
|
||||
},
|
||||
}),
|
||||
// unused fields
|
||||
authenticate: None,
|
||||
duration: AccessDuration {
|
||||
session: sc.session,
|
||||
..AccessDuration::default()
|
||||
},
|
||||
overwrite: false,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,4 +1,9 @@
|
|||
use crate::sql::{Algorithm, Base, Ident, Strand};
|
||||
use crate::sql::{
|
||||
access::AccessDuration,
|
||||
access_type::{JwtAccessVerify, JwtAccessVerifyKey},
|
||||
statements::DefineAccessStatement,
|
||||
AccessType, Algorithm, Base, Ident, JwtAccess, Strand,
|
||||
};
|
||||
use derive::Store;
|
||||
use revision::revisioned;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
@ -16,3 +21,25 @@ pub struct DefineTokenStatement {
|
|||
#[revision(start = 2)]
|
||||
pub if_not_exists: bool,
|
||||
}
|
||||
|
||||
impl From<DefineTokenStatement> for DefineAccessStatement {
|
||||
fn from(tk: DefineTokenStatement) -> DefineAccessStatement {
|
||||
DefineAccessStatement {
|
||||
name: tk.name,
|
||||
base: tk.base,
|
||||
comment: tk.comment,
|
||||
if_not_exists: tk.if_not_exists,
|
||||
kind: AccessType::Jwt(JwtAccess {
|
||||
issue: None,
|
||||
verify: JwtAccessVerify::Key(JwtAccessVerifyKey {
|
||||
alg: tk.kind,
|
||||
key: tk.code,
|
||||
}),
|
||||
}),
|
||||
// unused fields
|
||||
authenticate: None,
|
||||
duration: AccessDuration::default(),
|
||||
overwrite: false,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -25,17 +25,16 @@ pub use param::DefineParamStatement;
|
|||
pub use table::DefineTableStatement;
|
||||
pub use user::DefineUserStatement;
|
||||
|
||||
use deprecated::scope::DefineScopeStatement;
|
||||
use deprecated::token::DefineTokenStatement;
|
||||
#[doc(hidden)]
|
||||
pub use deprecated::scope::DefineScopeStatement;
|
||||
#[doc(hidden)]
|
||||
pub use deprecated::token::DefineTokenStatement;
|
||||
|
||||
use crate::ctx::Context;
|
||||
use crate::dbs::Options;
|
||||
use crate::doc::CursorDoc;
|
||||
use crate::err::Error;
|
||||
use crate::sql::access::AccessDuration;
|
||||
use crate::sql::access_type::{JwtAccessIssue, JwtAccessVerify, JwtAccessVerifyKey};
|
||||
use crate::sql::value::Value;
|
||||
use crate::sql::{Algorithm, Base, JwtAccess, RecordAccess};
|
||||
use crate::{ctx::Context, sql::AccessType};
|
||||
use derive::Store;
|
||||
use reblessive::tree::Stk;
|
||||
use revision::revisioned;
|
||||
|
@ -80,56 +79,14 @@ impl DefineStatement {
|
|||
fields: DefineTokenStatementFields,
|
||||
_revision: u16,
|
||||
) -> Result<Self, revision::Error> {
|
||||
Ok(DefineStatement::Access(DefineAccessStatement {
|
||||
name: fields.0.name,
|
||||
base: fields.0.base,
|
||||
comment: fields.0.comment,
|
||||
if_not_exists: fields.0.if_not_exists,
|
||||
kind: AccessType::Jwt(JwtAccess {
|
||||
issue: None,
|
||||
verify: JwtAccessVerify::Key(JwtAccessVerifyKey {
|
||||
alg: fields.0.kind,
|
||||
key: fields.0.code,
|
||||
}),
|
||||
}),
|
||||
// unused fields
|
||||
authenticate: None,
|
||||
duration: AccessDuration::default(),
|
||||
overwrite: false,
|
||||
}))
|
||||
Ok(DefineStatement::Access(fields.0.into()))
|
||||
}
|
||||
|
||||
fn convert_scope_to_access(
|
||||
fields: DefineScopeStatementFields,
|
||||
_revision: u16,
|
||||
) -> Result<Self, revision::Error> {
|
||||
Ok(DefineStatement::Access(DefineAccessStatement {
|
||||
name: fields.0.name,
|
||||
base: Base::Db,
|
||||
comment: fields.0.comment,
|
||||
if_not_exists: fields.0.if_not_exists,
|
||||
kind: AccessType::Record(RecordAccess {
|
||||
signup: fields.0.signup,
|
||||
signin: fields.0.signin,
|
||||
jwt: JwtAccess {
|
||||
issue: Some(JwtAccessIssue {
|
||||
alg: Algorithm::Hs512,
|
||||
key: fields.0.code.clone(),
|
||||
}),
|
||||
verify: JwtAccessVerify::Key(JwtAccessVerifyKey {
|
||||
alg: Algorithm::Hs512,
|
||||
key: fields.0.code,
|
||||
}),
|
||||
},
|
||||
}),
|
||||
// unused fields
|
||||
authenticate: None,
|
||||
duration: AccessDuration {
|
||||
session: fields.0.session,
|
||||
..AccessDuration::default()
|
||||
},
|
||||
overwrite: false,
|
||||
}))
|
||||
Ok(DefineStatement::Access(fields.0.into()))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -73,6 +73,10 @@ pub(crate) async fn run_router(
|
|||
|
||||
let kvs = match Datastore::new(endpoint).await {
|
||||
Ok(kvs) => {
|
||||
if let Err(error) = kvs.check_version().await {
|
||||
let _ = conn_tx.send(Err(error.into())).await;
|
||||
return;
|
||||
};
|
||||
if let Err(error) = kvs.bootstrap().await {
|
||||
let _ = conn_tx.send(Err(error.into())).await;
|
||||
return;
|
||||
|
|
43
src/cli/fix.rs
Normal file
43
src/cli/fix.rs
Normal file
|
@ -0,0 +1,43 @@
|
|||
use crate::cli::validator::parser::env_filter::CustomEnvFilter;
|
||||
use crate::cli::validator::parser::env_filter::CustomEnvFilterParser;
|
||||
use crate::dbs;
|
||||
use crate::err::Error;
|
||||
use clap::Args;
|
||||
use surrealdb::engine::any::IntoEndpoint;
|
||||
|
||||
#[derive(Args, Debug)]
|
||||
pub struct FixCommandArguments {
|
||||
#[arg(help = "Database path used for storing data")]
|
||||
#[arg(env = "SURREAL_PATH", index = 1)]
|
||||
#[arg(default_value = "memory")]
|
||||
#[arg(value_parser = super::validator::path_valid)]
|
||||
path: String,
|
||||
#[arg(help = "The logging level for the database server")]
|
||||
#[arg(env = "SURREAL_LOG", short = 'l', long = "log")]
|
||||
#[arg(default_value = "info")]
|
||||
#[arg(value_parser = CustomEnvFilterParser::new())]
|
||||
log: CustomEnvFilter,
|
||||
}
|
||||
|
||||
pub async fn init(
|
||||
FixCommandArguments {
|
||||
path,
|
||||
log,
|
||||
}: FixCommandArguments,
|
||||
) -> Result<(), Error> {
|
||||
// Initialize opentelemetry and logging
|
||||
crate::telemetry::builder().with_filter(log).init();
|
||||
// Start metrics subsystem
|
||||
crate::telemetry::metrics::init().expect("failed to initialize metrics");
|
||||
// Clean the path
|
||||
let endpoint = path.into_endpoint()?;
|
||||
let path = if endpoint.path.is_empty() {
|
||||
endpoint.url.to_string()
|
||||
} else {
|
||||
endpoint.path
|
||||
};
|
||||
// Fix the datastore, if applicable
|
||||
dbs::fix(path).await?;
|
||||
// All ok
|
||||
Ok(())
|
||||
}
|
|
@ -1,6 +1,7 @@
|
|||
pub(crate) mod abstraction;
|
||||
mod config;
|
||||
mod export;
|
||||
mod fix;
|
||||
mod import;
|
||||
mod isready;
|
||||
mod ml;
|
||||
|
@ -22,6 +23,7 @@ use crate::env::RELEASE;
|
|||
use clap::{Parser, Subcommand};
|
||||
pub use config::CF;
|
||||
use export::ExportCommandArguments;
|
||||
use fix::FixCommandArguments;
|
||||
use import::ImportCommandArguments;
|
||||
use isready::IsReadyCommandArguments;
|
||||
use ml::MlCommand;
|
||||
|
@ -89,6 +91,8 @@ enum Commands {
|
|||
IsReady(IsReadyCommandArguments),
|
||||
#[command(about = "Validate SurrealQL query files")]
|
||||
Validate(ValidateCommandArguments),
|
||||
#[command(about = "Fix database storage issues")]
|
||||
Fix(FixCommandArguments),
|
||||
}
|
||||
|
||||
pub async fn init() -> ExitCode {
|
||||
|
@ -132,6 +136,7 @@ pub async fn init() -> ExitCode {
|
|||
Commands::Ml(args) => ml::init(args).await,
|
||||
Commands::IsReady(args) => isready::init(args).await,
|
||||
Commands::Validate(args) => validate::init(args).await,
|
||||
Commands::Fix(args) => fix::init(args).await,
|
||||
};
|
||||
// Save the flamegraph and profile
|
||||
#[cfg(feature = "performance-profiler")]
|
||||
|
|
|
@ -2,6 +2,7 @@ use crate::cli::CF;
|
|||
use crate::err::Error;
|
||||
use clap::Args;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use surrealdb::dbs::capabilities::{Capabilities, FuncTarget, NetTarget, Targets};
|
||||
use surrealdb::kvs::Datastore;
|
||||
|
@ -251,6 +252,19 @@ pub async fn init(
|
|||
Ok(dbs)
|
||||
}
|
||||
|
||||
pub async fn fix(path: String) -> Result<(), Error> {
|
||||
// Parse and setup the desired kv datastore
|
||||
let dbs = Arc::new(Datastore::new(&path).await?);
|
||||
// Ensure the storage version is up-to-date to prevent corruption
|
||||
let version = dbs.get_version().await?;
|
||||
// Apply fixes
|
||||
version.fix(dbs).await?;
|
||||
// Log success
|
||||
println!("Database storage version was updated successfully. Please carefully read back logs to see if any manual changes need to be applied");
|
||||
// All ok
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::str::FromStr;
|
||||
|
|
Loading…
Reference in a new issue