diff --git a/Makefile.ci.toml b/Makefile.ci.toml index bddd2b37..971cecd6 100644 --- a/Makefile.ci.toml +++ b/Makefile.ci.toml @@ -208,52 +208,23 @@ script = """ [tasks.start-tikv] category = "CI - SERVICES" -env = { SURREAL_LINK = "https://github.com/surrealdb/surrealdb/releases/download/v1.2.0/surreal-v1.2.0.linux-amd64.tgz" } script = """ #!/bin/bash -ex - - if [ ! -f "/tmp/test_surreal" ]; then - echo "Downloading surrealdb for startup test" - curl -L $SURREAL_LINK | tar -xzO > /tmp/test_surreal - chmod +x /tmp/test_surreal - fi - ${HOME}/.tiup/bin/tiup install pd tikv playground - - playground_attempts=0 - while [[ $playground_attempts -lt 5 ]]; do - nohup ${HOME}/.tiup/bin/tiup playground --mode tikv-slim --kv 1 --pd 1 --db 0 --ticdc 0 --tiflash 0 --without-monitor > /tmp/tiup.log & - - echo $! > /tmp/tiup.pid - - set +e - echo "Waiting for tiup playground to be ready..." - tries=0 - while [[ $tries -lt 10 ]]; do - if ! ${HOME}/.tiup/bin/tiup playground display >/dev/null; then - tries=$((tries + 1)); - sleep 5; - continue - fi - if echo "create __tikv_test_thing" | /tmp/test_surreal sql --hide-welcome --endpoint tikv://127.0.0.1:2379 > /dev/null; then - echo "TIKV started correctly"; - exit 0; - fi - sleep 5; - tries=$((tries + 1)) - done - set -e - - echo "ERROR: TiUP Playground is unhealthy! Try again..." - ${HOME}/.tiup/bin/tiup clean --all - - - playground_attempts=$((playground_attempts + 1)) - done - - echo "PANIC: Couldn't start tiup playground! Here are the logs for the last attempt:" + nohup ${HOME}/.tiup/bin/tiup playground --mode tikv-slim --kv 1 --pd 1 --db 0 --ticdc 0 --tiflash 0 --without-monitor > /tmp/tiup.log & + set +e + tries=0 + echo "Waiting for tiup playground to be ready..." + while [[ $tries -lt 10 ]]; do + if ! ${HOME}/.tiup/bin/tiup playground display >/dev/null; then + tries=$((tries + 1)); + sleep 5; + continue + fi + exit 0; + done + echo "PANIC: Couldn't start tiup playground! Here are the logs for the last attempt:" cat /tmp/tiup.log - exit 1 """ diff --git a/core/src/kvs/ds.rs b/core/src/kvs/ds.rs index ba06d195..3ddba18d 100644 --- a/core/src/kvs/ds.rs +++ b/core/src/kvs/ds.rs @@ -510,6 +510,18 @@ impl Datastore { // Initialise the cluster and run bootstrap utilities #[instrument(err, level = "trace", target = "surrealdb::core::kvs::ds", skip_all)] pub async fn check_version(&self) -> Result { + let version = self.get_version().await?; + // Check we are running the latest version + if !version.is_latest() { + return Err(Error::OutdatedStorageVersion); + } + // Everything ok + Ok(version) + } + + // Initialise the cluster and run bootstrap utilities + #[instrument(err, level = "trace", target = "surrealdb::core::kvs::ds", skip_all)] + pub async fn get_version(&self) -> Result { // Start a new writeable transaction let txn = self.transaction(Write, Pessimistic).await?.enclose(); // Create the key where the version is stored @@ -530,7 +542,12 @@ impl Datastore { return Err(err); } // We could decode the version correctly - Ok(val) => val, + Ok(val) => { + // We didn't write anything, so just rollback + txn.cancel().await?; + // Return the current version + val + } } } // There is no version set in the storage @@ -556,10 +573,6 @@ impl Datastore { val } }; - // Check we are running the latest version - if !val.is_latest() { - return Err(Error::OutdatedStorageVersion); - } // Everything ok Ok(val) } diff --git a/core/src/kvs/version.rs b/core/src/kvs/version.rs deleted file mode 100644 index d4982be2..00000000 --- a/core/src/kvs/version.rs +++ /dev/null @@ -1,58 +0,0 @@ -use crate::err::Error; - -#[derive(Copy, Debug, Clone)] -pub struct Version(u16); - -impl From for Version { - fn from(version: u16) -> Self { - Version(version) - } -} - -impl From> for Version { - fn from(v: Option) -> Self { - v.unwrap_or(0).into() - } -} - -impl From for u16 { - fn from(v: Version) -> Self { - v.0 - } -} - -impl From for Vec { - fn from(v: Version) -> Self { - v.0.to_be_bytes().to_vec() - } -} - -impl TryFrom> for Version { - type Error = Error; - fn try_from(v: Vec) -> Result { - let bin = v.try_into().map_err(|_| Error::InvalidStorageVersion)?; - let val = u16::from_be_bytes(bin).into(); - Ok(val) - } -} - -impl Version { - /// The latest version - pub const LATEST: u16 = 2; - /// The latest version - pub fn latest() -> Self { - Self(2) - } - /// SurrealDB version 1 - pub fn v1() -> Self { - Self(1) - } - /// SurrealDB version 2 - pub fn v2() -> Self { - Self(2) - } - /// Check if we are running the latest version - pub fn is_latest(&self) -> bool { - self.0 == Self::LATEST - } -} diff --git a/core/src/kvs/version/fixes/mod.rs b/core/src/kvs/version/fixes/mod.rs new file mode 100644 index 00000000..6578b6b8 --- /dev/null +++ b/core/src/kvs/version/fixes/mod.rs @@ -0,0 +1,5 @@ +mod v1_to_2_id_uuid; +mod v1_to_2_migrate_to_access; + +pub use v1_to_2_id_uuid::v1_to_2_id_uuid; +pub use v1_to_2_migrate_to_access::v1_to_2_migrate_to_access; diff --git a/core/src/kvs/version/fixes/v1_to_2_id_uuid.rs b/core/src/kvs/version/fixes/v1_to_2_id_uuid.rs new file mode 100644 index 00000000..a70210fa --- /dev/null +++ b/core/src/kvs/version/fixes/v1_to_2_id_uuid.rs @@ -0,0 +1,69 @@ +use std::sync::Arc; + +use crate::{err::Error, kvs::Transaction}; + +pub async fn v1_to_2_id_uuid(tx: Arc) -> Result<(), Error> { + for ns in tx.all_ns().await?.iter() { + let ns = ns.name.as_str(); + for db in tx.all_db(ns).await?.iter() { + let db = db.name.as_str(); + for tb in tx.all_tb(ns, db).await?.iter() { + let tb = tb.name.as_str(); + // mutable beg, as we update it each iteration to the last record id + a null byte + let mut beg = crate::key::thing::prefix(ns, db, tb); + let end = crate::key::thing::suffix(ns, db, tb); + // queue of record ids to fix + let mut queue: Vec> = Vec::new(); + + // Explanation for these numbers: + // + // Before the Id enum: /*{NS}\0*{DB}\0*{TB}\0* + // We are counting: ^^ ^ ^ ^ ^ ^ ^ + // + // Looking at the first four bytes for Id::Array (revision 1), we find: [0, 0, 0, 2] + // First 3 bytes can be discarded, that 2 is the enum entry which we need to fix. + // This totals to 11 bytes, plus the lengths of the bytes for namespace + database + tablename. + // + // For revision 2 of the Id enum, we added Uuid in index 2 (after number and string) + // This means that any entry which was previously 2 or higher, now needs to be 3 or higher. + // Resulting in a threshold of 2 (as a u8), used down below. + // + let pos = 11 + ns.as_bytes().len() + db.as_bytes().len() + tb.as_bytes().len(); + let threshold = 2_u8; + + 'scan: loop { + let keys = tx.keys(beg.clone()..end.clone(), 1000).await?; + if keys.is_empty() { + break 'scan; + } + + // We suffix the last id with a null byte, to prevent scanning it twice (which would result in an infinite loop) + beg.clone_from(keys.last().unwrap()); + beg.extend_from_slice(&[b'\0']); + + for key in keys.iter() { + // Check if the id is affected + if key.get(pos).is_some_and(|b| b >= &threshold) { + // This ID needs fixing, add to queue + queue.push(key.to_owned()); + } + } + } + + for key in queue.iter() { + // Bump the enum entry by 1 + let mut fixed = key.clone(); + // This is safe, because we previously obtained the byte from the original id + unsafe { *fixed.get_unchecked_mut(pos) += 1 }; + // Get the value for the old key. We can unwrap the option, as we know that the key exists in the KV store + let val = tx.get(key.clone().to_owned(), None).await?.unwrap(); + // Delete the old key + tx.del(key.to_owned()).await?; + // Set the fixed key + tx.set(fixed, val, None).await?; + } + } + } + } + Ok(()) +} diff --git a/core/src/kvs/version/fixes/v1_to_2_migrate_to_access.rs b/core/src/kvs/version/fixes/v1_to_2_migrate_to_access.rs new file mode 100644 index 00000000..b8b6d694 --- /dev/null +++ b/core/src/kvs/version/fixes/v1_to_2_migrate_to_access.rs @@ -0,0 +1,265 @@ +use std::sync::Arc; + +use crate::{ + err::Error, + kvs::Transaction, + sql::{ + access_type::{JwtAccessVerify, JwtAccessVerifyKey}, + statements::{ + define::{DefineScopeStatement, DefineTokenStatement}, + DefineAccessStatement, + }, + AccessType, Ident, + }, +}; + +pub async fn v1_to_2_migrate_to_access(tx: Arc) -> Result<(), Error> { + for ns in tx.all_ns().await?.iter() { + let ns = ns.name.as_str(); + migrate_ns_tokens(tx.clone(), ns).await?; + + for db in tx.all_db(ns).await?.iter() { + let db = db.name.as_str(); + migrate_db_tokens(tx.clone(), ns, db).await?; + + let scope_keys = collect_db_scope_keys(tx.clone(), ns, db).await?; + for key in scope_keys.iter() { + let ac = migrate_db_scope_key(tx.clone(), ns, db, key.to_owned()).await?; + migrate_sc_tokens(tx.clone(), ns, db, ac).await?; + } + } + } + Ok(()) +} + +async fn migrate_ns_tokens(tx: Arc, ns: &str) -> Result<(), Error> { + // Find all tokens on the namespace level + let mut beg = crate::key::namespace::all::new(ns).encode()?; + beg.extend_from_slice(&[b'!', b't', b'k', 0x00]); + let mut end = crate::key::namespace::all::new(ns).encode()?; + end.extend_from_slice(&[b'!', b't', b'k', 0xff]); + + // queue of tokens to migrate + let mut queue: Vec> = Vec::new(); + + // Scan the token definitions + 'scan: loop { + let mut keys = tx.keys(beg.clone()..end.clone(), 1000).await?; + if keys.is_empty() { + break 'scan; + } + + // We suffix the last id with a null byte, to prevent scanning it twice (which would result in an infinite loop) + beg.clone_from(keys.last().unwrap()); + beg.extend_from_slice(&[b'\0']); + + // Assign to queue + queue.append(&mut keys); + } + + // Migrate the tokens to accesses + for key in queue.iter() { + // Get the value for the old key. We can unwrap the option, as we know that the key exists in the KV store + let tk: DefineTokenStatement = tx.get(key.clone().to_owned(), None).await?.unwrap().into(); + // Convert into access + let ac: DefineAccessStatement = tk.into(); + + // Delete the old key + tx.del(key.to_owned()).await?; + + // Extract the name + let name = ac.name.clone(); + // Construct the new key + let key = crate::key::namespace::ac::new(ns, &name); + // Set the fixed key + tx.set(key, ac, None).await?; + } + + Ok(()) +} + +async fn migrate_db_tokens(tx: Arc, ns: &str, db: &str) -> Result<(), Error> { + // Find all tokens on the namespace level + let mut beg = crate::key::database::all::new(ns, db).encode()?; + beg.extend_from_slice(&[b'!', b't', b'k', 0x00]); + let mut end = crate::key::database::all::new(ns, db).encode()?; + end.extend_from_slice(&[b'!', b't', b'k', 0xff]); + + // queue of tokens to migrate + let mut queue: Vec> = Vec::new(); + + // Scan the token definitions + 'scan: loop { + let mut keys = tx.keys(beg.clone()..end.clone(), 1000).await?; + if keys.is_empty() { + break 'scan; + } + + // We suffix the last id with a null byte, to prevent scanning it twice (which would result in an infinite loop) + beg.clone_from(keys.last().unwrap()); + beg.extend_from_slice(&[b'\0']); + + // Assign to queue + queue.append(&mut keys); + } + + // Migrate the tokens to accesses + for key in queue.iter() { + // Get the value for the old key. We can unwrap the option, as we know that the key exists in the KV store + let tk: DefineTokenStatement = tx.get(key.clone().to_owned(), None).await?.unwrap().into(); + // Convert into access + let ac: DefineAccessStatement = tk.into(); + + // Delete the old key + tx.del(key.to_owned()).await?; + + // Extract the name + let name = ac.name.clone(); + // Construct the new key + let key = crate::key::database::ac::new(ns, db, &name); + // Set the fixed key + tx.set(key, ac, None).await?; + } + + Ok(()) +} + +async fn collect_db_scope_keys( + tx: Arc, + ns: &str, + db: &str, +) -> Result>, Error> { + // Find all tokens on the namespace level + let mut beg = crate::key::database::all::new(ns, db).encode()?; + beg.extend_from_slice(&[b'!', b's', b'c', 0x00]); + let mut end = crate::key::database::all::new(ns, db).encode()?; + end.extend_from_slice(&[b'!', b's', b'c', 0xff]); + + // queue of tokens to migrate + let mut queue: Vec> = Vec::new(); + + // Scan the token definitions + 'scan: loop { + let mut keys = tx.keys(beg.clone()..end.clone(), 1000).await?; + if keys.is_empty() { + break 'scan; + } + + // We suffix the last id with a null byte, to prevent scanning it twice (which would result in an infinite loop) + beg.clone_from(keys.last().unwrap()); + beg.extend_from_slice(&[b'\0']); + + // Assign to queue + queue.append(&mut keys); + } + + Ok(queue) +} + +async fn migrate_db_scope_key( + tx: Arc, + ns: &str, + db: &str, + key: Vec, +) -> Result { + // Get the value for the old key. We can unwrap the option, as we know that the key exists in the KV store + let sc: DefineScopeStatement = tx.get(key.clone().to_owned(), None).await?.unwrap().into(); + // Convert into access + let ac: DefineAccessStatement = sc.into(); + + // Delete the old key + tx.del(key.to_owned()).await?; + + // Extract the name + let name = ac.name.clone(); + // Construct the new key + let key = crate::key::database::ac::new(ns, db, &name); + // Set the fixed key + tx.set(key, ac.clone(), None).await?; + + Ok(ac) +} + +async fn migrate_sc_tokens( + tx: Arc, + ns: &str, + db: &str, + ac: DefineAccessStatement, +) -> Result<(), Error> { + let name = ac.name.clone(); + // Find all tokens on the namespace level + // 0xb1 = ± + // Inserting the string manually does not add a null byte at the end of the string. + // Hence, in the third `extend_from_slice`, we add the null byte manually, followed by the token key prefix + let mut beg = crate::key::database::all::new(ns, db).encode()?; + beg.extend_from_slice(&[0xb1]); + beg.extend_from_slice(name.as_bytes()); + beg.extend_from_slice(&[0x00, b'!', b't', b'k', 0x00]); + let mut end = crate::key::database::all::new(ns, db).encode()?; + end.extend_from_slice(&[0xb1]); + end.extend_from_slice(name.as_bytes()); + end.extend_from_slice(&[0x00, b'!', b't', b'k', 0xff]); + + // queue of tokens to migrate + let mut queue: Vec> = Vec::new(); + + // Scan the token definitions + 'scan: loop { + let mut keys = tx.keys(beg.clone()..end.clone(), 1000).await?; + if keys.is_empty() { + break 'scan; + } + + // We suffix the last id with a null byte, to prevent scanning it twice (which would result in an infinite loop) + beg.clone_from(keys.last().unwrap()); + beg.extend_from_slice(&[b'\0']); + + // Assign to queue + queue.append(&mut keys); + } + + println!("\n=================="); + println!("NS: `{ns}`, DB: `{db}`, SC: `{}`", ac.name); + println!("Can not automatically merge scope tokens scope into the new scope-converted access method."); + println!("Logging the merged access definitions individually, with their names joined together like `scope_token`."); + println!("The old tokens will be removed from the datastore, but no fixes will be applied. They need manual resolution."); + println!("==================\n"); + + // Log example merged accesses + for key in queue.iter() { + // Get the value for the old key. We can unwrap the option, as we know that the key exists in the KV store + let tk: DefineTokenStatement = tx.get(key.clone().to_owned(), None).await?.unwrap().into(); + + // Delete the old key + tx.del(key.to_owned()).await?; + + // Merge the access and token definitions + let mut merged = merge_ac_and_tk(ac.clone(), tk.clone()); + merged.name = Ident(format!("{}_{}", ac.name.0, tk.name.0)); + println!("{merged:#}\n"); + } + + println!("==================\n"); + + Ok(()) +} + +fn merge_ac_and_tk(ac: DefineAccessStatement, tk: DefineTokenStatement) -> DefineAccessStatement { + let mut ac = ac; + ac.kind = match ac.kind { + AccessType::Record(ak) => { + let mut ak = ak; + ak.jwt.verify = JwtAccessVerify::Key(JwtAccessVerifyKey { + alg: tk.kind, + key: tk.code, + }); + AccessType::Record(ak) + } + + // We can not reach this code, because the code which invokes this + // method only passes record accesses, which we previously constructed + // based on old scope definitions. + _ => unreachable!("Unexpected access kind"), + }; + ac +} diff --git a/core/src/kvs/version/mod.rs b/core/src/kvs/version/mod.rs new file mode 100644 index 00000000..26cdda1e --- /dev/null +++ b/core/src/kvs/version/mod.rs @@ -0,0 +1,109 @@ +use super::{Datastore, LockType, TransactionType}; +use crate::err::Error; +use std::sync::Arc; + +mod fixes; + +#[derive(Copy, Debug, Clone)] +pub struct Version(u16); + +impl From for Version { + fn from(version: u16) -> Self { + Version(version) + } +} + +impl From> for Version { + fn from(v: Option) -> Self { + v.unwrap_or(0).into() + } +} + +impl From for u16 { + fn from(v: Version) -> Self { + v.0 + } +} + +impl From for Vec { + fn from(v: Version) -> Self { + v.0.to_be_bytes().to_vec() + } +} + +impl TryFrom> for Version { + type Error = Error; + fn try_from(v: Vec) -> Result { + let bin = v.try_into().map_err(|_| Error::InvalidStorageVersion)?; + let val = u16::from_be_bytes(bin).into(); + Ok(val) + } +} + +impl Version { + /// The latest version + pub const LATEST: u16 = 2; + /// The latest version + pub fn latest() -> Self { + Self(2) + } + /// SurrealDB version 1 + pub fn v1() -> Self { + Self(1) + } + /// SurrealDB version 2 + pub fn v2() -> Self { + Self(2) + } + /// Check if we are running the latest version + pub fn is_latest(&self) -> bool { + self.0 == Self::LATEST + } + /// Fix + pub async fn fix(&self, ds: Arc) -> Result<(), Error> { + // We iterate through each version from the current to the latest + // and apply the fixes for each version. We update storage version + // and commit changes each iteration, to keep transactions as small + // as possible. + // + // We commit all fixes and the storage version update in one transaction, + // because we never want to leave storage in a broken state where half of + // the fixes are applied and the storage version is not updated. + // + for v in self.0..Version::LATEST { + // Create a new transaction + let tx = Arc::new(ds.transaction(TransactionType::Write, LockType::Pessimistic).await?); + + // Easy shortcut to apply a fix + macro_rules! apply_fix { + ($name:ident) => {{ + match fixes::$name(tx.clone()).await { + // Fail early and cancel transaction if the fix failed + Err(e) => { + tx.cancel().await?; + return Err(e); + } + _ => {} + }; + }}; + } + + // Apply fixes based on the current version + if v == 1 { + apply_fix!(v1_to_2_id_uuid); + apply_fix!(v1_to_2_migrate_to_access); + } + + // Obtain storage version key and value + let key = crate::key::version::new(); + let val: Vec = Version::from(v + 1).into(); + // Attempt to set the current version in storage + tx.set(key, val, None).await?; + + // Commit the transaction + tx.commit().await?; + } + + Ok(()) + } +} diff --git a/core/src/sql/statements/define/deprecated/scope.rs b/core/src/sql/statements/define/deprecated/scope.rs index 10fd5c08..63284737 100644 --- a/core/src/sql/statements/define/deprecated/scope.rs +++ b/core/src/sql/statements/define/deprecated/scope.rs @@ -1,4 +1,9 @@ -use crate::sql::{Duration, Ident, Strand, Value}; +use crate::sql::{ + access::AccessDuration, + access_type::{JwtAccessIssue, JwtAccessVerify, JwtAccessVerifyKey}, + statements::DefineAccessStatement, + AccessType, Algorithm, Base, Duration, Ident, JwtAccess, RecordAccess, Strand, Value, +}; use derive::Store; use revision::revisioned; use serde::{Deserialize, Serialize}; @@ -17,3 +22,35 @@ pub struct DefineScopeStatement { #[revision(start = 2)] pub if_not_exists: bool, } + +impl From for DefineAccessStatement { + fn from(sc: DefineScopeStatement) -> DefineAccessStatement { + DefineAccessStatement { + name: sc.name, + base: Base::Db, + comment: sc.comment, + if_not_exists: sc.if_not_exists, + kind: AccessType::Record(RecordAccess { + signup: sc.signup, + signin: sc.signin, + jwt: JwtAccess { + issue: Some(JwtAccessIssue { + alg: Algorithm::Hs512, + key: sc.code.clone(), + }), + verify: JwtAccessVerify::Key(JwtAccessVerifyKey { + alg: Algorithm::Hs512, + key: sc.code, + }), + }, + }), + // unused fields + authenticate: None, + duration: AccessDuration { + session: sc.session, + ..AccessDuration::default() + }, + overwrite: false, + } + } +} diff --git a/core/src/sql/statements/define/deprecated/token.rs b/core/src/sql/statements/define/deprecated/token.rs index 59e255cc..2773fa75 100644 --- a/core/src/sql/statements/define/deprecated/token.rs +++ b/core/src/sql/statements/define/deprecated/token.rs @@ -1,4 +1,9 @@ -use crate::sql::{Algorithm, Base, Ident, Strand}; +use crate::sql::{ + access::AccessDuration, + access_type::{JwtAccessVerify, JwtAccessVerifyKey}, + statements::DefineAccessStatement, + AccessType, Algorithm, Base, Ident, JwtAccess, Strand, +}; use derive::Store; use revision::revisioned; use serde::{Deserialize, Serialize}; @@ -16,3 +21,25 @@ pub struct DefineTokenStatement { #[revision(start = 2)] pub if_not_exists: bool, } + +impl From for DefineAccessStatement { + fn from(tk: DefineTokenStatement) -> DefineAccessStatement { + DefineAccessStatement { + name: tk.name, + base: tk.base, + comment: tk.comment, + if_not_exists: tk.if_not_exists, + kind: AccessType::Jwt(JwtAccess { + issue: None, + verify: JwtAccessVerify::Key(JwtAccessVerifyKey { + alg: tk.kind, + key: tk.code, + }), + }), + // unused fields + authenticate: None, + duration: AccessDuration::default(), + overwrite: false, + } + } +} diff --git a/core/src/sql/statements/define/mod.rs b/core/src/sql/statements/define/mod.rs index 75734a30..50644b5a 100644 --- a/core/src/sql/statements/define/mod.rs +++ b/core/src/sql/statements/define/mod.rs @@ -25,17 +25,16 @@ pub use param::DefineParamStatement; pub use table::DefineTableStatement; pub use user::DefineUserStatement; -use deprecated::scope::DefineScopeStatement; -use deprecated::token::DefineTokenStatement; +#[doc(hidden)] +pub use deprecated::scope::DefineScopeStatement; +#[doc(hidden)] +pub use deprecated::token::DefineTokenStatement; +use crate::ctx::Context; use crate::dbs::Options; use crate::doc::CursorDoc; use crate::err::Error; -use crate::sql::access::AccessDuration; -use crate::sql::access_type::{JwtAccessIssue, JwtAccessVerify, JwtAccessVerifyKey}; use crate::sql::value::Value; -use crate::sql::{Algorithm, Base, JwtAccess, RecordAccess}; -use crate::{ctx::Context, sql::AccessType}; use derive::Store; use reblessive::tree::Stk; use revision::revisioned; @@ -80,56 +79,14 @@ impl DefineStatement { fields: DefineTokenStatementFields, _revision: u16, ) -> Result { - Ok(DefineStatement::Access(DefineAccessStatement { - name: fields.0.name, - base: fields.0.base, - comment: fields.0.comment, - if_not_exists: fields.0.if_not_exists, - kind: AccessType::Jwt(JwtAccess { - issue: None, - verify: JwtAccessVerify::Key(JwtAccessVerifyKey { - alg: fields.0.kind, - key: fields.0.code, - }), - }), - // unused fields - authenticate: None, - duration: AccessDuration::default(), - overwrite: false, - })) + Ok(DefineStatement::Access(fields.0.into())) } fn convert_scope_to_access( fields: DefineScopeStatementFields, _revision: u16, ) -> Result { - Ok(DefineStatement::Access(DefineAccessStatement { - name: fields.0.name, - base: Base::Db, - comment: fields.0.comment, - if_not_exists: fields.0.if_not_exists, - kind: AccessType::Record(RecordAccess { - signup: fields.0.signup, - signin: fields.0.signin, - jwt: JwtAccess { - issue: Some(JwtAccessIssue { - alg: Algorithm::Hs512, - key: fields.0.code.clone(), - }), - verify: JwtAccessVerify::Key(JwtAccessVerifyKey { - alg: Algorithm::Hs512, - key: fields.0.code, - }), - }, - }), - // unused fields - authenticate: None, - duration: AccessDuration { - session: fields.0.session, - ..AccessDuration::default() - }, - overwrite: false, - })) + Ok(DefineStatement::Access(fields.0.into())) } } diff --git a/sdk/src/api/engine/local/native.rs b/sdk/src/api/engine/local/native.rs index 5c7d4be1..b95fe90a 100644 --- a/sdk/src/api/engine/local/native.rs +++ b/sdk/src/api/engine/local/native.rs @@ -73,6 +73,10 @@ pub(crate) async fn run_router( let kvs = match Datastore::new(endpoint).await { Ok(kvs) => { + if let Err(error) = kvs.check_version().await { + let _ = conn_tx.send(Err(error.into())).await; + return; + }; if let Err(error) = kvs.bootstrap().await { let _ = conn_tx.send(Err(error.into())).await; return; diff --git a/src/cli/fix.rs b/src/cli/fix.rs new file mode 100644 index 00000000..c4cc3e73 --- /dev/null +++ b/src/cli/fix.rs @@ -0,0 +1,43 @@ +use crate::cli::validator::parser::env_filter::CustomEnvFilter; +use crate::cli::validator::parser::env_filter::CustomEnvFilterParser; +use crate::dbs; +use crate::err::Error; +use clap::Args; +use surrealdb::engine::any::IntoEndpoint; + +#[derive(Args, Debug)] +pub struct FixCommandArguments { + #[arg(help = "Database path used for storing data")] + #[arg(env = "SURREAL_PATH", index = 1)] + #[arg(default_value = "memory")] + #[arg(value_parser = super::validator::path_valid)] + path: String, + #[arg(help = "The logging level for the database server")] + #[arg(env = "SURREAL_LOG", short = 'l', long = "log")] + #[arg(default_value = "info")] + #[arg(value_parser = CustomEnvFilterParser::new())] + log: CustomEnvFilter, +} + +pub async fn init( + FixCommandArguments { + path, + log, + }: FixCommandArguments, +) -> Result<(), Error> { + // Initialize opentelemetry and logging + crate::telemetry::builder().with_filter(log).init(); + // Start metrics subsystem + crate::telemetry::metrics::init().expect("failed to initialize metrics"); + // Clean the path + let endpoint = path.into_endpoint()?; + let path = if endpoint.path.is_empty() { + endpoint.url.to_string() + } else { + endpoint.path + }; + // Fix the datastore, if applicable + dbs::fix(path).await?; + // All ok + Ok(()) +} diff --git a/src/cli/mod.rs b/src/cli/mod.rs index 4dcf652b..7af9553e 100644 --- a/src/cli/mod.rs +++ b/src/cli/mod.rs @@ -1,6 +1,7 @@ pub(crate) mod abstraction; mod config; mod export; +mod fix; mod import; mod isready; mod ml; @@ -22,6 +23,7 @@ use crate::env::RELEASE; use clap::{Parser, Subcommand}; pub use config::CF; use export::ExportCommandArguments; +use fix::FixCommandArguments; use import::ImportCommandArguments; use isready::IsReadyCommandArguments; use ml::MlCommand; @@ -89,6 +91,8 @@ enum Commands { IsReady(IsReadyCommandArguments), #[command(about = "Validate SurrealQL query files")] Validate(ValidateCommandArguments), + #[command(about = "Fix database storage issues")] + Fix(FixCommandArguments), } pub async fn init() -> ExitCode { @@ -132,6 +136,7 @@ pub async fn init() -> ExitCode { Commands::Ml(args) => ml::init(args).await, Commands::IsReady(args) => isready::init(args).await, Commands::Validate(args) => validate::init(args).await, + Commands::Fix(args) => fix::init(args).await, }; // Save the flamegraph and profile #[cfg(feature = "performance-profiler")] diff --git a/src/dbs/mod.rs b/src/dbs/mod.rs index 10bb69c6..0d216b30 100644 --- a/src/dbs/mod.rs +++ b/src/dbs/mod.rs @@ -2,6 +2,7 @@ use crate::cli::CF; use crate::err::Error; use clap::Args; use std::path::PathBuf; +use std::sync::Arc; use std::time::Duration; use surrealdb::dbs::capabilities::{Capabilities, FuncTarget, NetTarget, Targets}; use surrealdb::kvs::Datastore; @@ -251,6 +252,19 @@ pub async fn init( Ok(dbs) } +pub async fn fix(path: String) -> Result<(), Error> { + // Parse and setup the desired kv datastore + let dbs = Arc::new(Datastore::new(&path).await?); + // Ensure the storage version is up-to-date to prevent corruption + let version = dbs.get_version().await?; + // Apply fixes + version.fix(dbs).await?; + // Log success + println!("Database storage version was updated successfully. Please carefully read back logs to see if any manual changes need to be applied"); + // All ok + Ok(()) +} + #[cfg(test)] mod tests { use std::str::FromStr;