Add SpeeDB storage engine implementation (#2076)
This commit is contained in:
parent
496d16242f
commit
3900bfa737
22 changed files with 675 additions and 10 deletions
28
.github/workflows/ci.yml
vendored
28
.github/workflows/ci.yml
vendored
|
@ -174,6 +174,34 @@ jobs:
|
|||
- name: Run cargo test
|
||||
run: cargo test --locked --package surrealdb --no-default-features --features kv-rocksdb --test api api_integration::file
|
||||
|
||||
rocksdb-engine:
|
||||
name: RocksDB engine
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
|
||||
- name: Install stable toolchain
|
||||
uses: dtolnay/rust-toolchain@stable
|
||||
|
||||
- name: Checkout sources
|
||||
uses: actions/checkout@v3
|
||||
|
||||
- name: Run cargo test
|
||||
run: cargo test --locked --package surrealdb --no-default-features --features kv-rocksdb --test api api_integration::rocksdb
|
||||
|
||||
speedb-engine:
|
||||
name: SpeeDB engine
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
|
||||
- name: Install stable toolchain
|
||||
uses: dtolnay/rust-toolchain@stable
|
||||
|
||||
- name: Checkout sources
|
||||
uses: actions/checkout@v3
|
||||
|
||||
- name: Run cargo test
|
||||
run: cargo test --locked --package surrealdb --no-default-features --features kv-speedb --test api api_integration::speedb
|
||||
|
||||
any-engine:
|
||||
name: Any engine
|
||||
runs-on: ubuntu-latest
|
||||
|
|
47
Cargo.lock
generated
47
Cargo.lock
generated
|
@ -682,6 +682,26 @@ dependencies = [
|
|||
"which",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "bindgen"
|
||||
version = "0.64.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "c4243e6031260db77ede97ad86c27e501d646a27ab57b59a574f725d98ab1fb4"
|
||||
dependencies = [
|
||||
"bitflags",
|
||||
"cexpr 0.6.0",
|
||||
"clang-sys",
|
||||
"lazy_static",
|
||||
"lazycell",
|
||||
"peeking_take_while",
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"regex",
|
||||
"rustc-hash",
|
||||
"shlex 1.1.0",
|
||||
"syn 1.0.109",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "bindgen"
|
||||
version = "0.65.1"
|
||||
|
@ -2393,6 +2413,22 @@ dependencies = [
|
|||
"zstd-sys",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "libspeedb-sys"
|
||||
version = "0.0.2+2.3.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "2f7e94ee8f7d5494e100d1671955d6a752578589548da04031be4c8e9928fb69"
|
||||
dependencies = [
|
||||
"bindgen 0.64.0",
|
||||
"bzip2-sys",
|
||||
"cc",
|
||||
"glob",
|
||||
"libc",
|
||||
"libz-sys",
|
||||
"lz4-sys",
|
||||
"zstd-sys",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "libz-sys"
|
||||
version = "1.1.9"
|
||||
|
@ -4151,6 +4187,16 @@ dependencies = [
|
|||
"winapi",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "speedb"
|
||||
version = "0.0.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e7f0dc39279433ab6e2f63203b220c56b7f4042a773ba902f94755cc6ebf6a51"
|
||||
dependencies = [
|
||||
"libc",
|
||||
"libspeedb-sys",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "spin"
|
||||
version = "0.5.2"
|
||||
|
@ -4314,6 +4360,7 @@ dependencies = [
|
|||
"sha-1",
|
||||
"sha2",
|
||||
"snap",
|
||||
"speedb",
|
||||
"storekey",
|
||||
"surrealdb-derive",
|
||||
"temp-dir",
|
||||
|
|
|
@ -10,6 +10,7 @@ authors = ["Tobie Morgan Hitchcock <tobie@surrealdb.com>"]
|
|||
default = ["storage-mem", "storage-rocksdb", "scripting", "http"]
|
||||
storage-mem = ["surrealdb/kv-mem"]
|
||||
storage-rocksdb = ["surrealdb/kv-rocksdb"]
|
||||
storage-speedb = ["surrealdb/kv-speedb"]
|
||||
storage-tikv = ["surrealdb/kv-tikv"]
|
||||
storage-fdb = ["surrealdb/kv-fdb-6_3"]
|
||||
scripting = ["surrealdb/scripting"]
|
||||
|
|
2
Makefile
2
Makefile
|
@ -12,7 +12,7 @@ setup:
|
|||
|
||||
.PHONY: docs
|
||||
docs:
|
||||
cargo doc --open --no-deps --package surrealdb --features rustls,native-tls,protocol-ws,protocol-http,kv-mem,kv-indxdb,kv-rocksdb,kv-tikv,http,scripting
|
||||
cargo doc --open --no-deps --package surrealdb --features rustls,native-tls,protocol-ws,protocol-http,kv-mem,kv-indxdb,kv-speedb,kv-rocksdb,kv-tikv,http,scripting
|
||||
|
||||
.PHONY: test
|
||||
test:
|
||||
|
|
|
@ -22,6 +22,7 @@ protocol-http = ["dep:reqwest", "dep:tokio-util"]
|
|||
protocol-ws = ["dep:tokio-tungstenite", "tokio/time"]
|
||||
kv-mem = ["dep:echodb", "tokio/time"]
|
||||
kv-indxdb = ["dep:indxdb"]
|
||||
kv-speedb = ["dep:speedb", "tokio/time"]
|
||||
kv-rocksdb = ["dep:rocksdb", "tokio/time"]
|
||||
kv-tikv = ["dep:tikv", "dep:tikv-client-proto"]
|
||||
kv-fdb-5_1 = ["foundationdb/fdb-5_1", "kv-fdb"]
|
||||
|
@ -99,6 +100,7 @@ serde = { version = "1.0.163", features = ["derive"] }
|
|||
serde_json = "1.0.96"
|
||||
sha-1 = "0.10.1"
|
||||
sha2 = "0.10.6"
|
||||
speedb = { version = "0.0.2", optional = true }
|
||||
storekey = "0.5.0"
|
||||
thiserror = "1.0.40"
|
||||
tikv = { version = "0.1.0", package = "tikv-client", optional = true }
|
||||
|
|
|
@ -95,6 +95,7 @@ use crate::api::opt::Endpoint;
|
|||
feature = "kv-mem",
|
||||
feature = "kv-tikv",
|
||||
feature = "kv-rocksdb",
|
||||
feature = "kv-speedb",
|
||||
feature = "kv-fdb",
|
||||
feature = "kv-indxdb",
|
||||
))]
|
||||
|
@ -172,6 +173,7 @@ where
|
|||
feature = "kv-mem",
|
||||
feature = "kv-tikv",
|
||||
feature = "kv-rocksdb",
|
||||
feature = "kv-speedb",
|
||||
feature = "kv-fdb",
|
||||
feature = "kv-indxdb",
|
||||
))]
|
||||
|
@ -181,6 +183,7 @@ where
|
|||
feature = "kv-mem",
|
||||
feature = "kv-tikv",
|
||||
feature = "kv-rocksdb",
|
||||
feature = "kv-speedb",
|
||||
feature = "kv-fdb",
|
||||
feature = "kv-indxdb",
|
||||
)))
|
||||
|
@ -201,6 +204,7 @@ where
|
|||
feature = "kv-mem",
|
||||
feature = "kv-tikv",
|
||||
feature = "kv-rocksdb",
|
||||
feature = "kv-speedb",
|
||||
feature = "kv-fdb",
|
||||
feature = "kv-indxdb",
|
||||
),
|
||||
|
@ -213,6 +217,7 @@ where
|
|||
feature = "kv-mem",
|
||||
feature = "kv-tikv",
|
||||
feature = "kv-rocksdb",
|
||||
feature = "kv-speedb",
|
||||
feature = "kv-fdb",
|
||||
feature = "kv-indxdb",
|
||||
),
|
||||
|
|
|
@ -103,6 +103,21 @@ impl Connection for Any {
|
|||
.into());
|
||||
}
|
||||
|
||||
"speedb" => {
|
||||
#[cfg(feature = "kv-speedb")]
|
||||
{
|
||||
features.insert(ExtraFeatures::Backup);
|
||||
engine::local::native::router(address, conn_tx, route_rx);
|
||||
conn_rx.into_recv_async().await??
|
||||
}
|
||||
|
||||
#[cfg(not(feature = "kv-speedb"))]
|
||||
return Err(DbError::Ds(
|
||||
"Cannot connect to the `speedb` storage engine as it is not enabled in this build of SurrealDB".to_owned(),
|
||||
)
|
||||
.into());
|
||||
}
|
||||
|
||||
"tikv" => {
|
||||
#[cfg(feature = "kv-tikv")]
|
||||
{
|
||||
|
|
|
@ -109,6 +109,22 @@ impl Connection for Any {
|
|||
.into());
|
||||
}
|
||||
|
||||
"speedb" => {
|
||||
#[cfg(feature = "kv-speedb")]
|
||||
{
|
||||
engine::local::wasm::router(address, conn_tx, route_rx);
|
||||
if let Err(error) = conn_rx.into_recv_async().await? {
|
||||
return Err(error);
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(not(feature = "kv-speedb"))]
|
||||
return Err(DbError::Ds(
|
||||
"Cannot connect to the `speedb` storage engine as it is not enabled in this build of SurrealDB".to_owned(),
|
||||
)
|
||||
.into());
|
||||
}
|
||||
|
||||
"tikv" => {
|
||||
#[cfg(feature = "kv-tikv")]
|
||||
{
|
||||
|
|
|
@ -195,6 +195,41 @@ pub struct File;
|
|||
#[derive(Debug)]
|
||||
pub struct RocksDb;
|
||||
|
||||
/// SpeeDB database
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// Instantiating a SpeeDB-backed instance
|
||||
///
|
||||
/// ```no_run
|
||||
/// # #[tokio::main]
|
||||
/// # async fn main() -> surrealdb::Result<()> {
|
||||
/// use surrealdb::Surreal;
|
||||
/// use surrealdb::engine::local::SpeeDb;
|
||||
///
|
||||
/// let db = Surreal::new::<SpeeDb>("temp.db").await?;
|
||||
/// # Ok(())
|
||||
/// # }
|
||||
/// ```
|
||||
///
|
||||
/// Instantiating a SpeeDB-backed strict instance
|
||||
///
|
||||
/// ```no_run
|
||||
/// # #[tokio::main]
|
||||
/// # async fn main() -> surrealdb::Result<()> {
|
||||
/// use surrealdb::opt::Strict;
|
||||
/// use surrealdb::Surreal;
|
||||
/// use surrealdb::engine::local::SpeeDb;
|
||||
///
|
||||
/// let db = Surreal::new::<SpeeDb>(("temp.db", Strict)).await?;
|
||||
/// # Ok(())
|
||||
/// # }
|
||||
/// ```
|
||||
#[cfg(feature = "kv-speedb")]
|
||||
#[cfg_attr(docsrs, doc(cfg(feature = "kv-speedb")))]
|
||||
#[derive(Debug)]
|
||||
pub struct SpeeDb;
|
||||
|
||||
/// IndxDB database
|
||||
///
|
||||
/// # Examples
|
||||
|
|
|
@ -91,7 +91,7 @@ pub(crate) fn router(
|
|||
let kvs = {
|
||||
let path = match url.scheme() {
|
||||
"mem" => "memory".to_owned(),
|
||||
"fdb" | "rocksdb" | "file" => match url.to_file_path() {
|
||||
"fdb" | "rocksdb" | "speedb" | "file" => match url.to_file_path() {
|
||||
Ok(path) => format!("{}://{}", url.scheme(), path.display()),
|
||||
Err(_) => {
|
||||
let error = Error::InvalidUrl(url.as_str().to_owned());
|
||||
|
|
|
@ -5,6 +5,7 @@ pub mod any;
|
|||
feature = "kv-mem",
|
||||
feature = "kv-tikv",
|
||||
feature = "kv-rocksdb",
|
||||
feature = "kv-speedb",
|
||||
feature = "kv-fdb",
|
||||
feature = "kv-indxdb",
|
||||
))]
|
||||
|
|
|
@ -11,6 +11,8 @@ mod indxdb;
|
|||
mod mem;
|
||||
#[cfg(feature = "kv-rocksdb")]
|
||||
mod rocksdb;
|
||||
#[cfg(feature = "kv-speedb")]
|
||||
mod speedb;
|
||||
#[cfg(feature = "kv-tikv")]
|
||||
mod tikv;
|
||||
|
||||
|
|
54
lib/src/api/opt/endpoint/speedb.rs
Normal file
54
lib/src/api/opt/endpoint/speedb.rs
Normal file
|
@ -0,0 +1,54 @@
|
|||
use crate::api::engine::local::Db;
|
||||
use crate::api::engine::local::SpeeDb;
|
||||
use crate::api::err::Error;
|
||||
use crate::api::opt::Endpoint;
|
||||
use crate::api::opt::IntoEndpoint;
|
||||
use crate::api::opt::Strict;
|
||||
use crate::api::Result;
|
||||
use std::path::Path;
|
||||
use url::Url;
|
||||
|
||||
impl IntoEndpoint<SpeeDb> for &str {
|
||||
type Client = Db;
|
||||
|
||||
fn into_endpoint(self) -> Result<Endpoint> {
|
||||
let url = format!("speedb://{self}");
|
||||
Ok(Endpoint {
|
||||
endpoint: Url::parse(&url).map_err(|_| Error::InvalidUrl(url))?,
|
||||
strict: false,
|
||||
#[cfg(any(feature = "native-tls", feature = "rustls"))]
|
||||
tls_config: None,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl IntoEndpoint<SpeeDb> for &Path {
|
||||
type Client = Db;
|
||||
|
||||
fn into_endpoint(self) -> Result<Endpoint> {
|
||||
let url = format!("speedb://{}", self.display());
|
||||
Ok(Endpoint {
|
||||
endpoint: Url::parse(&url).map_err(|_| Error::InvalidUrl(url))?,
|
||||
strict: false,
|
||||
#[cfg(any(feature = "native-tls", feature = "rustls"))]
|
||||
tls_config: None,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> IntoEndpoint<SpeeDb> for (T, Strict)
|
||||
where
|
||||
T: AsRef<Path>,
|
||||
{
|
||||
type Client = Db;
|
||||
|
||||
fn into_endpoint(self) -> Result<Endpoint> {
|
||||
let url = format!("speedb://{}", self.0.as_ref().display());
|
||||
Ok(Endpoint {
|
||||
endpoint: Url::parse(&url).map_err(|_| Error::InvalidUrl(url))?,
|
||||
strict: true,
|
||||
#[cfg(any(feature = "native-tls", feature = "rustls"))]
|
||||
tls_config: None,
|
||||
})
|
||||
}
|
||||
}
|
|
@ -3,6 +3,7 @@
|
|||
feature = "kv-mem",
|
||||
feature = "kv-tikv",
|
||||
feature = "kv-rocksdb",
|
||||
feature = "kv-speedb",
|
||||
feature = "kv-fdb",
|
||||
feature = "kv-indxdb",
|
||||
))]
|
||||
|
|
|
@ -483,6 +483,13 @@ impl From<tikv::Error> for Error {
|
|||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "kv-speedb")]
|
||||
impl From<speedb::Error> for Error {
|
||||
fn from(e: speedb::Error) -> Error {
|
||||
Error::Tx(e.to_string())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "kv-rocksdb")]
|
||||
impl From<rocksdb::Error> for Error {
|
||||
fn from(e: rocksdb::Error) -> Error {
|
||||
|
|
|
@ -29,6 +29,8 @@ pub(super) enum Inner {
|
|||
Mem(super::mem::Datastore),
|
||||
#[cfg(feature = "kv-rocksdb")]
|
||||
RocksDB(super::rocksdb::Datastore),
|
||||
#[cfg(feature = "kv-speedb")]
|
||||
SpeeDB(super::speedb::Datastore),
|
||||
#[cfg(feature = "kv-indxdb")]
|
||||
IndxDB(super::indxdb::Datastore),
|
||||
#[cfg(feature = "kv-tikv")]
|
||||
|
@ -45,8 +47,10 @@ impl fmt::Display for Datastore {
|
|||
Inner::Mem(_) => write!(f, "memory"),
|
||||
#[cfg(feature = "kv-rocksdb")]
|
||||
Inner::RocksDB(_) => write!(f, "rocksdb"),
|
||||
#[cfg(feature = "kv-speedb")]
|
||||
Inner::SpeeDB(_) => write!(f, "speedb"),
|
||||
#[cfg(feature = "kv-indxdb")]
|
||||
Inner::IndxDB(_) => write!(f, "indexdb"),
|
||||
Inner::IndxDB(_) => write!(f, "indxdb"),
|
||||
#[cfg(feature = "kv-tikv")]
|
||||
Inner::TiKV(_) => write!(f, "tikv"),
|
||||
#[cfg(feature = "kv-fdb")]
|
||||
|
@ -107,7 +111,6 @@ impl Datastore {
|
|||
info!(target: LOG, "Started kvs store in {}", path);
|
||||
v
|
||||
}
|
||||
|
||||
#[cfg(not(feature = "kv-mem"))]
|
||||
return Err(Error::Ds("Cannot connect to the `memory` storage engine as it is not enabled in this build of SurrealDB".to_owned()));
|
||||
}
|
||||
|
@ -124,7 +127,6 @@ impl Datastore {
|
|||
info!(target: LOG, "Started kvs store at {}", path);
|
||||
v
|
||||
}
|
||||
|
||||
#[cfg(not(feature = "kv-rocksdb"))]
|
||||
return Err(Error::Ds("Cannot connect to the `rocksdb` storage engine as it is not enabled in this build of SurrealDB".to_owned()));
|
||||
}
|
||||
|
@ -141,10 +143,25 @@ impl Datastore {
|
|||
info!(target: LOG, "Started kvs store at {}", path);
|
||||
v
|
||||
}
|
||||
|
||||
#[cfg(not(feature = "kv-rocksdb"))]
|
||||
return Err(Error::Ds("Cannot connect to the `rocksdb` storage engine as it is not enabled in this build of SurrealDB".to_owned()));
|
||||
}
|
||||
// Parse and initiate an SpeeDB database
|
||||
s if s.starts_with("speedb:") => {
|
||||
#[cfg(feature = "kv-speedb")]
|
||||
{
|
||||
info!(target: LOG, "Starting kvs store at {}", path);
|
||||
let s = s.trim_start_matches("speedb://");
|
||||
let s = s.trim_start_matches("speedb:");
|
||||
let v = super::speedb::Datastore::new(s).await.map(|v| Datastore {
|
||||
inner: Inner::SpeeDB(v),
|
||||
});
|
||||
info!(target: LOG, "Started kvs store at {}", path);
|
||||
v
|
||||
}
|
||||
#[cfg(not(feature = "kv-speedb"))]
|
||||
return Err(Error::Ds("Cannot connect to the `speedb` storage engine as it is not enabled in this build of SurrealDB".to_owned()));
|
||||
}
|
||||
// Parse and initiate an IndxDB database
|
||||
s if s.starts_with("indxdb:") => {
|
||||
#[cfg(feature = "kv-indxdb")]
|
||||
|
@ -158,7 +175,6 @@ impl Datastore {
|
|||
info!(target: LOG, "Started kvs store at {}", path);
|
||||
v
|
||||
}
|
||||
|
||||
#[cfg(not(feature = "kv-indxdb"))]
|
||||
return Err(Error::Ds("Cannot connect to the `indxdb` storage engine as it is not enabled in this build of SurrealDB".to_owned()));
|
||||
}
|
||||
|
@ -175,7 +191,6 @@ impl Datastore {
|
|||
info!(target: LOG, "Connected to kvs store at {}", path);
|
||||
v
|
||||
}
|
||||
|
||||
#[cfg(not(feature = "kv-tikv"))]
|
||||
return Err(Error::Ds("Cannot connect to the `tikv` storage engine as it is not enabled in this build of SurrealDB".to_owned()));
|
||||
}
|
||||
|
@ -192,7 +207,6 @@ impl Datastore {
|
|||
info!(target: LOG, "Connected to kvs store at {}", path);
|
||||
v
|
||||
}
|
||||
|
||||
#[cfg(not(feature = "kv-fdb"))]
|
||||
return Err(Error::Ds("Cannot connect to the `foundationdb` storage engine as it is not enabled in this build of SurrealDB".to_owned()));
|
||||
}
|
||||
|
@ -231,6 +245,11 @@ impl Datastore {
|
|||
let tx = v.transaction(write, lock).await?;
|
||||
super::tx::Inner::RocksDB(tx)
|
||||
}
|
||||
#[cfg(feature = "kv-speedb")]
|
||||
Inner::SpeeDB(v) => {
|
||||
let tx = v.transaction(write, lock).await?;
|
||||
super::tx::Inner::SpeeDB(tx)
|
||||
}
|
||||
#[cfg(feature = "kv-indxdb")]
|
||||
Inner::IndxDB(v) => {
|
||||
let tx = v.transaction(write, lock).await?;
|
||||
|
|
|
@ -5,6 +5,7 @@ mod indxdb;
|
|||
mod kv;
|
||||
mod mem;
|
||||
mod rocksdb;
|
||||
mod speedb;
|
||||
mod tikv;
|
||||
mod tx;
|
||||
|
||||
|
|
337
lib/src/kvs/speedb/mod.rs
Normal file
337
lib/src/kvs/speedb/mod.rs
Normal file
|
@ -0,0 +1,337 @@
|
|||
#![cfg(feature = "kv-speedb")]
|
||||
|
||||
use crate::err::Error;
|
||||
use crate::kvs::Key;
|
||||
use crate::kvs::Val;
|
||||
use futures::lock::Mutex;
|
||||
use speedb::{OptimisticTransactionDB, OptimisticTransactionOptions, ReadOptions, WriteOptions};
|
||||
use std::ops::Range;
|
||||
use std::pin::Pin;
|
||||
use std::sync::Arc;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct Datastore {
|
||||
db: Pin<Arc<OptimisticTransactionDB>>,
|
||||
}
|
||||
|
||||
pub struct Transaction {
|
||||
// Is the transaction complete?
|
||||
ok: bool,
|
||||
// Is the transaction read+write?
|
||||
rw: bool,
|
||||
// The distributed datastore transaction
|
||||
tx: Arc<Mutex<Option<speedb::Transaction<'static, OptimisticTransactionDB>>>>,
|
||||
// The read options containing the Snapshot
|
||||
ro: ReadOptions,
|
||||
// the above, supposedly 'static, transaction actually points here, so keep the memory alive
|
||||
// note that this is dropped last, as it is declared last
|
||||
_db: Pin<Arc<OptimisticTransactionDB>>,
|
||||
}
|
||||
|
||||
impl Datastore {
|
||||
/// Open a new database
|
||||
pub async fn new(path: &str) -> Result<Datastore, Error> {
|
||||
Ok(Datastore {
|
||||
db: Arc::pin(OptimisticTransactionDB::open_default(path)?),
|
||||
})
|
||||
}
|
||||
/// Start a new transaction
|
||||
pub async fn transaction(&self, write: bool, _: bool) -> Result<Transaction, Error> {
|
||||
// Activate the snapshot options
|
||||
let mut to = OptimisticTransactionOptions::default();
|
||||
to.set_snapshot(true);
|
||||
// Create a new transaction
|
||||
let tx = self.db.transaction_opt(&WriteOptions::default(), &to);
|
||||
// The database reference must always outlive
|
||||
// the transaction. If it doesn't then this
|
||||
// is undefined behaviour. This unsafe block
|
||||
// ensures that the transaction reference is
|
||||
// static, but will cause a crash if the
|
||||
// datastore is dropped prematurely.
|
||||
let tx = unsafe {
|
||||
std::mem::transmute::<
|
||||
speedb::Transaction<'_, OptimisticTransactionDB>,
|
||||
speedb::Transaction<'static, OptimisticTransactionDB>,
|
||||
>(tx)
|
||||
};
|
||||
let mut ro = ReadOptions::default();
|
||||
ro.set_snapshot(&tx.snapshot());
|
||||
// Return the transaction
|
||||
Ok(Transaction {
|
||||
ok: false,
|
||||
rw: write,
|
||||
tx: Arc::new(Mutex::new(Some(tx))),
|
||||
ro,
|
||||
_db: self.db.clone(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl Transaction {
|
||||
/// Check if closed
|
||||
pub fn closed(&self) -> bool {
|
||||
self.ok
|
||||
}
|
||||
/// Cancel a transaction
|
||||
pub async fn cancel(&mut self) -> Result<(), Error> {
|
||||
// Check to see if transaction is closed
|
||||
if self.ok {
|
||||
return Err(Error::TxFinished);
|
||||
}
|
||||
// Mark this transaction as done
|
||||
self.ok = true;
|
||||
// Cancel this transaction
|
||||
match self.tx.lock().await.take() {
|
||||
Some(tx) => tx.rollback()?,
|
||||
None => unreachable!(),
|
||||
};
|
||||
// Continue
|
||||
Ok(())
|
||||
}
|
||||
/// Commit a transaction
|
||||
pub async fn commit(&mut self) -> Result<(), Error> {
|
||||
// Check to see if transaction is closed
|
||||
if self.ok {
|
||||
return Err(Error::TxFinished);
|
||||
}
|
||||
// Check to see if transaction is writable
|
||||
if !self.rw {
|
||||
return Err(Error::TxReadonly);
|
||||
}
|
||||
// Mark this transaction as done
|
||||
self.ok = true;
|
||||
// Cancel this transaction
|
||||
match self.tx.lock().await.take() {
|
||||
Some(tx) => tx.commit()?,
|
||||
None => unreachable!(),
|
||||
};
|
||||
// Continue
|
||||
Ok(())
|
||||
}
|
||||
/// Check if a key exists
|
||||
pub async fn exi<K>(&mut self, key: K) -> Result<bool, Error>
|
||||
where
|
||||
K: Into<Key>,
|
||||
{
|
||||
// Check to see if transaction is closed
|
||||
if self.ok {
|
||||
return Err(Error::TxFinished);
|
||||
}
|
||||
// Check the key
|
||||
let res = self.tx.lock().await.as_ref().unwrap().get_opt(key.into(), &self.ro)?.is_some();
|
||||
// Return result
|
||||
Ok(res)
|
||||
}
|
||||
/// Fetch a key from the database
|
||||
pub async fn get<K>(&mut self, key: K) -> Result<Option<Val>, Error>
|
||||
where
|
||||
K: Into<Key>,
|
||||
{
|
||||
// Check to see if transaction is closed
|
||||
if self.ok {
|
||||
return Err(Error::TxFinished);
|
||||
}
|
||||
// Get the key
|
||||
let res = self.tx.lock().await.as_ref().unwrap().get_opt(key.into(), &self.ro)?;
|
||||
// Return result
|
||||
Ok(res)
|
||||
}
|
||||
/// Insert or update a key in the database
|
||||
pub async fn set<K, V>(&mut self, key: K, val: V) -> Result<(), Error>
|
||||
where
|
||||
K: Into<Key>,
|
||||
V: Into<Val>,
|
||||
{
|
||||
// Check to see if transaction is closed
|
||||
if self.ok {
|
||||
return Err(Error::TxFinished);
|
||||
}
|
||||
// Check to see if transaction is writable
|
||||
if !self.rw {
|
||||
return Err(Error::TxReadonly);
|
||||
}
|
||||
// Set the key
|
||||
self.tx.lock().await.as_ref().unwrap().put(key.into(), val.into())?;
|
||||
// Return result
|
||||
Ok(())
|
||||
}
|
||||
/// Insert a key if it doesn't exist in the database
|
||||
pub async fn put<K, V>(&mut self, key: K, val: V) -> Result<(), Error>
|
||||
where
|
||||
K: Into<Key>,
|
||||
V: Into<Val>,
|
||||
{
|
||||
// Check to see if transaction is closed
|
||||
if self.ok {
|
||||
return Err(Error::TxFinished);
|
||||
}
|
||||
// Check to see if transaction is writable
|
||||
if !self.rw {
|
||||
return Err(Error::TxReadonly);
|
||||
}
|
||||
// Get the transaction
|
||||
let tx = self.tx.lock().await;
|
||||
let tx = tx.as_ref().unwrap();
|
||||
// Get the arguments
|
||||
let key = key.into();
|
||||
let val = val.into();
|
||||
// Set the key if empty
|
||||
match tx.get_opt(&key, &self.ro)? {
|
||||
None => tx.put(key, val)?,
|
||||
_ => return Err(Error::TxKeyAlreadyExists),
|
||||
};
|
||||
// Return result
|
||||
Ok(())
|
||||
}
|
||||
/// Insert a key if it doesn't exist in the database
|
||||
pub async fn putc<K, V>(&mut self, key: K, val: V, chk: Option<V>) -> Result<(), Error>
|
||||
where
|
||||
K: Into<Key>,
|
||||
V: Into<Val>,
|
||||
{
|
||||
// Check to see if transaction is closed
|
||||
if self.ok {
|
||||
return Err(Error::TxFinished);
|
||||
}
|
||||
// Check to see if transaction is writable
|
||||
if !self.rw {
|
||||
return Err(Error::TxReadonly);
|
||||
}
|
||||
// Get the transaction
|
||||
let tx = self.tx.lock().await;
|
||||
let tx = tx.as_ref().unwrap();
|
||||
// Get the arguments
|
||||
let key = key.into();
|
||||
let val = val.into();
|
||||
let chk = chk.map(Into::into);
|
||||
// Set the key if valid
|
||||
match (tx.get_opt(&key, &self.ro)?, chk) {
|
||||
(Some(v), Some(w)) if v == w => tx.put(key, val)?,
|
||||
(None, None) => tx.put(key, val)?,
|
||||
_ => return Err(Error::TxConditionNotMet),
|
||||
};
|
||||
// Return result
|
||||
Ok(())
|
||||
}
|
||||
/// Delete a key
|
||||
pub async fn del<K>(&mut self, key: K) -> Result<(), Error>
|
||||
where
|
||||
K: Into<Key>,
|
||||
{
|
||||
// Check to see if transaction is closed
|
||||
if self.ok {
|
||||
return Err(Error::TxFinished);
|
||||
}
|
||||
// Check to see if transaction is writable
|
||||
if !self.rw {
|
||||
return Err(Error::TxReadonly);
|
||||
}
|
||||
// Remove the key
|
||||
self.tx.lock().await.as_ref().unwrap().delete(key.into())?;
|
||||
// Return result
|
||||
Ok(())
|
||||
}
|
||||
/// Delete a key
|
||||
pub async fn delc<K, V>(&mut self, key: K, chk: Option<V>) -> Result<(), Error>
|
||||
where
|
||||
K: Into<Key>,
|
||||
V: Into<Val>,
|
||||
{
|
||||
// Check to see if transaction is closed
|
||||
if self.ok {
|
||||
return Err(Error::TxFinished);
|
||||
}
|
||||
// Check to see if transaction is writable
|
||||
if !self.rw {
|
||||
return Err(Error::TxReadonly);
|
||||
}
|
||||
// Get the transaction
|
||||
let tx = self.tx.lock().await;
|
||||
let tx = tx.as_ref().unwrap();
|
||||
// Get the arguments
|
||||
let key = key.into();
|
||||
let chk = chk.map(Into::into);
|
||||
// Delete the key if valid
|
||||
match (tx.get_opt(&key, &self.ro)?, chk) {
|
||||
(Some(v), Some(w)) if v == w => tx.delete(key)?,
|
||||
(None, None) => tx.delete(key)?,
|
||||
_ => return Err(Error::TxConditionNotMet),
|
||||
};
|
||||
// Return result
|
||||
Ok(())
|
||||
}
|
||||
/// Retrieve a range of keys from the databases
|
||||
pub async fn scan<K>(&mut self, rng: Range<K>, limit: u32) -> Result<Vec<(Key, Val)>, Error>
|
||||
where
|
||||
K: Into<Key>,
|
||||
{
|
||||
// Check to see if transaction is closed
|
||||
if self.ok {
|
||||
return Err(Error::TxFinished);
|
||||
}
|
||||
// Get the transaction
|
||||
let tx = self.tx.lock().await;
|
||||
let tx = tx.as_ref().unwrap();
|
||||
// Convert the range to bytes
|
||||
let rng: Range<Key> = Range {
|
||||
start: rng.start.into(),
|
||||
end: rng.end.into(),
|
||||
};
|
||||
// Create result set
|
||||
let mut res = vec![];
|
||||
// Set the key range
|
||||
let beg = rng.start.as_slice();
|
||||
let end = rng.end.as_slice();
|
||||
// Set the ReadOptions with the snapshot
|
||||
let mut ro = ReadOptions::default();
|
||||
ro.set_snapshot(&tx.snapshot());
|
||||
// Create the iterator
|
||||
let mut iter = tx.raw_iterator_opt(ro);
|
||||
// Seek to the start key
|
||||
iter.seek(&rng.start);
|
||||
// Scan the keys in the iterator
|
||||
while iter.valid() {
|
||||
// Check the scan limit
|
||||
if res.len() < limit as usize {
|
||||
// Get the key and value
|
||||
let (k, v) = (iter.key(), iter.value());
|
||||
// Check the key and value
|
||||
if let (Some(k), Some(v)) = (k, v) {
|
||||
if k >= beg && k < end {
|
||||
res.push((k.to_vec(), v.to_vec()));
|
||||
iter.next();
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
// Exit
|
||||
break;
|
||||
}
|
||||
// Return result
|
||||
Ok(res)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crate::kvs::tests::transaction::verify_transaction_isolation;
|
||||
use temp_dir::TempDir;
|
||||
|
||||
// https://github.com/surrealdb/surrealdb/issues/76
|
||||
#[tokio::test]
|
||||
async fn soundness() {
|
||||
let mut transaction = get_transaction().await;
|
||||
transaction.put("uh", "oh").await.unwrap();
|
||||
|
||||
async fn get_transaction() -> crate::kvs::Transaction {
|
||||
let datastore = crate::kvs::Datastore::new("speedb:/tmp/spee.db").await.unwrap();
|
||||
datastore.transaction(true, false).await.unwrap()
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 3)]
|
||||
async fn rocksdb_transaction() {
|
||||
let p = TempDir::new().unwrap().path().to_string_lossy().to_string();
|
||||
verify_transaction_isolation(&format!("file:{}", p)).await;
|
||||
}
|
||||
}
|
|
@ -80,6 +80,10 @@ pub(crate) mod transaction {
|
|||
Inner::RocksDB(ds) => Datastore {
|
||||
inner: Inner::RocksDB(ds.clone()),
|
||||
},
|
||||
#[cfg(feature = "kv-speedb")]
|
||||
Inner::SpeeDB(ds) => Datastore {
|
||||
inner: Inner::SpeeDB(ds.clone()),
|
||||
},
|
||||
#[cfg(feature = "kv-tikv")]
|
||||
Inner::TiKV(_) => Datastore::new(&self.ds_path).await.unwrap(),
|
||||
#[cfg(feature = "kv-fdb")]
|
||||
|
|
|
@ -48,6 +48,8 @@ pub(super) enum Inner {
|
|||
Mem(super::mem::Transaction),
|
||||
#[cfg(feature = "kv-rocksdb")]
|
||||
RocksDB(super::rocksdb::Transaction),
|
||||
#[cfg(feature = "kv-speedb")]
|
||||
SpeeDB(super::speedb::Transaction),
|
||||
#[cfg(feature = "kv-indxdb")]
|
||||
IndxDB(super::indxdb::Transaction),
|
||||
#[cfg(feature = "kv-tikv")]
|
||||
|
@ -64,8 +66,10 @@ impl fmt::Display for Transaction {
|
|||
Inner::Mem(_) => write!(f, "memory"),
|
||||
#[cfg(feature = "kv-rocksdb")]
|
||||
Inner::RocksDB(_) => write!(f, "rocksdb"),
|
||||
#[cfg(feature = "kv-speedb")]
|
||||
Inner::SpeeDB(_) => write!(f, "speedb"),
|
||||
#[cfg(feature = "kv-indxdb")]
|
||||
Inner::IndxDB(_) => write!(f, "indexdb"),
|
||||
Inner::IndxDB(_) => write!(f, "indxdb"),
|
||||
#[cfg(feature = "kv-tikv")]
|
||||
Inner::TiKV(_) => write!(f, "tikv"),
|
||||
#[cfg(feature = "kv-fdb")]
|
||||
|
@ -101,6 +105,11 @@ impl Transaction {
|
|||
inner: Inner::RocksDB(v),
|
||||
..
|
||||
} => v.closed(),
|
||||
#[cfg(feature = "kv-speedb")]
|
||||
Transaction {
|
||||
inner: Inner::SpeeDB(v),
|
||||
..
|
||||
} => v.closed(),
|
||||
#[cfg(feature = "kv-indxdb")]
|
||||
Transaction {
|
||||
inner: Inner::IndxDB(v),
|
||||
|
@ -138,6 +147,11 @@ impl Transaction {
|
|||
inner: Inner::RocksDB(v),
|
||||
..
|
||||
} => v.cancel().await,
|
||||
#[cfg(feature = "kv-speedb")]
|
||||
Transaction {
|
||||
inner: Inner::SpeeDB(v),
|
||||
..
|
||||
} => v.cancel().await,
|
||||
#[cfg(feature = "kv-indxdb")]
|
||||
Transaction {
|
||||
inner: Inner::IndxDB(v),
|
||||
|
@ -175,6 +189,11 @@ impl Transaction {
|
|||
inner: Inner::RocksDB(v),
|
||||
..
|
||||
} => v.commit().await,
|
||||
#[cfg(feature = "kv-speedb")]
|
||||
Transaction {
|
||||
inner: Inner::SpeeDB(v),
|
||||
..
|
||||
} => v.commit().await,
|
||||
#[cfg(feature = "kv-indxdb")]
|
||||
Transaction {
|
||||
inner: Inner::IndxDB(v),
|
||||
|
@ -214,6 +233,11 @@ impl Transaction {
|
|||
inner: Inner::RocksDB(v),
|
||||
..
|
||||
} => v.del(key).await,
|
||||
#[cfg(feature = "kv-speedb")]
|
||||
Transaction {
|
||||
inner: Inner::SpeeDB(v),
|
||||
..
|
||||
} => v.del(key).await,
|
||||
#[cfg(feature = "kv-indxdb")]
|
||||
Transaction {
|
||||
inner: Inner::IndxDB(v),
|
||||
|
@ -253,6 +277,11 @@ impl Transaction {
|
|||
inner: Inner::RocksDB(v),
|
||||
..
|
||||
} => v.exi(key).await,
|
||||
#[cfg(feature = "kv-speedb")]
|
||||
Transaction {
|
||||
inner: Inner::SpeeDB(v),
|
||||
..
|
||||
} => v.exi(key).await,
|
||||
#[cfg(feature = "kv-indxdb")]
|
||||
Transaction {
|
||||
inner: Inner::IndxDB(v),
|
||||
|
@ -292,6 +321,11 @@ impl Transaction {
|
|||
inner: Inner::RocksDB(v),
|
||||
..
|
||||
} => v.get(key).await,
|
||||
#[cfg(feature = "kv-speedb")]
|
||||
Transaction {
|
||||
inner: Inner::SpeeDB(v),
|
||||
..
|
||||
} => v.get(key).await,
|
||||
#[cfg(feature = "kv-indxdb")]
|
||||
Transaction {
|
||||
inner: Inner::IndxDB(v),
|
||||
|
@ -332,6 +366,11 @@ impl Transaction {
|
|||
inner: Inner::RocksDB(v),
|
||||
..
|
||||
} => v.set(key, val).await,
|
||||
#[cfg(feature = "kv-speedb")]
|
||||
Transaction {
|
||||
inner: Inner::SpeeDB(v),
|
||||
..
|
||||
} => v.set(key, val).await,
|
||||
#[cfg(feature = "kv-indxdb")]
|
||||
Transaction {
|
||||
inner: Inner::IndxDB(v),
|
||||
|
@ -372,6 +411,11 @@ impl Transaction {
|
|||
inner: Inner::RocksDB(v),
|
||||
..
|
||||
} => v.put(key, val).await,
|
||||
#[cfg(feature = "kv-speedb")]
|
||||
Transaction {
|
||||
inner: Inner::SpeeDB(v),
|
||||
..
|
||||
} => v.put(key, val).await,
|
||||
#[cfg(feature = "kv-indxdb")]
|
||||
Transaction {
|
||||
inner: Inner::IndxDB(v),
|
||||
|
@ -413,6 +457,11 @@ impl Transaction {
|
|||
inner: Inner::RocksDB(v),
|
||||
..
|
||||
} => v.scan(rng, limit).await,
|
||||
#[cfg(feature = "kv-speedb")]
|
||||
Transaction {
|
||||
inner: Inner::SpeeDB(v),
|
||||
..
|
||||
} => v.scan(rng, limit).await,
|
||||
#[cfg(feature = "kv-indxdb")]
|
||||
Transaction {
|
||||
inner: Inner::IndxDB(v),
|
||||
|
@ -453,6 +502,11 @@ impl Transaction {
|
|||
inner: Inner::RocksDB(v),
|
||||
..
|
||||
} => v.putc(key, val, chk).await,
|
||||
#[cfg(feature = "kv-speedb")]
|
||||
Transaction {
|
||||
inner: Inner::SpeeDB(v),
|
||||
..
|
||||
} => v.putc(key, val, chk).await,
|
||||
#[cfg(feature = "kv-indxdb")]
|
||||
Transaction {
|
||||
inner: Inner::IndxDB(v),
|
||||
|
@ -493,6 +547,11 @@ impl Transaction {
|
|||
inner: Inner::RocksDB(v),
|
||||
..
|
||||
} => v.delc(key, chk).await,
|
||||
#[cfg(feature = "kv-speedb")]
|
||||
Transaction {
|
||||
inner: Inner::SpeeDB(v),
|
||||
..
|
||||
} => v.delc(key, chk).await,
|
||||
#[cfg(feature = "kv-indxdb")]
|
||||
Transaction {
|
||||
inner: Inner::IndxDB(v),
|
||||
|
|
|
@ -132,6 +132,36 @@ mod api_integration {
|
|||
include!("api/backup.rs");
|
||||
}
|
||||
|
||||
#[cfg(feature = "kv-rocksdb")]
|
||||
mod rocksdb {
|
||||
use super::*;
|
||||
use surrealdb::engine::local::Db;
|
||||
use surrealdb::engine::local::RocksDb;
|
||||
|
||||
async fn new_db() -> Surreal<Db> {
|
||||
let path = format!("/tmp/{}.db", Ulid::new());
|
||||
Surreal::new::<RocksDb>(path.as_str()).await.unwrap()
|
||||
}
|
||||
|
||||
include!("api/mod.rs");
|
||||
include!("api/backup.rs");
|
||||
}
|
||||
|
||||
#[cfg(feature = "kv-speedb")]
|
||||
mod speedb {
|
||||
use super::*;
|
||||
use surrealdb::engine::local::Db;
|
||||
use surrealdb::engine::local::SpeeDb;
|
||||
|
||||
async fn new_db() -> Surreal<Db> {
|
||||
let path = format!("/tmp/{}.db", Ulid::new());
|
||||
Surreal::new::<SpeeDb>(path.as_str()).await.unwrap()
|
||||
}
|
||||
|
||||
include!("api/mod.rs");
|
||||
include!("api/backup.rs");
|
||||
}
|
||||
|
||||
#[cfg(feature = "kv-tikv")]
|
||||
mod tikv {
|
||||
use super::*;
|
||||
|
|
|
@ -7,6 +7,7 @@ pub(crate) fn path_valid(v: &str) -> Result<String, String> {
|
|||
"memory" => Ok(v.to_string()),
|
||||
v if v.starts_with("file:") => Ok(v.to_string()),
|
||||
v if v.starts_with("rocksdb:") => Ok(v.to_string()),
|
||||
v if v.starts_with("speedb:") => Ok(v.to_string()),
|
||||
v if v.starts_with("tikv:") => Ok(v.to_string()),
|
||||
v if v.starts_with("fdb:") => Ok(v.to_string()),
|
||||
_ => Err(String::from("Provide a valid database path parameter")),
|
||||
|
|
Loading…
Reference in a new issue