Feature: On disk temporary table (#3749)

Co-authored-by: Gerard Guillemas Martos <gerard.guillemas@surrealdb.com>
This commit is contained in:
Emmanuel Keller 2024-03-28 16:29:55 +00:00 committed by GitHub
parent e201366602
commit c82bbc0820
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
26 changed files with 1272 additions and 236 deletions

View file

@ -32,7 +32,7 @@ on:
rust_version:
required: false
type: string
default: "1.77.0"
default: "1.77.1"
description: "The Rust version to use for building binaries"
onnx_version:
required: false
@ -203,7 +203,7 @@ jobs:
test:
name: Test
needs: [prepare-vars]
needs: [ prepare-vars ]
runs-on: ubuntu-latest-16-cores
steps:
- name: Install stable toolchain
@ -248,7 +248,7 @@ jobs:
lint:
name: Lint
needs: [prepare-vars]
needs: [ prepare-vars ]
runs-on: ubuntu-latest
steps:
- name: Checkout sources
@ -281,7 +281,7 @@ jobs:
docker-builder:
name: Prepare docker builder
runs-on: ubuntu-latest
needs: [prepare-vars]
needs: [ prepare-vars ]
outputs:
name: ${{ steps.image.outputs.name }}
tag: ${{ steps.image.outputs.tag }}
@ -333,7 +333,7 @@ jobs:
build:
name: Build ${{ matrix.arch }} binary
needs: [prepare-vars, docker-builder]
needs: [ prepare-vars, docker-builder ]
strategy:
fail-fast: false
matrix:
@ -394,7 +394,7 @@ jobs:
# Linux amd64
- arch: x86_64-unknown-linux-gnu
runner: ["self-hosted", "amd64", "builder"]
runner: [ "self-hosted", "amd64", "builder" ]
file: surreal-${{ needs.prepare-vars.outputs.name }}.linux-amd64
build-step: |
# Build
@ -430,7 +430,7 @@ jobs:
# Linux arm64
- arch: aarch64-unknown-linux-gnu
runner: ["self-hosted", "arm64", "builder"]
runner: [ "self-hosted", "arm64", "builder" ]
file: surreal-${{ needs.prepare-vars.outputs.name }}.linux-arm64
build-step: |
set -x
@ -541,7 +541,7 @@ jobs:
publish:
name: Publish crate and artifacts binaries
needs: [prepare-vars, test, lint, build]
needs: [ prepare-vars, test, lint, build ]
if: ${{ inputs.publish }}
environment: ${{ inputs.environment }}
runs-on: ubuntu-latest
@ -688,7 +688,7 @@ jobs:
docker:
name: Docker images
needs: [prepare-vars, publish]
needs: [ prepare-vars, publish ]
uses: ./.github/workflows/reusable_docker.yml
with:
git-ref: ${{ needs.prepare-vars.outputs.git-ref }}
@ -700,7 +700,7 @@ jobs:
package-macos:
name: Package and publish macOS universal binary
needs: [prepare-vars, publish]
needs: [ prepare-vars, publish ]
runs-on: macos-latest
env:
FILE: surreal-${{ needs.prepare-vars.outputs.name }}.darwin-universal
@ -740,7 +740,7 @@ jobs:
propagate:
name: Propagate binaries to all regions
if: ${{ inputs.publish }}
needs: [publish, package-macos]
needs: [ publish, package-macos ]
runs-on: ubuntu-latest
steps:
- name: Configure AWS

19
Cargo.lock generated
View file

@ -1875,6 +1875,19 @@ dependencies = [
"pin-project-lite",
]
[[package]]
name = "ext-sort"
version = "0.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fcf73e44617eab501beba39234441a194cf138629d3b6447f81f573e1c3d0a13"
dependencies = [
"log",
"rayon",
"rmp-serde",
"serde",
"tempfile",
]
[[package]]
name = "fail"
version = "0.4.0"
@ -5922,6 +5935,7 @@ dependencies = [
"dmp",
"echodb",
"env_logger",
"ext-sort",
"foundationdb",
"fst",
"futures",
@ -5974,6 +5988,7 @@ dependencies = [
"surrealkv",
"surrealml-core 0.1.1",
"temp-dir",
"tempfile",
"test-log",
"thiserror",
"time",
@ -6221,9 +6236,9 @@ dependencies = [
[[package]]
name = "tempfile"
version = "3.10.0"
version = "3.10.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a365e8cd18e44762ef95d87f284f4b5cd04107fec2ff3052bd6a3e6069669e67"
checksum = "85b77fafb263dd9d05cbeac119526425676db3784113aa9295c88498cbf8bff1"
dependencies = [
"cfg-if",
"fastrand 2.0.1",

View file

@ -27,7 +27,7 @@ args = ["clippy", "--all-targets", "--features", "storage-mem,storage-rocksdb,st
category = "CI - INTEGRATION TESTS"
command = "cargo"
env = { RUST_BACKTRACE = 1, RUSTFLAGS = "--cfg surrealdb_unstable", RUST_LOG = { value = "cli_integration=debug", condition = { env_not_set = ["RUST_LOG"] } } }
args = ["test", "--locked", "--no-default-features", "--features", "storage-mem,http,scripting,jwks,sql2", "--workspace", "--test", "cli_integration", "--", "cli_integration"]
args = ["test", "--locked", "--no-default-features", "--features", "storage-mem,storage-surrealkv,http,scripting,jwks,sql2", "--workspace", "--test", "cli_integration", "--", "cli_integration"]
[tasks.ci-http-integration]
category = "CI - INTEGRATION TESTS"

View file

@ -510,9 +510,6 @@ allow_unsafe = true
[pkg.futures-util]
allow_unsafe = true
allow_apis = [
"net",
]
[pkg.indexmap]
allow_unsafe = true
@ -1291,26 +1288,6 @@ allow_unsafe = true
[pkg.pprof]
allow_unsafe = true
[pkg.crossbeam-queue]
allow_unsafe = true
[pkg.lru]
allow_unsafe = true
allow_apis = [
"net",
]
[pkg.surrealkv]
allow_apis = [
"fs",
"net",
]
[pkg.vart]
allow_apis = [
"net",
]
# examples
[pkg.pear_codegen]
allow_proc_macro = true
@ -1320,3 +1297,8 @@ allow_proc_macro = true
[pkg.rocket_codegen]
allow_proc_macro = true
[pkg.ext-sort]
allow_apis = [
"fs",
]

View file

@ -26,18 +26,18 @@ resolver = "2"
default = ["kv-mem"]
kv-mem = ["dep:echodb", "tokio/time"]
kv-indxdb = ["dep:indxdb"]
kv-speedb = ["dep:speedb", "tokio/time"]
kv-rocksdb = ["dep:rocksdb", "tokio/time"]
kv-tikv = ["dep:tikv"]
kv-fdb-5_1 = ["foundationdb/fdb-5_1", "kv-fdb"]
kv-fdb-5_2 = ["foundationdb/fdb-5_2", "kv-fdb"]
kv-fdb-6_0 = ["foundationdb/fdb-6_0", "kv-fdb"]
kv-fdb-6_1 = ["foundationdb/fdb-6_1", "kv-fdb"]
kv-fdb-6_2 = ["foundationdb/fdb-6_2", "kv-fdb"]
kv-fdb-6_3 = ["foundationdb/fdb-6_3", "kv-fdb"]
kv-fdb-7_0 = ["foundationdb/fdb-7_0", "kv-fdb"]
kv-fdb-7_1 = ["foundationdb/fdb-7_1", "kv-fdb"]
kv-surrealkv = ["dep:surrealkv", "tokio/time"]
kv-speedb = ["dep:speedb", "tokio/time", "dep:tempfile", "dep:ext-sort"]
kv-rocksdb = ["dep:rocksdb", "tokio/time", "dep:tempfile", "dep:ext-sort"]
kv-tikv = ["dep:tikv", "dep:tempfile", "dep:ext-sort"]
kv-fdb-5_1 = ["foundationdb/fdb-5_1", "kv-fdb", "dep:tempfile", "dep:ext-sort"]
kv-fdb-5_2 = ["foundationdb/fdb-5_2", "kv-fdb", "dep:tempfile", "dep:ext-sort"]
kv-fdb-6_0 = ["foundationdb/fdb-6_0", "kv-fdb", "dep:tempfile", "dep:ext-sort"]
kv-fdb-6_1 = ["foundationdb/fdb-6_1", "kv-fdb", "dep:tempfile", "dep:ext-sort"]
kv-fdb-6_2 = ["foundationdb/fdb-6_2", "kv-fdb", "dep:tempfile", "dep:ext-sort"]
kv-fdb-6_3 = ["foundationdb/fdb-6_3", "kv-fdb", "dep:tempfile", "dep:ext-sort"]
kv-fdb-7_0 = ["foundationdb/fdb-7_0", "kv-fdb", "dep:tempfile", "dep:ext-sort"]
kv-fdb-7_1 = ["foundationdb/fdb-7_1", "kv-fdb", "dep:tempfile", "dep:ext-sort"]
kv-surrealkv = ["dep:surrealkv", "tokio/time", "dep:tempfile", "dep:ext-sort"]
scripting = ["dep:js"]
http = ["dep:reqwest"]
ml = ["dep:surrealml-core1", "dep:ndarray"]
@ -80,6 +80,7 @@ deunicode = "1.4.1"
dmp = "0.2.0"
echodb = { version = "0.4.0", optional = true }
executor = { version = "1.8.0", package = "async-executor" }
ext-sort = { version = "^0.1.4", optional = true }
foundationdb = { version = "0.8.0", default-features = false, features = [
"embedded-fdb-include",
], optional = true }
@ -137,6 +138,7 @@ speedb = { version = "0.0.4", features = ["lz4", "snappy"], optional = true }
storekey = "0.5.0"
surrealml-core1 = { version = "0.1.1", optional = true, package = "surrealml-core" }
surrealkv = { version = "0.1.3", optional = true }
tempfile = { version = "3.10.1", optional = true }
thiserror = "1.0.50"
tikv = { version = "0.2.0-surreal.2", default-features = false, package = "surrealdb-tikv-client", optional = true }
tracing = "0.1.40"

View file

@ -38,3 +38,16 @@ pub const PROCESSOR_BATCH_SIZE: u32 = 50;
/// Forward all signup/signin query errors to a client trying authenticate to a scope. Do not use in production.
pub static INSECURE_FORWARD_SCOPE_ERRORS: Lazy<bool> =
lazy_env_parse!("SURREAL_INSECURE_FORWARD_SCOPE_ERRORS", bool, false);
#[cfg(any(
feature = "kv-surrealkv",
feature = "kv-file",
feature = "kv-rocksdb",
feature = "kv-fdb",
feature = "kv-tikv",
feature = "kv-speedb"
))]
/// Specifies the buffer limit for external sorting.
/// If the environment variable is not present or cannot be parsed, a default value of 50,000 is used.
pub static EXTERNAL_SORTING_BUFFER_LIMIT: Lazy<usize> =
lazy_env_parse!("SURREAL_EXTERNAL_SORTING_BUFFER_LIMIT", usize, 50_000);

View file

@ -12,7 +12,25 @@ use crate::sql::value::Value;
use channel::Sender;
use std::borrow::Cow;
use std::collections::HashMap;
#[cfg(any(
feature = "kv-surrealkv",
feature = "kv-file",
feature = "kv-rocksdb",
feature = "kv-fdb",
feature = "kv-tikv",
feature = "kv-speedb"
))]
use std::env;
use std::fmt::{self, Debug};
#[cfg(any(
feature = "kv-surrealkv",
feature = "kv-file",
feature = "kv-rocksdb",
feature = "kv-fdb",
feature = "kv-tikv",
feature = "kv-speedb"
))]
use std::path::{Path, PathBuf};
use std::str::FromStr;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
@ -53,6 +71,26 @@ pub struct Context<'a> {
index_stores: IndexStores,
// Capabilities
capabilities: Arc<Capabilities>,
#[cfg(any(
feature = "kv-surrealkv",
feature = "kv-file",
feature = "kv-rocksdb",
feature = "kv-fdb",
feature = "kv-tikv",
feature = "kv-speedb"
))]
// Is the datastore in memory? (KV-MEM, WASM)
is_memory: bool,
#[cfg(any(
feature = "kv-surrealkv",
feature = "kv-file",
feature = "kv-rocksdb",
feature = "kv-fdb",
feature = "kv-tikv",
feature = "kv-speedb"
))]
// The temporary directory
temporary_directory: Arc<PathBuf>,
}
impl<'a> Default for Context<'a> {
@ -77,6 +115,24 @@ impl<'a> Context<'a> {
time_out: Option<Duration>,
capabilities: Capabilities,
index_stores: IndexStores,
#[cfg(any(
feature = "kv-surrealkv",
feature = "kv-file",
feature = "kv-rocksdb",
feature = "kv-fdb",
feature = "kv-tikv",
feature = "kv-speedb"
))]
is_memory: bool,
#[cfg(any(
feature = "kv-surrealkv",
feature = "kv-file",
feature = "kv-rocksdb",
feature = "kv-fdb",
feature = "kv-tikv",
feature = "kv-speedb"
))]
temporary_directory: Arc<PathBuf>,
) -> Result<Context<'a>, Error> {
let mut ctx = Self {
values: HashMap::default(),
@ -89,6 +145,24 @@ impl<'a> Context<'a> {
iteration_stage: None,
capabilities: Arc::new(capabilities),
index_stores,
#[cfg(any(
feature = "kv-surrealkv",
feature = "kv-file",
feature = "kv-rocksdb",
feature = "kv-fdb",
feature = "kv-tikv",
feature = "kv-speedb"
))]
is_memory,
#[cfg(any(
feature = "kv-surrealkv",
feature = "kv-file",
feature = "kv-rocksdb",
feature = "kv-fdb",
feature = "kv-tikv",
feature = "kv-speedb"
))]
temporary_directory,
};
if let Some(timeout) = time_out {
ctx.add_timeout(timeout)?;
@ -108,6 +182,24 @@ impl<'a> Context<'a> {
iteration_stage: None,
capabilities: Arc::new(Capabilities::default()),
index_stores: IndexStores::default(),
#[cfg(any(
feature = "kv-surrealkv",
feature = "kv-file",
feature = "kv-rocksdb",
feature = "kv-fdb",
feature = "kv-tikv",
feature = "kv-speedb"
))]
is_memory: false,
#[cfg(any(
feature = "kv-surrealkv",
feature = "kv-file",
feature = "kv-rocksdb",
feature = "kv-fdb",
feature = "kv-tikv",
feature = "kv-speedb"
))]
temporary_directory: Arc::new(env::temp_dir()),
}
}
@ -124,6 +216,24 @@ impl<'a> Context<'a> {
iteration_stage: parent.iteration_stage.clone(),
capabilities: parent.capabilities.clone(),
index_stores: parent.index_stores.clone(),
#[cfg(any(
feature = "kv-surrealkv",
feature = "kv-file",
feature = "kv-rocksdb",
feature = "kv-fdb",
feature = "kv-tikv",
feature = "kv-speedb"
))]
is_memory: parent.is_memory,
#[cfg(any(
feature = "kv-surrealkv",
feature = "kv-file",
feature = "kv-rocksdb",
feature = "kv-fdb",
feature = "kv-tikv",
feature = "kv-speedb"
))]
temporary_directory: parent.temporary_directory.clone(),
}
}
@ -239,6 +349,32 @@ impl<'a> Context<'a> {
matches!(self.done(), Some(Reason::Timedout))
}
#[cfg(any(
feature = "kv-surrealkv",
feature = "kv-file",
feature = "kv-rocksdb",
feature = "kv-fdb",
feature = "kv-tikv",
feature = "kv-speedb"
))]
/// Return true if the underlying Datastore is KV-MEM (Or WASM)
pub fn is_memory(&self) -> bool {
self.is_memory
}
#[cfg(any(
feature = "kv-surrealkv",
feature = "kv-file",
feature = "kv-rocksdb",
feature = "kv-fdb",
feature = "kv-tikv",
feature = "kv-speedb"
))]
/// Return the location of the temporary directory
pub fn temporary_directory(&self) -> &Path {
self.temporary_directory.as_ref()
}
/// Get a value from the context. If no value is stored under the
/// provided key, then this will return None.
pub fn value(&self, key: &str) -> Option<&Value> {

View file

@ -1,6 +1,6 @@
use crate::ctx::Context;
use crate::dbs::plan::Explanation;
use crate::dbs::store::StoreCollector;
use crate::dbs::store::MemoryCollector;
use crate::dbs::{Options, Statement, Transaction};
use crate::err::Error;
use crate::sql::function::OptimisedAggregate;
@ -110,8 +110,8 @@ impl GroupsCollector {
opt: &Options,
txn: &Transaction,
stm: &Statement<'_>,
) -> Result<StoreCollector, Error> {
let mut results = StoreCollector::default();
) -> Result<MemoryCollector, Error> {
let mut results = MemoryCollector::default();
if let Some(fields) = stm.expr() {
let grp = mem::take(&mut self.grp);
// Loop over each grouped collection

View file

@ -18,7 +18,6 @@ use crate::sql::table::Table;
use crate::sql::thing::Thing;
use crate::sql::value::Value;
use async_recursion::async_recursion;
use std::cmp::Ordering;
use std::mem;
#[derive(Clone)]
@ -296,7 +295,18 @@ impl Iterator {
// Process the query START clause
self.setup_start(&cancel_ctx, opt, txn, stm).await?;
// Prepare the results with possible optimisations on groups
self.results = self.results.prepare(stm);
self.results = self.results.prepare(
#[cfg(any(
feature = "kv-surrealkv",
feature = "kv-file",
feature = "kv-rocksdb",
feature = "kv-fdb",
feature = "kv-tikv",
feature = "kv-speedb"
))]
ctx,
stm,
)?;
// Extract the expected behaviour depending on the presence of EXPLAIN with or without FULL
let mut plan = Plan::new(ctx, stm, &self.entries, &self.results);
if plan.do_iterate {
@ -317,10 +327,17 @@ impl Iterator {
}
// Process any SPLIT clause
self.output_split(ctx, opt, txn, stm).await?;
// Process any GROUP clause
self.results = self.results.group(ctx, opt, txn, stm).await?;
if let Results::Groups(g) = &mut self.results {
self.results = Results::Memory(g.output(ctx, opt, txn, stm).await?);
}
// Process any ORDER clause
self.output_order(ctx, opt, txn, stm).await?;
if let Some(orders) = stm.order() {
self.results.sort(orders);
}
// Process any START & LIMIT clause
self.results.start_limit(self.start.as_ref(), self.limit.as_ref());
@ -333,7 +350,7 @@ impl Iterator {
}
// Extract the output from the result
let mut results = self.results.take();
let mut results = self.results.take()?;
// Output the explanation if any
if let Some(e) = plan.explanation {
@ -387,7 +404,7 @@ impl Iterator {
// Loop over each split clause
for split in splits.iter() {
// Get the query result
let res = self.results.take();
let res = self.results.take()?;
// Loop over each value
for obj in &res {
// Get the value at the path
@ -418,44 +435,6 @@ impl Iterator {
}
Ok(())
}
#[inline]
async fn output_order(
&mut self,
_ctx: &Context<'_>,
_opt: &Options,
_txn: &Transaction,
stm: &Statement<'_>,
) -> Result<(), Error> {
if let Some(orders) = stm.order() {
// Sort the full result set
self.results.sort_by(|a, b| {
// Loop over each order clause
for order in orders.iter() {
// Reverse the ordering if DESC
let o = match order.random {
true => {
let a = rand::random::<f64>();
let b = rand::random::<f64>();
a.partial_cmp(&b)
}
false => match order.direction {
true => a.compare(b, order, order.collate, order.numeric),
false => b.compare(a, order, order.collate, order.numeric),
},
};
//
match o {
Some(Ordering::Greater) => return Ordering::Greater,
Some(Ordering::Equal) => continue,
Some(Ordering::Less) => return Ordering::Less,
None => continue,
}
}
Ordering::Equal
})
}
Ok(())
}
#[inline]
async fn output_fetch(
@ -467,11 +446,13 @@ impl Iterator {
) -> Result<(), Error> {
if let Some(fetchs) = stm.fetch() {
for fetch in fetchs.iter() {
let mut values = self.results.take()?;
// Loop over each result value
for obj in &mut self.results {
for obj in &mut values {
// Fetch the value at the path
obj.fetch(ctx, opt, txn, fetch).await?;
}
self.results = values.into();
}
}
Ok(())

View file

@ -1,33 +1,66 @@
use crate::ctx::Context;
use crate::dbs::group::GroupsCollector;
use crate::dbs::plan::Explanation;
use crate::dbs::store::StoreCollector;
#[cfg(any(
feature = "kv-surrealkv",
feature = "kv-file",
feature = "kv-rocksdb",
feature = "kv-fdb",
feature = "kv-tikv",
feature = "kv-speedb"
))]
use crate::dbs::store::file_store::FileCollector;
use crate::dbs::store::MemoryCollector;
use crate::dbs::{Options, Statement, Transaction};
use crate::err::Error;
use crate::sql::Value;
use std::cmp::Ordering;
use std::slice::IterMut;
use crate::sql::{Orders, Value};
pub(super) enum Results {
None,
Store(StoreCollector),
Memory(MemoryCollector),
#[cfg(any(
feature = "kv-surrealkv",
feature = "kv-file",
feature = "kv-rocksdb",
feature = "kv-fdb",
feature = "kv-tikv",
feature = "kv-speedb"
))]
File(Box<FileCollector>),
Groups(GroupsCollector),
}
impl Default for Results {
fn default() -> Self {
Self::None
}
}
impl Results {
pub(super) fn prepare(&mut self, stm: &Statement<'_>) -> Self {
pub(super) fn prepare(
&mut self,
#[cfg(any(
feature = "kv-surrealkv",
feature = "kv-file",
feature = "kv-rocksdb",
feature = "kv-fdb",
feature = "kv-tikv",
feature = "kv-speedb"
))]
ctx: &Context<'_>,
stm: &Statement<'_>,
) -> Result<Self, Error> {
if stm.expr().is_some() && stm.group().is_some() {
Self::Groups(GroupsCollector::new(stm))
} else {
Self::Store(StoreCollector::default())
return Ok(Self::Groups(GroupsCollector::new(stm)));
}
#[cfg(any(
feature = "kv-surrealkv",
feature = "kv-file",
feature = "kv-rocksdb",
feature = "kv-fdb",
feature = "kv-tikv",
feature = "kv-speedb"
))]
if !ctx.is_memory() {
return Ok(Self::File(Box::new(FileCollector::new(ctx.temporary_directory())?)));
}
Ok(Self::Memory(Default::default()))
}
pub(super) async fn push(
&mut self,
ctx: &Context<'_>,
@ -37,89 +70,126 @@ impl Results {
val: Value,
) -> Result<(), Error> {
match self {
Results::None => {}
Results::Store(s) => {
Self::None => {}
Self::Memory(s) => {
s.push(val);
}
Results::Groups(g) => {
#[cfg(any(
feature = "kv-surrealkv",
feature = "kv-file",
feature = "kv-rocksdb",
feature = "kv-fdb",
feature = "kv-tikv",
feature = "kv-speedb"
))]
Self::File(e) => {
e.push(val)?;
}
Self::Groups(g) => {
g.push(ctx, opt, txn, stm, val).await?;
}
}
Ok(())
}
pub(super) fn sort_by<F>(&mut self, compare: F)
where
F: FnMut(&Value, &Value) -> Ordering,
{
if let Results::Store(s) = self {
s.sort_by(compare)
pub(super) fn sort(&mut self, orders: &Orders) {
match self {
Self::Memory(m) => m.sort(orders),
#[cfg(any(
feature = "kv-surrealkv",
feature = "kv-file",
feature = "kv-rocksdb",
feature = "kv-fdb",
feature = "kv-tikv",
feature = "kv-speedb"
))]
Self::File(f) => f.sort(orders),
_ => {}
}
}
pub(super) fn start_limit(&mut self, start: Option<&usize>, limit: Option<&usize>) {
if let Results::Store(s) = self {
if let Some(&start) = start {
s.start(start);
}
if let Some(&limit) = limit {
s.limit(limit);
}
match self {
Self::None => {}
Self::Memory(m) => m.start_limit(start, limit),
#[cfg(any(
feature = "kv-surrealkv",
feature = "kv-file",
feature = "kv-rocksdb",
feature = "kv-fdb",
feature = "kv-tikv",
feature = "kv-speedb"
))]
Self::File(f) => f.start_limit(start, limit),
Self::Groups(_) => {}
}
}
pub(super) fn len(&self) -> usize {
match self {
Results::None => 0,
Results::Store(s) => s.len(),
Results::Groups(g) => g.len(),
Self::None => 0,
Self::Memory(s) => s.len(),
#[cfg(any(
feature = "kv-surrealkv",
feature = "kv-file",
feature = "kv-rocksdb",
feature = "kv-fdb",
feature = "kv-tikv",
feature = "kv-speedb"
))]
Self::File(e) => e.len(),
Self::Groups(g) => g.len(),
}
}
pub(super) async fn group(
&mut self,
ctx: &Context<'_>,
opt: &Options,
txn: &Transaction,
stm: &Statement<'_>,
) -> Result<Self, Error> {
pub(super) fn take(&mut self) -> Result<Vec<Value>, Error> {
Ok(match self {
Self::None => Self::None,
Self::Store(s) => Self::Store(s.take_store()),
Self::Groups(g) => Self::Store(g.output(ctx, opt, txn, stm).await?),
Self::Memory(m) => m.take_vec(),
#[cfg(any(
feature = "kv-surrealkv",
feature = "kv-file",
feature = "kv-rocksdb",
feature = "kv-fdb",
feature = "kv-tikv",
feature = "kv-speedb"
))]
Self::File(f) => f.take_vec()?,
_ => vec![],
})
}
pub(super) fn take(&mut self) -> Vec<Value> {
if let Self::Store(s) = self {
s.take_vec()
} else {
vec![]
}
}
pub(super) fn explain(&self, exp: &mut Explanation) {
match self {
Results::None => exp.add_collector("None", vec![]),
Results::Store(s) => {
Self::None => exp.add_collector("None", vec![]),
Self::Memory(s) => {
s.explain(exp);
}
Results::Groups(g) => {
#[cfg(any(
feature = "kv-surrealkv",
feature = "kv-file",
feature = "kv-rocksdb",
feature = "kv-fdb",
feature = "kv-tikv",
feature = "kv-speedb"
))]
Self::File(e) => {
e.explain(exp);
}
Self::Groups(g) => {
g.explain(exp);
}
}
}
}
impl<'a> IntoIterator for &'a mut Results {
type Item = &'a mut Value;
type IntoIter = IterMut<'a, Value>;
fn into_iter(self) -> Self::IntoIter {
if let Results::Store(s) = self {
s.into_iter()
} else {
[].iter_mut()
}
impl Default for Results {
fn default() -> Self {
Self::None
}
}
impl From<Vec<Value>> for Results {
fn from(value: Vec<Value>) -> Self {
Results::Memory(value.into())
}
}

View file

@ -1,53 +1,420 @@
use crate::dbs::plan::Explanation;
use crate::sql::value::Value;
use std::cmp::Ordering;
use crate::sql::Orders;
use std::mem;
#[derive(Default)]
// TODO Use surreal-kv once the number of record reach a given threshold
pub(super) struct StoreCollector(Vec<Value>);
pub(super) struct MemoryCollector(Vec<Value>);
impl StoreCollector {
impl MemoryCollector {
pub(super) fn push(&mut self, val: Value) {
self.0.push(val);
}
// When surreal-kv will be used, the key will be used to sort the records in surreal-kv
pub(super) fn sort_by<F>(&mut self, compare: F)
where
F: FnMut(&Value, &Value) -> Ordering,
{
self.0.sort_by(compare);
pub(super) fn sort(&mut self, orders: &Orders) {
self.0.sort_by(|a, b| orders.compare(a, b));
}
pub(super) fn len(&self) -> usize {
self.0.len()
}
pub(super) fn start(&mut self, start: usize) {
self.0 = mem::take(&mut self.0).into_iter().skip(start).collect();
}
pub(super) fn limit(&mut self, limit: usize) {
self.0 = mem::take(&mut self.0).into_iter().take(limit).collect();
pub(super) fn start_limit(&mut self, start: Option<&usize>, limit: Option<&usize>) {
match (start, limit) {
(Some(&start), Some(&limit)) => {
self.0 = mem::take(&mut self.0).into_iter().skip(start).take(limit).collect()
}
(Some(&start), None) => {
self.0 = mem::take(&mut self.0).into_iter().skip(start).collect()
}
(None, Some(&limit)) => {
self.0 = mem::take(&mut self.0).into_iter().take(limit).collect()
}
(None, None) => {}
}
}
pub(super) fn take_vec(&mut self) -> Vec<Value> {
mem::take(&mut self.0)
}
pub(super) fn take_store(&mut self) -> Self {
Self(self.take_vec())
}
pub(super) fn explain(&self, exp: &mut Explanation) {
exp.add_collector("Store", vec![]);
exp.add_collector("Memory", vec![]);
}
}
impl<'a> IntoIterator for &'a mut StoreCollector {
type Item = &'a mut Value;
type IntoIter = std::slice::IterMut<'a, Value>;
fn into_iter(self) -> Self::IntoIter {
self.0.iter_mut()
impl From<Vec<Value>> for MemoryCollector {
fn from(values: Vec<Value>) -> Self {
Self(values)
}
}
#[cfg(any(
feature = "kv-surrealkv",
feature = "kv-file",
feature = "kv-rocksdb",
feature = "kv-fdb",
feature = "kv-tikv",
feature = "kv-speedb"
))]
pub(super) mod file_store {
use crate::cnf::EXTERNAL_SORTING_BUFFER_LIMIT;
use crate::dbs::plan::Explanation;
use crate::err::Error;
use crate::sql::{Orders, Value};
use ext_sort::{ExternalChunk, ExternalSorter, ExternalSorterBuilder, LimitedBufferBuilder};
use revision::Revisioned;
use std::fs::{File, OpenOptions};
use std::io::{BufReader, BufWriter, Read, Seek, SeekFrom, Take, Write};
use std::path::{Path, PathBuf};
use std::{fs, io, mem};
use tempfile::{Builder, TempDir};
pub(in crate::dbs) struct FileCollector {
dir: TempDir,
len: usize,
writer: Option<FileWriter>,
reader: Option<FileReader>,
orders: Option<Orders>,
paging: FilePaging,
}
impl FileCollector {
const INDEX_FILE_NAME: &'static str = "ix";
const RECORDS_FILE_NAME: &'static str = "re";
const SORT_DIRECTORY_NAME: &'static str = "so";
const USIZE_SIZE: usize = mem::size_of::<usize>();
pub(in crate::dbs) fn new(temp_dir: &Path) -> Result<Self, Error> {
let dir = Builder::new().prefix("SURREAL").tempdir_in(temp_dir)?;
Ok(Self {
len: 0,
writer: Some(FileWriter::new(&dir)?),
reader: None,
orders: None,
paging: Default::default(),
dir,
})
}
pub(in crate::dbs) fn push(&mut self, value: Value) -> Result<(), Error> {
if let Some(writer) = &mut self.writer {
writer.push(value)?;
self.len += 1;
Ok(())
} else {
Err(Error::Internal("No FileWriter available.".to_string()))
}
}
fn check_reader(&mut self) -> Result<(), Error> {
if self.reader.is_none() {
if let Some(writer) = self.writer.take() {
writer.flush()?;
self.reader = Some(FileReader::new(self.len, &self.dir)?);
}
}
Ok(())
}
pub(in crate::dbs) fn sort(&mut self, orders: &Orders) {
self.orders = Some(orders.clone());
}
pub(in crate::dbs) fn len(&self) -> usize {
self.len
}
pub(in crate::dbs) fn start_limit(&mut self, start: Option<&usize>, limit: Option<&usize>) {
self.paging.start = start.cloned();
self.paging.limit = limit.cloned();
}
pub(in crate::dbs) fn take_vec(&mut self) -> Result<Vec<Value>, Error> {
self.check_reader()?;
if let Some(mut reader) = self.reader.take() {
if let Some((start, num)) = self.paging.get_start_num(reader.len) {
if let Some(orders) = self.orders.take() {
return self.sort_and_take_vec(reader, orders, start, num);
}
return reader.take_vec(start, num);
}
}
Ok(vec![])
}
fn sort_and_take_vec(
&mut self,
reader: FileReader,
orders: Orders,
start: usize,
num: usize,
) -> Result<Vec<Value>, Error> {
let sort_dir = self.dir.path().join(Self::SORT_DIRECTORY_NAME);
fs::create_dir(&sort_dir)?;
let sorter: ExternalSorter<Value, Error, LimitedBufferBuilder, ValueExternalChunk> =
ExternalSorterBuilder::new()
.with_tmp_dir(&sort_dir)
.with_buffer(LimitedBufferBuilder::new(*EXTERNAL_SORTING_BUFFER_LIMIT, true))
.build()?;
let sorted = sorter.sort_by(reader, |a, b| orders.compare(a, b))?;
let iter = sorted.map(Result::unwrap);
let r: Vec<Value> = iter.skip(start).take(num).collect();
Ok(r)
}
pub(in crate::dbs) fn explain(&self, exp: &mut Explanation) {
exp.add_collector("TempFiles", vec![]);
}
}
struct FileWriter {
index: BufWriter<File>,
records: BufWriter<File>,
offset: usize,
}
impl FileWriter {
fn new(dir: &TempDir) -> Result<Self, Error> {
let index = OpenOptions::new()
.create_new(true)
.append(true)
.open(dir.path().join(FileCollector::INDEX_FILE_NAME))?;
let records = OpenOptions::new()
.create_new(true)
.append(true)
.open(dir.path().join(FileCollector::RECORDS_FILE_NAME))?;
Ok(Self {
index: BufWriter::new(index),
records: BufWriter::new(records),
offset: 0,
})
}
fn write_usize<W: Write>(writer: &mut W, u: usize) -> Result<(), Error> {
let buf = u.to_be_bytes();
writer.write_all(&buf)?;
Ok(())
}
fn write_value<W: Write>(writer: &mut W, value: Value) -> Result<usize, Error> {
let mut val = Vec::new();
value.serialize_revisioned(&mut val)?;
// Write the size of the buffer in the index
Self::write_usize(writer, val.len())?;
// Write the buffer in the records
writer.write_all(&val)?;
Ok(val.len())
}
fn push(&mut self, value: Value) -> Result<(), Error> {
debug!("PUSH {}", self.offset);
// Serialize the value in a buffer
let len = Self::write_value(&mut self.records, value)?;
// Increment the offset of the next record
self.offset += len + FileCollector::USIZE_SIZE;
Self::write_usize(&mut self.index, self.offset)?;
Ok(())
}
fn flush(mut self) -> Result<(), Error> {
self.records.flush()?;
self.index.flush()?;
Ok(())
}
}
struct FileReader {
len: usize,
index: PathBuf,
records: PathBuf,
}
impl FileReader {
fn new(len: usize, dir: &TempDir) -> Result<Self, Error> {
let index = dir.path().join(FileCollector::INDEX_FILE_NAME);
let records = dir.path().join(FileCollector::RECORDS_FILE_NAME);
Ok(Self {
len,
index,
records,
})
}
fn read_value<R: Read>(reader: &mut R) -> Result<Value, Error> {
let len = FileReader::read_usize(reader)?;
let mut buf = vec![0u8; len];
if let Err(e) = reader.read_exact(&mut buf) {
return Err(Error::Io(e));
}
let val = Value::deserialize_revisioned(&mut buf.as_slice())?;
Ok(val)
}
fn read_usize<R: Read>(reader: &mut R) -> Result<usize, io::Error> {
let mut buf = vec![0u8; FileCollector::USIZE_SIZE];
reader.read_exact(&mut buf)?;
// Safe to call unwrap because we know the slice length matches the expected length
let u = usize::from_be_bytes(buf.try_into().unwrap());
Ok(u)
}
fn take_vec(&mut self, start: usize, num: usize) -> Result<Vec<Value>, Error> {
let mut iter = FileRecordsIterator::new(self.records.clone(), self.len);
if start > 0 {
// Get the start offset of the first record
let mut index = OpenOptions::new().read(true).open(&self.index)?;
index.seek(SeekFrom::Start(((start - 1) * FileCollector::USIZE_SIZE) as u64))?;
let start_offset = Self::read_usize(&mut index)?;
// Set records to the position of the first record
iter.seek(start_offset, start)?;
}
// Collect the records
let mut res = Vec::with_capacity(num);
for _ in 0..num {
debug!("READ");
if let Some(val) = iter.next() {
res.push(val?);
} else {
break;
}
}
Ok(res)
}
}
impl IntoIterator for FileReader {
type Item = Result<Value, Error>;
type IntoIter = FileRecordsIterator;
fn into_iter(self) -> Self::IntoIter {
FileRecordsIterator::new(self.records.clone(), self.len)
}
}
struct FileRecordsIterator {
path: PathBuf,
reader: Option<BufReader<File>>,
len: usize,
pos: usize,
}
impl FileRecordsIterator {
fn new(path: PathBuf, len: usize) -> Self {
Self {
path,
reader: None,
len,
pos: 0,
}
}
fn check_reader(&mut self) -> Result<(), Error> {
if self.reader.is_none() {
let f = OpenOptions::new().read(true).open(&self.path)?;
self.reader = Some(BufReader::new(f));
}
Ok(())
}
fn seek(&mut self, seek_pos: usize, pos: usize) -> Result<(), Error> {
self.check_reader()?;
if let Some(reader) = &mut self.reader {
debug!("SEEK {seek_pos}");
reader.seek(SeekFrom::Start(seek_pos as u64))?;
self.pos = pos;
}
Ok(())
}
}
impl Iterator for FileRecordsIterator {
type Item = Result<Value, Error>;
fn next(&mut self) -> Option<Self::Item> {
if self.pos == self.len {
return None;
}
if let Err(e) = self.check_reader() {
return Some(Err(e));
}
if let Some(reader) = &mut self.reader {
match FileReader::read_value(reader) {
Ok(val) => {
self.pos += 1;
Some(Ok(val))
}
Err(e) => Some(Err(e)),
}
} else {
None
}
}
}
#[derive(Default)]
struct FilePaging {
start: Option<usize>,
limit: Option<usize>,
}
impl FilePaging {
fn get_start_num(&self, len: usize) -> Option<(usize, usize)> {
let start = self.start.unwrap_or(0);
if start >= len {
return None;
}
let max = len - start;
let num = if let Some(limit) = self.limit {
limit.min(max)
} else {
max
};
debug!("FilePaging - START: {start} - NUM: {num}");
Some((start, num))
}
}
struct ValueExternalChunk {
reader: Take<BufReader<File>>,
}
impl ExternalChunk<Value> for ValueExternalChunk {
type SerializationError = Error;
type DeserializationError = Error;
fn new(reader: Take<BufReader<File>>) -> Self {
Self {
reader,
}
}
fn dump(
chunk_writer: &mut BufWriter<File>,
items: impl IntoIterator<Item = Value>,
) -> Result<(), Self::SerializationError> {
for item in items {
FileWriter::write_value(chunk_writer, item)?;
}
Ok(())
}
}
impl Iterator for ValueExternalChunk {
type Item = Result<Value, Error>;
fn next(&mut self) -> Option<Self::Item> {
if self.reader.limit() == 0 {
None
} else {
match FileReader::read_value(&mut self.reader) {
Ok(val) => Some(Ok(val)),
Err(err) => Some(Err(err)),
}
}
}
}
}

View file

@ -11,6 +11,15 @@ use crate::syn::error::RenderedError as RenderedParserError;
use crate::vs::Error as VersionstampError;
use base64_lib::DecodeError as Base64Error;
use bincode::Error as BincodeError;
#[cfg(any(
feature = "kv-surrealkv",
feature = "kv-file",
feature = "kv-rocksdb",
feature = "kv-fdb",
feature = "kv-tikv",
feature = "kv-speedb"
))]
use ext_sort::SortError;
use fst::Error as FstError;
use jsonwebtoken::errors::Error as JWTError;
use object_store::Error as ObjectStoreError;
@ -1033,6 +1042,25 @@ impl From<reqwest::Error> for Error {
}
}
#[cfg(any(
feature = "kv-surrealkv",
feature = "kv-file",
feature = "kv-rocksdb",
feature = "kv-fdb",
feature = "kv-tikv",
feature = "kv-speedb"
))]
impl<S, D, I> From<SortError<S, D, I>> for Error
where
S: std::error::Error,
D: std::error::Error,
I: std::error::Error,
{
fn from(e: SortError<S, D, I>) -> Error {
Error::Internal(e.to_string())
}
}
impl Serialize for Error {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where

View file

@ -1,5 +1,23 @@
use std::collections::{BTreeMap, BTreeSet};
#[cfg(any(
feature = "kv-surrealkv",
feature = "kv-file",
feature = "kv-rocksdb",
feature = "kv-fdb",
feature = "kv-tikv",
feature = "kv-speedb"
))]
use std::env;
use std::fmt;
#[cfg(any(
feature = "kv-surrealkv",
feature = "kv-file",
feature = "kv-rocksdb",
feature = "kv-fdb",
feature = "kv-tikv",
feature = "kv-speedb"
))]
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
#[cfg(not(target_arch = "wasm32"))]
@ -92,6 +110,16 @@ pub struct Datastore {
#[cfg(feature = "jwks")]
// The JWKS object cache
jwks_cache: Arc<RwLock<JwksCache>>,
#[cfg(any(
feature = "kv-surrealkv",
feature = "kv-file",
feature = "kv-rocksdb",
feature = "kv-fdb",
feature = "kv-tikv",
feature = "kv-speedb"
))]
// The temporary directory
temporary_directory: Arc<PathBuf>,
}
/// We always want to be circulating the live query information
@ -366,6 +394,15 @@ impl Datastore {
cf_watermarks: Arc::new(RwLock::new(BTreeMap::new())),
#[cfg(feature = "jwks")]
jwks_cache: Arc::new(RwLock::new(JwksCache::new())),
#[cfg(any(
feature = "kv-surrealkv",
feature = "kv-file",
feature = "kv-rocksdb",
feature = "kv-fdb",
feature = "kv-tikv",
feature = "kv-speedb"
))]
temporary_directory: Arc::new(env::temp_dir()),
})
}
@ -418,6 +455,19 @@ impl Datastore {
self
}
#[cfg(any(
feature = "kv-surrealkv",
feature = "kv-file",
feature = "kv-rocksdb",
feature = "kv-fdb",
feature = "kv-tikv",
feature = "kv-speedb"
))]
pub fn with_temporary_directory(mut self, path: Option<PathBuf>) -> Self {
self.temporary_directory = Arc::new(path.unwrap_or_else(env::temp_dir));
self
}
/// Set the engine options for the datastore
pub fn with_engine_options(mut self, engine_options: EngineOptions) -> Self {
self.engine_options = engine_options;
@ -433,6 +483,22 @@ impl Datastore {
self.auth_enabled
}
#[cfg(any(
feature = "kv-surrealkv",
feature = "kv-file",
feature = "kv-rocksdb",
feature = "kv-fdb",
feature = "kv-tikv",
feature = "kv-speedb"
))]
pub(crate) fn is_memory(&self) -> bool {
#[cfg(feature = "kv-mem")]
if matches!(self.inner, Inner::Mem(_)) {
return true;
};
false
}
/// Is authentication level enabled for this Datastore?
/// TODO(gguillemas): Remove this method once the legacy authentication is deprecated in v2.0.0
pub fn is_auth_level_enabled(&self) -> bool {
@ -1377,6 +1443,24 @@ impl Datastore {
self.query_timeout,
self.capabilities.clone(),
self.index_stores.clone(),
#[cfg(any(
feature = "kv-surrealkv",
feature = "kv-file",
feature = "kv-rocksdb",
feature = "kv-fdb",
feature = "kv-tikv",
feature = "kv-speedb"
))]
self.is_memory(),
#[cfg(any(
feature = "kv-surrealkv",
feature = "kv-file",
feature = "kv-rocksdb",
feature = "kv-fdb",
feature = "kv-tikv",
feature = "kv-speedb"
))]
self.temporary_directory.clone(),
)?;
// Setup the notification channel
if let Some(channel) = &self.notification_channel {

View file

@ -1,7 +1,9 @@
use crate::sql::fmt::Fmt;
use crate::sql::idiom::Idiom;
use crate::sql::Value;
use revision::revisioned;
use serde::{Deserialize, Serialize};
use std::cmp::Ordering;
use std::fmt;
use std::ops::Deref;
@ -10,6 +12,33 @@ use std::ops::Deref;
#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
pub struct Orders(pub Vec<Order>);
impl Orders {
pub(crate) fn compare(&self, a: &Value, b: &Value) -> Ordering {
for order in &self.0 {
// Reverse the ordering if DESC
let o = match order.random {
true => {
let a = rand::random::<f64>();
let b = rand::random::<f64>();
a.partial_cmp(&b)
}
false => match order.direction {
true => a.compare(b, order, order.collate, order.numeric),
false => b.compare(a, order, order.collate, order.numeric),
},
};
//
match o {
Some(Ordering::Greater) => return Ordering::Greater,
Some(Ordering::Equal) => continue,
Some(Ordering::Less) => return Ordering::Less,
None => continue,
}
}
Ordering::Equal
}
}
impl Deref for Orders {
type Target = Vec<Order>;
fn deref(&self) -> &Self::Target {

View file

@ -146,6 +146,15 @@ pub(crate) fn router(
.with_query_timeout(address.config.query_timeout)
.with_transaction_timeout(address.config.transaction_timeout)
.with_capabilities(address.config.capabilities);
#[cfg(any(
feature = "kv-surrealkv",
feature = "kv-file",
feature = "kv-rocksdb",
feature = "kv-fdb",
feature = "kv-tikv",
feature = "kv-speedb"
))]
let kvs = kvs.with_temporary_directory(address.config.temporary_directory);
let kvs = Arc::new(kvs);
let mut vars = BTreeMap::new();

View file

@ -1,4 +1,13 @@
use crate::{dbs::Capabilities, iam::Level};
#[cfg(any(
feature = "kv-surrealkv",
feature = "kv-file",
feature = "kv-rocksdb",
feature = "kv-fdb",
feature = "kv-tikv",
feature = "kv-speedb"
))]
use std::path::PathBuf;
use std::time::Duration;
/// Configuration for server connection, including: strictness, notifications, query_timeout, transaction_timeout
@ -17,6 +26,15 @@ pub struct Config {
pub(crate) password: String,
pub(crate) tick_interval: Option<Duration>,
pub(crate) capabilities: Capabilities,
#[cfg(any(
feature = "kv-surrealkv",
feature = "kv-file",
feature = "kv-rocksdb",
feature = "kv-fdb",
feature = "kv-tikv",
feature = "kv-speedb"
))]
pub(crate) temporary_directory: Option<PathBuf>,
}
impl Config {
@ -104,4 +122,16 @@ impl Config {
self.capabilities = capabilities;
self
}
#[cfg(any(
feature = "kv-surrealkv",
feature = "kv-file",
feature = "kv-rocksdb",
feature = "kv-fdb",
feature = "kv-tikv",
feature = "kv-speedb"
))]
pub fn temporary_directory(mut self, path: Option<PathBuf>) -> Self {
self.temporary_directory = path;
self
}
}

View file

@ -1,6 +1,8 @@
// Tests common to all protocols and storage engines
use surrealdb::fflags::FFLAGS;
use surrealdb::sql::value;
use surrealdb::Response;
static PERMITS: Semaphore = Semaphore::const_new(1);
@ -588,6 +590,160 @@ async fn select_record_ranges() {
assert_eq!(convert(users), vec!["john"]);
}
#[test_log::test(tokio::test)]
async fn select_records_order_by_start_limit() {
let (permit, db) = new_db().await;
db.use_ns(NS).use_db(Ulid::new().to_string()).await.unwrap();
drop(permit);
let sql = "
CREATE user:john SET name = 'John';
CREATE user:zoey SET name = 'Zoey';
CREATE user:amos SET name = 'Amos';
CREATE user:jane SET name = 'Jane';
";
db.query(sql).await.unwrap().check().unwrap();
let check_start_limit = |mut response: Response, expected: Vec<&str>| {
let users: Vec<RecordName> = response.take(0).unwrap();
let users: Vec<String> = users.into_iter().map(|user| user.name).collect();
assert_eq!(users, expected);
};
let response =
db.query("SELECT name FROM user ORDER BY name DESC START 1 LIMIT 2").await.unwrap();
check_start_limit(response, vec!["John", "Jane"]);
let response = db.query("SELECT name FROM user ORDER BY name DESC START 1").await.unwrap();
check_start_limit(response, vec!["John", "Jane", "Amos"]);
let response = db.query("SELECT name FROM user ORDER BY name DESC START 4").await.unwrap();
check_start_limit(response, vec![]);
let response = db.query("SELECT name FROM user ORDER BY name DESC LIMIT 2").await.unwrap();
check_start_limit(response, vec!["Zoey", "John"]);
let response = db.query("SELECT name FROM user ORDER BY name DESC LIMIT 10").await.unwrap();
check_start_limit(response, vec!["Zoey", "John", "Jane", "Amos"]);
}
#[test_log::test(tokio::test)]
async fn select_records_order_by() {
let (permit, db) = new_db().await;
db.use_ns(NS).use_db(Ulid::new().to_string()).await.unwrap();
drop(permit);
let sql = "
CREATE user:john SET name = 'John';
CREATE user:zoey SET name = 'Zoey';
CREATE user:amos SET name = 'Amos';
CREATE user:jane SET name = 'Jane';
";
db.query(sql).await.unwrap().check().unwrap();
let sql = "SELECT name FROM user ORDER BY name DESC";
let mut response = db.query(sql).await.unwrap();
let users: Vec<RecordName> = response.take(0).unwrap();
let convert = |users: Vec<RecordName>| -> Vec<String> {
users.into_iter().map(|user| user.name).collect()
};
assert_eq!(convert(users), vec!["Zoey", "John", "Jane", "Amos"]);
}
#[test_log::test(tokio::test)]
async fn select_records_fetch() {
let (permit, db) = new_db().await;
db.use_ns(NS).use_db(Ulid::new().to_string()).await.unwrap();
drop(permit);
let sql = "
CREATE tag:rs SET name = 'Rust';
CREATE tag:go SET name = 'Golang';
CREATE tag:js SET name = 'JavaScript';
CREATE person:tobie SET tags = [tag:rs, tag:go, tag:js];
CREATE person:jaime SET tags = [tag:js];
";
db.query(sql).await.unwrap().check().unwrap();
let check_fetch = |mut response: Response, expected: &str| {
let val: Value = response.take(0).unwrap();
let exp = value(expected).unwrap();
assert_eq!(format!("{val:#}"), format!("{exp:#}"));
};
let sql = "SELECT * FROM person LIMIT 1 FETCH tags;";
let response = db.query(sql).await.unwrap();
check_fetch(
response,
"[
{
id: person:jaime,
tags: [
{
id: tag:js,
name: 'JavaScript'
}
]
}
]",
);
let sql = "SELECT * FROM person START 1 LIMIT 1 FETCH tags;";
let response = db.query(sql).await.unwrap();
check_fetch(
response,
"[
{
id: person:tobie,
tags: [
{
id: tag:rs,
name: 'Rust'
},
{
id: tag:go,
name: 'Golang'
},
{
id: tag:js,
name: 'JavaScript'
}
]
}
]",
);
let sql = "SELECT * FROM person ORDER BY id FETCH tags;";
let response = db.query(sql).await.unwrap();
check_fetch(
response,
"[
{
id: person:jaime,
tags: [
{
id: tag:js,
name: 'JavaScript'
}
]
},
{
id: person:tobie,
tags: [
{
id: tag:rs,
name: 'Rust'
},
{
id: tag:go,
name: 'Golang'
},
{
id: tag:js,
name: 'JavaScript'
}
]
}
]",
);
}
#[test_log::test(tokio::test)]
async fn update_table() {
let (permit, db) = new_db().await;

View file

@ -39,7 +39,7 @@ async fn select_where_matches_using_index() -> Result<(), Error> {
},
{
detail: {
type: 'Store'
type: 'Memory'
},
operation: 'Collector'
}
@ -89,7 +89,7 @@ async fn select_where_matches_without_using_index_iterator() -> Result<(), Error
},
{
detail: {
type: 'Store'
type: 'Memory'
},
operation: 'Collector'
},
@ -155,7 +155,7 @@ async fn select_where_matches_using_index_and_arrays(parallel: bool) -> Result<(
},
{
detail: {
type: 'Store'
type: 'Memory'
},
operation: 'Collector'
}
@ -403,7 +403,7 @@ async fn select_where_matches_using_index_and_objects(parallel: bool) -> Result<
},
{
detail: {
type: 'Store'
type: 'Memory'
},
operation: 'Collector'
}
@ -638,7 +638,7 @@ async fn select_where_matches_without_complex_query() -> Result<(), Error> {
},
{
detail: {
type: 'Store'
type: 'Memory'
},
operation: 'Collector'
}

View file

@ -210,7 +210,7 @@ fn table_explain(fetch_count: usize) -> String {
}},
{{
detail: {{
type: 'Store'
type: 'Memory'
}},
operation: 'Collector'
}},
@ -241,7 +241,7 @@ fn table_explain_no_index(fetch_count: usize) -> String {
}},
{{
detail: {{
type: 'Store'
type: 'Memory'
}},
operation: 'Collector'
}},
@ -264,7 +264,7 @@ const THREE_TABLE_EXPLAIN: &str = "[
},
{
detail: {
type: 'Store'
type: 'Memory'
},
operation: 'Collector'
},
@ -312,7 +312,7 @@ const THREE_MULTI_INDEX_EXPLAIN: &str = "[
},
{
detail: {
type: 'Store'
type: 'Memory'
},
operation: 'Collector'
},
@ -338,7 +338,7 @@ const SINGLE_INDEX_FT_EXPLAIN: &str = "[
},
{
detail: {
type: 'Store'
type: 'Memory'
},
operation: 'Collector'
},
@ -364,7 +364,7 @@ const SINGLE_INDEX_UNIQ_EXPLAIN: &str = "[
},
{
detail: {
type: 'Store'
type: 'Memory'
},
operation: 'Collector'
},
@ -390,7 +390,7 @@ const SINGLE_INDEX_IDX_EXPLAIN: &str = "[
},
{
detail: {
type: 'Store'
type: 'Memory'
},
operation: 'Collector'
},
@ -427,7 +427,7 @@ const TWO_MULTI_INDEX_EXPLAIN: &str = "[
},
{
detail: {
type: 'Store'
type: 'Memory'
},
operation: 'Collector'
},
@ -464,7 +464,7 @@ async fn select_with_no_index_unary_operator() -> Result<(), Error> {
},
{
detail: {
type: 'Store'
type: 'Memory'
},
operation: 'Collector'
}
@ -498,7 +498,7 @@ async fn select_unsupported_unary_operator() -> Result<(), Error> {
},
{
detail: {
type: 'Store'
type: 'Memory'
},
operation: 'Collector'
}
@ -588,7 +588,7 @@ const EXPLAIN_FROM_TO: &str = r"[
},
{
detail: {
type: 'Store'
type: 'Memory'
},
operation: 'Collector'
}
@ -635,7 +635,7 @@ const EXPLAIN_FROM_INCL_TO: &str = r"[
},
{
detail: {
type: 'Store'
type: 'Memory'
},
operation: 'Collector'
}
@ -686,7 +686,7 @@ const EXPLAIN_FROM_TO_INCL: &str = r"[
},
{
detail: {
type: 'Store'
type: 'Memory'
},
operation: 'Collector'
}
@ -737,7 +737,7 @@ const EXPLAIN_FROM_INCL_TO_INCL: &str = r"[
},
{
detail: {
type: 'Store'
type: 'Memory'
},
operation: 'Collector'
}
@ -831,7 +831,7 @@ const EXPLAIN_LESS: &str = r"[
},
{
detail: {
type: 'Store'
type: 'Memory'
},
operation: 'Collector'
}
@ -872,7 +872,7 @@ const EXPLAIN_LESS_OR_EQUAL: &str = r"[
},
{
detail: {
type: 'Store'
type: 'Memory'
},
operation: 'Collector'
}
@ -917,7 +917,7 @@ const EXPLAIN_MORE: &str = r"[
},
{
detail: {
type: 'Store'
type: 'Memory'
},
operation: 'Collector'
}
@ -958,7 +958,7 @@ const EXPLAIN_MORE_OR_EQUAL: &str = r"[
},
{
detail: {
type: 'Store'
type: 'Memory'
},
operation: 'Collector'
}
@ -1014,7 +1014,7 @@ async fn select_with_idiom_param_value() -> Result<(), Error> {
},
{
detail: {
type: 'Store'
type: 'Memory'
},
operation: 'Collector'
}
@ -1062,7 +1062,7 @@ const CONTAINS_TABLE_EXPLAIN: &str = r"[
},
{
detail: {
type: 'Store'
type: 'Memory'
},
operation: 'Collector'
}
@ -1128,7 +1128,7 @@ async fn select_contains() -> Result<(), Error> {
},
{
detail: {
type: 'Store'
type: 'Memory'
},
operation: 'Collector'
}
@ -1171,7 +1171,7 @@ async fn select_contains_all() -> Result<(), Error> {
},
{
detail: {
type: 'Store'
type: 'Memory'
},
operation: 'Collector'
}
@ -1214,7 +1214,7 @@ async fn select_contains_any() -> Result<(), Error> {
},
{
detail: {
type: 'Store'
type: 'Memory'
},
operation: 'Collector'
}
@ -1264,7 +1264,7 @@ async fn select_unique_contains() -> Result<(), Error> {
},
{
detail: {
type: 'Store'
type: 'Memory'
},
operation: 'Collector'
}
@ -1317,7 +1317,7 @@ async fn select_with_datetime_value() -> Result<(), Error> {
},
{
detail: {
type: 'Store'
type: 'Memory'
},
operation: 'Collector'
}
@ -1380,7 +1380,7 @@ async fn select_with_uuid_value() -> Result<(), Error> {
},
{
detail: {
type: 'Store'
type: 'Memory'
},
operation: 'Collector'
}
@ -1441,7 +1441,7 @@ async fn select_with_in_operator() -> Result<(), Error> {
},
{
detail: {
type: 'Store'
type: 'Memory'
},
operation: 'Collector'
}
@ -1519,7 +1519,7 @@ async fn select_with_in_operator_uniq_index() -> Result<(), Error> {
},
{
detail: {
type: 'Store'
type: 'Memory'
},
operation: 'Collector'
}

View file

@ -220,7 +220,7 @@ async fn select_expression_value() -> Result<(), Error> {
},
{
detail: {
type: 'Store'
type: 'Memory'
},
operation: 'Collector'
},
@ -530,7 +530,7 @@ async fn select_where_field_is_thing_and_with_index() -> Result<(), Error> {
},
{
detail: {
type: 'Store'
type: 'Memory'
},
operation: 'Collector'
}
@ -554,7 +554,7 @@ async fn select_where_field_is_thing_and_with_index() -> Result<(), Error> {
},
{
detail: {
type: 'Store'
type: 'Memory'
},
operation: 'Collector'
},
@ -618,7 +618,7 @@ async fn select_where_and_with_index() -> Result<(), Error> {
},
{
detail: {
type: 'Store'
type: 'Memory'
},
operation: 'Collector'
}
@ -671,7 +671,7 @@ async fn select_where_and_with_unique_index() -> Result<(), Error> {
},
{
detail: {
type: 'Store'
type: 'Memory'
},
operation: 'Collector'
}
@ -726,7 +726,7 @@ async fn select_where_and_with_fulltext_index() -> Result<(), Error> {
},
{
detail: {
type: 'Store'
type: 'Memory'
},
operation: 'Collector'
}
@ -780,7 +780,7 @@ async fn select_where_explain() -> Result<(), Error> {
},
{
detail: {
type: 'Store'
type: 'Memory'
},
operation: 'Collector'
},
@ -805,7 +805,7 @@ async fn select_where_explain() -> Result<(), Error> {
},
{
detail: {
type: 'Store'
type: 'Memory'
},
operation: 'Collector'
},

View file

@ -55,7 +55,7 @@ async fn select_where_mtree_knn() -> Result<(), Error> {
},
{
detail: {
type: 'Store'
type: 'Memory'
},
operation: 'Collector'
},
@ -208,7 +208,7 @@ async fn select_where_brut_force_knn() -> Result<(), Error> {
},
{
detail: {
type: 'Store'
type: 'Memory'
},
operation: 'Collector'
},

View file

@ -22,15 +22,35 @@ pub(crate) fn path_valid(v: &str) -> Result<String, String> {
}
}
pub(crate) fn file_exists(path: &str) -> Result<PathBuf, String> {
pub(crate) fn path_exists(path: &str) -> Result<PathBuf, String> {
let path = Path::new(path);
if !*path.try_exists().as_ref().map_err(ToString::to_string)? {
return Err(String::from("Ensure the file exists"));
return Err(String::from("Ensure the path exists"));
}
Ok(path.to_owned())
}
pub(crate) fn file_exists(path: &str) -> Result<PathBuf, String> {
let path = path_exists(path)?;
if !path.is_file() {
return Err(String::from("Ensure the path is a file"));
}
Ok(path.to_owned())
Ok(path)
}
#[cfg(any(
feature = "storage-surrealkv",
feature = "storage-rocksdb",
feature = "storage-fdb",
feature = "storage-tikv",
feature = "storage-speedb"
))]
pub(crate) fn dir_exists(path: &str) -> Result<PathBuf, String> {
let path = path_exists(path)?;
if !path.is_dir() {
return Err(String::from("Ensure the path is a directory"));
}
Ok(path)
}
pub(crate) fn endpoint_valid(v: &str) -> Result<String, String> {

View file

@ -1,6 +1,14 @@
use crate::cli::CF;
use crate::err::Error;
use clap::Args;
#[cfg(any(
feature = "storage-surrealkv",
feature = "storage-rocksdb",
feature = "storage-fdb",
feature = "storage-tikv",
feature = "storage-speedb"
))]
use std::path::PathBuf;
use std::sync::{Arc, OnceLock};
use std::time::Duration;
use surrealdb::dbs::capabilities::{Capabilities, FuncTarget, NetTarget, Targets};
@ -37,6 +45,17 @@ pub struct StartCommandDbsOptions {
#[command(flatten)]
#[command(next_help_heading = "Capabilities")]
caps: DbsCapabilities,
#[cfg(any(
feature = "storage-surrealkv",
feature = "storage-speedb",
feature = "storage-rocksdb",
feature = "storage-fdb",
feature = "storage-tikv",
))]
#[arg(help = "Sets the directory for storing temporary database files")]
#[arg(env = "SURREAL_TEMPORARY_DIRECTORY", long = "temporary-directory")]
#[arg(value_parser = super::cli::validator::dir_exists)]
temporary_directory: Option<PathBuf>,
}
#[derive(Args, Debug)]
@ -214,6 +233,14 @@ pub async fn init(
// TODO(gguillemas): Remove this field once the legacy authentication is deprecated in v2.0.0
auth_level_enabled,
caps,
#[cfg(any(
feature = "storage-surrealkv",
feature = "storage-rocksdb",
feature = "storage-fdb",
feature = "storage-tikv",
feature = "storage-speedb"
))]
temporary_directory,
}: StartCommandDbsOptions,
) -> Result<(), Error> {
// Get local copy of options
@ -243,6 +270,7 @@ pub async fn init(
let caps = caps.into();
debug!("Server capabilities: {caps}");
#[allow(unused_mut)]
// Parse and setup the desired kv datastore
let mut dbs = Datastore::new(&opt.path)
.await?
@ -253,6 +281,14 @@ pub async fn init(
.with_auth_enabled(auth_enabled)
.with_auth_level_enabled(auth_level_enabled)
.with_capabilities(caps);
#[cfg(any(
feature = "storage-surrealkv",
feature = "storage-rocksdb",
feature = "storage-fdb",
feature = "storage-tikv",
feature = "storage-speedb"
))]
let mut dbs = dbs.with_temporary_directory(temporary_directory);
if let Some(engine_options) = opt.engine {
dbs = dbs.with_engine_options(engine_options);
}

View file

@ -1,4 +1,3 @@
# cargo-vet config file
[cargo-vet]
@ -695,6 +694,10 @@ criteria = "safe-to-deploy"
version = "0.5.0"
criteria = "safe-to-deploy"
[[exemptions.ext-sort]]
version = "0.1.4"
criteria = "safe-to-deploy"
[[exemptions.fail]]
version = "0.4.0"
criteria = "safe-to-deploy"

View file

@ -1129,6 +1129,69 @@ mod cli_integration {
server.finish().unwrap();
}
}
#[test(tokio::test)]
async fn test_temporary_directory() {
info!("* The path is a non-existing directory");
{
let path = format!("surrealkv:{}", tempfile::tempdir().unwrap().path().display());
let res = common::start_server(StartServerArguments {
path: Some(path),
args: "".to_owned(),
temporary_directory: Some("/tmp/TELL-ME-THIS-FILE-DOES-NOT-EXISTS".to_owned()),
..Default::default()
})
.await;
match res {
Ok((_, mut server)) => {
server.finish().unwrap();
panic!("Should not be ok!");
}
Err(e) => {
assert_eq!(e.to_string(), "server failed to start", "{:?}", e);
}
}
}
info!("* The path is a file");
{
let path = format!("surrealkv:{}", tempfile::tempdir().unwrap().path().display());
let temp_file = tempfile::NamedTempFile::new().unwrap();
let res = common::start_server(StartServerArguments {
path: Some(path),
args: "".to_owned(),
temporary_directory: Some(format!("{}", temp_file.path().display())),
..Default::default()
})
.await;
match res {
Ok((_, mut server)) => {
server.finish().unwrap();
panic!("Should not be ok!");
}
Err(e) => {
assert_eq!(e.to_string(), "server failed to start", "{:?}", e);
}
}
temp_file.close().unwrap();
}
info!("* The path is a valid directory");
{
let path = format!("surrealkv:{}", tempfile::tempdir().unwrap().path().display());
let temp_dir = tempfile::tempdir().unwrap();
let (_, mut server) = common::start_server(StartServerArguments {
path: Some(path),
args: "".to_owned(),
temporary_directory: Some(format!("{}", temp_dir.path().display())),
..Default::default()
})
.await
.unwrap();
temp_dir.close().unwrap();
server.finish().unwrap();
}
}
}
fn remove_debug_info(output: String) -> String {

View file

@ -1,7 +1,7 @@
use rand::{thread_rng, Rng};
use std::error::Error;
use std::fs::File;
use std::path::Path;
use std::path::{Path, PathBuf};
use std::process::{Command, ExitStatus, Stdio};
use std::{env, fs};
use tokio::time;
@ -146,22 +146,26 @@ pub fn tmp_file(name: &str) -> String {
}
pub struct StartServerArguments {
pub path: Option<String>,
pub auth: bool,
pub tls: bool,
pub wait_is_ready: bool,
pub enable_auth_level: bool,
pub tick_interval: time::Duration,
pub temporary_directory: Option<String>,
pub args: String,
}
impl Default for StartServerArguments {
fn default() -> Self {
Self {
path: None,
auth: true,
tls: false,
wait_is_ready: true,
enable_auth_level: false,
tick_interval: time::Duration::new(1, 0),
temporary_directory: None,
args: "--allow-all".to_string(),
}
}
@ -189,16 +193,20 @@ pub async fn start_server_with_defaults() -> Result<(String, Child), Box<dyn Err
pub async fn start_server(
StartServerArguments {
path,
auth,
tls,
wait_is_ready,
enable_auth_level,
tick_interval,
temporary_directory,
args,
}: StartServerArguments,
) -> Result<(String, Child), Box<dyn Error>> {
let mut rng = thread_rng();
let path = path.unwrap_or("memory".to_string());
let mut extra_args = args.clone();
if tls {
// Test the crt/key args but the keys are self signed so don't actually connect.
@ -225,11 +233,15 @@ pub async fn start_server(
extra_args.push_str(format!(" --tick-interval {sec}s").as_str());
}
if let Some(path) = temporary_directory {
extra_args.push_str(format!(" --temporary-directory {path}").as_str());
}
'retry: for _ in 0..3 {
let port: u16 = rng.gen_range(13000..24000);
let addr = format!("127.0.0.1:{port}");
let start_args = format!("start --bind {addr} memory --no-banner --log trace --user {USER} --pass {PASS} {extra_args}");
let start_args = format!("start --bind {addr} {path} --no-banner --log trace --user {USER} --pass {PASS} {extra_args}");
info!("starting server with args: {start_args}");