Add more tests for all key-value storage engines (#2124)

This commit is contained in:
Tobie Morgan Hitchcock 2023-06-10 21:30:37 +01:00 committed by GitHub
parent 93904eb24b
commit a6e1bacee0
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
19 changed files with 626 additions and 235 deletions

32
.github/CODEOWNERS vendored Normal file
View file

@ -0,0 +1,32 @@
# Default owners for the repository
* @tobiemh
# Configuration for GitHub
/.github/ @tobiemh
# Code for fuzzing configuration
/lib/fuzz/ @finnbear
# Code and tests for the Rust API
/lib/CARGO.md @rushmorem
/lib/README.md @rushmorem
/lib/examples/ @rushmorem
/lib/src/api/ @rushmorem
/lib/src/tests/api/ @rushmorem
/lib/src/tests/api.rs @rushmorem
# Code and tests for indexing
/lib/src/idx/ @emmanuel-keller
# Code and tests for scripting
/lib/src/fnc/script/ @DelSkayn
# Code and tests for the command-line
/src/cli/ @rushmorem @finnbear
/tests/cli.rs @finnbear
# Code and tests for opentelemetry
/src/o11y/ @sgirones
# Tests related to the key-value store
/lib/src/kvs/tests/ @tobiemh @phughk

View file

@ -83,6 +83,25 @@ jobs:
- name: Run cargo clippy
run: cargo clippy --no-deps -- -W warnings
cli:
name: Test command line
runs-on: ubuntu-20.04
steps:
- name: Install stable toolchain
uses: dtolnay/rust-toolchain@stable
- name: Checkout sources
uses: actions/checkout@v3
- name: Install dependencies
run: |
sudo apt-get -y update
sudo apt-get -y install protobuf-compiler libprotobuf-dev
- name: Run cargo test
run: cargo test --locked --no-default-features --features storage-mem --workspace --test cli
test:
name: Test workspace
runs-on: ubuntu-20.04
@ -100,7 +119,7 @@ jobs:
sudo apt-get -y install protobuf-compiler libprotobuf-dev
- name: Run cargo test
run: cargo test --locked --no-default-features --features storage-mem,scripting,http --workspace -- --skip api_integration --include-ignored
run: cargo test --locked --no-default-features --features storage-mem,scripting,http --workspace -- --skip api_integration --skip cli
ws-engine:
name: WebSocket engine
@ -158,7 +177,9 @@ jobs:
uses: actions/checkout@v3
- name: Run cargo test
run: cargo test --locked --package surrealdb --no-default-features --features kv-mem --test api api_integration::mem
run: |
cargo test --locked --package surrealdb --no-default-features --features kv-mem --lib kvs
cargo test --locked --package surrealdb --no-default-features --features kv-mem --test api api_integration::mem
file-engine:
name: File engine
@ -172,7 +193,9 @@ jobs:
uses: actions/checkout@v3
- name: Run cargo test
run: cargo test --locked --package surrealdb --no-default-features --features kv-rocksdb --test api api_integration::file
run: |
cargo test --locked --package surrealdb --no-default-features --features kv-rocksdb --lib kvs
cargo test --locked --package surrealdb --no-default-features --features kv-rocksdb --test api api_integration::file
rocksdb-engine:
name: RocksDB engine
@ -186,7 +209,9 @@ jobs:
uses: actions/checkout@v3
- name: Run cargo test
run: cargo test --locked --package surrealdb --no-default-features --features kv-rocksdb --test api api_integration::rocksdb
run: |
cargo test --locked --package surrealdb --no-default-features --features kv-rocksdb --lib kvs
cargo test --locked --package surrealdb --no-default-features --features kv-rocksdb --test api api_integration::rocksdb
speedb-engine:
name: SpeeDB engine
@ -200,7 +225,37 @@ jobs:
uses: actions/checkout@v3
- name: Run cargo test
run: cargo test --locked --package surrealdb --no-default-features --features kv-speedb --test api api_integration::speedb
run: |
cargo test --locked --package surrealdb --no-default-features --features kv-speedb --lib kvs
cargo test --locked --package surrealdb --no-default-features --features kv-speedb --test api api_integration::speedb
tikv-engine:
name: TiKV engine
runs-on: ubuntu-20.04
steps:
- name: Install stable toolchain
uses: dtolnay/rust-toolchain@stable
- name: Checkout sources
uses: actions/checkout@v3
- name: Install dependencies
run: |
sudo apt-get -y update
sudo apt-get -y install protobuf-compiler libprotobuf-dev
- name: Install TiKV
run: |
curl --proto '=https' --tlsv1.2 -sSf https://tiup-mirrors.pingcap.com/install.sh | sh
~/.tiup/bin/tiup install tikv pd
~/.tiup/bin/tiup -v
- name: Run cargo test
run: |
(&>/dev/null ~/.tiup/bin/tiup playground --mode tikv-slim --kv 3 --without-monitor &)
cargo test --locked --package surrealdb --no-default-features --features kv-tikv --lib kvs
cargo test --locked --package surrealdb --no-default-features --features kv-tikv --test api api_integration::tikv
any-engine:
name: Any engine

View file

@ -30,7 +30,7 @@ jobs:
- name: Run cargo test
run: |
(&>/dev/null cargo run --no-default-features -F storage-mem -- start --log trace --user root --pass root memory &)
cargo test --workspace --features protocol-ws,protocol-http,kv-rocksdb
cargo test --workspace --features protocol-ws,protocol-http,kv-mem,kv-rocksdb
lint:
name: Lint

View file

@ -30,7 +30,7 @@ jobs:
- name: Run cargo test
run: |
(&>/dev/null cargo run --no-default-features -F storage-mem -- start --log trace --user root --pass root memory &)
cargo test --workspace --features protocol-ws,protocol-http,kv-rocksdb
cargo test --workspace --features protocol-ws,protocol-http,kv-mem,kv-rocksdb
lint:
name: Lint

27
Cargo.lock generated
View file

@ -4240,6 +4240,31 @@ dependencies = [
"serde",
]
[[package]]
name = "serial_test"
version = "2.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0e56dd856803e253c8f298af3f4d7eb0ae5e23a737252cd90bb4f3b435033b2d"
dependencies = [
"dashmap",
"futures 0.3.28",
"lazy_static",
"log",
"parking_lot 0.12.1",
"serial_test_derive",
]
[[package]]
name = "serial_test_derive"
version = "2.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "91d129178576168c589c9ec973feedf7d3126c01ac2bf08795109aa35b69fb8f"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.18",
]
[[package]]
name = "sha-1"
version = "0.10.1"
@ -4455,6 +4480,7 @@ dependencies = [
"serde",
"serde_cbor",
"serde_json",
"serial_test",
"surrealdb",
"temp-env",
"tempfile",
@ -4526,6 +4552,7 @@ dependencies = [
"semver",
"serde",
"serde_json",
"serial_test",
"sha-1",
"sha2",
"snap",

View file

@ -73,6 +73,7 @@ nix = "0.26.2"
rcgen = "0.10.0"
tonic = "0.8.3"
opentelemetry-proto = {version = "0.1.0", features = ["gen-tonic", "traces", "build-server"] }
serial_test = "2.0.0"
tokio-stream = { version = "0.1", features = ["net"] }
temp-env = "0.3.4"

View file

@ -117,6 +117,7 @@ criterion = "0.4"
env_logger = "0.10.0"
test-log = "0.2.11"
pprof = { version = "0.11.1", features = [ "flamegraph", "criterion" ] }
serial_test = "2.0.0"
temp-dir = "0.1.11"
time = { version = "0.3.21", features = ["serde"] }
tokio = { version = "1.28.1", features = ["macros", "sync", "rt-multi-thread"] }

View file

@ -311,27 +311,3 @@ impl Transaction {
Ok(res)
}
}
#[cfg(test)]
mod tests {
use crate::kvs::tests::tx_isolation::transaction::verify_transaction_isolation;
use temp_dir::TempDir;
// https://github.com/surrealdb/surrealdb/issues/76
#[tokio::test]
async fn soundness() {
let mut transaction = get_transaction().await;
transaction.put("uh", "oh").await.unwrap();
async fn get_transaction() -> crate::kvs::Transaction {
let datastore = crate::kvs::Datastore::new("rocksdb:/tmp/rocks.db").await.unwrap();
datastore.transaction(true, false).await.unwrap()
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 3)]
async fn rocksdb_transaction() {
let p = TempDir::new().unwrap().path().to_string_lossy().to_string();
verify_transaction_isolation(&format!("file:{}", p)).await;
}
}

View file

@ -311,27 +311,3 @@ impl Transaction {
Ok(res)
}
}
#[cfg(test)]
mod tests {
use crate::kvs::tests::transaction::verify_transaction_isolation;
use temp_dir::TempDir;
// https://github.com/surrealdb/surrealdb/issues/76
#[tokio::test]
async fn soundness() {
let mut transaction = get_transaction().await;
transaction.put("uh", "oh").await.unwrap();
async fn get_transaction() -> crate::kvs::Transaction {
let datastore = crate::kvs::Datastore::new("speedb:/tmp/spee.db").await.unwrap();
datastore.transaction(true, false).await.unwrap()
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 3)]
async fn rocksdb_transaction() {
let p = TempDir::new().unwrap().path().to_string_lossy().to_string();
verify_transaction_isolation(&format!("file:{}", p)).await;
}
}

View file

@ -1,2 +1,119 @@
mod tb;
pub(crate) mod tx_isolation;
#[cfg(feature = "kv-mem")]
mod mem {
use crate::kvs::Datastore;
use crate::kvs::Transaction;
use serial_test::serial;
async fn new_ds() -> Datastore {
Datastore::new("memory").await.unwrap()
}
async fn new_tx(write: bool, lock: bool) -> Transaction {
new_ds().await.transaction(write, lock).await.unwrap()
}
include!("raw.rs");
include!("snapshot.rs");
include!("multireader.rs");
}
#[cfg(feature = "kv-rocksdb")]
mod rocksdb {
use crate::kvs::Datastore;
use crate::kvs::Transaction;
use serial_test::serial;
use temp_dir::TempDir;
async fn new_ds() -> Datastore {
let path = TempDir::new().unwrap().path().to_string_lossy().to_string();
Datastore::new(format!("rocksdb:{path}").as_str()).await.unwrap()
}
async fn new_tx(write: bool, lock: bool) -> Transaction {
new_ds().await.transaction(write, lock).await.unwrap()
}
include!("raw.rs");
include!("snapshot.rs");
include!("multireader.rs");
include!("multiwriter.rs");
}
#[cfg(feature = "kv-speedb")]
mod speedb {
use crate::kvs::Datastore;
use crate::kvs::Transaction;
use serial_test::serial;
use temp_dir::TempDir;
async fn new_ds() -> Datastore {
let path = TempDir::new().unwrap().path().to_string_lossy().to_string();
Datastore::new(format!("speedb:{path}").as_str()).await.unwrap()
}
async fn new_tx(write: bool, lock: bool) -> Transaction {
new_ds().await.transaction(write, lock).await.unwrap()
}
include!("raw.rs");
include!("snapshot.rs");
include!("multireader.rs");
include!("multiwriter.rs");
}
#[cfg(feature = "kv-tikv")]
mod tikv {
use crate::kvs::Datastore;
use crate::kvs::Transaction;
use serial_test::serial;
async fn new_ds() -> Datastore {
let ds = Datastore::new("tikv:127.0.0.1:2379").await.unwrap();
// Clear any previous test entries
let mut tx = ds.transaction(true, false).await.unwrap();
assert!(tx.delp(vec![], u32::MAX).await.is_ok());
tx.commit().await.unwrap();
// Return the datastore
ds
}
async fn new_tx(write: bool, lock: bool) -> Transaction {
new_ds().await.transaction(write, lock).await.unwrap()
}
include!("raw.rs");
include!("snapshot.rs");
include!("multireader.rs");
include!("multiwriter.rs");
}
#[cfg(feature = "kv-fdb")]
mod fdb {
use crate::kvs::Datastore;
use crate::kvs::Transaction;
use serial_test::serial;
async fn new_ds() -> Datastore {
let ds = Datastore::new("/etc/foundationdb/fdb.cluster").await.unwrap();
// Clear any previous test entries
let mut tx = ds.transaction(true, false).await.unwrap();
assert!(tx.delp(vec![], u32::MAX).await.is_ok());
tx.commit().await.unwrap();
// Return the datastore
ds
}
async fn new_tx(write: bool, lock: bool) -> Transaction {
new_ds().await.transaction(write, lock).await.unwrap()
}
include!("raw.rs");
include!("snapshot.rs");
include!("multireader.rs");
include!("multiwriter.rs");
}

View file

@ -0,0 +1,26 @@
#[tokio::test]
#[serial]
async fn multireader() {
// Create a new datastore
let ds = new_ds().await;
// Insert an initial key
let mut tx = ds.transaction(true, false).await.unwrap();
tx.set("test", "some text").await.unwrap();
tx.commit().await.unwrap();
// Create a readonly transaction
let mut tx1 = ds.transaction(false, false).await.unwrap();
let val = tx1.get("test").await.unwrap().unwrap();
assert_eq!(val, b"some text");
// Create a readonly transaction
let mut tx2 = ds.transaction(false, false).await.unwrap();
let val = tx2.get("test").await.unwrap().unwrap();
assert_eq!(val, b"some text");
// Create a readonly transaction
let mut tx3 = ds.transaction(false, false).await.unwrap();
let val = tx3.get("test").await.unwrap().unwrap();
assert_eq!(val, b"some text");
// Cancel both readonly transactions
tx1.cancel().await.unwrap();
tx2.cancel().await.unwrap();
tx3.cancel().await.unwrap();
}

View file

@ -0,0 +1,72 @@
#[tokio::test]
#[serial]
async fn multiwriter_same_key() {
// Create a new datastore
let ds = new_ds().await;
// Insert an initial key
let mut tx = ds.transaction(true, false).await.unwrap();
tx.set("test", "some text").await.unwrap();
tx.commit().await.unwrap();
// Create a writeable transaction
let mut tx1 = ds.transaction(true, false).await.unwrap();
tx1.set("test", "other text").await.unwrap();
// Create a writeable transaction
let mut tx2 = ds.transaction(true, false).await.unwrap();
tx2.set("test", "other text").await.unwrap();
// Create a writeable transaction
let mut tx3 = ds.transaction(true, false).await.unwrap();
tx3.set("test", "other text").await.unwrap();
// Cancel both writeable transactions
assert!(tx1.commit().await.is_ok());
assert!(tx2.commit().await.is_err());
assert!(tx3.commit().await.is_err());
// Check that the key was updated ok
let mut tx = ds.transaction(false, false).await.unwrap();
let val = tx.get("test").await.unwrap().unwrap();
assert_eq!(val, b"other text");
tx.cancel().await.unwrap();
// Create a writeable transaction
let mut tx = ds.transaction(true, false).await.unwrap();
tx.set("test", "original text").await.unwrap();
tx.commit().await.unwrap();
// Check that the key was updated ok
let mut tx = ds.transaction(false, false).await.unwrap();
let val = tx.get("test").await.unwrap().unwrap();
assert_eq!(val, b"original text");
tx.cancel().await.unwrap();
}
#[tokio::test]
#[serial]
async fn multiwriter_different_keys() {
// Create a new datastore
let ds = new_ds().await;
// Insert an initial key
let mut tx = ds.transaction(true, false).await.unwrap();
tx.set("test", "some text").await.unwrap();
tx.commit().await.unwrap();
// Create a writeable transaction
let mut tx1 = ds.transaction(true, false).await.unwrap();
tx1.set("test1", "other text 1").await.unwrap();
// Create a writeable transaction
let mut tx2 = ds.transaction(true, false).await.unwrap();
tx2.set("test2", "other text 2").await.unwrap();
// Create a writeable transaction
let mut tx3 = ds.transaction(true, false).await.unwrap();
tx3.set("test3", "other text 3").await.unwrap();
// Cancel both writeable transactions
tx1.commit().await.unwrap();
tx2.commit().await.unwrap();
tx3.commit().await.unwrap();
// Check that the key was updated ok
let mut tx = ds.transaction(false, false).await.unwrap();
let val = tx.get("test").await.unwrap().unwrap();
assert_eq!(val, b"some text");
let val = tx.get("test1").await.unwrap().unwrap();
assert_eq!(val, b"other text 1");
let val = tx.get("test2").await.unwrap().unwrap();
assert_eq!(val, b"other text 2");
let val = tx.get("test3").await.unwrap().unwrap();
assert_eq!(val, b"other text 3");
tx.cancel().await.unwrap();
}

224
lib/src/kvs/tests/raw.rs Normal file
View file

@ -0,0 +1,224 @@
#[tokio::test]
#[serial]
async fn initialise() {
let mut tx = new_tx(true, false).await;
assert!(tx.put("test", "ok").await.is_ok());
tx.commit().await.unwrap();
}
#[tokio::test]
#[serial]
async fn exi() {
// Create a new datastore
let ds = new_ds().await;
// Create a writeable transaction
let mut tx = ds.transaction(true, false).await.unwrap();
assert!(tx.put("test", "ok").await.is_ok());
tx.commit().await.unwrap();
// Create a readonly transaction
let mut tx = ds.transaction(false, false).await.unwrap();
let val = tx.exi("test").await.unwrap();
assert_eq!(val, true);
let val = tx.exi("none").await.unwrap();
assert_eq!(val, false);
tx.cancel().await.unwrap();
}
#[tokio::test]
#[serial]
async fn get() {
// Create a new datastore
let ds = new_ds().await;
// Create a writeable transaction
let mut tx = ds.transaction(true, false).await.unwrap();
assert!(tx.put("test", "ok").await.is_ok());
tx.commit().await.unwrap();
// Create a readonly transaction
let mut tx = ds.transaction(false, false).await.unwrap();
let val = tx.get("test").await.unwrap();
assert!(matches!(val.as_deref(), Some(b"ok")));
let val = tx.get("none").await.unwrap();
assert!(matches!(val.as_deref(), None));
tx.cancel().await.unwrap();
}
#[tokio::test]
#[serial]
async fn set() {
// Create a new datastore
let ds = new_ds().await;
// Create a writeable transaction
let mut tx = ds.transaction(true, false).await.unwrap();
assert!(tx.set("test", "one").await.is_ok());
tx.commit().await.unwrap();
// Create a readonly transaction
let mut tx = ds.transaction(false, false).await.unwrap();
let val = tx.get("test").await.unwrap();
assert!(matches!(val.as_deref(), Some(b"one")));
tx.cancel().await.unwrap();
// Create a writeable transaction
let mut tx = ds.transaction(true, false).await.unwrap();
assert!(tx.set("test", "two").await.is_ok());
tx.commit().await.unwrap();
// Create a readonly transaction
let mut tx = ds.transaction(false, false).await.unwrap();
let val = tx.get("test").await.unwrap();
assert!(matches!(val.as_deref(), Some(b"two")));
tx.cancel().await.unwrap();
}
#[tokio::test]
#[serial]
async fn put() {
// Create a new datastore
let ds = new_ds().await;
// Create a writeable transaction
let mut tx = ds.transaction(true, false).await.unwrap();
assert!(tx.put("test", "one").await.is_ok());
tx.commit().await.unwrap();
// Create a readonly transaction
let mut tx = ds.transaction(false, false).await.unwrap();
let val = tx.get("test").await.unwrap();
assert!(matches!(val.as_deref(), Some(b"one")));
tx.cancel().await.unwrap();
// Create a writeable transaction
let mut tx = ds.transaction(true, false).await.unwrap();
assert!(tx.put("test", "two").await.is_err());
tx.commit().await.unwrap();
// Create a readonly transaction
let mut tx = ds.transaction(false, false).await.unwrap();
let val = tx.get("test").await.unwrap();
assert!(matches!(val.as_deref(), Some(b"one")));
tx.cancel().await.unwrap();
}
#[tokio::test]
#[serial]
async fn del() {
// Create a new datastore
let ds = new_ds().await;
// Create a writeable transaction
let mut tx = ds.transaction(true, false).await.unwrap();
assert!(tx.put("test", "one").await.is_ok());
tx.commit().await.unwrap();
// Create a writeable transaction
let mut tx = ds.transaction(true, false).await.unwrap();
assert!(tx.del("test").await.is_ok());
tx.commit().await.unwrap();
// Create a readonly transaction
let mut tx = ds.transaction(false, false).await.unwrap();
let val = tx.get("test").await.unwrap();
assert!(matches!(val.as_deref(), None));
tx.cancel().await.unwrap();
}
#[tokio::test]
#[serial]
async fn putc() {
// Create a new datastore
let ds = new_ds().await;
// Create a writeable transaction
let mut tx = ds.transaction(true, false).await.unwrap();
assert!(tx.put("test", "one").await.is_ok());
tx.commit().await.unwrap();
// Create a readonly transaction
let mut tx = ds.transaction(false, false).await.unwrap();
let val = tx.get("test").await.unwrap();
assert!(matches!(val.as_deref(), Some(b"one")));
tx.cancel().await.unwrap();
// Create a writeable transaction
let mut tx = ds.transaction(true, false).await.unwrap();
assert!(tx.putc("test", "two", Some("one")).await.is_ok());
tx.commit().await.unwrap();
// Create a readonly transaction
let mut tx = ds.transaction(false, false).await.unwrap();
let val = tx.get("test").await.unwrap();
assert!(matches!(val.as_deref(), Some(b"two")));
tx.cancel().await.unwrap();
// Create a writeable transaction
let mut tx = ds.transaction(true, false).await.unwrap();
assert!(tx.putc("test", "tre", Some("one")).await.is_err());
tx.commit().await.unwrap();
// Create a readonly transaction
let mut tx = ds.transaction(false, false).await.unwrap();
let val = tx.get("test").await.unwrap();
assert!(matches!(val.as_deref(), Some(b"two")));
tx.cancel().await.unwrap();
}
#[tokio::test]
#[serial]
async fn delc() {
// Create a new datastore
let ds = new_ds().await;
// Create a writeable transaction
let mut tx = ds.transaction(true, false).await.unwrap();
assert!(tx.put("test", "one").await.is_ok());
tx.commit().await.unwrap();
// Create a writeable transaction
let mut tx = ds.transaction(true, false).await.unwrap();
assert!(tx.delc("test", Some("two")).await.is_err());
tx.commit().await.unwrap();
// Create a readonly transaction
let mut tx = ds.transaction(false, false).await.unwrap();
let val = tx.get("test").await.unwrap();
assert!(matches!(val.as_deref(), Some(b"one")));
tx.cancel().await.unwrap();
// Create a writeable transaction
let mut tx = ds.transaction(true, false).await.unwrap();
assert!(tx.delc("test", Some("one")).await.is_ok());
tx.commit().await.unwrap();
// Create a readonly transaction
let mut tx = ds.transaction(false, false).await.unwrap();
let val = tx.get("test").await.unwrap();
assert!(matches!(val.as_deref(), None));
tx.cancel().await.unwrap();
}
#[tokio::test]
#[serial]
async fn scan() {
// Create a new datastore
let ds = new_ds().await;
// Create a writeable transaction
let mut tx = ds.transaction(true, false).await.unwrap();
assert!(tx.put("test1", "1").await.is_ok());
assert!(tx.put("test2", "2").await.is_ok());
assert!(tx.put("test3", "3").await.is_ok());
assert!(tx.put("test4", "4").await.is_ok());
assert!(tx.put("test5", "5").await.is_ok());
tx.commit().await.unwrap();
// Create a readonly transaction
let mut tx = ds.transaction(false, false).await.unwrap();
let val = tx.scan("test1".."test9", u32::MAX).await.unwrap();
assert_eq!(val.len(), 5);
assert_eq!(val[0].0, b"test1");
assert_eq!(val[0].1, b"1");
assert_eq!(val[1].0, b"test2");
assert_eq!(val[1].1, b"2");
assert_eq!(val[2].0, b"test3");
assert_eq!(val[2].1, b"3");
assert_eq!(val[3].0, b"test4");
assert_eq!(val[3].1, b"4");
assert_eq!(val[4].0, b"test5");
assert_eq!(val[4].1, b"5");
tx.cancel().await.unwrap();
// Create a readonly transaction
let mut tx = ds.transaction(false, false).await.unwrap();
let val = tx.scan("test2".."test4", u32::MAX).await.unwrap();
assert_eq!(val.len(), 2);
assert_eq!(val[0].0, b"test2");
assert_eq!(val[0].1, b"2");
assert_eq!(val[1].0, b"test3");
assert_eq!(val[1].1, b"3");
tx.cancel().await.unwrap();
// Create a readonly transaction
let mut tx = ds.transaction(false, false).await.unwrap();
let val = tx.scan("test1".."test9", 2).await.unwrap();
assert_eq!(val.len(), 2);
assert_eq!(val[0].0, b"test1");
assert_eq!(val[0].1, b"1");
assert_eq!(val[1].0, b"test2");
assert_eq!(val[1].1, b"2");
tx.cancel().await.unwrap();
}

View file

@ -0,0 +1,43 @@
#[tokio::test]
#[serial]
async fn snapshot() {
// Create a new datastore
let ds = new_ds().await;
// Insert an initial key
let mut tx = ds.transaction(true, false).await.unwrap();
tx.set("test", "some text").await.unwrap();
tx.commit().await.unwrap();
// Create a readonly transaction
let mut tx1 = ds.transaction(false, false).await.unwrap();
// Check that the key was inserted ok
let val = tx1.get("test").await.unwrap().unwrap();
assert_eq!(val, b"some text");
// Create a new writeable transaction
let mut txw = ds.transaction(true, false).await.unwrap();
// Update the test key content
txw.set("test", "other text").await.unwrap();
// Create a readonly transaction
let mut tx2 = ds.transaction(false, false).await.unwrap();
let val = tx2.get("test").await.unwrap().unwrap();
assert_eq!(val, b"some text");
// Create a readonly transaction
let mut tx3 = ds.transaction(false, false).await.unwrap();
let val = tx3.get("test").await.unwrap().unwrap();
assert_eq!(val, b"some text");
// Update the test key content
txw.set("test", "extra text").await.unwrap();
// Check the key from the original transaction
let val = tx1.get("test").await.unwrap().unwrap();
assert_eq!(val, b"some text");
// Cancel both readonly transactions
tx1.cancel().await.unwrap();
tx2.cancel().await.unwrap();
tx3.cancel().await.unwrap();
// Commit the writable transaction
txw.commit().await.unwrap();
// Check that the key was updated ok
let mut tx = ds.transaction(false, false).await.unwrap();
let val = tx.get("test").await.unwrap().unwrap();
assert_eq!(val, b"extra text");
tx.cancel().await.unwrap();
}

View file

@ -1,160 +0,0 @@
#[cfg(any(feature = "kv-tikv", feature = "kv-rocksdb", feature = "kv-speedb", feature = "kv-fdb"))]
pub(crate) mod transaction {
use crate::dbs::{Response, Session};
use crate::kvs::ds::Inner;
use crate::kvs::Datastore;
use crate::sql::json;
use log::debug;
use std::sync::{Arc, Barrier};
use std::time::SystemTime;
use ulid::Ulid;
// The first transaction increments value by 1.
// This transaction uses sleep to be sure it runs longer than transaction2.
async fn transaction_isolation_1(client: TestClient, barrier: Arc<Barrier>) {
debug!("1 barrier");
barrier.wait();
debug!("1 execute");
client
.execute(
r#"
BEGIN;
/* 00:00 read the initial value */
CREATE rec:1 SET value=(SELECT value FROM rec:0);
SELECT * FROM sleep("2s");
/* 00:02 before txn2's commit */
CREATE rec:2 SET value=(SELECT value FROM rec:0);
SELECT * FROM sleep("2s");
/* 00:04 after tnx2's commit; */
CREATE rec:3 SET value=(SELECT value FROM rec:0);
COMMIT;"#,
)
.await;
debug!("1 ends");
}
// The second transaction increments value by 2.
async fn transaction_isolation_2(client: TestClient, barrier: Arc<Barrier>) {
debug!("2 barrier");
barrier.wait();
debug!("2 execute");
client
.execute(
r#"
BEGIN;
SLEEP 1s;
/* 00:01 before txn1 check the value */
UPDATE rec:0 SET value=1;
SLEEP 2s;
/* 00:03 before txn1 check the value the second time */
COMMIT;"#,
)
.await;
debug!("2 ends");
}
struct TestClient {
ds_path: String,
ds: Datastore,
ses: Session,
}
impl TestClient {
async fn new(db: String, ds_path: String) -> Self {
let ds = Datastore::new(&ds_path).await.unwrap();
let ses = Session::for_kv().with_ns("test").with_db(&db);
Self {
ds_path,
ds,
ses,
}
}
async fn execute(&self, txt: &str) -> Vec<Response> {
self.ds.execute(txt, &self.ses, None, false).await.unwrap()
}
async fn clone(&self) -> Self {
let ds = match &self.ds.inner {
#[cfg(feature = "kv-rocksdb")]
Inner::RocksDB(ds) => Datastore {
inner: Inner::RocksDB(ds.clone()),
},
#[cfg(feature = "kv-speedb")]
Inner::SpeeDB(ds) => Datastore {
inner: Inner::SpeeDB(ds.clone()),
},
#[cfg(feature = "kv-tikv")]
Inner::TiKV(_) => Datastore::new(&self.ds_path).await.unwrap(),
#[cfg(feature = "kv-fdb")]
Inner::FDB(_) => Datastore::new(&self.ds_path).await.unwrap(),
_ => panic!("Datastore not supported"),
};
Self {
ds_path: self.ds_path.clone(),
ds,
ses: self.ses.clone(),
}
}
}
fn assert_eq_value(mut res: Vec<Response>, expected: &str) {
let value = json(expected).unwrap();
assert_eq!(res.remove(0).result.unwrap(), value)
}
/// This test checks if the repeatable read isolation level is being properly enforced
// https://github.com/surrealdb/surrealdb/issues/1620
pub(crate) async fn verify_transaction_isolation(ds_path: &str) {
let db = Ulid::new().to_string();
let client = TestClient::new(db, ds_path.to_string()).await;
// Create a document with initial values.
client.execute("CREATE rec:0 SET value=0").await;
// The barrier is used to synchronise both transactions.
let barrier = Arc::new(Barrier::new(3));
// The two queries are run in parallel.
let f1 = tokio::spawn(transaction_isolation_1(client.clone().await, barrier.clone()));
let f2 = tokio::spawn(transaction_isolation_2(client.clone().await, barrier.clone()));
// Unlock the execution of both transactions.
let time = SystemTime::now();
barrier.wait();
// Wait for both transaction's execution.
let (res1, res2) = tokio::join!(f1, f2);
if time.elapsed().unwrap().as_secs() > 6 {
panic!(
"The test should not take more than 6 seconds.\
It probably means that the two transactions has not been run in parallel."
)
}
// Check that both transaction ran successfully.
res1.unwrap();
res2.unwrap();
// `rec:0.value` should be 1, set by txn2.
assert_eq_value(client.execute("SELECT value FROM rec:0").await, r#"[{"value": 1}]"#);
// `rec:1.value should be 0, the initial value of rec:0.value
assert_eq_value(
client.execute("SELECT value FROM rec:1").await,
r#"[{"value": {"value": 0}}]"#,
);
// `rec:2.value should be 0, the initial value of rec:0.value
assert_eq_value(
client.execute("SELECT value FROM rec:2").await,
r#"[{"value": {"value": 0}}]"#,
);
// `rec:3.value should be 0, the initial value of rec:0.value
assert_eq_value(
client.execute("SELECT value FROM rec:3").await,
r#"[{"value": {"value": 0}}]"#,
);
}
}

View file

@ -70,7 +70,9 @@ impl Transaction {
// Mark this transaction as done
self.ok = true;
// Cancel this transaction
self.tx.rollback().await?;
if self.rw {
self.tx.rollback().await?;
}
// Continue
Ok(())
}
@ -152,8 +154,15 @@ impl Transaction {
if !self.rw {
return Err(Error::TxReadonly);
}
// Set the key
self.tx.insert(key.into(), val.into()).await?;
// Get the key
let key = key.into();
// Get the val
let val = val.into();
// Set the key if empty
match self.tx.key_exists(key.clone()).await? {
false => self.tx.put(key, val).await?,
_ => return Err(Error::TxKeyAlreadyExists),
};
// Return result
Ok(())
}
@ -252,14 +261,3 @@ impl Transaction {
Ok(res)
}
}
#[cfg(test)]
mod tests {
use crate::kvs::tests::transaction::verify_transaction_isolation;
use test_log::test;
#[test(tokio::test(flavor = "multi_thread", worker_threads = 3))]
async fn tikv_transaction() {
verify_transaction_isolation("tikv://127.0.0.1:2379").await;
}
}

View file

@ -20,7 +20,6 @@ use std::collections::HashMap;
use std::sync::Arc;
use surrealdb::channel;
use surrealdb::channel::Sender;
use surrealdb::dbs::Response;
use surrealdb::dbs::Session;
use surrealdb::opt::auth::Root;
use surrealdb::sql::Array;

View file

@ -3,7 +3,6 @@ use crate::err::Error;
use crate::iam::verify::basic;
use crate::iam::BASIC;
use crate::net::client_ip;
use std::net::SocketAddr;
use surrealdb::dbs::Session;
use surrealdb::iam::verify::token;
use surrealdb::iam::TOKEN;

View file

@ -2,6 +2,7 @@ mod cli_integration {
// cargo test --package surreal --bin surreal --no-default-features --features storage-mem --test cli -- cli_integration --nocapture
use rand::{thread_rng, Rng};
use serial_test::serial;
use std::fs;
use std::path::Path;
use std::process::{Command, Stdio};
@ -76,27 +77,31 @@ mod cli_integration {
}
#[test]
#[serial]
fn version() {
assert!(run("version").output().is_ok());
}
#[test]
#[serial]
fn help() {
assert!(run("help").output().is_ok());
}
#[test]
#[serial]
fn nonexistent_subcommand() {
assert!(run("nonexistent").output().is_err());
}
#[test]
#[serial]
fn nonexistent_option() {
assert!(run("version --turbo").output().is_err());
}
#[test]
#[ignore = "only runs in CI"]
#[serial]
fn start() {
let mut rng = thread_rng();
@ -255,7 +260,7 @@ mod cli_integration {
}
#[test]
#[ignore = "only runs in CI"]
#[serial]
fn start_tls() {
let mut rng = thread_rng();