Allow transferring raw futures over cbor (#4504)
Co-authored-by: Micha de Vries <micha@devrie.sh>
This commit is contained in:
parent
4c3f4d0fd5
commit
95f9f8dacd
7 changed files with 128 additions and 9 deletions
|
@ -243,6 +243,13 @@ impl<'a> Executor<'a> {
|
||||||
} else {
|
} else {
|
||||||
Force::None
|
Force::None
|
||||||
}),
|
}),
|
||||||
|
"FUTURES" => {
|
||||||
|
if stm.what {
|
||||||
|
opt.with_futures(true)
|
||||||
|
} else {
|
||||||
|
opt.with_futures_never()
|
||||||
|
}
|
||||||
|
}
|
||||||
_ => break,
|
_ => break,
|
||||||
};
|
};
|
||||||
// Continue
|
// Continue
|
||||||
|
|
|
@ -41,7 +41,7 @@ pub struct Options {
|
||||||
/// Should we process field queries?
|
/// Should we process field queries?
|
||||||
pub import: bool,
|
pub import: bool,
|
||||||
/// Should we process function futures?
|
/// Should we process function futures?
|
||||||
pub futures: bool,
|
pub futures: Futures,
|
||||||
/// Should we process variable field projections?
|
/// Should we process variable field projections?
|
||||||
pub projections: bool,
|
pub projections: bool,
|
||||||
/// The channel over which we send notifications
|
/// The channel over which we send notifications
|
||||||
|
@ -59,6 +59,13 @@ pub enum Force {
|
||||||
Index(Arc<[DefineIndexStatement]>),
|
Index(Arc<[DefineIndexStatement]>),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Clone, Debug)]
|
||||||
|
pub enum Futures {
|
||||||
|
Disabled,
|
||||||
|
Enabled,
|
||||||
|
Never,
|
||||||
|
}
|
||||||
|
|
||||||
impl Default for Options {
|
impl Default for Options {
|
||||||
fn default() -> Self {
|
fn default() -> Self {
|
||||||
Options::new()
|
Options::new()
|
||||||
|
@ -78,7 +85,7 @@ impl Options {
|
||||||
force: Force::None,
|
force: Force::None,
|
||||||
strict: false,
|
strict: false,
|
||||||
import: false,
|
import: false,
|
||||||
futures: false,
|
futures: Futures::Disabled,
|
||||||
projections: false,
|
projections: false,
|
||||||
auth_enabled: true,
|
auth_enabled: true,
|
||||||
sender: None,
|
sender: None,
|
||||||
|
@ -189,7 +196,20 @@ impl Options {
|
||||||
|
|
||||||
/// Specify if we should process futures
|
/// Specify if we should process futures
|
||||||
pub fn with_futures(mut self, futures: bool) -> Self {
|
pub fn with_futures(mut self, futures: bool) -> Self {
|
||||||
self.futures = futures;
|
if matches!(self.futures, Futures::Never) {
|
||||||
|
return self;
|
||||||
|
}
|
||||||
|
|
||||||
|
self.futures = match futures {
|
||||||
|
true => Futures::Enabled,
|
||||||
|
false => Futures::Disabled,
|
||||||
|
};
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Specify if we should never process futures
|
||||||
|
pub fn with_futures_never(mut self) -> Self {
|
||||||
|
self.futures = Futures::Never;
|
||||||
self
|
self
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -221,6 +241,7 @@ impl Options {
|
||||||
ns: self.ns.clone(),
|
ns: self.ns.clone(),
|
||||||
db: self.db.clone(),
|
db: self.db.clone(),
|
||||||
force: self.force.clone(),
|
force: self.force.clone(),
|
||||||
|
futures: self.futures.clone(),
|
||||||
perms,
|
perms,
|
||||||
..*self
|
..*self
|
||||||
}
|
}
|
||||||
|
@ -233,6 +254,7 @@ impl Options {
|
||||||
auth: self.auth.clone(),
|
auth: self.auth.clone(),
|
||||||
ns: self.ns.clone(),
|
ns: self.ns.clone(),
|
||||||
db: self.db.clone(),
|
db: self.db.clone(),
|
||||||
|
futures: self.futures.clone(),
|
||||||
force,
|
force,
|
||||||
..*self
|
..*self
|
||||||
}
|
}
|
||||||
|
@ -246,6 +268,7 @@ impl Options {
|
||||||
ns: self.ns.clone(),
|
ns: self.ns.clone(),
|
||||||
db: self.db.clone(),
|
db: self.db.clone(),
|
||||||
force: self.force.clone(),
|
force: self.force.clone(),
|
||||||
|
futures: self.futures.clone(),
|
||||||
strict,
|
strict,
|
||||||
..*self
|
..*self
|
||||||
}
|
}
|
||||||
|
@ -259,6 +282,7 @@ impl Options {
|
||||||
ns: self.ns.clone(),
|
ns: self.ns.clone(),
|
||||||
db: self.db.clone(),
|
db: self.db.clone(),
|
||||||
force: self.force.clone(),
|
force: self.force.clone(),
|
||||||
|
futures: self.futures.clone(),
|
||||||
import,
|
import,
|
||||||
..*self
|
..*self
|
||||||
}
|
}
|
||||||
|
@ -272,7 +296,13 @@ impl Options {
|
||||||
ns: self.ns.clone(),
|
ns: self.ns.clone(),
|
||||||
db: self.db.clone(),
|
db: self.db.clone(),
|
||||||
force: self.force.clone(),
|
force: self.force.clone(),
|
||||||
futures,
|
futures: match self.futures {
|
||||||
|
Futures::Never => Futures::Never,
|
||||||
|
_ => match futures {
|
||||||
|
true => Futures::Enabled,
|
||||||
|
false => Futures::Disabled,
|
||||||
|
},
|
||||||
|
},
|
||||||
..*self
|
..*self
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -285,6 +315,7 @@ impl Options {
|
||||||
ns: self.ns.clone(),
|
ns: self.ns.clone(),
|
||||||
db: self.db.clone(),
|
db: self.db.clone(),
|
||||||
force: self.force.clone(),
|
force: self.force.clone(),
|
||||||
|
futures: self.futures.clone(),
|
||||||
projections,
|
projections,
|
||||||
..*self
|
..*self
|
||||||
}
|
}
|
||||||
|
@ -297,6 +328,7 @@ impl Options {
|
||||||
ns: self.ns.clone(),
|
ns: self.ns.clone(),
|
||||||
db: self.db.clone(),
|
db: self.db.clone(),
|
||||||
force: self.force.clone(),
|
force: self.force.clone(),
|
||||||
|
futures: self.futures.clone(),
|
||||||
sender: Some(sender),
|
sender: Some(sender),
|
||||||
..*self
|
..*self
|
||||||
}
|
}
|
||||||
|
@ -326,6 +358,7 @@ impl Options {
|
||||||
ns: self.ns.clone(),
|
ns: self.ns.clone(),
|
||||||
db: self.db.clone(),
|
db: self.db.clone(),
|
||||||
force: self.force.clone(),
|
force: self.force.clone(),
|
||||||
|
futures: self.futures.clone(),
|
||||||
dive: self.dive - cost as u32,
|
dive: self.dive - cost as u32,
|
||||||
..*self
|
..*self
|
||||||
})
|
})
|
||||||
|
@ -511,4 +544,21 @@ mod tests {
|
||||||
.unwrap();
|
.unwrap();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
pub fn execute_futures() {
|
||||||
|
let mut opts = Options::default().with_futures(false);
|
||||||
|
|
||||||
|
// Futures should be disabled
|
||||||
|
assert!(matches!(opts.futures, Futures::Disabled));
|
||||||
|
|
||||||
|
// Allow setting to true
|
||||||
|
opts = opts.with_futures(true);
|
||||||
|
assert!(matches!(opts.futures, Futures::Enabled));
|
||||||
|
|
||||||
|
// Set to never and disallow setting to true
|
||||||
|
opts = opts.with_futures_never();
|
||||||
|
opts = opts.with_futures(true);
|
||||||
|
assert!(matches!(opts.futures, Futures::Never));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -11,6 +11,7 @@ use crate::sql::id::range::IdRange;
|
||||||
use crate::sql::Array;
|
use crate::sql::Array;
|
||||||
use crate::sql::Datetime;
|
use crate::sql::Datetime;
|
||||||
use crate::sql::Duration;
|
use crate::sql::Duration;
|
||||||
|
use crate::sql::Future;
|
||||||
use crate::sql::Geometry;
|
use crate::sql::Geometry;
|
||||||
use crate::sql::Id;
|
use crate::sql::Id;
|
||||||
use crate::sql::Number;
|
use crate::sql::Number;
|
||||||
|
@ -35,6 +36,7 @@ const TAG_STRING_DECIMAL: u64 = 10;
|
||||||
const TAG_CUSTOM_DATETIME: u64 = 12;
|
const TAG_CUSTOM_DATETIME: u64 = 12;
|
||||||
const TAG_STRING_DURATION: u64 = 13;
|
const TAG_STRING_DURATION: u64 = 13;
|
||||||
const TAG_CUSTOM_DURATION: u64 = 14;
|
const TAG_CUSTOM_DURATION: u64 = 14;
|
||||||
|
const TAG_FUTURE: u64 = 15;
|
||||||
|
|
||||||
// Ranges
|
// Ranges
|
||||||
const TAG_RANGE: u64 = 49;
|
const TAG_RANGE: u64 = 49;
|
||||||
|
@ -193,6 +195,14 @@ impl TryFrom<Cbor> for Value {
|
||||||
},
|
},
|
||||||
// A range
|
// A range
|
||||||
TAG_RANGE => Ok(Value::Range(Box::new(Range::try_from(*v)?))),
|
TAG_RANGE => Ok(Value::Range(Box::new(Range::try_from(*v)?))),
|
||||||
|
TAG_FUTURE => match *v {
|
||||||
|
Data::Text(v) => {
|
||||||
|
let block = crate::syn::block(format!("{{{v}}}").as_str())
|
||||||
|
.map_err(|_| "Failed to parse block")?;
|
||||||
|
Ok(Value::Future(Box::new(Future(block))))
|
||||||
|
}
|
||||||
|
_ => Err("Expected a CBOR text data type"),
|
||||||
|
},
|
||||||
TAG_GEOMETRY_POINT => match *v {
|
TAG_GEOMETRY_POINT => match *v {
|
||||||
Data::Array(mut v) if v.len() == 2 => {
|
Data::Array(mut v) if v.len() == 2 => {
|
||||||
let x = Value::try_from(Cbor(v.remove(0)))?;
|
let x = Value::try_from(Cbor(v.remove(0)))?;
|
||||||
|
@ -395,6 +405,10 @@ impl TryFrom<Value> for Cbor {
|
||||||
Value::Table(v) => Ok(Cbor(Data::Tag(TAG_TABLE, Box::new(Data::Text(v.0))))),
|
Value::Table(v) => Ok(Cbor(Data::Tag(TAG_TABLE, Box::new(Data::Text(v.0))))),
|
||||||
Value::Geometry(v) => Ok(Cbor(encode_geometry(v)?)),
|
Value::Geometry(v) => Ok(Cbor(encode_geometry(v)?)),
|
||||||
Value::Range(v) => Ok(Cbor(Data::try_from(*v)?)),
|
Value::Range(v) => Ok(Cbor(Data::try_from(*v)?)),
|
||||||
|
Value::Future(v) => {
|
||||||
|
let bin = Data::Text(format!("{}", (*v).0));
|
||||||
|
Ok(Cbor(Data::Tag(TAG_FUTURE, Box::new(bin))))
|
||||||
|
}
|
||||||
// We shouldn't reach here
|
// We shouldn't reach here
|
||||||
_ => Err("Found unsupported SurrealQL value being encoded into a CBOR value"),
|
_ => Err("Found unsupported SurrealQL value being encoded into a CBOR value"),
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,9 +1,9 @@
|
||||||
use crate::ctx::Context;
|
|
||||||
use crate::dbs::Options;
|
use crate::dbs::Options;
|
||||||
use crate::doc::CursorDoc;
|
use crate::doc::CursorDoc;
|
||||||
use crate::err::Error;
|
use crate::err::Error;
|
||||||
use crate::sql::block::Block;
|
use crate::sql::block::Block;
|
||||||
use crate::sql::value::Value;
|
use crate::sql::value::Value;
|
||||||
|
use crate::{ctx::Context, dbs::Futures};
|
||||||
use reblessive::tree::Stk;
|
use reblessive::tree::Stk;
|
||||||
use revision::revisioned;
|
use revision::revisioned;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
|
@ -35,8 +35,8 @@ impl Future {
|
||||||
) -> Result<Value, Error> {
|
) -> Result<Value, Error> {
|
||||||
// Process the future if enabled
|
// Process the future if enabled
|
||||||
match opt.futures {
|
match opt.futures {
|
||||||
true => stk.run(|stk| self.0.compute(stk, ctx, opt, doc)).await?.ok(),
|
Futures::Enabled => stk.run(|stk| self.0.compute(stk, ctx, opt, doc)).await?.ok(),
|
||||||
false => Ok(self.clone().into()),
|
_ => Ok(self.clone().into()),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -2,7 +2,7 @@
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
err::Error,
|
err::Error,
|
||||||
sql::{Datetime, Duration, Idiom, Query, Range, Subquery, Thing, Value},
|
sql::{Block, Datetime, Duration, Idiom, Query, Range, Subquery, Thing, Value},
|
||||||
};
|
};
|
||||||
|
|
||||||
pub mod common;
|
pub mod common;
|
||||||
|
@ -21,6 +21,7 @@ mod test;
|
||||||
|
|
||||||
use parser::Parser;
|
use parser::Parser;
|
||||||
use reblessive::Stack;
|
use reblessive::Stack;
|
||||||
|
use token::t;
|
||||||
|
|
||||||
/// Takes a string and returns if it could be a reserved keyword in certain contexts.
|
/// Takes a string and returns if it could be a reserved keyword in certain contexts.
|
||||||
pub fn could_be_reserved_keyword(s: &str) -> bool {
|
pub fn could_be_reserved_keyword(s: &str) -> bool {
|
||||||
|
@ -168,3 +169,32 @@ pub fn thing(input: &str) -> Result<Thing, Error> {
|
||||||
.map_err(|e| e.render_on(input))
|
.map_err(|e| e.render_on(input))
|
||||||
.map_err(Error::InvalidQuery)
|
.map_err(Error::InvalidQuery)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Parse a block, expects the value to be wrapped in `{}`.
|
||||||
|
pub fn block(input: &str) -> Result<Block, Error> {
|
||||||
|
debug!("parsing block, input = {input}");
|
||||||
|
|
||||||
|
let mut parser = Parser::new(input.as_bytes());
|
||||||
|
let mut stack = Stack::new();
|
||||||
|
|
||||||
|
match parser.peek_kind() {
|
||||||
|
t!("{") => {
|
||||||
|
let start = parser.pop_peek().span;
|
||||||
|
stack
|
||||||
|
.enter(|stk| parser.parse_block(stk, start))
|
||||||
|
.finish()
|
||||||
|
.map_err(|e| e.render_on(input))
|
||||||
|
.map_err(Error::InvalidQuery)
|
||||||
|
}
|
||||||
|
found => Err(Error::InvalidQuery(
|
||||||
|
crate::syn::parser::ParseError::new(
|
||||||
|
crate::syn::parser::ParseErrorKind::Unexpected {
|
||||||
|
expected: "{",
|
||||||
|
found,
|
||||||
|
},
|
||||||
|
parser.last_span(),
|
||||||
|
)
|
||||||
|
.render_on(input),
|
||||||
|
)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -637,7 +637,7 @@ impl Parser<'_> {
|
||||||
/// # Parser State
|
/// # Parser State
|
||||||
/// Expects the starting `{` to have already been eaten and its span to be handed to this
|
/// Expects the starting `{` to have already been eaten and its span to be handed to this
|
||||||
/// functions as the `start` parameter.
|
/// functions as the `start` parameter.
|
||||||
pub(super) async fn parse_block(&mut self, ctx: &mut Stk, start: Span) -> ParseResult<Block> {
|
pub async fn parse_block(&mut self, ctx: &mut Stk, start: Span) -> ParseResult<Block> {
|
||||||
let mut statements = Vec::new();
|
let mut statements = Vec::new();
|
||||||
loop {
|
loop {
|
||||||
while self.eat(t!(";")) {}
|
while self.eat(t!(";")) {}
|
||||||
|
|
|
@ -67,6 +67,24 @@ async fn future_function_arguments() -> Result<(), Error> {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn future_disabled() -> Result<(), Error> {
|
||||||
|
let sql = "
|
||||||
|
OPTION FUTURES = false;
|
||||||
|
<future> { 123 };
|
||||||
|
";
|
||||||
|
let dbs = new_ds().await?;
|
||||||
|
let ses = Session::owner().with_ns("test").with_db("test");
|
||||||
|
let res = &mut dbs.execute(sql, &ses, None).await?;
|
||||||
|
assert_eq!(res.len(), 1);
|
||||||
|
//
|
||||||
|
let tmp = res.remove(0).result?;
|
||||||
|
let val = Value::parse("<future> { 123 }");
|
||||||
|
assert_eq!(tmp, val);
|
||||||
|
//
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn concurrency() -> Result<(), Error> {
|
async fn concurrency() -> Result<(), Error> {
|
||||||
// cargo test --package surrealdb --test future --features kv-mem --release -- concurrency --nocapture
|
// cargo test --package surrealdb --test future --features kv-mem --release -- concurrency --nocapture
|
||||||
|
|
Loading…
Reference in a new issue