Fix bug with timeouts not always being processed (#4721)
This commit is contained in:
parent
30118f445c
commit
ba18cc2d79
12 changed files with 189 additions and 69 deletions
|
@ -207,5 +207,6 @@ license-file = ["LICENSE", "4"]
|
|||
[lints.rust]
|
||||
unexpected_cfgs = { level = "warn", check-cfg = [
|
||||
'cfg(docker)',
|
||||
'cfg(storage)',
|
||||
'cfg(surrealdb_unstable)',
|
||||
] }
|
||||
|
|
|
@ -198,7 +198,10 @@ tokio-tungstenite = { version = "0.21.0", optional = true }
|
|||
uuid = { version = "1.10.0", features = ["serde", "v4", "v7"] }
|
||||
|
||||
[lints.rust]
|
||||
unexpected_cfgs = { level = "warn", check-cfg = ['cfg(surrealdb_unstable)'] }
|
||||
unexpected_cfgs = { level = "warn", check-cfg = [
|
||||
'cfg(storage)',
|
||||
'cfg(surrealdb_unstable)',
|
||||
] }
|
||||
|
||||
[lib]
|
||||
name = "surrealdb_core"
|
||||
|
|
|
@ -364,42 +364,17 @@ impl<'a> Executor<'a> {
|
|||
true => Err(Error::TxFailure),
|
||||
// The transaction began successfully
|
||||
false => {
|
||||
// Create a new context for this statement
|
||||
let mut ctx = MutableContext::new(&ctx);
|
||||
// Set the transaction on the context
|
||||
ctx.set_transaction(self.txn());
|
||||
let c = ctx.freeze();
|
||||
// Process the statement
|
||||
let res = match stm.timeout() {
|
||||
// There is a timeout clause
|
||||
Some(timeout) => {
|
||||
// Set statement timeout or propagate the error
|
||||
if let Err(err) = ctx.add_timeout(timeout) {
|
||||
Err(err)
|
||||
} else {
|
||||
ctx.set_transaction(self.txn());
|
||||
let c = ctx.freeze();
|
||||
// Process the statement
|
||||
let res = stack
|
||||
.enter(|stk| stm.compute(stk, &c, &opt, None))
|
||||
.finish()
|
||||
.await;
|
||||
ctx = MutableContext::unfreeze(c)?;
|
||||
// Catch statement timeout
|
||||
match ctx.is_timedout() {
|
||||
true => Err(Error::QueryTimedout),
|
||||
false => res,
|
||||
}
|
||||
}
|
||||
}
|
||||
// There is no timeout clause
|
||||
None => {
|
||||
ctx.set_transaction(self.txn());
|
||||
let c = ctx.freeze();
|
||||
let r = stack
|
||||
.enter(|stk| stm.compute(stk, &c, &opt, None))
|
||||
.finish()
|
||||
.await;
|
||||
ctx = MutableContext::unfreeze(c)?;
|
||||
r
|
||||
}
|
||||
};
|
||||
let res = stack
|
||||
.enter(|stk| stm.compute(stk, &c, &opt, None))
|
||||
.finish()
|
||||
.await;
|
||||
ctx = MutableContext::unfreeze(c)?;
|
||||
// Check if this is a RETURN statement
|
||||
let can_return =
|
||||
matches!(stm, Statement::Output(_) | Statement::Value(_));
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
use crate::ctx::Context;
|
||||
use crate::ctx::{Context, MutableContext};
|
||||
use crate::dbs::{Iterator, Options, Statement};
|
||||
use crate::doc::CursorDoc;
|
||||
use crate::err::Error;
|
||||
|
@ -55,10 +55,19 @@ impl CreateStatement {
|
|||
let version = self.version.as_ref().map(|v| v.to_u64());
|
||||
// Ensure futures are stored
|
||||
let opt = &opt.new_with_futures(false).with_version(version);
|
||||
// Check if there is a timeout
|
||||
let ctx = match self.timeout.as_ref() {
|
||||
Some(timeout) => {
|
||||
let mut ctx = MutableContext::new(ctx);
|
||||
ctx.add_timeout(*timeout.0)?;
|
||||
ctx.freeze()
|
||||
}
|
||||
None => ctx.clone(),
|
||||
};
|
||||
// Loop over the create targets
|
||||
for w in self.what.0.iter() {
|
||||
let v = w.compute(stk, ctx, opt, doc).await?;
|
||||
i.prepare(stk, ctx, opt, &stm, v).await.map_err(|e| match e {
|
||||
let v = w.compute(stk, &ctx, opt, doc).await?;
|
||||
i.prepare(stk, &ctx, opt, &stm, v).await.map_err(|e| match e {
|
||||
Error::InvalidStatementTarget {
|
||||
value: v,
|
||||
} => Error::CreateStatement {
|
||||
|
@ -67,8 +76,14 @@ impl CreateStatement {
|
|||
e => e,
|
||||
})?;
|
||||
}
|
||||
// Process the statement
|
||||
let res = i.output(stk, &ctx, opt, &stm).await?;
|
||||
// Catch statement timeout
|
||||
if ctx.is_timedout() {
|
||||
return Err(Error::QueryTimedout);
|
||||
}
|
||||
// Output the results
|
||||
match i.output(stk, ctx, opt, &stm).await? {
|
||||
match res {
|
||||
// This is a single record result
|
||||
Value::Array(mut a) if self.only => match a.len() {
|
||||
// There was exactly one result
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
use crate::ctx::Context;
|
||||
use crate::ctx::{Context, MutableContext};
|
||||
use crate::dbs::{Iterator, Options, Statement};
|
||||
use crate::doc::CursorDoc;
|
||||
use crate::err::Error;
|
||||
|
@ -44,10 +44,19 @@ impl DeleteStatement {
|
|||
let stm = Statement::from(self);
|
||||
// Ensure futures are stored
|
||||
let opt = &opt.new_with_futures(false).with_projections(false);
|
||||
// Check if there is a timeout
|
||||
let ctx = match self.timeout.as_ref() {
|
||||
Some(timeout) => {
|
||||
let mut ctx = MutableContext::new(ctx);
|
||||
ctx.add_timeout(*timeout.0)?;
|
||||
ctx.freeze()
|
||||
}
|
||||
None => ctx.clone(),
|
||||
};
|
||||
// Loop over the delete targets
|
||||
for w in self.what.0.iter() {
|
||||
let v = w.compute(stk, ctx, opt, doc).await?;
|
||||
i.prepare(stk, ctx, opt, &stm, v).await.map_err(|e| match e {
|
||||
let v = w.compute(stk, &ctx, opt, doc).await?;
|
||||
i.prepare(stk, &ctx, opt, &stm, v).await.map_err(|e| match e {
|
||||
Error::InvalidStatementTarget {
|
||||
value: v,
|
||||
} => Error::DeleteStatement {
|
||||
|
@ -56,8 +65,14 @@ impl DeleteStatement {
|
|||
e => e,
|
||||
})?;
|
||||
}
|
||||
// Process the statement
|
||||
let res = i.output(stk, &ctx, opt, &stm).await?;
|
||||
// Catch statement timeout
|
||||
if ctx.is_timedout() {
|
||||
return Err(Error::QueryTimedout);
|
||||
}
|
||||
// Output the results
|
||||
match i.output(stk, ctx, opt, &stm).await? {
|
||||
match res {
|
||||
// This is a single record result
|
||||
Value::Array(mut a) if self.only => match a.len() {
|
||||
// There was exactly one result
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
use crate::ctx::Context;
|
||||
use crate::ctx::{Context, MutableContext};
|
||||
use crate::dbs::{Iterable, Iterator, Options, Statement};
|
||||
use crate::doc::CursorDoc;
|
||||
use crate::err::Error;
|
||||
|
@ -50,10 +50,19 @@ impl InsertStatement {
|
|||
let version = self.version.as_ref().map(|v| v.to_u64());
|
||||
// Ensure futures are stored
|
||||
let opt = &opt.new_with_futures(false).with_projections(false).with_version(version);
|
||||
// Check if there is a timeout
|
||||
let ctx = match self.timeout.as_ref() {
|
||||
Some(timeout) => {
|
||||
let mut ctx = MutableContext::new(ctx);
|
||||
ctx.add_timeout(*timeout.0)?;
|
||||
ctx.freeze()
|
||||
}
|
||||
None => ctx.clone(),
|
||||
};
|
||||
// Parse the INTO expression
|
||||
let into = match &self.into {
|
||||
None => None,
|
||||
Some(into) => match into.compute(stk, ctx, opt, doc).await? {
|
||||
Some(into) => match into.compute(stk, &ctx, opt, doc).await? {
|
||||
Value::Table(into) => Some(into),
|
||||
v => {
|
||||
return Err(Error::InsertStatement {
|
||||
|
@ -71,8 +80,8 @@ impl InsertStatement {
|
|||
let mut o = Value::base();
|
||||
// Set each field from the expression
|
||||
for (k, v) in v.iter() {
|
||||
let v = v.compute(stk, ctx, opt, None).await?;
|
||||
o.set(stk, ctx, opt, k, v).await?;
|
||||
let v = v.compute(stk, &ctx, opt, None).await?;
|
||||
o.set(stk, &ctx, opt, k, v).await?;
|
||||
}
|
||||
// Specify the new table record id
|
||||
let id = gen_id(&o, &into)?;
|
||||
|
@ -82,7 +91,7 @@ impl InsertStatement {
|
|||
}
|
||||
// Check if this is a modern statement
|
||||
Data::SingleExpression(v) => {
|
||||
let v = v.compute(stk, ctx, opt, doc).await?;
|
||||
let v = v.compute(stk, &ctx, opt, doc).await?;
|
||||
match v {
|
||||
Value::Array(v) => {
|
||||
for v in v {
|
||||
|
@ -109,8 +118,14 @@ impl InsertStatement {
|
|||
}
|
||||
// Assign the statement
|
||||
let stm = Statement::from(self);
|
||||
// Process the statement
|
||||
let res = i.output(stk, &ctx, opt, &stm).await?;
|
||||
// Catch statement timeout
|
||||
if ctx.is_timedout() {
|
||||
return Err(Error::QueryTimedout);
|
||||
}
|
||||
// Output the results
|
||||
i.output(stk, ctx, opt, &stm).await
|
||||
Ok(res)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
use crate::ctx::Context;
|
||||
use crate::ctx::{Context, MutableContext};
|
||||
use crate::dbs::{Iterable, Iterator, Options, Statement};
|
||||
use crate::doc::CursorDoc;
|
||||
use crate::err::Error;
|
||||
|
@ -45,10 +45,19 @@ impl RelateStatement {
|
|||
let mut i = Iterator::new();
|
||||
// Ensure futures are stored
|
||||
let opt = &opt.new_with_futures(false).with_projections(false);
|
||||
// Check if there is a timeout
|
||||
let ctx = match self.timeout.as_ref() {
|
||||
Some(timeout) => {
|
||||
let mut ctx = MutableContext::new(ctx);
|
||||
ctx.add_timeout(*timeout.0)?;
|
||||
ctx.freeze()
|
||||
}
|
||||
None => ctx.clone(),
|
||||
};
|
||||
// Loop over the from targets
|
||||
let from = {
|
||||
let mut out = Vec::new();
|
||||
match self.from.compute(stk, ctx, opt, doc).await? {
|
||||
match self.from.compute(stk, &ctx, opt, doc).await? {
|
||||
Value::Thing(v) => out.push(v),
|
||||
Value::Array(v) => {
|
||||
for v in v {
|
||||
|
@ -90,7 +99,7 @@ impl RelateStatement {
|
|||
// Loop over the with targets
|
||||
let with = {
|
||||
let mut out = Vec::new();
|
||||
match self.with.compute(stk, ctx, opt, doc).await? {
|
||||
match self.with.compute(stk, &ctx, opt, doc).await? {
|
||||
Value::Thing(v) => out.push(v),
|
||||
Value::Array(v) => {
|
||||
for v in v {
|
||||
|
@ -133,14 +142,14 @@ impl RelateStatement {
|
|||
for w in with.iter() {
|
||||
let f = f.clone();
|
||||
let w = w.clone();
|
||||
match &self.kind.compute(stk, ctx, opt, doc).await? {
|
||||
match &self.kind.compute(stk, &ctx, opt, doc).await? {
|
||||
// The relation has a specific record id
|
||||
Value::Thing(id) => i.ingest(Iterable::Relatable(f, id.to_owned(), w, None)),
|
||||
// The relation does not have a specific record id
|
||||
Value::Table(tb) => match &self.data {
|
||||
// There is a data clause so check for a record id
|
||||
Some(data) => {
|
||||
let id = match data.rid(stk, ctx, opt).await? {
|
||||
let id = match data.rid(stk, &ctx, opt).await? {
|
||||
Some(id) => id.generate(tb, false)?,
|
||||
None => tb.generate(),
|
||||
};
|
||||
|
@ -160,8 +169,14 @@ impl RelateStatement {
|
|||
}
|
||||
// Assign the statement
|
||||
let stm = Statement::from(self);
|
||||
// Process the statement
|
||||
let res = i.output(stk, &ctx, opt, &stm).await?;
|
||||
// Catch statement timeout
|
||||
if ctx.is_timedout() {
|
||||
return Err(Error::QueryTimedout);
|
||||
}
|
||||
// Output the results
|
||||
match i.output(stk, ctx, opt, &stm).await? {
|
||||
match res {
|
||||
// This is a single record result
|
||||
Value::Array(mut a) if self.only => match a.len() {
|
||||
// There was exactly one result
|
||||
|
|
|
@ -94,15 +94,24 @@ impl SelectStatement {
|
|||
if self.only && !limit_is_one_or_zero && self.what.0.len() > 1 {
|
||||
return Err(Error::SingleOnlyOutput);
|
||||
}
|
||||
// Check if there is a timeout
|
||||
let ctx = match self.timeout.as_ref() {
|
||||
Some(timeout) => {
|
||||
let mut ctx = MutableContext::new(ctx);
|
||||
ctx.add_timeout(*timeout.0)?;
|
||||
ctx.freeze()
|
||||
}
|
||||
None => ctx.clone(),
|
||||
};
|
||||
// Loop over the select targets
|
||||
for w in self.what.0.iter() {
|
||||
let v = w.compute(stk, ctx, &opt, doc).await?;
|
||||
let v = w.compute(stk, &ctx, &opt, doc).await?;
|
||||
match v {
|
||||
Value::Table(t) => {
|
||||
if self.only && !limit_is_one_or_zero {
|
||||
return Err(Error::SingleOnlyOutput);
|
||||
}
|
||||
planner.add_iterables(stk, ctx, t, &mut i).await?;
|
||||
planner.add_iterables(stk, &ctx, t, &mut i).await?;
|
||||
}
|
||||
Value::Thing(v) => match &v.id {
|
||||
Id::Range(r) => i.ingest(Iterable::TableRange(v.tb, *r.to_owned())),
|
||||
|
@ -132,7 +141,7 @@ impl SelectStatement {
|
|||
for v in v {
|
||||
match v {
|
||||
Value::Table(t) => {
|
||||
planner.add_iterables(stk, ctx, t, &mut i).await?;
|
||||
planner.add_iterables(stk, &ctx, t, &mut i).await?;
|
||||
}
|
||||
Value::Thing(v) => i.ingest(Iterable::Thing(v)),
|
||||
Value::Edges(v) => i.ingest(Iterable::Edges(*v)),
|
||||
|
@ -149,14 +158,20 @@ impl SelectStatement {
|
|||
};
|
||||
}
|
||||
// Create a new context
|
||||
let mut ctx = MutableContext::new(ctx);
|
||||
let mut ctx = MutableContext::new(&ctx);
|
||||
// Add query executors if any
|
||||
if planner.has_executors() {
|
||||
ctx.set_query_planner(planner);
|
||||
}
|
||||
let ctx = ctx.freeze();
|
||||
// Process the statement
|
||||
let res = i.output(stk, &ctx, &opt, &stm).await?;
|
||||
// Catch statement timeout
|
||||
if ctx.is_timedout() {
|
||||
return Err(Error::QueryTimedout);
|
||||
}
|
||||
// Output the results
|
||||
match i.output(stk, &ctx, &opt, &stm).await? {
|
||||
match res {
|
||||
// This is a single record result
|
||||
Value::Array(mut a) if self.only => match a.len() {
|
||||
// There were no results
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
use crate::ctx::Context;
|
||||
use crate::ctx::{Context, MutableContext};
|
||||
use crate::dbs::{Iterator, Options, Statement};
|
||||
use crate::doc::CursorDoc;
|
||||
use crate::err::Error;
|
||||
|
@ -45,10 +45,19 @@ impl UpdateStatement {
|
|||
let stm = Statement::from(self);
|
||||
// Ensure futures are stored
|
||||
let opt = &opt.new_with_futures(false).with_projections(false);
|
||||
// Check if there is a timeout
|
||||
let ctx = match self.timeout.as_ref() {
|
||||
Some(timeout) => {
|
||||
let mut ctx = MutableContext::new(ctx);
|
||||
ctx.add_timeout(*timeout.0)?;
|
||||
ctx.freeze()
|
||||
}
|
||||
None => ctx.clone(),
|
||||
};
|
||||
// Loop over the update targets
|
||||
for w in self.what.0.iter() {
|
||||
let v = w.compute(stk, ctx, opt, doc).await?;
|
||||
i.prepare(stk, ctx, opt, &stm, v).await.map_err(|e| match e {
|
||||
let v = w.compute(stk, &ctx, opt, doc).await?;
|
||||
i.prepare(stk, &ctx, opt, &stm, v).await.map_err(|e| match e {
|
||||
Error::InvalidStatementTarget {
|
||||
value: v,
|
||||
} => Error::UpdateStatement {
|
||||
|
@ -57,8 +66,14 @@ impl UpdateStatement {
|
|||
e => e,
|
||||
})?;
|
||||
}
|
||||
// Process the statement
|
||||
let res = i.output(stk, &ctx, opt, &stm).await?;
|
||||
// Catch statement timeout
|
||||
if ctx.is_timedout() {
|
||||
return Err(Error::QueryTimedout);
|
||||
}
|
||||
// Output the results
|
||||
match i.output(stk, ctx, opt, &stm).await? {
|
||||
match res {
|
||||
// This is a single record result
|
||||
Value::Array(mut a) if self.only => match a.len() {
|
||||
// There was exactly one result
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
use crate::ctx::Context;
|
||||
use crate::ctx::{Context, MutableContext};
|
||||
use crate::dbs::{Iterator, Options, Statement};
|
||||
use crate::doc::CursorDoc;
|
||||
use crate::err::Error;
|
||||
|
@ -44,10 +44,19 @@ impl UpsertStatement {
|
|||
let stm = Statement::from(self);
|
||||
// Ensure futures are stored
|
||||
let opt = &opt.new_with_futures(false).with_projections(false);
|
||||
// Check if there is a timeout
|
||||
let ctx = match self.timeout.as_ref() {
|
||||
Some(timeout) => {
|
||||
let mut ctx = MutableContext::new(ctx);
|
||||
ctx.add_timeout(*timeout.0)?;
|
||||
ctx.freeze()
|
||||
}
|
||||
None => ctx.clone(),
|
||||
};
|
||||
// Loop over the upsert targets
|
||||
for w in self.what.0.iter() {
|
||||
let v = w.compute(stk, ctx, opt, doc).await?;
|
||||
i.prepare(stk, ctx, opt, &stm, v).await.map_err(|e| match e {
|
||||
let v = w.compute(stk, &ctx, opt, doc).await?;
|
||||
i.prepare(stk, &ctx, opt, &stm, v).await.map_err(|e| match e {
|
||||
Error::InvalidStatementTarget {
|
||||
value: v,
|
||||
} => Error::UpsertStatement {
|
||||
|
@ -56,8 +65,14 @@ impl UpsertStatement {
|
|||
e => e,
|
||||
})?;
|
||||
}
|
||||
// Process the statement
|
||||
let res = i.output(stk, &ctx, opt, &stm).await?;
|
||||
// Catch statement timeout
|
||||
if ctx.is_timedout() {
|
||||
return Err(Error::QueryTimedout);
|
||||
}
|
||||
// Output the results
|
||||
match i.output(stk, ctx, opt, &stm).await? {
|
||||
match res {
|
||||
// This is a single record result
|
||||
Value::Array(mut a) if self.only => match a.len() {
|
||||
// There was exactly one result
|
||||
|
|
|
@ -157,7 +157,10 @@ tokio-tungstenite = { version = "0.23.1", optional = true, features = ["url"] }
|
|||
uuid = { version = "1.10.0", features = ["serde", "v4", "v7"] }
|
||||
|
||||
[lints.rust]
|
||||
unexpected_cfgs = { level = "warn", check-cfg = ['cfg(surrealdb_unstable)'] }
|
||||
unexpected_cfgs = { level = "warn", check-cfg = [
|
||||
'cfg(storage)',
|
||||
'cfg(surrealdb_unstable)',
|
||||
] }
|
||||
|
||||
[lib]
|
||||
name = "surrealdb" # Needed for the nightly crate as we will be renaming it
|
||||
|
|
33
sdk/tests/timeout.rs
Normal file
33
sdk/tests/timeout.rs
Normal file
|
@ -0,0 +1,33 @@
|
|||
mod helpers;
|
||||
mod parse;
|
||||
use helpers::Test;
|
||||
use surrealdb::err::Error;
|
||||
|
||||
#[tokio::test]
|
||||
async fn statement_timeouts() -> Result<(), Error> {
|
||||
let sql = "
|
||||
CREATE ONLY person:ok CONTENT { test: true };
|
||||
CREATE person:test CONTENT { test: true } TIMEOUT 0s;
|
||||
UPSERT person:test CONTENT { test: true } TIMEOUT 0s;
|
||||
UPDATE person:test CONTENT { test: true } TIMEOUT 0s;
|
||||
INSERT { id: person:test, test: true } TIMEOUT 0s;
|
||||
RELATE person:test->know->person:ok TIMEOUT 0s;
|
||||
LET $temp = SELECT * FROM person TIMEOUT 0s;
|
||||
SELECT * FROM person TIMEOUT 0s;
|
||||
DELETE person:test TIMEOUT 0s;
|
||||
";
|
||||
let error = "The query was not executed because it exceeded the timeout";
|
||||
Test::new(sql)
|
||||
.await?
|
||||
.expect_val("{ id: person:ok, test: true }")?
|
||||
.expect_error(error)?
|
||||
.expect_error(error)?
|
||||
.expect_error(error)?
|
||||
.expect_error(error)?
|
||||
.expect_error(error)?
|
||||
.expect_error(error)?
|
||||
.expect_error(error)?
|
||||
.expect_error(error)?;
|
||||
|
||||
Ok(())
|
||||
}
|
Loading…
Reference in a new issue