[Feat] Add TEMPFILES clause to the SELECT statement (#4084)

This commit is contained in:
Emmanuel Keller 2024-05-23 14:04:20 +01:00 committed by GitHub
parent 948666e33a
commit 23653e5fce
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
22 changed files with 124 additions and 125 deletions

View file

@ -24,7 +24,7 @@ resolver = "2"
[features]
# Public features
default = ["kv-mem"]
kv-mem = ["dep:echodb", "tokio/time"]
kv-mem = ["dep:echodb", "tokio/time", "dep:tempfile", "dep:ext-sort"]
kv-indxdb = ["dep:indxdb"]
kv-speedb = ["dep:speedb", "tokio/time", "dep:tempfile", "dep:ext-sort"]
kv-rocksdb = ["dep:rocksdb", "tokio/time", "dep:tempfile", "dep:ext-sort"]

View file

@ -40,6 +40,7 @@ pub static INSECURE_FORWARD_RECORD_ACCESS_ERRORS: Lazy<bool> =
lazy_env_parse!("SURREAL_INSECURE_FORWARD_RECORD_ACCESS_ERRORS", bool, false);
#[cfg(any(
feature = "kv-mem",
feature = "kv-surrealkv",
feature = "kv-file",
feature = "kv-rocksdb",

View file

@ -14,6 +14,7 @@ use std::borrow::Cow;
use std::collections::HashMap;
use std::fmt::{self, Debug};
#[cfg(any(
feature = "kv-mem",
feature = "kv-surrealkv",
feature = "kv-file",
feature = "kv-rocksdb",
@ -64,16 +65,7 @@ pub struct Context<'a> {
// Capabilities
capabilities: Arc<Capabilities>,
#[cfg(any(
feature = "kv-surrealkv",
feature = "kv-file",
feature = "kv-rocksdb",
feature = "kv-fdb",
feature = "kv-tikv",
feature = "kv-speedb"
))]
// Is the datastore in memory? (KV-MEM, WASM)
is_memory: bool,
#[cfg(any(
feature = "kv-mem",
feature = "kv-surrealkv",
feature = "kv-file",
feature = "kv-rocksdb",
@ -108,15 +100,7 @@ impl<'a> Context<'a> {
capabilities: Capabilities,
index_stores: IndexStores,
#[cfg(any(
feature = "kv-surrealkv",
feature = "kv-file",
feature = "kv-rocksdb",
feature = "kv-fdb",
feature = "kv-tikv",
feature = "kv-speedb"
))]
is_memory: bool,
#[cfg(any(
feature = "kv-mem",
feature = "kv-surrealkv",
feature = "kv-file",
feature = "kv-rocksdb",
@ -138,15 +122,7 @@ impl<'a> Context<'a> {
capabilities: Arc::new(capabilities),
index_stores,
#[cfg(any(
feature = "kv-surrealkv",
feature = "kv-file",
feature = "kv-rocksdb",
feature = "kv-fdb",
feature = "kv-tikv",
feature = "kv-speedb"
))]
is_memory,
#[cfg(any(
feature = "kv-mem",
feature = "kv-surrealkv",
feature = "kv-file",
feature = "kv-rocksdb",
@ -175,15 +151,7 @@ impl<'a> Context<'a> {
capabilities: Arc::new(Capabilities::default()),
index_stores: IndexStores::default(),
#[cfg(any(
feature = "kv-surrealkv",
feature = "kv-file",
feature = "kv-rocksdb",
feature = "kv-fdb",
feature = "kv-tikv",
feature = "kv-speedb"
))]
is_memory: false,
#[cfg(any(
feature = "kv-mem",
feature = "kv-surrealkv",
feature = "kv-file",
feature = "kv-rocksdb",
@ -209,15 +177,7 @@ impl<'a> Context<'a> {
capabilities: parent.capabilities.clone(),
index_stores: parent.index_stores.clone(),
#[cfg(any(
feature = "kv-surrealkv",
feature = "kv-file",
feature = "kv-rocksdb",
feature = "kv-fdb",
feature = "kv-tikv",
feature = "kv-speedb"
))]
is_memory: parent.is_memory,
#[cfg(any(
feature = "kv-mem",
feature = "kv-surrealkv",
feature = "kv-file",
feature = "kv-rocksdb",
@ -342,19 +302,7 @@ impl<'a> Context<'a> {
}
#[cfg(any(
feature = "kv-surrealkv",
feature = "kv-file",
feature = "kv-rocksdb",
feature = "kv-fdb",
feature = "kv-tikv",
feature = "kv-speedb"
))]
/// Return true if the underlying Datastore is KV-MEM (Or WASM)
pub fn is_memory(&self) -> bool {
self.is_memory
}
#[cfg(any(
feature = "kv-mem",
feature = "kv-surrealkv",
feature = "kv-file",
feature = "kv-rocksdb",

View file

@ -299,6 +299,7 @@ impl Iterator {
// Prepare the results with possible optimisations on groups
self.results = self.results.prepare(
#[cfg(any(
feature = "kv-mem",
feature = "kv-surrealkv",
feature = "kv-file",
feature = "kv-rocksdb",

View file

@ -2,6 +2,7 @@ use crate::ctx::Context;
use crate::dbs::group::GroupsCollector;
use crate::dbs::plan::Explanation;
#[cfg(any(
feature = "kv-mem",
feature = "kv-surrealkv",
feature = "kv-file",
feature = "kv-rocksdb",
@ -20,6 +21,7 @@ pub(super) enum Results {
None,
Memory(MemoryCollector),
#[cfg(any(
feature = "kv-mem",
feature = "kv-surrealkv",
feature = "kv-file",
feature = "kv-rocksdb",
@ -35,6 +37,7 @@ impl Results {
pub(super) fn prepare(
&mut self,
#[cfg(any(
feature = "kv-mem",
feature = "kv-surrealkv",
feature = "kv-file",
feature = "kv-rocksdb",
@ -49,6 +52,7 @@ impl Results {
return Ok(Self::Groups(GroupsCollector::new(stm)));
}
#[cfg(any(
feature = "kv-mem",
feature = "kv-surrealkv",
feature = "kv-file",
feature = "kv-rocksdb",
@ -56,7 +60,7 @@ impl Results {
feature = "kv-tikv",
feature = "kv-speedb"
))]
if !ctx.is_memory() {
if stm.tempfiles() {
if let Some(temp_dir) = ctx.temporary_directory() {
return Ok(Self::File(Box::new(FileCollector::new(temp_dir)?)));
}
@ -79,6 +83,7 @@ impl Results {
s.push(val);
}
#[cfg(any(
feature = "kv-mem",
feature = "kv-surrealkv",
feature = "kv-file",
feature = "kv-rocksdb",
@ -100,6 +105,7 @@ impl Results {
match self {
Self::Memory(m) => m.sort(orders),
#[cfg(any(
feature = "kv-mem",
feature = "kv-surrealkv",
feature = "kv-file",
feature = "kv-rocksdb",
@ -117,6 +123,7 @@ impl Results {
Self::None => {}
Self::Memory(m) => m.start_limit(start, limit),
#[cfg(any(
feature = "kv-mem",
feature = "kv-surrealkv",
feature = "kv-file",
feature = "kv-rocksdb",
@ -134,6 +141,7 @@ impl Results {
Self::None => 0,
Self::Memory(s) => s.len(),
#[cfg(any(
feature = "kv-mem",
feature = "kv-surrealkv",
feature = "kv-file",
feature = "kv-rocksdb",
@ -150,6 +158,7 @@ impl Results {
Ok(match self {
Self::Memory(m) => m.take_vec(),
#[cfg(any(
feature = "kv-mem",
feature = "kv-surrealkv",
feature = "kv-file",
feature = "kv-rocksdb",
@ -169,6 +178,7 @@ impl Results {
s.explain(exp);
}
#[cfg(any(
feature = "kv-mem",
feature = "kv-surrealkv",
feature = "kv-file",
feature = "kv-rocksdb",

View file

@ -207,7 +207,7 @@ impl<'a> Statement<'a> {
}
/// Returns any PARALLEL clause if specified
#[inline]
#[allow(dead_code)]
#[cfg(not(target_arch = "wasm32"))]
pub fn parallel(&self) -> bool {
match self {
Statement::Select(v) => v.parallel,
@ -219,6 +219,25 @@ impl<'a> Statement<'a> {
_ => false,
}
}
/// Returns any TEMPFILES clause if specified
#[inline]
#[cfg(any(
feature = "kv-mem",
feature = "kv-surrealkv",
feature = "kv-file",
feature = "kv-rocksdb",
feature = "kv-fdb",
feature = "kv-tikv",
feature = "kv-speedb"
))]
pub fn tempfiles(&self) -> bool {
match self {
Statement::Select(v) => v.tempfiles,
_ => false,
}
}
/// Returns any EXPLAIN clause if specified
#[inline]
pub fn explain(&self) -> Option<&Explain> {

View file

@ -50,6 +50,7 @@ impl From<Vec<Value>> for MemoryCollector {
}
#[cfg(any(
feature = "kv-mem",
feature = "kv-surrealkv",
feature = "kv-file",
feature = "kv-rocksdb",

View file

@ -12,6 +12,7 @@ use crate::vs::Error as VersionstampError;
use base64::DecodeError as Base64Error;
use bincode::Error as BincodeError;
#[cfg(any(
feature = "kv-mem",
feature = "kv-surrealkv",
feature = "kv-file",
feature = "kv-rocksdb",
@ -1070,6 +1071,7 @@ impl From<reqwest::Error> for Error {
}
#[cfg(any(
feature = "kv-mem",
feature = "kv-surrealkv",
feature = "kv-file",
feature = "kv-rocksdb",

View file

@ -1,6 +1,7 @@
use std::collections::{BTreeMap, BTreeSet};
use std::fmt;
#[cfg(any(
feature = "kv-mem",
feature = "kv-surrealkv",
feature = "kv-file",
feature = "kv-rocksdb",
@ -93,6 +94,7 @@ pub struct Datastore {
// The JWKS object cache
jwks_cache: Arc<RwLock<JwksCache>>,
#[cfg(any(
feature = "kv-mem",
feature = "kv-surrealkv",
feature = "kv-file",
feature = "kv-rocksdb",
@ -376,6 +378,7 @@ impl Datastore {
#[cfg(feature = "jwks")]
jwks_cache: Arc::new(RwLock::new(JwksCache::new())),
#[cfg(any(
feature = "kv-mem",
feature = "kv-surrealkv",
feature = "kv-file",
feature = "kv-rocksdb",
@ -438,6 +441,7 @@ impl Datastore {
}
#[cfg(any(
feature = "kv-mem",
feature = "kv-surrealkv",
feature = "kv-file",
feature = "kv-rocksdb",
@ -465,22 +469,6 @@ impl Datastore {
self.auth_enabled
}
#[cfg(any(
feature = "kv-surrealkv",
feature = "kv-file",
feature = "kv-rocksdb",
feature = "kv-fdb",
feature = "kv-tikv",
feature = "kv-speedb"
))]
pub(crate) fn is_memory(&self) -> bool {
#[cfg(feature = "kv-mem")]
if matches!(self.inner, Inner::Mem(_)) {
return true;
};
false
}
/// Is authentication level enabled for this Datastore?
/// TODO(gguillemas): Remove this method once the legacy authentication is deprecated in v2.0.0
pub fn is_auth_level_enabled(&self) -> bool {
@ -1188,15 +1176,7 @@ impl Datastore {
self.capabilities.clone(),
self.index_stores.clone(),
#[cfg(any(
feature = "kv-surrealkv",
feature = "kv-file",
feature = "kv-rocksdb",
feature = "kv-fdb",
feature = "kv-tikv",
feature = "kv-speedb"
))]
self.is_memory(),
#[cfg(any(
feature = "kv-mem",
feature = "kv-surrealkv",
feature = "kv-file",
feature = "kv-rocksdb",

View file

@ -13,7 +13,7 @@ use revision::revisioned;
use serde::{Deserialize, Serialize};
use std::fmt;
#[revisioned(revision = 2)]
#[revisioned(revision = 3)]
#[derive(Clone, Debug, Default, Eq, PartialEq, PartialOrd, Serialize, Deserialize, Store, Hash)]
#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
#[non_exhaustive]
@ -35,6 +35,8 @@ pub struct SelectStatement {
pub timeout: Option<Timeout>,
pub parallel: bool,
pub explain: Option<Explain>,
#[revision(start = 3)]
pub tempfiles: bool,
}
impl SelectStatement {

View file

@ -66,6 +66,7 @@ pub struct SerializeSelectStatement {
timeout: Option<Timeout>,
parallel: Option<bool>,
explain: Option<Explain>,
tempfiles: Option<bool>,
}
impl serde::ser::SerializeStruct for SerializeSelectStatement {
@ -122,6 +123,9 @@ impl serde::ser::SerializeStruct for SerializeSelectStatement {
"parallel" => {
self.parallel = Some(value.serialize(ser::primitive::bool::Serializer.wrap())?);
}
"tempfiles" => {
self.tempfiles = Some(value.serialize(ser::primitive::bool::Serializer.wrap())?);
}
"explain" => {
self.explain = value.serialize(ser::explain::opt::Serializer.wrap())?;
}
@ -133,14 +137,15 @@ impl serde::ser::SerializeStruct for SerializeSelectStatement {
}
fn end(self) -> Result<Self::Ok, Error> {
match (self.expr, self.what, self.parallel) {
(Some(expr), Some(what), Some(parallel)) => Ok(SelectStatement {
match (self.expr, self.what, self.parallel, self.tempfiles) {
(Some(expr), Some(what), Some(parallel), Some(tempfiles)) => Ok(SelectStatement {
expr,
omit: self.omit,
only: self.only.is_some_and(|v| v),
what,
with: self.with,
parallel,
tempfiles,
explain: self.explain,
cond: self.cond,
split: self.split,

View file

@ -204,6 +204,7 @@ pub(crate) static KEYWORDS: phf::Map<UniCase<&'static str>, TokenKind> = phf_map
UniCase::ascii("STRUCTURE") => TokenKind::Keyword(Keyword::Structure),
UniCase::ascii("TABLE") => TokenKind::Keyword(Keyword::Table),
UniCase::ascii("TB") => TokenKind::Keyword(Keyword::Table),
UniCase::ascii("TEMPFILES") => TokenKind::Keyword(Keyword::TempFiles),
UniCase::ascii("TERMS_CACHE") => TokenKind::Keyword(Keyword::TermsCache),
UniCase::ascii("TERMS_ORDER") => TokenKind::Keyword(Keyword::TermsOrder),
UniCase::ascii("THEN") => TokenKind::Keyword(Keyword::Then),

View file

@ -58,6 +58,7 @@ impl Parser<'_> {
let version = self.try_parse_version()?;
let timeout = self.try_parse_timeout()?;
let parallel = self.eat(t!("PARALLEL"));
let tempfiles = self.eat(t!("TEMPFILES"));
let explain = self.eat(t!("EXPLAIN")).then(|| Explain(self.eat(t!("FULL"))));
Ok(SelectStatement {
@ -76,6 +77,7 @@ impl Parser<'_> {
version,
timeout,
parallel,
tempfiles,
explain,
})
}

View file

@ -1444,6 +1444,7 @@ SELECT bar as foo,[1,2],bar OMIT bar FROM ONLY a,1
version: Some(Version(Datetime(expected_datetime))),
timeout: None,
parallel: false,
tempfiles: false,
explain: Some(Explain(true)),
}),
);

View file

@ -498,6 +498,7 @@ fn statements() -> Vec<Statement> {
version: Some(Version(Datetime(expected_datetime))),
timeout: None,
parallel: false,
tempfiles: false,
explain: Some(Explain(true)),
}),
Statement::Set(SetStatement {

View file

@ -159,6 +159,7 @@ keyword! {
Start => "START",
Structure => "STRUCTURE",
Table => "TABLE",
TempFiles => "TEMPFILES",
TermsCache => "TERMS_CACHE",
TermsOrder => "TERMS_ORDER",
Then => "THEN",

View file

@ -147,6 +147,7 @@ pub(crate) fn router(
.with_capabilities(address.config.capabilities);
#[cfg(any(
feature = "kv-mem",
feature = "kv-surrealkv",
feature = "kv-file",
feature = "kv-rocksdb",

View file

@ -1,5 +1,6 @@
use crate::{dbs::Capabilities, iam::Level};
#[cfg(any(
feature = "kv-mem",
feature = "kv-surrealkv",
feature = "kv-file",
feature = "kv-rocksdb",
@ -27,6 +28,7 @@ pub struct Config {
pub(crate) tick_interval: Option<Duration>,
pub(crate) capabilities: Capabilities,
#[cfg(any(
feature = "kv-mem",
feature = "kv-surrealkv",
feature = "kv-file",
feature = "kv-rocksdb",
@ -122,7 +124,9 @@ impl Config {
self.capabilities = capabilities;
self
}
#[cfg(any(
feature = "kv-mem",
feature = "kv-surrealkv",
feature = "kv-file",
feature = "kv-rocksdb",

View file

@ -38,13 +38,6 @@ pub(crate) fn file_exists(path: &str) -> Result<PathBuf, String> {
Ok(path)
}
#[cfg(any(
feature = "storage-surrealkv",
feature = "storage-rocksdb",
feature = "storage-fdb",
feature = "storage-tikv",
feature = "storage-speedb"
))]
pub(crate) fn dir_exists(path: &str) -> Result<PathBuf, String> {
let path = path_exists(path)?;
if !path.is_dir() {

View file

@ -1,13 +1,6 @@
use crate::cli::CF;
use crate::err::Error;
use clap::Args;
#[cfg(any(
feature = "storage-surrealkv",
feature = "storage-rocksdb",
feature = "storage-fdb",
feature = "storage-tikv",
feature = "storage-speedb"
))]
use std::path::PathBuf;
use std::sync::{Arc, OnceLock};
use std::time::Duration;
@ -45,13 +38,6 @@ pub struct StartCommandDbsOptions {
#[command(flatten)]
#[command(next_help_heading = "Capabilities")]
caps: DbsCapabilities,
#[cfg(any(
feature = "storage-surrealkv",
feature = "storage-speedb",
feature = "storage-rocksdb",
feature = "storage-fdb",
feature = "storage-tikv",
))]
#[arg(help = "Sets the directory for storing temporary database files")]
#[arg(env = "SURREAL_TEMPORARY_DIRECTORY", long = "temporary-directory")]
#[arg(value_parser = super::cli::validator::dir_exists)]
@ -233,13 +219,6 @@ pub async fn init(
// TODO(gguillemas): Remove this field once the legacy authentication is deprecated in v2.0.0
auth_level_enabled,
caps,
#[cfg(any(
feature = "storage-surrealkv",
feature = "storage-rocksdb",
feature = "storage-fdb",
feature = "storage-tikv",
feature = "storage-speedb"
))]
temporary_directory,
}: StartCommandDbsOptions,
) -> Result<(), Error> {
@ -280,13 +259,6 @@ pub async fn init(
.with_auth_level_enabled(auth_level_enabled)
.with_capabilities(caps);
#[cfg(any(
feature = "storage-surrealkv",
feature = "storage-rocksdb",
feature = "storage-fdb",
feature = "storage-tikv",
feature = "storage-speedb"
))]
let mut dbs = match temporary_directory {
Some(tmp_dir) => dbs.with_temporary_directory(tmp_dir),
_ => dbs,

View file

@ -191,6 +191,16 @@ pub async fn start_server_with_defaults() -> Result<(String, Child), Box<dyn Err
start_server(StartServerArguments::default()).await
}
pub async fn start_server_with_temporary_directory(
path: &str,
) -> Result<(String, Child), Box<dyn Error>> {
start_server(StartServerArguments {
temporary_directory: Some(path.to_string()),
..Default::default()
})
.await
}
pub async fn start_server(
StartServerArguments {
path,

View file

@ -1,4 +1,5 @@
use super::common::{self, Format, Socket, DB, NS, PASS, USER};
use assert_fs::TempDir;
use serde_json::json;
use std::future::Future;
use std::pin::Pin;
@ -1539,3 +1540,46 @@ async fn relate_rpc() {
// Test passed
server.finish().unwrap();
}
#[test(tokio::test)]
async fn temporary_directory() {
// Setup database server
let temp_dir = TempDir::new().unwrap();
let (addr, mut server) =
common::start_server_with_temporary_directory(temp_dir.to_string_lossy().as_ref())
.await
.unwrap();
// Connect to WebSocket
let mut socket = Socket::connect(&addr, SERVER, FORMAT).await.unwrap();
// Authenticate the connection
socket.send_message_signin(USER, PASS, None, None, None).await.unwrap();
// Specify a namespace and database
socket.send_message_use(Some(NS), Some(DB)).await.unwrap();
// create records
socket.send_message_query("CREATE test:a, test:b").await.unwrap();
// These selects use the memory collector
let mut res =
socket.send_message_query("SELECT * FROM test ORDER BY id DESC EXPLAIN").await.unwrap();
let expected = json!([{"detail": { "table": "test" }, "operation": "Iterate Table" }, { "detail": { "type": "Memory" }, "operation": "Collector" }]);
assert_eq!(res.remove(0)["result"], expected);
// And return the correct result
let mut res = socket.send_message_query("SELECT * FROM test ORDER BY id DESC").await.unwrap();
let expected = json!([{"id": "test:b" }, { "id": "test:a" }]);
assert_eq!(res.remove(0)["result"], expected);
// This one should the file collector
let mut res = socket
.send_message_query("SELECT * FROM test ORDER BY id DESC TEMPFILES EXPLAIN")
.await
.unwrap();
let expected = json!([{"detail": { "table": "test" }, "operation": "Iterate Table" }, { "detail": { "type": "TempFiles" }, "operation": "Collector" }]);
assert_eq!(res.remove(0)["result"], expected);
// And return the correct result
let mut res =
socket.send_message_query("SELECT * FROM test ORDER BY id DESC TEMPFILES").await.unwrap();
let expected = json!([{"id": "test:b" }, { "id": "test:a" }]);
assert_eq!(res.remove(0)["result"], expected);
// Test passed
server.finish().unwrap();
// Cleanup
temp_dir.close().unwrap();
}