Add SurrealKV as an experimental backend store (#3451)

This commit is contained in:
Farhan 2024-02-16 02:17:10 +05:30 committed by GitHub
parent 0c93e0b6d4
commit 485a224bcd
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
26 changed files with 948 additions and 21 deletions

View file

@ -675,3 +675,36 @@ jobs:
df -h
ps auxf
cat /tmp/surrealdb.log || true
surrealkv-engine:
name: SurrealKV engine
runs-on: ubuntu-latest
steps:
- name: Install stable toolchain
uses: dtolnay/rust-toolchain@stable
with:
toolchain: 1.75.0
- name: Checkout sources
uses: actions/checkout@v4
- name: Setup cache
uses: Swatinem/rust-cache@v2
with:
save-if: ${{ github.ref == 'refs/heads/main' }}
- name: Install cargo-make
run: cargo install --debug --locked cargo-make
- name: Test surrealkv engine
run: cargo make ci-api-integration-surrealkv
- name: Debug info
if: always()
run: |
set -x
free -m
df -h
ps auxf
cat /tmp/surrealdb.log || true

82
Cargo.lock generated
View file

@ -438,6 +438,19 @@ dependencies = [
"futures-core",
]
[[package]]
name = "async-channel"
version = "2.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1ca33f4bc4ed1babef42cad36cc1f51fa88be00420404e5b1e80ab1b18f7678c"
dependencies = [
"concurrent-queue",
"event-listener 4.0.3",
"event-listener-strategy",
"futures-core",
"pin-project-lite",
]
[[package]]
name = "async-compression"
version = "0.4.6"
@ -1409,6 +1422,19 @@ version = "1.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7059fff8937831a9ae6f0fe4d658ffabf58f2ca96aa9dec1c889f936f705f216"
[[package]]
name = "crossbeam"
version = "0.8.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1137cd7e7fc0fb5d3c5a8678be38ec56e819125d8d7907411fe24ccb943faca8"
dependencies = [
"crossbeam-channel",
"crossbeam-deque",
"crossbeam-epoch",
"crossbeam-queue",
"crossbeam-utils",
]
[[package]]
name = "crossbeam-channel"
version = "0.5.11"
@ -1437,6 +1463,15 @@ dependencies = [
"crossbeam-utils",
]
[[package]]
name = "crossbeam-queue"
version = "0.3.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "df0346b5d5e76ac2fe4e327c5fd1118d6be7c51dfb18f9b7922923f287471e35"
dependencies = [
"crossbeam-utils",
]
[[package]]
name = "crossbeam-utils"
version = "0.8.19"
@ -2458,7 +2493,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6e9b187a72d63adbfba487f48095306ac823049cb504ee195541e91c7775f5ad"
dependencies = [
"anyhow",
"async-channel",
"async-channel 1.9.0",
"base64 0.13.1",
"futures-lite 1.13.0",
"http 0.2.11",
@ -3018,6 +3053,15 @@ version = "0.4.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f"
[[package]]
name = "lru"
version = "0.12.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "db2c024b41519440580066ba82aab04092b333e09066a5eb86c7c4890df31f22"
dependencies = [
"hashbrown 0.14.3",
]
[[package]]
name = "lz4-sys"
version = "1.9.4"
@ -5311,7 +5355,7 @@ dependencies = [
name = "surrealdb"
version = "1.3.0"
dependencies = [
"async-channel",
"async-channel 1.9.0",
"bincode",
"chrono",
"criterion",
@ -5367,7 +5411,7 @@ dependencies = [
"any_ascii",
"arbitrary",
"argon2",
"async-channel",
"async-channel 1.9.0",
"async-executor",
"async-recursion 1.0.5",
"base64 0.21.7",
@ -5429,6 +5473,7 @@ dependencies = [
"surrealdb-derive",
"surrealdb-jsonwebtoken",
"surrealdb-tikv-client",
"surrealkv",
"surrealml-core 0.0.7",
"surrealml-core 0.0.8",
"temp-dir",
@ -5506,6 +5551,28 @@ dependencies = [
"tonic 0.9.2",
]
[[package]]
name = "surrealkv"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "efb36f58183b82fe47f0d2a3d2565ec3f6192ff9638482a8084a91f9f06b409d"
dependencies = [
"async-channel 2.1.1",
"bytes",
"chrono",
"crc32fast",
"crossbeam",
"crossbeam-channel",
"futures",
"hashbrown 0.14.3",
"lru",
"parking_lot",
"quick_cache",
"sha2",
"tokio",
"vart",
]
[[package]]
name = "surrealml-core"
version = "0.0.7"
@ -6369,6 +6436,15 @@ version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d"
[[package]]
name = "vart"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e273ebe2c2eaad64b95aaeccbae6b95d0ba235564b062a13b8ef134c5cae9f6a"
dependencies = [
"hashbrown 0.14.3",
]
[[package]]
name = "vcpkg"
version = "0.2.15"

View file

@ -14,6 +14,7 @@ storage-rocksdb = ["surrealdb/kv-rocksdb"]
storage-speedb = ["surrealdb/kv-speedb"]
storage-tikv = ["surrealdb/kv-tikv"]
storage-fdb = ["surrealdb/kv-fdb-7_1"]
storage-surrealkv = ["surrealdb/kv-surrealkv"]
scripting = ["surrealdb/scripting"]
http = ["surrealdb/http"]
http-compression = []

View file

@ -161,6 +161,12 @@ run_task = { name = ["test-kvs", "test-api-integration"], fork = true, parallel
category = "CI - INTEGRATION TESTS"
run_task = { name = ["start-tikv", "ci-api-integration-tikv-tests", "stop-tikv"], fork = true }
[tasks.ci-api-integration-surrealkv]
category = "CI - INTEGRATION TESTS"
env = { _TEST_API_ENGINE = "surrealkv", _TEST_FEATURES = "kv-surrealkv", RUSTFLAGS = "--cfg surrealdb_unstable" }
run_task = { name = ["test-kvs", "test-api-integration"], fork = true, parallel = false }
#
# Services
#
@ -279,7 +285,7 @@ BENCH_WORKER_THREADS = { value = "1", condition = { env_not_set = ["BENCH_WORKER
BENCH_NUM_OPS = { value = "1000", condition = { env_not_set = ["BENCH_NUM_OPS"] } }
BENCH_DURATION = { value = "30", condition = { env_not_set = ["BENCH_DURATION"] } }
BENCH_SAMPLE_SIZE = { value = "10", condition = { env_not_set = ["BENCH_SAMPLE_SIZE"] } }
BENCH_FEATURES = { value = "protocol-ws,kv-mem,kv-rocksdb,kv-fdb-7_1", condition = { env_not_set = ["BENCH_FEATURES"] } }
BENCH_FEATURES = { value = "protocol-ws,kv-mem,kv-rocksdb,kv-fdb-7_1,kv-surrealkv", condition = { env_not_set = ["BENCH_FEATURES"] } }
RUSTFLAGS = "--cfg surrealdb_unstable"
[tasks.bench-target]
@ -323,3 +329,8 @@ run_task = { name = ["bench-target"] }
category = "CI - BENCHMARK - SurrealDB Target"
env = { BENCH_DATASTORE_TARGET = "sdk-ws" }
run_task = { name = ["bench-target"] }
[tasks.bench-lib-surrealkv]
category = "CI - BENCHMARK - SurrealDB Target"
env = { BENCH_DATASTORE_TARGET = "lib-surrealkv" }
run_task = { name = ["bench-target"] }

View file

@ -510,6 +510,9 @@ allow_unsafe = true
[pkg.futures-util]
allow_unsafe = true
allow_apis = [
"net",
]
[pkg.indexmap]
allow_unsafe = true
@ -1160,6 +1163,7 @@ allow_apis = [
allow_unsafe = true
allow_apis = [
"net",
"fs",
]
[pkg.assert_fs]
@ -1279,3 +1283,23 @@ allow_unsafe = true
[pkg.pprof]
allow_unsafe = true
[pkg.crossbeam-queue]
allow_unsafe = true
[pkg.lru]
allow_unsafe = true
allow_apis = [
"net",
]
[pkg.surrealkv]
allow_apis = [
"fs",
"net",
]
[pkg.vart]
allow_apis = [
"net",
]

View file

@ -37,6 +37,7 @@ kv-fdb-6_2 = ["foundationdb/fdb-6_2", "kv-fdb"]
kv-fdb-6_3 = ["foundationdb/fdb-6_3", "kv-fdb"]
kv-fdb-7_0 = ["foundationdb/fdb-7_0", "kv-fdb"]
kv-fdb-7_1 = ["foundationdb/fdb-7_1", "kv-fdb"]
kv-surrealkv = ["dep:surrealkv", "tokio/time"]
scripting = ["dep:js"]
http = ["dep:reqwest"]
ml = ["dep:surrealml-core1", "dep:ndarray"]
@ -138,6 +139,7 @@ speedb = { version = "0.0.4", features = ["lz4", "snappy"], optional = true }
storekey = "0.5.0"
surrealml-core1 = { version = "0.0.7", optional = true, package = "surrealml-core" }
surrealml-core2 = { version = "0.0.8", optional = true, package = "surrealml-core" }
surrealkv = { version = "0.1.1", optional = true }
thiserror = "1.0.50"
tikv = { version = "0.2.0-surreal.2", default-features = false, package = "surrealdb-tikv-client", optional = true }
tracing = "0.1.40"

View file

@ -859,6 +859,13 @@ impl From<rocksdb::Error> for Error {
}
}
#[cfg(feature = "kv-surrealkv")]
impl From<surrealkv::Error> for Error {
fn from(e: surrealkv::Error) -> Error {
Error::Tx(e.to_string())
}
}
impl From<channel::RecvError> for Error {
fn from(e: channel::RecvError) -> Error {
Error::Channel(e.to_string())

View file

@ -102,6 +102,8 @@ pub(super) enum Inner {
TiKV(super::tikv::Datastore),
#[cfg(feature = "kv-fdb")]
FoundationDB(super::fdb::Datastore),
#[cfg(feature = "kv-surrealkv")]
SurrealKV(super::surrealkv::Datastore),
}
impl fmt::Display for Datastore {
@ -120,6 +122,8 @@ impl fmt::Display for Datastore {
Inner::TiKV(_) => write!(f, "tikv"),
#[cfg(feature = "kv-fdb")]
Inner::FoundationDB(_) => write!(f, "fdb"),
#[cfg(feature = "kv-surrealkv")]
Inner::SurrealKV(_) => write!(f, "surrealkv"),
#[allow(unreachable_patterns)]
_ => unreachable!(),
}
@ -192,7 +196,8 @@ impl Datastore {
feature = "kv-speedb",
feature = "kv-indxdb",
feature = "kv-tikv",
feature = "kv-fdb"
feature = "kv-fdb",
feature = "kv-surrealkv"
)))]
let _ = (clock_override, default_clock);
@ -306,6 +311,22 @@ impl Datastore {
}
#[cfg(not(feature = "kv-fdb"))]
return Err(Error::Ds("Cannot connect to the `foundationdb` storage engine as it is not enabled in this build of SurrealDB".to_owned()));
}
// Parse and initiate a SurrealKV database
s if s.starts_with("surrealkv:") => {
#[cfg(feature = "kv-surrealkv")]
{
info!("Starting kvs store at {}", path);
let s = s.trim_start_matches("surrealkv://");
let s = s.trim_start_matches("surrealkv:");
let v = super::surrealkv::Datastore::new(s).await.map(Inner::SurrealKV);
info!("Started to kvs store at {}", path);
let default_clock = Arc::new(SizedClock::System(SystemClock::new()));
let clock = clock_override.unwrap_or(default_clock);
Ok((v, clock))
}
#[cfg(not(feature = "kv-surrealkv"))]
return Err(Error::Ds("Cannot connect to the `surrealkv` storage engine as it is not enabled in this build of SurrealDB".to_owned()));
}
// The datastore path is not valid
_ => {
@ -1033,6 +1054,11 @@ impl Datastore {
let tx = v.transaction(write, lock).await?;
super::tx::Inner::FoundationDB(tx)
}
#[cfg(feature = "kv-surrealkv")]
Inner::SurrealKV(v) => {
let tx = v.transaction(write, lock).await?;
super::tx::Inner::SurrealKV(tx)
}
#[allow(unreachable_patterns)]
_ => unreachable!(),
};

View file

@ -20,6 +20,7 @@ mod kv;
mod mem;
mod rocksdb;
mod speedb;
mod surrealkv;
mod tikv;
mod tx;
@ -32,7 +33,8 @@ pub(crate) mod lq_structs;
feature = "kv-speedb",
feature = "kv-indxdb",
feature = "kv-tikv",
feature = "kv-fdb"
feature = "kv-fdb",
feature = "kv-surrealkv"
))]
mod tests;

View file

@ -0,0 +1,396 @@
#![cfg(feature = "kv-surrealkv")]
use crate::err::Error;
use crate::key::error::KeyCategory;
use crate::kvs::Check;
use crate::kvs::Key;
use crate::kvs::Val;
use crate::vs::{try_to_u64_be, u64_to_versionstamp, Versionstamp};
use std::ops::Range;
use surrealkv::Options;
use surrealkv::Store;
use surrealkv::Transaction as Tx;
pub struct Datastore {
db: Store,
}
pub struct Transaction {
/// Is the transaction complete?
done: bool,
/// Is the transaction writeable?
write: bool,
/// Should we check unhandled transactions?
check: Check,
/// The underlying datastore transaction
inner: Tx,
}
impl Drop for Transaction {
fn drop(&mut self) {
if !self.done && self.write {
// Check if already panicking
if std::thread::panicking() {
return;
}
// Handle the behaviour
match self.check {
Check::None => {
trace!("A transaction was dropped without being committed or cancelled");
}
Check::Warn => {
warn!("A transaction was dropped without being committed or cancelled");
}
Check::Panic => {
#[cfg(debug_assertions)]
{
let backtrace = std::backtrace::Backtrace::force_capture();
if let std::backtrace::BacktraceStatus::Captured = backtrace.status() {
println!("{}", backtrace);
}
}
panic!("A transaction was dropped without being committed or cancelled");
}
}
}
}
}
impl Datastore {
/// Open a new database
pub(crate) async fn new(path: &str) -> Result<Datastore, Error> {
let mut opts = Options::new();
opts.dir = path.to_string().into();
match Store::new(opts) {
Ok(db) => Ok(Datastore {
db,
}),
Err(e) => Err(Error::Ds(e.to_string())),
}
}
/// Start a new transaction
pub(crate) async fn transaction(&self, write: bool, _: bool) -> Result<Transaction, Error> {
// Specify the check level
#[cfg(not(debug_assertions))]
let check = Check::Warn;
#[cfg(debug_assertions)]
let check = Check::Panic;
// Create a new transaction
match self.db.begin() {
Ok(inner) => Ok(Transaction {
done: false,
check,
write,
inner,
}),
Err(e) => Err(Error::Tx(e.to_string())),
}
}
}
impl Transaction {
/// Sets the behavior of the transaction if it's not closed.
pub(crate) fn set_check_level(&mut self, check: Check) {
self.check = check;
}
/// Checks if the transaction is closed.
pub(crate) fn is_closed(&self) -> bool {
self.done
}
/// Cancels the transaction.
pub(crate) async fn cancel(&mut self) -> Result<(), Error> {
// If the transaction is already closed, return an error.
if self.is_closed() {
return Err(Error::TxFinished);
}
// Mark the transaction as done.
self.done = true;
// Rollback the transaction.
self.inner.rollback();
Ok(())
}
/// Commits the transaction.
pub(crate) async fn commit(&mut self) -> Result<(), Error> {
// If the transaction is already closed or is read-only, return an error.
if self.is_closed() {
return Err(Error::TxFinished);
} else if !self.write {
return Err(Error::TxReadonly);
}
// Mark the transaction as done.
self.done = true;
// Commit the transaction.
self.inner.commit().await.map_err(|e| Error::from(e))
}
/// Checks if a key exists in the database.
pub(crate) async fn exists<K>(&mut self, key: K) -> Result<bool, Error>
where
K: Into<Key>,
{
// If the transaction is already closed, return an error.
if self.is_closed() {
return Err(Error::TxFinished);
}
// Check if the key exists in the database.
self.inner
.get(key.into().as_slice())
.map(|opt| opt.is_some())
.map_err(|e| Error::Tx(format!("Unable to get kv from SurrealKV: {}", e)))
}
/// Fetches a value from the database by key.
pub(crate) async fn get<K>(&mut self, key: K) -> Result<Option<Val>, Error>
where
K: Into<Key>,
{
// If the transaction is already closed, return an error.
if self.is_closed() {
return Err(Error::TxFinished);
}
// Fetch the value from the database.
let res = self.inner.get(key.into().as_slice())?;
Ok(res)
}
/// Obtains a new change timestamp for a key.
/// This timestamp is replaced with the current timestamp when the transaction is committed.
/// This method should be called when composing the change feed entries for this transaction,
/// which should be done immediately before the transaction commit.
/// This is to minimize the delay or conflict of other transactions.
#[allow(unused)]
pub(crate) async fn get_timestamp<K>(&mut self, key: K) -> Result<Versionstamp, Error>
where
K: Into<Key>,
{
// If the transaction is already closed, return an error.
if self.is_closed() {
return Err(Error::TxFinished);
}
// Convert the key into a vector.
let key_vec = key.into();
let k = key_vec.as_slice();
// Get the previous value of the key.
let prev = self.inner.get(k)?;
// Calculate the new version.
let ver = match prev {
Some(prev) => {
let slice = prev.as_slice();
let res: Result<[u8; 10], Error> = match slice.try_into() {
Ok(ba) => Ok(ba),
Err(e) => Err(Error::Ds(e.to_string())),
};
let array = res?;
let prev: u64 = try_to_u64_be(array)?;
prev + 1
}
None => 1,
};
// Convert the version to a versionstamp.
let verbytes = u64_to_versionstamp(ver);
// Set the new versionstamp.
self.inner.set(k, verbytes.as_slice())?;
// Return the versionstamp.
Ok(verbytes)
}
/// Obtains a new key that is suffixed with the change timestamp.
pub(crate) async fn get_versionstamped_key<K>(
&mut self,
ts_key: K,
prefix: K,
suffix: K,
) -> Result<Vec<u8>, Error>
where
K: Into<Key>,
{
// If the transaction is already closed or is read-only, return an error.
if self.is_closed() {
return Err(Error::TxFinished);
} else if !self.write {
return Err(Error::TxReadonly);
}
// Get the timestamp.
let ts = self.get_timestamp(ts_key).await?;
// Create the new key.
let mut k: Vec<u8> = prefix.into();
k.append(&mut ts.to_vec());
k.append(&mut suffix.into());
// Return the new key.
Ok(k)
}
/// Inserts or updates a key in the database.
pub(crate) async fn set<K, V>(&mut self, key: K, val: V) -> Result<(), Error>
where
K: Into<Key>,
V: Into<Val>,
{
// If the transaction is already closed or is read-only, return an error.
if self.is_closed() {
return Err(Error::TxFinished);
} else if !self.write {
return Err(Error::TxReadonly);
}
// Set the key.
self.inner.set(key.into().as_slice(), &val.into()).map_err(|e| Error::from(e))
}
/// Inserts a key-value pair into the database if the key doesn't already exist.
pub(crate) async fn put<K, V>(
&mut self,
category: KeyCategory,
key: K,
val: V,
) -> Result<(), Error>
where
K: Into<Key>,
V: Into<Val>,
{
// Ensure the transaction is open and writable.
if self.done {
return Err(Error::TxFinished);
}
if !self.write {
return Err(Error::TxReadonly);
}
// Check if the key already exists.
let key: Vec<u8> = key.into();
if self.exists(key.clone().as_slice()).await? {
return Err(Error::TxKeyAlreadyExistsCategory(category));
}
// Insert the key-value pair.
self.inner.set(&key, &val.into()).map_err(|e| Error::from(e))
}
/// Inserts a key-value pair into the database if the key doesn't already exist,
/// or if the existing value matches the provided check value.
pub(crate) async fn putc<K, V>(&mut self, key: K, val: V, chk: Option<V>) -> Result<(), Error>
where
K: Into<Key>,
V: Into<Val>,
{
// Ensure the transaction is open and writable.
if self.done {
return Err(Error::TxFinished);
}
if !self.write {
return Err(Error::TxReadonly);
}
// Convert the check value.
let chk = chk.map(Into::into);
// Insert the key-value pair if the key doesn't exist or the existing value matches the check value.
let key_slice = key.into();
let val_vec = val.into();
let res = self.inner.get(key_slice.as_slice())?;
match (res, chk) {
(Some(v), Some(w)) if v == w => self.inner.set(key_slice.as_slice(), &val_vec)?,
(None, None) => self.inner.set(key_slice.as_slice(), &val_vec)?,
_ => return Err(Error::TxConditionNotMet),
};
Ok(())
}
/// Deletes a key from the database.
pub(crate) async fn del<K>(&mut self, key: K) -> Result<(), Error>
where
K: Into<Key>,
{
// Ensure the transaction is open and writable.
if self.done {
return Err(Error::TxFinished);
}
if !self.write {
return Err(Error::TxReadonly);
}
// Delete the key.
let key_slice = key.into();
self.inner.delete(key_slice.as_slice()).map_err(|e| Error::from(e))
}
/// Deletes a key from the database if the existing value matches the provided check value.
pub(crate) async fn delc<K, V>(&mut self, key: K, chk: Option<V>) -> Result<(), Error>
where
K: Into<Key>,
V: Into<Val>,
{
// Ensure the transaction is open and writable.
if self.done {
return Err(Error::TxFinished);
}
if !self.write {
return Err(Error::TxReadonly);
}
// Convert the check value.
let chk: Option<Val> = chk.map(Into::into);
// Delete the key if the existing value matches the check value.
let key_slice = key.into();
let res = self.inner.get(key_slice.as_slice())?;
match (res, chk) {
(Some(v), Some(w)) if v == w => self.inner.delete(key_slice.as_slice())?,
(None, None) => self.inner.delete(key_slice.as_slice())?,
_ => return Err(Error::TxConditionNotMet),
};
Ok(())
}
/// Retrieves a range of key-value pairs from the database.
pub(crate) async fn scan<K>(
&mut self,
rng: Range<K>,
limit: u32,
) -> Result<Vec<(Key, Val)>, Error>
where
K: Into<Key>,
{
// Ensure the transaction is open.
if self.done {
return Err(Error::TxFinished);
}
// Convert the range to byte slices.
let start_range = rng.start.into();
let end_range = rng.end.into();
// Retrieve the key-value pairs.
let res =
self.inner.scan(start_range.as_slice()..end_range.as_slice(), Some(limit as usize))?;
let res = res.into_iter().map(|kv| (Key::from(kv.0), kv.1)).collect();
Ok(res)
}
}

View file

@ -13,6 +13,8 @@ pub(crate) enum Kvs {
Tikv,
#[allow(dead_code)]
Fdb,
#[allow(dead_code)]
SurrealKV,
}
// This type is unsused when no store is enabled.
@ -250,3 +252,52 @@ mod fdb {
include!("tblq.rs");
include!("tbnt.rs");
}
#[cfg(feature = "kv-surrealkv")]
mod surrealkv {
use crate::kvs::tests::{ClockType, Kvs};
use crate::kvs::Datastore;
use crate::kvs::LockType;
use crate::kvs::Transaction;
use crate::kvs::TransactionType;
use serial_test::serial;
use temp_dir::TempDir;
async fn new_ds(node_id: Uuid, clock_override: ClockType) -> (Datastore, Kvs) {
let path = TempDir::new().unwrap().path().to_string_lossy().to_string();
(
Datastore::new_full(format!("surrealkv:{path}").as_str(), Some(clock_override))
.await
.unwrap()
.with_node_id(sql::Uuid::from(node_id)),
Kvs::SurrealKV,
)
}
async fn new_tx(write: TransactionType, lock: LockType) -> Transaction {
// Shared node id for one-off transactions
// We should delete this, node IDs should be known.
let new_tx_uuid = Uuid::parse_str("22358e5e-87bd-4040-8c63-01db896191ab").unwrap();
let clock = Arc::new(SizedClock::Fake(FakeClock::new(Timestamp::default())));
let (ds, _) = new_ds(new_tx_uuid, clock).await;
ds.transaction(write, lock).await.unwrap()
}
include!("raw.rs");
include!("cluster_init.rs");
include!("hb.rs");
include!("helper.rs");
include!("lq.rs");
include!("nq.rs");
include!("snapshot.rs");
include!("tb.rs");
include!("multireader.rs");
include!("multiwriter_different_keys.rs");
include!("multiwriter_same_keys_allow.rs");
include!("timestamp_to_versionstamp.rs");
include!("nd.rs");
include!("ndlq.rs");
include!("tblq.rs");
include!("tbnt.rs");
}

View file

@ -19,6 +19,7 @@ async fn exi() {
let mut tx = ds.transaction(Write, Optimistic).await.unwrap();
assert!(tx.put(Unknown, "test", "ok").await.is_ok());
tx.commit().await.unwrap();
// Create a readonly transaction
let mut tx = ds.transaction(Read, Optimistic).await.unwrap();
let val = tx.exi("test").await.unwrap();

View file

@ -102,6 +102,8 @@ pub(super) enum Inner {
TiKV(super::tikv::Transaction),
#[cfg(feature = "kv-fdb")]
FoundationDB(super::fdb::Transaction),
#[cfg(feature = "kv-surrealkv")]
SurrealKV(super::surrealkv::Transaction),
}
#[derive(Copy, Clone)]
pub enum TransactionType {
@ -139,6 +141,8 @@ impl fmt::Display for Transaction {
Inner::TiKV(_) => write!(f, "tikv"),
#[cfg(feature = "kv-fdb")]
Inner::FoundationDB(_) => write!(f, "fdb"),
#[cfg(feature = "kv-surrealkv")]
Inner::SurrealKV(_) => write!(f, "surrealkv"),
#[allow(unreachable_patterns)]
_ => unreachable!(),
}
@ -213,6 +217,11 @@ impl Transaction {
inner: Inner::FoundationDB(v),
..
} => v.closed(),
#[cfg(feature = "kv-surrealkv")]
Transaction {
inner: Inner::SurrealKV(v),
..
} => v.is_closed(),
#[allow(unreachable_patterns)]
_ => unreachable!(),
}
@ -255,6 +264,11 @@ impl Transaction {
inner: Inner::FoundationDB(v),
..
} => v.cancel().await,
#[cfg(feature = "kv-surrealkv")]
Transaction {
inner: Inner::SurrealKV(v),
..
} => v.cancel().await,
#[allow(unreachable_patterns)]
_ => unreachable!(),
}
@ -297,6 +311,11 @@ impl Transaction {
inner: Inner::FoundationDB(v),
..
} => v.commit().await,
#[cfg(feature = "kv-surrealkv")]
Transaction {
inner: Inner::SurrealKV(v),
..
} => v.commit().await,
#[allow(unreachable_patterns)]
_ => unreachable!(),
}
@ -341,6 +360,11 @@ impl Transaction {
inner: Inner::FoundationDB(v),
..
} => v.del(key).await,
#[cfg(feature = "kv-surrealkv")]
Transaction {
inner: Inner::SurrealKV(v),
..
} => v.del(key).await,
#[allow(unreachable_patterns)]
_ => unreachable!(),
}
@ -385,6 +409,11 @@ impl Transaction {
inner: Inner::FoundationDB(v),
..
} => v.exi(key).await,
#[cfg(feature = "kv-surrealkv")]
Transaction {
inner: Inner::SurrealKV(v),
..
} => v.exists(key).await,
#[allow(unreachable_patterns)]
_ => unreachable!(),
}
@ -429,6 +458,11 @@ impl Transaction {
inner: Inner::FoundationDB(v),
..
} => v.get(key).await,
#[cfg(feature = "kv-surrealkv")]
Transaction {
inner: Inner::SurrealKV(v),
..
} => v.get(key).await,
#[allow(unreachable_patterns)]
_ => unreachable!(),
}
@ -474,6 +508,11 @@ impl Transaction {
inner: Inner::FoundationDB(v),
..
} => v.set(key, val).await,
#[cfg(feature = "kv-surrealkv")]
Transaction {
inner: Inner::SurrealKV(v),
..
} => v.set(key, val).await,
#[allow(unreachable_patterns)]
_ => unreachable!(),
}
@ -522,6 +561,11 @@ impl Transaction {
inner: Inner::SpeeDB(v),
..
} => v.get_timestamp(key).await,
#[cfg(feature = "kv-surrealkv")]
Transaction {
inner: Inner::SurrealKV(v),
..
} => v.get_timestamp(key).await,
#[allow(unreachable_patterns)]
_ => unreachable!(),
}
@ -611,6 +655,14 @@ impl Transaction {
let k = v.get_versionstamped_key(ts_key, prefix, suffix).await?;
v.set(k, val).await
}
#[cfg(feature = "kv-surrealkv")]
Transaction {
inner: Inner::SurrealKV(v),
..
} => {
let k = v.get_versionstamped_key(ts_key, prefix, suffix).await?;
v.set(k, val).await
}
#[allow(unreachable_patterns)]
_ => unreachable!(),
}
@ -654,6 +706,11 @@ impl Transaction {
inner: Inner::FoundationDB(v),
..
} => v.put(category, key, val).await,
#[cfg(feature = "kv-surrealkv")]
Transaction {
inner: Inner::SurrealKV(v),
..
} => v.put(category, key, val).await,
#[allow(unreachable_patterns)]
_ => unreachable!(),
}
@ -700,6 +757,11 @@ impl Transaction {
inner: Inner::FoundationDB(v),
..
} => v.scan(rng, limit).await,
#[cfg(feature = "kv-surrealkv")]
Transaction {
inner: Inner::SurrealKV(v),
..
} => v.scan(rng, limit).await,
#[allow(unreachable_patterns)]
_ => unreachable!(),
}
@ -751,6 +813,11 @@ impl Transaction {
inner: Inner::FoundationDB(v),
..
} => v.scan(range, batch_limit).await,
#[cfg(feature = "kv-surrealkv")]
Transaction {
inner: Inner::SurrealKV(v),
..
} => v.scan(range, batch_limit).await,
#[allow(unreachable_patterns)]
_ => Err(Error::MissingStorageEngine),
};
@ -818,6 +885,11 @@ impl Transaction {
inner: Inner::FoundationDB(v),
..
} => v.putc(key, val, chk).await,
#[cfg(feature = "kv-surrealkv")]
Transaction {
inner: Inner::SurrealKV(v),
..
} => v.putc(key, val, chk).await,
#[allow(unreachable_patterns)]
_ => unreachable!(),
}
@ -863,6 +935,11 @@ impl Transaction {
inner: Inner::FoundationDB(v),
..
} => v.delc(key, chk).await,
#[cfg(feature = "kv-surrealkv")]
Transaction {
inner: Inner::SurrealKV(v),
..
} => v.delc(key, chk).await,
#[allow(unreachable_patterns)]
_ => unreachable!(),
}
@ -2761,6 +2838,11 @@ impl Transaction {
inner: Inner::FoundationDB(ref mut v),
..
} => v.check_level(check),
#[cfg(feature = "kv-surrealkv")]
Transaction {
inner: Inner::SurrealKV(v),
..
} => v.set_check_level(check),
#[allow(unreachable_patterns)]
_ => unreachable!(),
}

View file

@ -39,6 +39,7 @@ kv-fdb-6_2 = ["surrealdb-core/kv-fdb-6_2", "kv-fdb"]
kv-fdb-6_3 = ["surrealdb-core/kv-fdb-6_3", "kv-fdb"]
kv-fdb-7_0 = ["surrealdb-core/kv-fdb-7_0", "kv-fdb"]
kv-fdb-7_1 = ["surrealdb-core/kv-fdb-7_1", "kv-fdb"]
kv-surrealkv = ["surrealdb-core/kv-surrealkv", "tokio/time"]
scripting = ["surrealdb-core/scripting"]
http = ["surrealdb-core/http"]
native-tls = [

View file

@ -1,7 +1,7 @@
use criterion::{Criterion, Throughput};
use std::sync::{Arc, OnceLock};
use std::time::Duration;
#[cfg(any(feature = "kv-rocksdb", feature = "kv-fdb"))]
#[cfg(any(feature = "kv-rocksdb", feature = "kv-fdb", feature = "kv-surrealkv"))]
use surrealdb::dbs::Session;
use surrealdb::kvs::Datastore;
@ -40,6 +40,23 @@ pub(super) async fn init(target: &str) {
.expect("Unable to connect to FDB cluster");
let _ = DB.set(Arc::new(ds));
}
#[cfg(feature = "kv-surrealkv")]
"lib-surrealkv" => {
let path = format!(
"surrealkv://lib-surrealkv-{}.db",
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis()
);
println!("\n### Using path: {} ###\n", path);
let ds = Datastore::new(&path).await.unwrap();
ds.execute("INFO FOR DB", &Session::owner().with_ns("ns").with_db("db"), None)
.await
.expect("Unable to execute the query");
let _ = DB.set(Arc::new(ds));
}
_ => panic!("Unknown target: {}", target),
}
}

View file

@ -139,6 +139,22 @@ impl Connection for Any {
);
}
EndpointKind::SurrealKV => {
#[cfg(feature = "kv-surrealkv")]
{
features.insert(ExtraFeatures::Backup);
features.insert(ExtraFeatures::LiveQueries);
engine::local::native::router(address, conn_tx, route_rx);
conn_rx.into_recv_async().await??
}
#[cfg(not(feature = "kv-surrealkv"))]
return Err(DbError::Ds(
"Cannot connect to the `surrealkv` storage engine as it is not enabled in this build of SurrealDB".to_owned(),
)
.into());
}
EndpointKind::Http | EndpointKind::Https => {
#[cfg(feature = "protocol-http")]
{

View file

@ -120,6 +120,21 @@ impl Connection for Any {
.into());
}
EndpointKind::SurrealKV => {
#[cfg(feature = "kv-surrealkv")]
{
features.insert(ExtraFeatures::LiveQueries);
engine::local::wasm::router(address, conn_tx, route_rx);
conn_rx.into_recv_async().await??;
}
#[cfg(not(feature = "kv-surrealkv"))]
return Err(DbError::Ds(
"Cannot connect to the `surrealkv` storage engine as it is not enabled in this build of SurrealDB".to_owned(),
)
.into());
}
EndpointKind::TiKv => {
#[cfg(feature = "kv-tikv")]
{

View file

@ -372,6 +372,11 @@ pub struct TiKv;
#[derive(Debug)]
pub struct FDb;
#[cfg(feature = "kv-surrealkv")]
#[cfg_attr(docsrs, doc(cfg(feature = "kv-surrealkv")))]
#[derive(Debug)]
pub struct SurrealKV;
/// An embedded database
#[derive(Debug, Clone)]
pub struct Db {

View file

@ -8,6 +8,7 @@ pub mod any;
feature = "kv-speedb",
feature = "kv-fdb",
feature = "kv-indxdb",
feature = "kv-surrealkv",
))]
pub mod local;
#[cfg(any(feature = "protocol-http", feature = "protocol-ws"))]

View file

@ -13,6 +13,8 @@ mod mem;
mod rocksdb;
#[cfg(feature = "kv-speedb")]
mod speedb;
#[cfg(feature = "kv-surrealkv")]
mod surrealkv;
#[cfg(feature = "kv-tikv")]
mod tikv;
@ -131,6 +133,7 @@ pub enum EndpointKind {
SpeeDb,
TiKv,
Unsupported(String),
SurrealKV,
}
impl From<&str> for EndpointKind {
@ -148,6 +151,7 @@ impl From<&str> for EndpointKind {
"rocksdb" => Self::RocksDb,
"speedb" => Self::SpeeDb,
"tikv" => Self::TiKv,
"surrealkv" => Self::SurrealKV,
_ => Self::Unsupported(s.to_owned()),
}
}

View file

@ -0,0 +1,40 @@
use crate::api::engine::local::Db;
use crate::api::engine::local::SurrealKV;
use crate::api::opt::Config;
use crate::api::opt::Endpoint;
use crate::api::opt::IntoEndpoint;
use crate::api::Result;
use std::path::Path;
use std::path::PathBuf;
use url::Url;
macro_rules! endpoints {
($($name:ty),*) => {
$(
impl IntoEndpoint<SurrealKV> for $name {
type Client = Db;
fn into_endpoint(self) -> Result<Endpoint> {
let protocol = "surrealkv://";
let url = Url::parse(protocol)
.unwrap_or_else(|_| unreachable!("`{protocol}` should be static and valid"));
let mut endpoint = Endpoint::new(url);
endpoint.path = super::path_to_string(protocol, self);
Ok(endpoint)
}
}
impl IntoEndpoint<SurrealKV> for ($name, Config) {
type Client = Db;
fn into_endpoint(self) -> Result<Endpoint> {
let mut endpoint = IntoEndpoint::<SurrealKV>::into_endpoint(self.0)?;
endpoint.config = self.1;
Ok(endpoint)
}
}
)*
}
}
endpoints!(&str, &String, String, &Path, PathBuf);

View file

@ -118,6 +118,11 @@ compile_error!(
"`sql2` is currently unstable. You need to enable the `surrealdb_unstable` flag to use it."
);
#[cfg(all(not(surrealdb_unstable), feature = "kv-surrealkv"))]
compile_error!(
"`kv-surrealkv` is currently unstable. You need to enable the `surrealdb_unstable` flag to use it."
);
#[macro_use]
extern crate tracing;

View file

@ -392,6 +392,41 @@ mod api_integration {
include!("api/backup.rs");
}
#[cfg(feature = "kv-surrealkv")]
mod surrealkv {
use super::*;
use surrealdb::engine::local::Db;
use surrealdb::engine::local::SurrealKV;
async fn new_db() -> (SemaphorePermit<'static>, Surreal<Db>) {
let permit = PERMITS.acquire().await.unwrap();
let path = format!("/tmp/{}.db", Ulid::new());
let root = Root {
username: ROOT_USER,
password: ROOT_PASS,
};
let config = Config::new()
.user(root)
.tick_interval(TICK_INTERVAL)
.capabilities(Capabilities::all());
let db = Surreal::new::<SurrealKV>((path, config)).await.unwrap();
db.signin(root).await.unwrap();
(permit, db)
}
#[test_log::test(tokio::test)]
async fn any_engine_can_connect() {
let path = format!("{}.db", Ulid::new());
surrealdb::engine::any::connect(format!("surrealkv://{path}")).await.unwrap();
surrealdb::engine::any::connect(format!("surrealkv:///tmp/{path}")).await.unwrap();
tokio::fs::remove_dir_all(path).await.unwrap();
}
include!("api/mod.rs");
include!("api/live.rs");
include!("api/backup.rs");
}
#[cfg(feature = "protocol-http")]
mod any {
use super::*;

View file

@ -81,6 +81,12 @@ user-id = 3987 # Rushmore Mushambi (rushmorem)
start = "2023-08-29"
end = "2025-01-24"
[[trusted.surrealkv]]
criteria = "safe-to-deploy"
user-id = 145457 # Tobie Morgan Hitchcock (tobiemh)
start = "2024-02-08"
end = "2025-02-10"
[[trusted.surrealml-core]]
criteria = "safe-to-deploy"
user-id = 145457 # Tobie Morgan Hitchcock (tobiemh)
@ -92,3 +98,9 @@ criteria = "safe-to-deploy"
user-id = 145457 # Tobie Morgan Hitchcock (tobiemh)
start = "2022-02-17"
end = "2025-01-24"
[[trusted.vart]]
criteria = "safe-to-deploy"
user-id = 145457 # Tobie Morgan Hitchcock (tobiemh)
start = "2024-01-16"
end = "2025-02-10"

View file

@ -155,6 +155,10 @@ criteria = "safe-to-run"
version = "1.9.0"
criteria = "safe-to-deploy"
[[exemptions.async-channel]]
version = "2.1.1"
criteria = "safe-to-deploy"
[[exemptions.async-compression]]
version = "0.4.6"
criteria = "safe-to-deploy"
@ -435,17 +439,25 @@ criteria = "safe-to-run"
version = "1.1.2"
criteria = "safe-to-deploy"
[[exemptions.crossbeam]]
version = "0.8.4"
criteria = "safe-to-deploy"
[[exemptions.crossbeam-channel]]
version = "0.5.11"
criteria = "safe-to-deploy"
[[exemptions.crossbeam-deque]]
version = "0.8.5"
criteria = "safe-to-run"
criteria = "safe-to-deploy"
[[exemptions.crossbeam-epoch]]
version = "0.9.18"
criteria = "safe-to-run"
criteria = "safe-to-deploy"
[[exemptions.crossbeam-queue]]
version = "0.3.11"
criteria = "safe-to-deploy"
[[exemptions.crossbeam-utils]]
version = "0.8.19"
@ -903,6 +915,10 @@ criteria = "safe-to-deploy"
version = "0.4.11"
criteria = "safe-to-deploy"
[[exemptions.lru]]
version = "0.12.2"
criteria = "safe-to-deploy"
[[exemptions.lz4-sys]]
version = "1.9.4"
criteria = "safe-to-deploy"
@ -1003,18 +1019,6 @@ criteria = "safe-to-deploy"
version = "0.4.4"
criteria = "safe-to-run"
[[exemptions.num-integer]]
version = "0.1.46"
criteria = "safe-to-deploy"
[[exemptions.num-iter]]
version = "0.1.44"
criteria = "safe-to-deploy"
[[exemptions.num-traits]]
version = "0.2.18"
criteria = "safe-to-deploy"
[[exemptions.num_cpus]]
version = "1.16.0"
criteria = "safe-to-deploy"

View file

@ -142,6 +142,13 @@ user-id = 3987
user-login = "rushmorem"
user-name = "Rushmore Mushambi"
[[publisher.surrealkv]]
version = "0.1.1"
when = "2024-02-10"
user-id = 145457
user-login = "tobiemh"
user-name = "Tobie Morgan Hitchcock"
[[publisher.surrealml-core]]
version = "0.0.7"
when = "2024-01-22"
@ -184,6 +191,13 @@ user-id = 1139
user-login = "Manishearth"
user-name = "Manish Goregaokar"
[[publisher.vart]]
version = "0.1.1"
when = "2024-02-10"
user-id = 145457
user-login = "tobiemh"
user-name = "Tobie Morgan Hitchcock"
[[audits.bytecode-alliance.wildcard-audits.arbitrary]]
who = "Nick Fitzgerald <fitzgen@gmail.com>"
criteria = "safe-to-deploy"
@ -779,6 +793,31 @@ who = "David Cook <dcook@divviup.org>"
criteria = "safe-to-deploy"
delta = "0.4.3 -> 0.4.4"
[[audits.isrg.audits.num-integer]]
who = "David Cook <dcook@divviup.org>"
criteria = "safe-to-deploy"
delta = "0.1.45 -> 0.1.46"
[[audits.isrg.audits.num-iter]]
who = "David Cook <dcook@divviup.org>"
criteria = "safe-to-deploy"
delta = "0.1.43 -> 0.1.44"
[[audits.isrg.audits.num-traits]]
who = "David Cook <dcook@divviup.org>"
criteria = "safe-to-deploy"
delta = "0.2.15 -> 0.2.16"
[[audits.isrg.audits.num-traits]]
who = "Ameer Ghani <inahga@divviup.org>"
criteria = "safe-to-deploy"
delta = "0.2.16 -> 0.2.17"
[[audits.isrg.audits.num-traits]]
who = "David Cook <dcook@divviup.org>"
criteria = "safe-to-deploy"
delta = "0.2.17 -> 0.2.18"
[[audits.isrg.audits.rand_chacha]]
who = "David Cook <dcook@divviup.org>"
criteria = "safe-to-deploy"
@ -1156,6 +1195,27 @@ version = "0.4.3"
notes = "All code written or reviewed by Josh Stone."
aggregated-from = "https://hg.mozilla.org/mozilla-central/raw-file/tip/supply-chain/audits.toml"
[[audits.mozilla.audits.num-integer]]
who = "Josh Stone <jistone@redhat.com>"
criteria = "safe-to-deploy"
version = "0.1.45"
notes = "All code written or reviewed by Josh Stone."
aggregated-from = "https://hg.mozilla.org/mozilla-central/raw-file/tip/supply-chain/audits.toml"
[[audits.mozilla.audits.num-iter]]
who = "Josh Stone <jistone@redhat.com>"
criteria = "safe-to-deploy"
version = "0.1.43"
notes = "All code written or reviewed by Josh Stone."
aggregated-from = "https://hg.mozilla.org/mozilla-central/raw-file/tip/supply-chain/audits.toml"
[[audits.mozilla.audits.num-traits]]
who = "Josh Stone <jistone@redhat.com>"
criteria = "safe-to-deploy"
version = "0.2.15"
notes = "All code written or reviewed by Josh Stone."
aggregated-from = "https://hg.mozilla.org/mozilla-central/raw-file/tip/supply-chain/audits.toml"
[[audits.mozilla.audits.peeking_take_while]]
who = "Bobby Holley <bobbyholley@gmail.com>"
criteria = "safe-to-deploy"