Performance and behaviour optimisations (#4785)

This commit is contained in:
Tobie Morgan Hitchcock 2024-09-17 14:20:48 +01:00 committed by GitHub
parent 9f1d376716
commit 439ab99e15
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
58 changed files with 1540 additions and 1168 deletions

View file

@ -29,6 +29,10 @@ pub static MAX_OBJECT_PARSING_DEPTH: LazyLock<u32> =
pub static MAX_QUERY_PARSING_DEPTH: LazyLock<u32> =
lazy_env_parse!("SURREAL_MAX_QUERY_PARSING_DEPTH", u32, 20);
/// Specifies the number of computed regexes which can be cached in the engine.
pub static REGEX_CACHE_SIZE: LazyLock<usize> =
lazy_env_parse!("SURREAL_REGEX_CACHE_SIZE", usize, 1_000);
/// Specifies the number of items which can be cached within a single transaction.
pub static TRANSACTION_CACHE_SIZE: LazyLock<usize> =
lazy_env_parse!("SURREAL_TRANSACTION_CACHE_SIZE", usize, 10_000);

View file

@ -11,7 +11,10 @@ use crate::doc::Document;
use crate::err::Error;
use crate::idx::planner::iterators::{IteratorRecord, IteratorRef};
use crate::idx::planner::IterationStage;
use crate::sql::array::Array;
use crate::sql::edges::Edges;
use crate::sql::mock::Mock;
use crate::sql::object::Object;
use crate::sql::table::Table;
use crate::sql::thing::Thing;
use crate::sql::value::Value;
@ -26,17 +29,65 @@ const TARGET: &str = "surrealdb::core::dbs";
#[derive(Clone)]
pub(crate) enum Iterable {
/// Any [Value] which does not exists in storage. This
/// could be the result of a query, an arbritrary
/// SurrealQL value, object, or array of values.
Value(Value),
Table(Table, bool), // true = keys only
Thing(Thing),
TableRange(String, IdRange, bool), // true = keys_only
Edges(Edges),
/// An iterable which does not actually fetch the record
/// data from storage. This is used in CREATE statements
/// where we attempt to write data without first checking
/// if the record exists, throwing an error on failure.
Defer(Thing),
/// An iterable whose Record ID needs to be generated
/// before processing. This is used in CREATE statements
/// when generating a new id, or generating an id based
/// on the id field which is specified within the data.
Yield(Table),
/// An iterable which needs to fetch the data of a
/// specific record before processing the document.
Thing(Thing),
/// An iterable which needs to fetch the related edges
/// of a record before processing each document.
Edges(Edges),
/// An iterable which needs to iterate over the records
/// in a table before processing each document. When the
/// 2nd argument is true, we iterate over keys only.
Table(Table, bool),
/// An iterable which fetches a specific range of records
/// from storage, used in range and time-series scenarios.
/// When the 2nd argument is true, we iterate over keys only.
Range(String, IdRange, bool),
/// An iterable which fetches a record from storage, and
/// which has the specific value to update the record with.
/// This is used in INSERT statements, where each value
/// passed in to the iterable is unique for each record.
Mergeable(Thing, Value),
/// An iterable which fetches a record from storage, and
/// which has the specific value to update the record with.
/// This is used in RELATE statements. The optional value
/// is used in INSERT RELATION statements, where each value
/// passed in to the iterable is unique for each record.
Relatable(Thing, Thing, Thing, Option<Value>),
/// An iterable which iterates over an index range for a
/// table, which then fetches the correesponding records
/// which are matched within the index.
Index(Table, IteratorRef),
}
#[derive(Debug)]
pub(crate) enum Operable {
Value(Arc<Value>),
Mergeable(Arc<Value>, Arc<Value>, bool),
Relatable(Thing, Arc<Value>, Thing, Option<Arc<Value>>, bool),
}
#[derive(Debug)]
pub(crate) enum Workable {
Normal,
Insert(Arc<Value>, bool),
Relate(Thing, Thing, Option<Arc<Value>>, bool),
}
#[derive(Debug)]
pub(crate) struct Processed {
pub(crate) rid: Option<Arc<Thing>>,
@ -44,18 +95,15 @@ pub(crate) struct Processed {
pub(crate) val: Operable,
}
#[derive(Debug)]
pub(crate) enum Operable {
Value(Arc<Value>),
Mergeable(Arc<Value>, Arc<Value>),
Relatable(Thing, Arc<Value>, Thing, Option<Arc<Value>>),
}
#[derive(Debug)]
pub(crate) enum Workable {
Normal,
Insert(Arc<Value>),
Relate(Thing, Thing, Option<Arc<Value>>),
impl Workable {
/// Check if this is the first iteration of an INSERT statement
pub(crate) fn is_insert_initial(&self) -> bool {
matches!(self, Self::Insert(_, false) | Self::Relate(_, _, _, false))
}
/// Check if this is an INSERT with a specific id field
pub(crate) fn is_insert_with_specific_id(&self) -> bool {
matches!(self, Self::Insert(v, _) if v.rid().is_some())
}
}
#[derive(Default)]
@ -97,182 +145,23 @@ impl Iterator {
}
/// Ingests an iterable for processing
pub fn ingest(&mut self, val: Iterable) {
pub(crate) fn ingest(&mut self, val: Iterable) {
self.entries.push(val)
}
/// Prepares a value for processing
pub async fn prepare(
&mut self,
stk: &mut Stk,
ctx: &Context,
opt: &Options,
stm: &Statement<'_>,
val: Value,
) -> Result<(), Error> {
pub(crate) fn prepare(&mut self, stm: &Statement<'_>, val: Value) -> Result<(), Error> {
// Match the values
match val {
Value::Table(v) => match stm.data() {
// There is a data clause so fetch a record id
Some(data) => match stm {
Statement::Create(_) => {
let id = match data.rid(stk, ctx, opt).await? {
// Generate a new id from the id field
Some(id) => id.generate(&v, false)?,
// Generate a new random table id
None => v.generate(),
};
self.ingest(Iterable::Thing(id))
}
_ => {
// Ingest the table for scanning
self.ingest(Iterable::Table(v, false))
}
},
// There is no data clause so create a record id
None => match stm {
Statement::Create(_) => {
// Generate a new random table id
self.ingest(Iterable::Thing(v.generate()))
}
_ => {
// Ingest the table for scanning
self.ingest(Iterable::Table(v, false))
}
},
Value::Mock(v) => self.prepare_mock(stm, v)?,
Value::Table(v) => self.prepare_table(stm, v)?,
Value::Edges(v) => self.prepare_edges(stm, *v)?,
Value::Object(v) => self.prepare_object(stm, v)?,
Value::Array(v) => self.prepare_array(stm, v)?,
Value::Thing(v) => match v.is_range() {
true => self.prepare_range(stm, v, false)?,
false => self.prepare_thing(stm, v)?,
},
Value::Thing(v) => {
// Check if there is a data clause
if let Some(data) = stm.data() {
// Check if there is an id field specified
if let Some(id) = data.rid(stk, ctx, opt).await? {
// Check to see the type of the id
match id {
// The id is a match, so don't error
Value::Thing(id) if id == v => (),
// The id does not match
id => {
return Err(Error::IdMismatch {
value: id.to_string(),
});
}
}
}
}
// Add the record to the iterator
match &v.id {
Id::Range(r) => {
match stm {
Statement::Create(_) => {
return Err(Error::InvalidStatementTarget {
value: v.to_string(),
});
}
_ => {
self.ingest(Iterable::TableRange(v.tb, *r.to_owned(), false));
}
};
}
_ => {
match stm {
Statement::Create(_) => {
self.ingest(Iterable::Defer(v));
}
_ => {
self.ingest(Iterable::Thing(v));
}
};
}
}
}
Value::Mock(v) => {
// Check if there is a data clause
if let Some(data) = stm.data() {
// Check if there is an id field specified
if let Some(id) = data.rid(stk, ctx, opt).await? {
return Err(Error::IdMismatch {
value: id.to_string(),
});
}
}
// Add the records to the iterator
for v in v {
self.ingest(Iterable::Thing(v))
}
}
Value::Edges(v) => {
// Check if this is a create statement
if let Statement::Create(_) = stm {
return Err(Error::InvalidStatementTarget {
value: v.to_string(),
});
}
// Check if there is a data clause
if let Some(data) = stm.data() {
// Check if there is an id field specified
if let Some(id) = data.rid(stk, ctx, opt).await? {
return Err(Error::IdMismatch {
value: id.to_string(),
});
}
}
// Add the record to the iterator
self.ingest(Iterable::Edges(*v));
}
Value::Object(v) => {
// Check if there is a data clause
if let Some(data) = stm.data() {
// Check if there is an id field specified
if let Some(id) = data.rid(stk, ctx, opt).await? {
return Err(Error::IdMismatch {
value: id.to_string(),
});
}
}
// Check if the object has an id field
match v.rid() {
Some(id) => {
// Add the record to the iterator
self.ingest(Iterable::Thing(id))
}
None => {
return Err(Error::InvalidStatementTarget {
value: v.to_string(),
});
}
}
}
Value::Array(v) => {
// Check if there is a data clause
if let Some(data) = stm.data() {
// Check if there is an id field specified
if let Some(id) = data.rid(stk, ctx, opt).await? {
return Err(Error::IdMismatch {
value: id.to_string(),
});
}
}
// Add the records to the iterator
for v in v {
match v {
Value::Thing(v) => self.ingest(Iterable::Thing(v)),
Value::Edges(v) => self.ingest(Iterable::Edges(*v)),
Value::Object(v) => match v.rid() {
Some(v) => self.ingest(Iterable::Thing(v)),
None => {
return Err(Error::InvalidStatementTarget {
value: v.to_string(),
})
}
},
_ => {
return Err(Error::InvalidStatementTarget {
value: v.to_string(),
})
}
}
}
}
v => {
return Err(Error::InvalidStatementTarget {
value: v.to_string(),
@ -283,6 +172,120 @@ impl Iterator {
Ok(())
}
/// Prepares a value for processing
pub(crate) fn prepare_table(&mut self, stm: &Statement<'_>, v: Table) -> Result<(), Error> {
// Add the record to the iterator
match stm.is_create() {
true => self.ingest(Iterable::Yield(v)),
false => self.ingest(Iterable::Table(v, false)),
}
// All ingested ok
Ok(())
}
/// Prepares a value for processing
pub(crate) fn prepare_thing(&mut self, stm: &Statement<'_>, v: Thing) -> Result<(), Error> {
// Add the record to the iterator
match stm.is_deferable() {
true => self.ingest(Iterable::Defer(v)),
false => self.ingest(Iterable::Thing(v)),
}
// All ingested ok
Ok(())
}
/// Prepares a value for processing
pub(crate) fn prepare_mock(&mut self, stm: &Statement<'_>, v: Mock) -> Result<(), Error> {
// Add the records to the iterator
for v in v {
match stm.is_deferable() {
true => self.ingest(Iterable::Defer(v)),
false => self.ingest(Iterable::Thing(v)),
}
}
// All ingested ok
Ok(())
}
/// Prepares a value for processing
pub(crate) fn prepare_edges(&mut self, stm: &Statement<'_>, v: Edges) -> Result<(), Error> {
// Check if this is a create statement
if stm.is_create() {
return Err(Error::InvalidStatementTarget {
value: v.to_string(),
});
}
// Add the record to the iterator
self.ingest(Iterable::Edges(v));
// All ingested ok
Ok(())
}
/// Prepares a value for processing
pub(crate) fn prepare_range(
&mut self,
stm: &Statement<'_>,
v: Thing,
keys: bool,
) -> Result<(), Error> {
// Check if this is a create statement
if stm.is_create() {
return Err(Error::InvalidStatementTarget {
value: v.to_string(),
});
}
// Add the record to the iterator
if let (tb, Id::Range(v)) = (v.tb, v.id) {
self.ingest(Iterable::Range(tb, *v, keys));
}
// All ingested ok
Ok(())
}
/// Prepares a value for processing
pub(crate) fn prepare_object(&mut self, stm: &Statement<'_>, v: Object) -> Result<(), Error> {
// Add the record to the iterator
match v.rid() {
// This object has an 'id' field
Some(v) => match stm.is_deferable() {
true => self.ingest(Iterable::Defer(v)),
false => self.ingest(Iterable::Thing(v)),
},
// This object has no 'id' field
None => {
return Err(Error::InvalidStatementTarget {
value: v.to_string(),
});
}
}
// All ingested ok
Ok(())
}
/// Prepares a value for processing
pub(crate) fn prepare_array(&mut self, stm: &Statement<'_>, v: Array) -> Result<(), Error> {
// Add the records to the iterator
for v in v {
match v {
Value::Mock(v) => self.prepare_mock(stm, v)?,
Value::Table(v) => self.prepare_table(stm, v)?,
Value::Edges(v) => self.prepare_edges(stm, *v)?,
Value::Object(v) => self.prepare_object(stm, v)?,
Value::Thing(v) => match v.is_range() {
true => self.prepare_range(stm, v, false)?,
false => self.prepare_thing(stm, v)?,
},
_ => {
return Err(Error::InvalidStatementTarget {
value: v.to_string(),
})
}
}
}
// All ingested ok
Ok(())
}
/// Process the records and output
pub async fn output(
&mut self,

View file

@ -93,13 +93,8 @@ impl ExplainItem {
name: "Iterate Value".into(),
details: vec![("value", v.to_owned())],
},
Iterable::Table(t, keys_only) => Self {
name: if *keys_only {
"Iterate Table Keys"
} else {
"Iterate Table"
}
.into(),
Iterable::Yield(t) => Self {
name: "Iterate Yield".into(),
details: vec![("table", Value::from(t.0.to_owned()))],
},
Iterable::Thing(t) => Self {
@ -110,7 +105,20 @@ impl ExplainItem {
name: "Iterate Defer".into(),
details: vec![("thing", Value::Thing(t.to_owned()))],
},
Iterable::TableRange(tb, r, keys_only) => Self {
Iterable::Edges(e) => Self {
name: "Iterate Edges".into(),
details: vec![("from", Value::Thing(e.from.to_owned()))],
},
Iterable::Table(t, keys_only) => Self {
name: if *keys_only {
"Iterate Table Keys"
} else {
"Iterate Table"
}
.into(),
details: vec![("table", Value::from(t.0.to_owned()))],
},
Iterable::Range(tb, r, keys_only) => Self {
name: if *keys_only {
"Iterate Range Keys"
} else {
@ -119,10 +127,6 @@ impl ExplainItem {
.into(),
details: vec![("table", tb.to_owned().into()), ("range", r.to_owned().into())],
},
Iterable::Edges(e) => Self {
name: "Iterate Edges".into(),
details: vec![("from", Value::Thing(e.from.to_owned()))],
},
Iterable::Mergeable(t, v) => Self {
name: "Iterate Mergeable".into(),
details: vec![("thing", Value::Thing(t.to_owned())), ("value", v.to_owned())],

View file

@ -136,16 +136,17 @@ impl<'a> Processor<'a> {
if ctx.is_ok() {
match iterable {
Iterable::Value(v) => self.process_value(stk, ctx, opt, stm, v).await?,
Iterable::Yield(v) => self.process_yield(stk, ctx, opt, stm, v).await?,
Iterable::Thing(v) => self.process_thing(stk, ctx, opt, stm, v).await?,
Iterable::Defer(v) => self.process_defer(stk, ctx, opt, stm, v).await?,
Iterable::TableRange(tb, v, keys_only) => {
Iterable::Edges(e) => self.process_edges(stk, ctx, opt, stm, e).await?,
Iterable::Range(tb, v, keys_only) => {
if keys_only {
self.process_range_keys(stk, ctx, opt, stm, &tb, v).await?
} else {
self.process_range(stk, ctx, opt, stm, &tb, v).await?
}
}
Iterable::Edges(e) => self.process_edge(stk, ctx, opt, stm, e).await?,
Iterable::Table(v, keys_only) => {
let ctx = Self::check_query_planner_context(ctx, &v);
if keys_only {
@ -196,6 +197,36 @@ impl<'a> Processor<'a> {
self.process(stk, ctx, opt, stm, pro).await
}
async fn process_yield(
&mut self,
stk: &mut Stk,
ctx: &Context,
opt: &Options,
stm: &Statement<'_>,
v: Table,
) -> Result<(), Error> {
// Fetch the record id if specified
let v = match stm.data() {
// There is a data clause so fetch a record id
Some(data) => match data.rid(stk, ctx, opt).await? {
// Generate a new id from the id field
Some(id) => id.generate(&v, false)?,
// Generate a new random table id
None => v.generate(),
},
// There is no data clause so create a record id
None => v.generate(),
};
// Pass the value through
let pro = Processed {
rid: Some(v.into()),
ir: None,
val: Operable::Value(Value::None.into()),
};
// Process the document record
self.process(stk, ctx, opt, stm, pro).await
}
async fn process_defer(
&mut self,
stk: &mut Stk,
@ -259,21 +290,11 @@ impl<'a> Processor<'a> {
) -> Result<(), Error> {
// Check that the table exists
ctx.tx().check_ns_db_tb(opt.ns()?, opt.db()?, &v.tb, opt.strict).await?;
// Fetch the data from the store
let key = thing::new(opt.ns()?, opt.db()?, &v.tb, &v.id);
let val = ctx.tx().get(key, None).await?;
// Parse the data from the store
let x = match val {
Some(v) => Value::from(v),
None => Value::None,
};
// Create a new operable value
let val = Operable::Mergeable(x.into(), o.into());
// Process the document record
let pro = Processed {
rid: Some(v.into()),
ir: None,
val,
val: Operable::Mergeable(Value::None.into(), o.into(), false),
};
self.process(stk, ctx, opt, stm, pro).await?;
// Everything ok
@ -299,7 +320,7 @@ impl<'a> Processor<'a> {
None => Value::None,
};
// Create a new operable value
let val = Operable::Relatable(f, x.into(), w, o.map(|v| v.into()));
let val = Operable::Relatable(f, x.into(), w, o.map(|v| v.into()), false);
// Process the document record
let pro = Processed {
rid: Some(v.into()),
@ -503,7 +524,7 @@ impl<'a> Processor<'a> {
Ok(())
}
async fn process_edge(
async fn process_edges(
&mut self,
stk: &mut Stk,
ctx: &Context,

View file

@ -116,18 +116,27 @@ impl<'a> fmt::Display for Statement<'a> {
}
impl<'a> Statement<'a> {
/// Check the type of statement
#[inline]
/// Check if this is a SELECT statement
pub fn is_select(&self) -> bool {
matches!(self, Statement::Select(_))
}
/// Check the type of statement
#[inline]
/// Check if this is a CREATE statement
pub fn is_create(&self) -> bool {
matches!(self, Statement::Create(_))
}
/// Check if this is a DELETE statement
pub fn is_delete(&self) -> bool {
matches!(self, Statement::Delete(_))
}
/// Returns whether retrieval can be deferred
pub fn is_deferable(&self) -> bool {
matches!(self, Statement::Create(_) | Statement::Upsert(_))
}
/// Returns whether this requires savepoints
pub fn is_retryable(&self) -> bool {
matches!(self, Statement::Insert(_) if self.data().is_some())
}
/// Returns any query fields if specified
#[inline]
pub fn expr(&self) -> Option<&Fields> {
match self {
Statement::Select(v) => Some(&v.expr),
@ -136,15 +145,13 @@ impl<'a> Statement<'a> {
}
}
/// Returns any OMIT clause if specified
#[inline]
pub fn omit(&self) -> Option<&Idioms> {
match self {
Statement::Select(v) => v.omit.as_ref(),
_ => None,
}
}
/// Returns any SET clause if specified
#[inline]
/// Returns any SET, CONTENT, or MERGE clause if specified
pub fn data(&self) -> Option<&Data> {
match self {
Statement::Create(v) => v.data.as_ref(),
@ -156,7 +163,6 @@ impl<'a> Statement<'a> {
}
}
/// Returns any WHERE clause if specified
#[inline]
pub fn conds(&self) -> Option<&Cond> {
match self {
Statement::Live(v) => v.cond.as_ref(),
@ -168,7 +174,6 @@ impl<'a> Statement<'a> {
}
}
/// Returns any SPLIT clause if specified
#[inline]
pub fn split(&self) -> Option<&Splits> {
match self {
Statement::Select(v) => v.split.as_ref(),
@ -176,7 +181,6 @@ impl<'a> Statement<'a> {
}
}
/// Returns any GROUP clause if specified
#[inline]
pub fn group(&self) -> Option<&Groups> {
match self {
Statement::Select(v) => v.group.as_ref(),
@ -184,7 +188,6 @@ impl<'a> Statement<'a> {
}
}
/// Returns any ORDER clause if specified
#[inline]
pub fn order(&self) -> Option<&Orders> {
match self {
Statement::Select(v) => v.order.as_ref(),
@ -192,7 +195,6 @@ impl<'a> Statement<'a> {
}
}
/// Returns any FETCH clause if specified
#[inline]
pub fn fetch(&self) -> Option<&Fetchs> {
match self {
Statement::Select(v) => v.fetch.as_ref(),
@ -200,7 +202,6 @@ impl<'a> Statement<'a> {
}
}
/// Returns any START clause if specified
#[inline]
pub fn start(&self) -> Option<&Start> {
match self {
Statement::Select(v) => v.start.as_ref(),
@ -208,7 +209,6 @@ impl<'a> Statement<'a> {
}
}
/// Returns any LIMIT clause if specified
#[inline]
pub fn limit(&self) -> Option<&Limit> {
match self {
Statement::Select(v) => v.limit.as_ref(),
@ -216,7 +216,6 @@ impl<'a> Statement<'a> {
}
}
/// Returns any RETURN clause if specified
#[inline]
pub fn output(&self) -> Option<&Output> {
match self {
Statement::Create(v) => v.output.as_ref(),
@ -229,7 +228,6 @@ impl<'a> Statement<'a> {
}
}
/// Returns any PARALLEL clause if specified
#[inline]
#[cfg(not(target_arch = "wasm32"))]
pub fn parallel(&self) -> bool {
match self {
@ -243,9 +241,7 @@ impl<'a> Statement<'a> {
_ => false,
}
}
/// Returns any TEMPFILES clause if specified
#[inline]
#[cfg(storage)]
pub fn tempfiles(&self) -> bool {
match self {
@ -253,9 +249,7 @@ impl<'a> Statement<'a> {
_ => false,
}
}
/// Returns any EXPLAIN clause if specified
#[inline]
pub fn explain(&self) -> Option<&Explain> {
match self {
Statement::Select(v) => v.explain.as_ref(),

View file

@ -1,78 +0,0 @@
use crate::ctx::Context;
use crate::dbs::Options;
use crate::dbs::Statement;
use crate::doc::Document;
use crate::err::Error;
use crate::sql::permission::Permission;
use reblessive::tree::Stk;
impl Document {
pub async fn allow(
&self,
stk: &mut Stk,
ctx: &Context,
opt: &Options,
stm: &Statement<'_>,
) -> Result<(), Error> {
// Check if this record exists
if self.id.is_some() {
// Should we run permissions checks?
if opt.check_perms(stm.into())? {
// Check that record authentication matches session
if opt.auth.is_record() {
let ns = opt.ns()?;
if opt.auth.level().ns() != Some(ns) {
return Err(Error::NsNotAllowed {
ns: ns.into(),
});
}
let db = opt.db()?;
if opt.auth.level().db() != Some(db) {
return Err(Error::DbNotAllowed {
db: db.into(),
});
}
}
// Get the table
let tb = self.tb(ctx, opt).await?;
// Get the permission clause
let perms = if stm.is_delete() {
&tb.permissions.delete
} else if stm.is_select() {
&tb.permissions.select
} else if self.is_new() {
&tb.permissions.create
} else {
&tb.permissions.update
};
// Process the table permissions
match perms {
Permission::None => return Err(Error::Ignore),
Permission::Full => return Ok(()),
Permission::Specific(e) => {
// Disable permissions
let opt = &opt.new_with_perms(false);
// Process the PERMISSION clause
if !e
.compute(
stk,
ctx,
opt,
Some(match stm.is_delete() {
true => &self.initial,
false => &self.current,
}),
)
.await?
.is_truthy()
{
return Err(Error::Ignore);
}
}
}
}
}
// Carry on
Ok(())
}
}

View file

@ -6,11 +6,132 @@ use crate::doc::Document;
use crate::err::Error;
use crate::sql::data::Data;
use crate::sql::operator::Operator;
use crate::sql::paths::EDGE;
use crate::sql::paths::IN;
use crate::sql::paths::OUT;
use crate::sql::value::Value;
use reblessive::tree::Stk;
impl Document {
pub async fn alter(
/// Clears all of the content of this document.
/// This is used to empty the current content
/// of the document within a `DELETE` statement.
/// This function only clears the document in
/// memory, and does not store this on disk.
pub async fn clear_record_data(
&mut self,
_ctx: &Context,
_opt: &Options,
_stm: &Statement<'_>,
) -> Result<(), Error> {
self.current.doc.to_mut().clear()
}
/// Sets the default field data that should be
/// present on this document. For normal records
/// the `id` field is always specified, and for
/// relation records, the `in`, `out`, and the
/// hidden `edge` field are always present. This
/// ensures that any user modifications of these
/// fields are reset back to the original state.
pub async fn default_record_data(
&mut self,
_ctx: &Context,
_opt: &Options,
_stm: &Statement<'_>,
) -> Result<(), Error> {
// Get the record id
let rid = self.id()?;
// Set default field values
self.current.doc.to_mut().def(&rid);
// This is a RELATE statement, so reset fields
if let Workable::Relate(l, r, _, _) = &self.extras {
// Mark that this is an edge node
self.current.doc.to_mut().put(&*EDGE, Value::Bool(true));
// If this document existed before, check the `in` field
match (self.initial.doc.pick(&*IN), self.is_new()) {
// If the document id matches, then all good
(Value::Thing(id), false) if id.eq(l) => {
self.current.doc.to_mut().put(&*IN, l.clone().into());
}
// If the document is new then all good
(_, true) => {
self.current.doc.to_mut().put(&*IN, l.clone().into());
}
// Otherwise this is attempting to override the `in` field
(v, _) => {
return Err(Error::InOverride {
value: v.to_string(),
})
}
}
// If this document existed before, check the `out` field
match (self.initial.doc.pick(&*OUT), self.is_new()) {
// If the document id matches, then all good
(Value::Thing(id), false) if id.eq(r) => {
self.current.doc.to_mut().put(&*OUT, r.clone().into());
}
// If the document is new then all good
(_, true) => {
self.current.doc.to_mut().put(&*OUT, r.clone().into());
}
// Otherwise this is attempting to override the `in` field
(v, _) => {
return Err(Error::OutOverride {
value: v.to_string(),
})
}
}
}
// This is an UPDATE of a graph edge, so reset fields
if self.initial.doc.pick(&*EDGE).is_true() {
self.current.doc.to_mut().put(&*EDGE, Value::Bool(true));
self.current.doc.to_mut().put(&*IN, self.initial.doc.pick(&*IN));
self.current.doc.to_mut().put(&*OUT, self.initial.doc.pick(&*OUT));
}
// Carry on
Ok(())
}
/// Updates the current document using the data
/// passed in to each document. This is relevant
/// for INSERT and RELATE queries where each
/// document has its own data block. This
/// function also ensures that standard default
/// fields are set and reset before and after the
/// document data is modified.
pub async fn process_merge_data(
&mut self,
stk: &mut Stk,
ctx: &Context,
opt: &Options,
_stm: &Statement<'_>,
) -> Result<(), Error> {
// Get the record id
let rid = self.id()?;
// Set default field values
self.current.doc.to_mut().def(&rid);
// This is an INSERT statement
if let Workable::Insert(v, _) = &self.extras {
let v = v.compute(stk, ctx, opt, Some(&self.current)).await?;
self.current.doc.to_mut().merge(v)?;
}
// This is an INSERT RELATION statement
if let Workable::Relate(_, _, Some(v), _) = &self.extras {
let v = v.compute(stk, ctx, opt, Some(&self.current)).await?;
self.current.doc.to_mut().merge(v)?;
}
// Set default field values
self.current.doc.to_mut().def(&rid);
// Carry on
Ok(())
}
/// Updates the current document using the data
/// clause present on the statement. This can be
/// one of CONTENT, REPLACE, MERGE, PATCH, SET,
/// UNSET, or ON DUPLICATE KEY UPDATE. This
/// function also ensures that standard default
/// fields are set and reset before and after the
/// document data is modified.
pub async fn process_record_data(
&mut self,
stk: &mut Stk,
ctx: &Context,
@ -18,9 +139,9 @@ impl Document {
stm: &Statement<'_>,
) -> Result<(), Error> {
// Get the record id
let rid = self.id.as_ref().unwrap();
let rid = self.id()?;
// Set default field values
self.current.doc.to_mut().def(rid);
self.current.doc.to_mut().def(&rid);
// The statement has a data clause
if let Some(v) = stm.data() {
match v {
@ -74,14 +195,14 @@ impl Document {
// Duplicate context
let mut ctx = MutableContext::new(ctx);
// Add insertable value
if let Workable::Insert(value) = &self.extras {
if let Workable::Insert(value, _) = &self.extras {
ctx.add_value("input", value.clone());
}
if let Workable::Relate(_, _, Some(value)) = &self.extras {
if let Workable::Relate(_, _, Some(value), _) = &self.extras {
ctx.add_value("input", value.clone());
}
// Freeze the context
let ctx: Context = ctx.into();
let ctx = ctx.freeze();
// Process ON DUPLICATE KEY clause
for x in x.iter() {
let v = x.2.compute(stk, &ctx, opt, Some(&self.current)).await?;
@ -111,7 +232,7 @@ impl Document {
};
};
// Set default field values
self.current.doc.to_mut().def(rid);
self.current.doc.to_mut().def(&rid);
// Carry on
Ok(())
}

View file

@ -5,7 +5,7 @@ use crate::doc::Document;
use crate::err::Error;
impl Document {
pub async fn changefeeds(
pub async fn process_changefeeds(
&self,
ctx: &Context,
opt: &Options,

View file

@ -1,33 +1,302 @@
use crate::ctx::Context;
use crate::dbs::Options;
use crate::dbs::Statement;
use crate::doc::{CursorDoc, Document};
use crate::dbs::Workable;
use crate::doc::Document;
use crate::err::Error;
use crate::sql::Cond;
use crate::sql::paths::ID;
use crate::sql::paths::IN;
use crate::sql::paths::OUT;
use crate::sql::permission::Permission;
use crate::sql::value::Value;
use reblessive::tree::Stk;
impl Document {
pub async fn check(
/// Checks whether this operation is allowed on
/// the table for this document. When inserting
/// an edge or relation, we check that the table
/// type is `ANY` or `RELATION`. When inserting
/// a node or normal record, we check that the
/// table type is `ANY` or `NORMAL`.
pub async fn check_table_type(
&mut self,
ctx: &Context,
opt: &Options,
stm: &Statement<'_>,
) -> Result<(), Error> {
// Get the table for this document
let tb = self.tb(ctx, opt).await?;
// Determine the type of statement
match stm {
Statement::Create(_) => {
if !tb.allows_normal() {
return Err(Error::TableCheck {
thing: self.id()?.to_string(),
relation: false,
target_type: tb.kind.to_string(),
});
}
}
Statement::Upsert(_) => {
if !tb.allows_normal() {
return Err(Error::TableCheck {
thing: self.id()?.to_string(),
relation: false,
target_type: tb.kind.to_string(),
});
}
}
Statement::Relate(_) => {
if !tb.allows_relation() {
return Err(Error::TableCheck {
thing: self.id()?.to_string(),
relation: true,
target_type: tb.kind.to_string(),
});
}
}
Statement::Insert(_) => match self.extras {
Workable::Relate(_, _, _, _) => {
if !tb.allows_relation() {
return Err(Error::TableCheck {
thing: self.id()?.to_string(),
relation: true,
target_type: tb.kind.to_string(),
});
}
}
_ => {
if !tb.allows_normal() {
return Err(Error::TableCheck {
thing: self.id()?.to_string(),
relation: false,
target_type: tb.kind.to_string(),
});
}
}
},
_ => {}
}
// Carry on
Ok(())
}
/// Checks that a specifically selected record
/// actually exists in the underlying datastore.
/// If the user specifies a record directly
/// using a Record ID, and that record does not
/// exist, then this function will exit early.
pub async fn check_record_exists(
&self,
_ctx: &Context,
_opt: &Options,
_stm: &Statement<'_>,
) -> Result<(), Error> {
// Check if this record exists
if self.id.is_some() && self.current.doc.is_none() {
return Err(Error::Ignore);
}
// Carry on
Ok(())
}
/// Checks that a specifically selected record
/// actually exists in the underlying datastore.
/// If the user specifies a record directly
/// using a Record ID, and that record does not
/// exist, then this function will exit early.
pub async fn check_data_fields(
&self,
stk: &mut Stk,
ctx: &Context,
opt: &Options,
stm: &Statement<'_>,
) -> Result<(), Error> {
Self::check_cond(stk, ctx, opt, stm.conds(), &self.current).await
// Get the record id
let rid = self.id()?;
// This is a CREATE, UPSERT, UPDATE statement
if let Workable::Normal = &self.extras {
// This is a CONTENT, MERGE or SET clause
if let Some(data) = stm.data() {
// Check if there is an id field specified
if let Some(field) = data.pick(stk, ctx, opt, &*ID).await? {
match field {
// The id is a match, so don't error
Value::Thing(v) if v.eq(&rid) => (),
// The id is a match, so don't error
v if rid.id.is(&v) => (),
// The in field does not match
v => match v.convert_to_record() {
// This is a value which matches the id
Ok(v) if v.eq(&rid) => (),
// The value is a record but doesn't match
Ok(v) => {
return Err(Error::IdMismatch {
value: v.to_string(),
})
}
// The in field does not match at all
Err(Error::ConvertTo {
from,
..
}) => {
return Err(Error::IdMismatch {
value: from.to_string(),
})
}
// Return any other error
Err(e) => return Err(e),
},
}
}
}
}
// This is a RELATE statement
if let Workable::Relate(l, r, v, _) = &self.extras {
// This is a RELATE statement
if let Some(data) = stm.data() {
// Check that the 'in' field matches
if let Some(field) = data.pick(stk, ctx, opt, &*IN).await? {
match field {
// The in field is a match, so don't error
Value::Thing(v) if v.eq(l) => (),
// The in is a match, so don't error
v if l.id.is(&v) => (),
// The in field does not match
v => match v.convert_to_record() {
// This is a value which matches the id
Ok(v) if v.eq(l) => (),
// The value is a record but doesn't match
Ok(v) => {
return Err(Error::InMismatch {
value: v.to_string(),
})
}
// The in field does not match at all
Err(Error::ConvertTo {
from,
..
}) => {
return Err(Error::InMismatch {
value: from.to_string(),
})
}
// Return any other error
Err(e) => return Err(e),
},
}
}
// Check that the 'out' field matches
if let Some(field) = data.pick(stk, ctx, opt, &*OUT).await? {
match field {
// The out field is a match, so don't error
Value::Thing(v) if v.eq(r) => (),
// The out is a match, so don't error
v if r.id.is(&v) => (),
// The in field does not match
v => match v.convert_to_record() {
// This is a value which matches the id
Ok(v) if v.eq(r) => (),
// The value is a record but doesn't match
Ok(v) => {
return Err(Error::OutMismatch {
value: v.to_string(),
})
}
// The in field does not match at all
Err(Error::ConvertTo {
from,
..
}) => {
return Err(Error::OutMismatch {
value: from.to_string(),
})
}
// Return any other error
Err(e) => return Err(e),
},
}
}
}
// This is a INSERT RELATION statement
if let Some(data) = v {
// Check that the 'in' field matches
match data.pick(&*IN).compute(stk, ctx, opt, Some(&self.current)).await? {
// The in field is a match, so don't error
Value::Thing(v) if v.eq(l) => (),
// The in is a match, so don't error
v if l.id.is(&v) => (),
// The in field does not match
v => match v.convert_to_record() {
// This is a value which matches the id
Ok(v) if v.eq(l) => (),
// The value is a record but doesn't match
Ok(v) => {
return Err(Error::InMismatch {
value: v.to_string(),
})
}
// The in field does not match at all
Err(Error::ConvertTo {
from,
..
}) => {
return Err(Error::InMismatch {
value: from.to_string(),
})
}
// Return any other error
Err(e) => return Err(e),
},
}
// Check that the 'out' field matches
match data.pick(&*OUT).compute(stk, ctx, opt, Some(&self.current)).await? {
// The out field is a match, so don't error
Value::Thing(v) if v.eq(r) => (),
// The out is a match, so don't error
v if l.id.is(&v) => (),
// The out field does not match
v => match v.convert_to_record() {
// This is a value which matches the id
Ok(v) if v.eq(l) => (),
// The value is a record but doesn't match
Ok(v) => {
return Err(Error::OutMismatch {
value: v.to_string(),
})
}
// The out field does not match at all
Err(Error::ConvertTo {
from,
..
}) => {
return Err(Error::OutMismatch {
value: from.to_string(),
})
}
// Return any other error
Err(e) => return Err(e),
},
}
}
}
// Carry on
Ok(())
}
pub(crate) async fn check_cond(
/// Checks that the `WHERE` condition on a query
/// matches before proceeding with processing
/// the document. This ensures that records from
/// a table, or from an index can be filtered out
/// before being included within the query output.
pub async fn check_where_condition(
&self,
stk: &mut Stk,
ctx: &Context,
opt: &Options,
cond: Option<&Cond>,
doc: &CursorDoc,
stm: &Statement<'_>,
) -> Result<(), Error> {
// Check where condition
if let Some(cond) = cond {
if let Some(cond) = stm.conds() {
// Check if the expression is truthy
if !cond.compute(stk, ctx, opt, Some(doc)).await?.is_truthy() {
if !cond.compute(stk, ctx, opt, Some(&self.current)).await?.is_truthy() {
// Ignore this document
return Err(Error::Ignore);
}
@ -35,4 +304,114 @@ impl Document {
// Carry on
Ok(())
}
/// Checks the `PERMISSIONS` clause on the table
/// for this record, returning immediately if the
/// permissions are `NONE`. This function does not
/// check any custom advanced table permissions,
/// which should be checked at a later stage.
pub async fn check_permissions_quick(
&self,
_stk: &mut Stk,
ctx: &Context,
opt: &Options,
stm: &Statement<'_>,
) -> Result<(), Error> {
// Check if this record exists
if self.id.is_some() {
// Should we run permissions checks?
if opt.check_perms(stm.into())? {
// Get the table for this document
let table = self.tb(ctx, opt).await?;
// Get the permissions for this table
let perms = if stm.is_delete() {
&table.permissions.delete
} else if stm.is_select() {
&table.permissions.select
} else if self.is_new() {
&table.permissions.create
} else {
&table.permissions.update
};
// Exit early if permissions are NONE
if perms.is_none() {
return Err(Error::Ignore);
}
}
}
// Carry on
Ok(())
}
/// Checks the `PERMISSIONS` clause on the table for
/// this record, processing all advanced permissions
/// clauses and evaluating the expression. This
/// function checks and evaluates `FULL`, `NONE`,
/// and specific permissions clauses on the table.
pub async fn check_permissions_table(
&self,
stk: &mut Stk,
ctx: &Context,
opt: &Options,
stm: &Statement<'_>,
) -> Result<(), Error> {
// Check if this record exists
if self.id.is_some() {
// Should we run permissions checks?
if opt.check_perms(stm.into())? {
// Check that record authentication matches session
if opt.auth.is_record() {
let ns = opt.ns()?;
if opt.auth.level().ns() != Some(ns) {
return Err(Error::NsNotAllowed {
ns: ns.into(),
});
}
let db = opt.db()?;
if opt.auth.level().db() != Some(db) {
return Err(Error::DbNotAllowed {
db: db.into(),
});
}
}
// Get the table
let table = self.tb(ctx, opt).await?;
// Get the permission clause
let perms = if stm.is_delete() {
&table.permissions.delete
} else if stm.is_select() {
&table.permissions.select
} else if self.is_new() {
&table.permissions.create
} else {
&table.permissions.update
};
// Process the table permissions
match perms {
Permission::None => return Err(Error::Ignore),
Permission::Full => return Ok(()),
Permission::Specific(e) => {
// Disable permissions
let opt = &opt.new_with_perms(false);
// Process the PERMISSION clause
if !e
.compute(
stk,
ctx,
opt,
Some(match stm.is_delete() {
true => &self.initial,
false => &self.current,
}),
)
.await?
.is_truthy()
{
return Err(Error::Ignore);
}
}
}
}
}
// Carry on
Ok(())
}
}

View file

@ -1,59 +0,0 @@
use crate::ctx::Context;
use crate::dbs::Options;
use crate::dbs::Statement;
use crate::doc::Document;
use crate::err::Error;
use crate::sql::idiom::Idiom;
use reblessive::tree::Stk;
impl Document {
pub async fn clean(
&mut self,
stk: &mut Stk,
ctx: &Context,
opt: &Options,
_stm: &Statement<'_>,
) -> Result<(), Error> {
// Get the table
let tb = self.tb(ctx, opt).await?;
// This table is schemafull
if tb.full {
// Create a vector to store the keys
let mut keys: Vec<Idiom> = vec![];
// Loop through all field statements
for fd in self.fd(ctx, opt).await?.iter() {
// Is this a schemaless field?
match fd.flex || fd.kind.as_ref().is_some_and(|k| k.is_literal_nested()) {
false => {
// Loop over this field in the document
for k in self.current.doc.as_ref().each(&fd.name).into_iter() {
keys.push(k);
}
}
true => {
// Loop over every field under this field in the document
for k in
self.current.doc.as_ref().every(Some(&fd.name), true, true).into_iter()
{
keys.push(k);
}
}
}
}
// Loop over every field in the document
for fd in self.current.doc.as_ref().every(None, true, true).iter() {
if !keys.contains(fd) {
match fd {
fd if fd.is_id() => continue,
fd if fd.is_in() => continue,
fd if fd.is_out() => continue,
fd if fd.is_meta() => continue,
fd => self.current.doc.to_mut().del(stk, ctx, opt, fd).await?,
}
}
}
}
// Carry on
Ok(())
}
}

View file

@ -29,13 +29,13 @@ impl Document {
// Setup a new workable
let ins = match pro.val {
Operable::Value(v) => (v, Workable::Normal),
Operable::Mergeable(v, o) => (v, Workable::Insert(o)),
Operable::Relatable(f, v, w, o) => (v, Workable::Relate(f, w, o)),
Operable::Mergeable(v, o, u) => (v, Workable::Insert(o, u)),
Operable::Relatable(f, v, w, o, u) => (v, Workable::Relate(f, w, o, u)),
};
// Setup a new document
let mut doc = Document::new(pro.rid, pro.ir, ins.0, ins.1);
// Optionally create a save point so we can roll back any upcoming changes
let is_save_point = if !stm.is_select() {
let is_save_point = if stm.is_retryable() {
ctx.tx().lock().await.new_save_point().await;
true
} else {
@ -75,16 +75,16 @@ impl Document {
ir: None,
val: match doc.extras {
Workable::Normal => Operable::Value(val),
Workable::Insert(o) => Operable::Mergeable(val, o),
Workable::Relate(f, w, o) => Operable::Relatable(f, val, w, o),
Workable::Insert(o, _) => Operable::Mergeable(val, o, true),
Workable::Relate(f, w, o, _) => Operable::Relatable(f, val, w, o, true),
},
};
// Go to top of loop
continue;
}
// This record didn't match conditions, so skip
Err(Error::Ignore) => Err(Error::Ignore),
// If any other error was received, then let's
// pass that error through and return an error
// Pass other errors through and return the error
Err(e) => {
// We roll back any change following the save point
if is_save_point {

View file

@ -14,31 +14,20 @@ impl Document {
opt: &Options,
stm: &Statement<'_>,
) -> Result<Value, Error> {
// Check if table has current relation status
self.relation(ctx, opt, stm).await?;
// Alter record data
self.alter(stk, ctx, opt, stm).await?;
// Merge fields data
self.field(stk, ctx, opt, stm).await?;
// Reset fields data
self.reset(ctx, opt, stm).await?;
// Clean fields data
self.clean(stk, ctx, opt, stm).await?;
// Check if allowed
self.allow(stk, ctx, opt, stm).await?;
// Store record data
self.store(ctx, opt, stm).await?;
// Store index data
self.index(stk, ctx, opt, stm).await?;
// Run table queries
self.table(stk, ctx, opt, stm).await?;
// Run lives queries
self.lives(stk, ctx, opt, stm).await?;
// Run change feeds queries
self.changefeeds(ctx, opt, stm).await?;
// Run event queries
self.event(stk, ctx, opt, stm).await?;
// Yield document
self.check_permissions_quick(stk, ctx, opt, stm).await?;
self.check_table_type(ctx, opt, stm).await?;
self.check_data_fields(stk, ctx, opt, stm).await?;
self.process_record_data(stk, ctx, opt, stm).await?;
self.process_table_fields(stk, ctx, opt, stm).await?;
self.cleanup_table_fields(stk, ctx, opt, stm).await?;
self.default_record_data(ctx, opt, stm).await?;
self.check_permissions_table(stk, ctx, opt, stm).await?;
self.store_record_data(ctx, opt, stm).await?;
self.store_index_data(stk, ctx, opt, stm).await?;
self.process_table_views(stk, ctx, opt, stm).await?;
self.process_table_lives(stk, ctx, opt, stm).await?;
self.process_table_events(stk, ctx, opt, stm).await?;
self.process_changefeeds(ctx, opt, stm).await?;
self.pluck(stk, ctx, opt, stm).await
}
}

View file

@ -14,25 +14,17 @@ impl Document {
opt: &Options,
stm: &Statement<'_>,
) -> Result<Value, Error> {
// Check where clause
self.check(stk, ctx, opt, stm).await?;
// Check if allowed
self.allow(stk, ctx, opt, stm).await?;
// Erase document
self.erase(ctx, opt, stm).await?;
// Purge index data
self.index(stk, ctx, opt, stm).await?;
// Purge record data
self.check_record_exists(ctx, opt, stm).await?;
self.check_permissions_quick(stk, ctx, opt, stm).await?;
self.check_where_condition(stk, ctx, opt, stm).await?;
self.check_permissions_table(stk, ctx, opt, stm).await?;
self.clear_record_data(ctx, opt, stm).await?;
self.store_index_data(stk, ctx, opt, stm).await?;
self.purge(stk, ctx, opt, stm).await?;
// Run table queries
self.table(stk, ctx, opt, stm).await?;
// Run lives queries
self.lives(stk, ctx, opt, stm).await?;
// Run change feeds queries
self.changefeeds(ctx, opt, stm).await?;
// Run event queries
self.event(stk, ctx, opt, stm).await?;
// Yield document
self.process_table_views(stk, ctx, opt, stm).await?;
self.process_table_lives(stk, ctx, opt, stm).await?;
self.process_table_events(stk, ctx, opt, stm).await?;
self.process_changefeeds(ctx, opt, stm).await?;
self.pluck(stk, ctx, opt, stm).await
}
}

View file

@ -79,7 +79,6 @@ impl CursorValue {
impl Deref for CursorValue {
type Target = Value;
fn deref(&self) -> &Self::Target {
self.as_ref()
}
@ -191,7 +190,15 @@ impl Document {
/// Check if document is being created
pub fn is_new(&self) -> bool {
self.initial.doc.as_ref().is_none() && self.current.doc.as_ref().is_some()
self.initial.doc.as_ref().is_none()
}
/// Retrieve the record id for this document
pub fn id(&self) -> Result<Arc<Thing>, Error> {
match self.id.as_ref() {
Some(id) => Ok(id.clone()),
_ => Err(fail!("Expected a document id to be present")),
}
}
/// Get the table for this document
@ -203,9 +210,9 @@ impl Document {
// Get transaction
let txn = ctx.tx();
// Get the record id
let rid = self.id.as_ref().unwrap();
let id = self.id()?;
// Get the table definition
let tb = txn.get_tb(opt.ns()?, opt.db()?, &rid.tb).await;
let tb = txn.get_tb(opt.ns()?, opt.db()?, &id.tb).await;
// Return the table or attempt to define it
match tb {
// The table doesn't exist
@ -215,7 +222,7 @@ impl Document {
// Allowed to run?
opt.is_allowed(Action::Edit, ResourceKind::Table, &Base::Db)?;
// We can create the table automatically
txn.ensure_ns_db_tb(opt.ns()?, opt.db()?, &rid.tb, opt.strict).await
txn.ensure_ns_db_tb(opt.ns()?, opt.db()?, &id.tb, opt.strict).await
}
// There was an error
Err(err) => Err(err),
@ -230,7 +237,7 @@ impl Document {
opt: &Options,
) -> Result<Arc<[DefineTableStatement]>, Error> {
// Get the record id
let id = self.id.as_ref().unwrap();
let id = self.id()?;
// Get the table definitions
ctx.tx().all_tb_views(opt.ns()?, opt.db()?, &id.tb).await
}
@ -241,7 +248,7 @@ impl Document {
opt: &Options,
) -> Result<Arc<[DefineEventStatement]>, Error> {
// Get the record id
let id = self.id.as_ref().unwrap();
let id = self.id()?;
// Get the event definitions
ctx.tx().all_tb_events(opt.ns()?, opt.db()?, &id.tb).await
}
@ -252,7 +259,7 @@ impl Document {
opt: &Options,
) -> Result<Arc<[DefineFieldStatement]>, Error> {
// Get the record id
let id = self.id.as_ref().unwrap();
let id = self.id()?;
// Get the field definitions
ctx.tx().all_tb_fields(opt.ns()?, opt.db()?, &id.tb, None).await
}
@ -263,14 +270,14 @@ impl Document {
opt: &Options,
) -> Result<Arc<[DefineIndexStatement]>, Error> {
// Get the record id
let id = self.id.as_ref().unwrap();
let id = self.id()?;
// Get the index definitions
ctx.tx().all_tb_indexes(opt.ns()?, opt.db()?, &id.tb).await
}
// Get the lives for this document
pub async fn lv(&self, ctx: &Context, opt: &Options) -> Result<Arc<[LiveStatement]>, Error> {
// Get the record id
let id = self.id.as_ref().unwrap();
let id = self.id()?;
// Get the table definition
ctx.tx().all_tb_lives(opt.ns()?, opt.db()?, &id.tb).await
}

View file

@ -13,7 +13,7 @@ use crate::sql::Relation;
use crate::sql::TableType;
impl Document {
pub async fn edges(
pub async fn store_edges_data(
&mut self,
ctx: &Context,
opt: &Options,
@ -30,9 +30,9 @@ impl Document {
// Lock the transaction
let mut txn = txn.lock().await;
// Get the record id
let rid = self.id.as_ref().unwrap();
let rid = self.id()?;
// Store the record edges
if let Workable::Relate(l, r, _) = &self.extras {
if let Workable::Relate(l, r, _, _) = &self.extras {
// For enforced relations, ensure that the edges exist
if matches!(
tb.kind,
@ -41,13 +41,14 @@ impl Document {
..
})
) {
// Check that the `in` record exists
let key = crate::key::thing::new(opt.ns()?, opt.db()?, &l.tb, &l.id);
if !txn.exists(key).await? {
return Err(Error::IdNotFound {
value: l.to_string(),
});
}
// Check that the `out` record exists
let key = crate::key::thing::new(opt.ns()?, opt.db()?, &r.tb, &r.id);
if !txn.exists(key).await? {
return Err(Error::IdNotFound {
@ -58,7 +59,7 @@ impl Document {
// Get temporary edge references
let (ref o, ref i) = (Dir::Out, Dir::In);
// Store the left pointer edge
let key = crate::key::graph::new(opt.ns()?, opt.db()?, &l.tb, &l.id, o, rid);
let key = crate::key::graph::new(opt.ns()?, opt.db()?, &l.tb, &l.id, o, &rid);
txn.set(key, vec![], None).await?;
// Store the left inner edge
let key = crate::key::graph::new(opt.ns()?, opt.db()?, &rid.tb, &rid.id, i, l);
@ -67,7 +68,7 @@ impl Document {
let key = crate::key::graph::new(opt.ns()?, opt.db()?, &rid.tb, &rid.id, o, r);
txn.set(key, vec![], None).await?;
// Store the right pointer edge
let key = crate::key::graph::new(opt.ns()?, opt.db()?, &r.tb, &r.id, i, rid);
let key = crate::key::graph::new(opt.ns()?, opt.db()?, &r.tb, &r.id, i, &rid);
txn.set(key, vec![], None).await?;
// Store the edges on the record
self.current.doc.to_mut().put(&*EDGE, Value::Bool(true));

View file

@ -1,25 +0,0 @@
use crate::ctx::Context;
use crate::dbs::Options;
use crate::dbs::Statement;
use crate::doc::Document;
use crate::err::Error;
impl Document {
pub async fn empty(
&self,
_ctx: &Context,
_opt: &Options,
_stm: &Statement<'_>,
) -> Result<(), Error> {
// Check if this record exists
if self.id.is_some() {
// There is no current record
if self.current.doc.as_ref().is_none() {
// Ignore this requested record
return Err(Error::Ignore);
}
}
// Carry on
Ok(())
}
}

View file

@ -1,16 +0,0 @@
use crate::ctx::Context;
use crate::dbs::Options;
use crate::dbs::Statement;
use crate::doc::Document;
use crate::err::Error;
impl Document {
pub async fn erase(
&mut self,
_ctx: &Context,
_opt: &Options,
_stm: &Statement<'_>,
) -> Result<(), Error> {
self.current.doc.to_mut().clear()
}
}

View file

@ -7,7 +7,12 @@ use crate::sql::value::Value;
use reblessive::tree::Stk;
impl Document {
pub async fn event(
/// Processes any DEFINE EVENT clauses which
/// have been defined for the table which this
/// record belongs to. This functions loops
/// through the events and processes them all
/// within the currently running transaction.
pub async fn process_table_events(
&mut self,
stk: &mut Stk,
ctx: &Context,

View file

@ -4,14 +4,80 @@ use crate::dbs::Statement;
use crate::doc::Document;
use crate::err::Error;
use crate::iam::Action;
use crate::sql::idiom::Idiom;
use crate::sql::kind::Kind;
use crate::sql::permission::Permission;
use crate::sql::value::Value;
use crate::sql::{Idiom, Kind};
use reblessive::tree::Stk;
use std::sync::Arc;
impl Document {
pub async fn field(
/// Ensures that any remaining fields on a
/// SCHEMAFULL table are cleaned up and removed.
/// If a field is defined as FLEX, then any
/// nested fields or array values are untouched.
pub async fn cleanup_table_fields(
&mut self,
stk: &mut Stk,
ctx: &Context,
opt: &Options,
_stm: &Statement<'_>,
) -> Result<(), Error> {
// Get the table
let tb = self.tb(ctx, opt).await?;
// This table is schemafull
if tb.full {
// Create a vector to store the keys
let mut keys: Vec<Idiom> = vec![];
// Loop through all field statements
for fd in self.fd(ctx, opt).await?.iter() {
// Is this a schemaless field?
match fd.flex || fd.kind.as_ref().is_some_and(|k| k.is_literal_nested()) {
false => {
// Loop over this field in the document
for k in self.current.doc.each(&fd.name).into_iter() {
keys.push(k);
}
}
true => {
// Loop over every field under this field in the document
for k in self.current.doc.every(Some(&fd.name), true, true).into_iter() {
keys.push(k);
}
}
}
}
// Loop over every field in the document
for fd in self.current.doc.every(None, true, true).iter() {
if !keys.contains(fd) {
match fd {
fd if fd.is_id() => continue,
fd if fd.is_in() => continue,
fd if fd.is_out() => continue,
fd if fd.is_meta() => continue,
fd => match opt.strict {
// If strict, then throw an error on an undefined field
true => {
return Err(Error::FieldUndefined {
table: tb.name.to_raw(),
field: fd.to_owned(),
})
}
// Otherwise, delete the field silently and don't error
false => self.current.doc.to_mut().del(stk, ctx, opt, fd).await?,
},
}
}
}
}
// Carry on
Ok(())
}
/// Processes `DEFINE FIELD` statements which
/// have been defined on the table for this
/// record. These fields are executed for
/// every matching field in the input document.
pub async fn process_table_fields(
&mut self,
stk: &mut Stk,
ctx: &Context,
@ -23,26 +89,30 @@ impl Document {
return Ok(());
}
// Get the record id
let rid = self.id.as_ref().unwrap();
let rid = self.id()?;
// Get the user applied input
let inp = self.initial.doc.as_ref().changed(self.current.doc.as_ref());
// If set, the loop will skip certain clauses as long
// as the field name starts with the set idiom
let mut skip: Option<Idiom> = None;
// When set, any matching embedded object fields
// which are prefixed with the specified idiom
// will be skipped, as the parent object is optional
let mut skip: Option<&Idiom> = None;
// Loop through all field statements
for fd in self.fd(ctx, opt).await?.iter() {
// Check if we should skip this field
let skipped = match skip {
Some(ref inner) => {
// We are skipping a parent field
Some(inner) => {
// Check if this field is a child field
let skipped = fd.name.starts_with(inner);
// Let's stop skipping fields if not
if !skipped {
skip = None;
}
// Specify whether we should skip
skipped
}
None => false,
};
// Loop over each field in document
for (k, mut val) in self.current.doc.as_ref().walk(&fd.name).into_iter() {
// Get the initial value
@ -50,12 +120,17 @@ impl Document {
// Get the input value
let inp = Arc::new(inp.pick(&k));
// Check for READONLY clause
if fd.readonly && !self.is_new() && val.ne(&old) {
return Err(Error::FieldReadonly {
field: fd.name.clone(),
thing: rid.to_string(),
});
if fd.readonly || fd.name.is_id() {
if !self.is_new() && val.ne(&old) {
return Err(Error::FieldReadonly {
field: fd.name.clone(),
thing: rid.to_string(),
});
} else if !self.is_new() {
continue;
}
}
// Skip this field?
if !skipped {
// Get the default value
let def = match &fd.default {
@ -67,14 +142,17 @@ impl Document {
};
// Check for a DEFAULT clause
if let Some(expr) = def {
// Only run value clause for new and empty fields
if self.is_new() && val.is_none() {
// Arc the current value
let now = Arc::new(val);
// Configure the context
let mut ctx = MutableContext::new(ctx);
let v = Arc::new(val);
ctx.add_value("input", inp.clone());
ctx.add_value("value", v.clone());
ctx.add_value("after", v);
ctx.add_value("before", old.clone());
ctx.add_value("input", inp.clone());
ctx.add_value("after", now.clone());
ctx.add_value("value", now);
// Freeze the new context
let ctx = ctx.freeze();
// Process the VALUE clause
val = expr.compute(stk, &ctx, opt, Some(&self.current)).await?;
@ -82,7 +160,17 @@ impl Document {
}
// Check for a TYPE clause
if let Some(kind) = &fd.kind {
val = val.coerce_to(kind).map_err(|e| match e {
// If this is the `id` field, it must be a record
let cast = match &fd.name {
name if name.is_id() => match kind.to_owned() {
Kind::Option(v) if v.is_record() => &*v.to_owned(),
Kind::Record(r) => &Kind::Record(r),
_ => &Kind::Record(vec![]),
},
_ => kind,
};
// Check the type of the field value
val = val.coerce_to(cast).map_err(|e| match e {
// There was a conversion error
Error::CoerceTo {
from,
@ -91,31 +179,60 @@ impl Document {
thing: rid.to_string(),
field: fd.name.clone(),
value: from.to_string(),
check: kind.to_string(),
check: cast.to_string(),
},
// There was a different error
e => e,
})?;
// If this is the `id` field, check the inner type
if fd.name.is_id() {
if let Value::Thing(id) = val.clone() {
let inner = Value::from(id.id);
inner.coerce_to(kind).map_err(|e| match e {
// There was a conversion error
Error::CoerceTo {
from,
..
} => Error::FieldCheck {
thing: rid.to_string(),
field: fd.name.clone(),
value: from.to_string(),
check: kind.to_string(),
},
// There was a different error
e => e,
})?;
}
}
}
// Check for a VALUE clause
if let Some(expr) = &fd.value {
// Only run value clause for mutable and new fields
if !fd.readonly || self.is_new() {
// Configure the context
let v = Arc::new(val);
let mut ctx = MutableContext::new(ctx);
ctx.add_value("input", inp.clone());
ctx.add_value("value", v.clone());
ctx.add_value("after", v);
ctx.add_value("before", old.clone());
let ctx = ctx.freeze();
// Process the VALUE clause
val = expr.compute(stk, &ctx, opt, Some(&self.current)).await?;
}
// Arc the current value
let now = Arc::new(val);
// Configure the context
let mut ctx = MutableContext::new(ctx);
ctx.add_value("before", old.clone());
ctx.add_value("input", inp.clone());
ctx.add_value("after", now.clone());
ctx.add_value("value", now);
// Freeze the new context
let ctx = ctx.freeze();
// Process the VALUE clause
val = expr.compute(stk, &ctx, opt, Some(&self.current)).await?;
}
// Check for a TYPE clause
if let Some(kind) = &fd.kind {
val = val.coerce_to(kind).map_err(|e| match e {
// If this is the `id` field, it must be a record
let cast = match &fd.name {
name if name.is_id() => match kind.to_owned() {
Kind::Option(v) if v.is_record() => &*v.to_owned(),
Kind::Record(r) => &Kind::Record(r),
_ => &Kind::Record(vec![]),
},
_ => kind,
};
// Check the type of the field value
val = val.coerce_to(cast).map_err(|e| match e {
// There was a conversion error
Error::CoerceTo {
from,
@ -124,11 +241,31 @@ impl Document {
thing: rid.to_string(),
field: fd.name.clone(),
value: from.to_string(),
check: kind.to_string(),
check: cast.to_string(),
},
// There was a different error
e => e,
})?;
// If this is the `id` field, check the inner type
if fd.name.is_id() {
if let Value::Thing(id) = val.clone() {
let inner = Value::from(id.id);
inner.coerce_to(kind).map_err(|e| match e {
// There was a conversion error
Error::CoerceTo {
from,
..
} => Error::FieldCheck {
thing: rid.to_string(),
field: fd.name.clone(),
value: from.to_string(),
check: kind.to_string(),
},
// There was a different error
e => e,
})?;
}
}
}
// Check for a ASSERT clause
if let Some(expr) = &fd.assert {
@ -139,13 +276,15 @@ impl Document {
(Value::None, Some(Kind::Option(_))) => (),
// Otherwise let's process the ASSERT clause
_ => {
// Arc the current value
let now = Arc::new(val.clone());
// Configure the context
let mut ctx = MutableContext::new(ctx);
let v = Arc::new(val.clone());
ctx.add_value("input", inp.clone());
ctx.add_value("value", v.clone());
ctx.add_value("after", v);
ctx.add_value("before", old.clone());
ctx.add_value("input", inp.clone());
ctx.add_value("after", now.clone());
ctx.add_value("value", now.clone());
// Freeze the new context
let ctx = ctx.freeze();
// Process the ASSERT clause
if !expr
@ -156,8 +295,8 @@ impl Document {
return Err(Error::FieldValue {
thing: rid.to_string(),
field: fd.name.clone(),
value: val.to_string(),
check: expr.to_string(),
value: now.to_string(),
});
}
}
@ -187,15 +326,17 @@ impl Document {
// we check the expression and
// revert the field if denied.
Permission::Specific(e) => {
// Arc the current value
let now = Arc::new(val.clone());
// Disable permissions
let opt = &opt.new_with_perms(false);
// Configure the context
let mut ctx = MutableContext::new(ctx);
let v = Arc::new(val.clone());
ctx.add_value("input", inp);
ctx.add_value("value", v.clone());
ctx.add_value("after", v);
ctx.add_value("before", old.clone());
ctx.add_value("input", inp.clone());
ctx.add_value("after", now.clone());
ctx.add_value("value", now.clone());
// Freeze the new context
let ctx = ctx.freeze();
// Process the PERMISSION clause
if !e.compute(stk, &ctx, opt, Some(&self.current)).await?.is_truthy() {
@ -204,13 +345,13 @@ impl Document {
}
}
}
// Skip this field?
if !skipped {
if matches!(val, Value::None) && matches!(fd.kind, Some(Kind::Option(_))) {
skip = Some(fd.name.to_owned());
// If the field is empty, mark child fields as skippable
if val.is_none() && fd.kind.as_ref().is_some_and(Kind::can_be_none) {
skip = Some(&fd.name);
}
// Set the value of the field
// Set the new value of the field, or delete it if empty
match val {
Value::None => self.current.doc.to_mut().del(stk, ctx, opt, &k).await?,
v => self.current.doc.to_mut().set(stk, ctx, opt, &k, v).await?,

View file

@ -17,7 +17,7 @@ use crate::sql::{Part, Thing, Value};
use reblessive::tree::Stk;
impl Document {
pub async fn index(
pub async fn store_index_data(
&self,
stk: &mut Stk,
ctx: &Context,
@ -44,7 +44,7 @@ impl Document {
return Ok(());
}
// Get the record id
let rid = self.id.as_ref().unwrap();
let rid = self.id()?;
// Loop through all index statements
for ix in ixs.iter() {
// Calculate old values
@ -55,7 +55,7 @@ impl Document {
// Update the index entries
if targeted_force || o != n {
Self::one_index(stk, ctx, opt, ix, o, n, rid).await?;
Self::one_index(stk, ctx, opt, ix, o, n, &rid).await?;
}
}
// Carry on

View file

@ -14,37 +14,72 @@ impl Document {
opt: &Options,
stm: &Statement<'_>,
) -> Result<Value, Error> {
// Check whether current record exists
match self.current.doc.as_ref().is_some() {
// We attempted to INSERT a document with an ID,
// and this ID already exists in the database,
// so we need to update the record instead.
true => self.insert_update(stk, ctx, opt, stm).await,
// We attempted to INSERT a document with an ID,
// which does not exist in the database, or we
// are creating a new record with a new ID.
false => {
// First of all let's try to create the record
match self.insert_create(stk, ctx, opt, stm).await {
// We received an index exists error, so we
// ignore the error, and attempt to update the
// record using the ON DUPLICATE KEY clause
// with the Record ID received in the error
Err(Error::IndexExists {
// On the first iteration, this will always be
// false, as we do not first attempt to fetch the
// record from the storage engine. After attempting
// to create the record, if the record already exists
// then we will fetch the record from the storage
// engine, and will update the record subsequently
match self.extras.is_insert_initial() {
// We haven't yet checked if the record exists
// so let's assume that the record does not exist
// and attempt to create the record in the database
true => match self.insert_create(stk, ctx, opt, stm).await {
// We received an index exists error, so we
// ignore the error, and attempt to update the
// record using the ON DUPLICATE KEY UPDATE
// clause with the ID received in the error
Err(Error::IndexExists {
thing,
index,
value,
}) => match stm.is_retryable() {
// There is an ON DUPLICATE KEY UPDATE clause
true => match self.extras.is_insert_with_specific_id() {
// No specific Record ID has been specified, so retry
false => Err(Error::RetryWithId(thing)),
// A specific Record ID was specified, so error
true => Err(Error::IndexExists {
thing,
index,
value,
}),
},
// There is no ON DUPLICATE KEY UPDATE clause
false => Err(Error::IndexExists {
thing,
..
}) => Err(Error::RetryWithId(thing)),
// If any other error was received, then let's
// pass that error through and return an error
Err(e) => Err(e),
// Otherwise the record creation succeeded
Ok(v) => Ok(v),
}
}
index,
value,
}),
},
// We attempted to INSERT a document with an ID,
// and this ID already exists in the database,
// so we need to update the record instead using
// the ON DUPLICATE KEY UPDATE statement clause
Err(Error::RecordExists {
thing,
}) => match stm.is_retryable() {
// There is an ON DUPLICATE KEY UPDATE clause
true => Err(Error::RetryWithId(thing)),
// There is no ON DUPLICATE KEY UPDATE clause
false => Err(Error::RecordExists {
thing,
}),
},
// If any other error was received, then let's
// pass that error through and return an error
Err(e) => Err(e),
// Otherwise the record creation succeeded
Ok(v) => Ok(v),
},
// If we first attempted to create the record,
// but the record existed already, then we will
// fetch the record from the storage engine,
// and will update the record subsequently
false => self.insert_update(stk, ctx, opt, stm).await,
}
}
// Attempt to run an INSERT clause
#[inline(always)]
/// Attempt to run an INSERT clause
async fn insert_create(
&mut self,
stk: &mut Stk,
@ -52,37 +87,24 @@ impl Document {
opt: &Options,
stm: &Statement<'_>,
) -> Result<Value, Error> {
// Check if table has correct relation status
self.relation(ctx, opt, stm).await?;
// Merge record data
self.merge(stk, ctx, opt, stm).await?;
// Store record edges
self.edges(ctx, opt, stm).await?;
// Merge fields data
self.field(stk, ctx, opt, stm).await?;
// Reset fields data
self.reset(ctx, opt, stm).await?;
// Clean fields data
self.clean(stk, ctx, opt, stm).await?;
// Check if allowed
self.allow(stk, ctx, opt, stm).await?;
// Store index data
self.index(stk, ctx, opt, stm).await?;
// Store record data
self.store(ctx, opt, stm).await?;
// Run table queries
self.table(stk, ctx, opt, stm).await?;
// Run lives queries
self.lives(stk, ctx, opt, stm).await?;
// Run change feeds queries
self.changefeeds(ctx, opt, stm).await?;
// Run event queries
self.event(stk, ctx, opt, stm).await?;
// Yield document
self.check_permissions_quick(stk, ctx, opt, stm).await?;
self.check_table_type(ctx, opt, stm).await?;
self.check_data_fields(stk, ctx, opt, stm).await?;
self.process_merge_data(stk, ctx, opt, stm).await?;
self.store_edges_data(ctx, opt, stm).await?;
self.process_table_fields(stk, ctx, opt, stm).await?;
self.cleanup_table_fields(stk, ctx, opt, stm).await?;
self.default_record_data(ctx, opt, stm).await?;
self.check_permissions_table(stk, ctx, opt, stm).await?;
self.store_record_data(ctx, opt, stm).await?;
self.store_index_data(stk, ctx, opt, stm).await?;
self.process_table_views(stk, ctx, opt, stm).await?;
self.process_table_lives(stk, ctx, opt, stm).await?;
self.process_table_events(stk, ctx, opt, stm).await?;
self.process_changefeeds(ctx, opt, stm).await?;
self.pluck(stk, ctx, opt, stm).await
}
// Attempt to run an UPDATE clause
#[inline(always)]
/// Attempt to run an UPDATE clause
async fn insert_update(
&mut self,
stk: &mut Stk,
@ -90,31 +112,21 @@ impl Document {
opt: &Options,
stm: &Statement<'_>,
) -> Result<Value, Error> {
// Check if allowed
self.allow(stk, ctx, opt, stm).await?;
// Alter record data
self.alter(stk, ctx, opt, stm).await?;
// Merge fields data
self.field(stk, ctx, opt, stm).await?;
// Reset fields data
self.reset(ctx, opt, stm).await?;
// Clean fields data
self.clean(stk, ctx, opt, stm).await?;
// Check if allowed
self.allow(stk, ctx, opt, stm).await?;
// Store index data
self.index(stk, ctx, opt, stm).await?;
// Store record data
self.store(ctx, opt, stm).await?;
// Run table queries
self.table(stk, ctx, opt, stm).await?;
// Run lives queries
self.lives(stk, ctx, opt, stm).await?;
// Run change feeds queries
self.changefeeds(ctx, opt, stm).await?;
// Run event queries
self.event(stk, ctx, opt, stm).await?;
// Yield document
self.check_permissions_quick(stk, ctx, opt, stm).await?;
self.check_table_type(ctx, opt, stm).await?;
self.check_data_fields(stk, ctx, opt, stm).await?;
self.check_permissions_table(stk, ctx, opt, stm).await?;
self.process_record_data(stk, ctx, opt, stm).await?;
self.process_table_fields(stk, ctx, opt, stm).await?;
self.cleanup_table_fields(stk, ctx, opt, stm).await?;
self.default_record_data(ctx, opt, stm).await?;
self.check_permissions_table(stk, ctx, opt, stm).await?;
self.store_record_data(ctx, opt, stm).await?;
self.store_index_data(stk, ctx, opt, stm).await?;
self.process_table_views(stk, ctx, opt, stm).await?;
self.process_table_lives(stk, ctx, opt, stm).await?;
self.process_table_events(stk, ctx, opt, stm).await?;
self.process_changefeeds(ctx, opt, stm).await?;
self.pluck(stk, ctx, opt, stm).await
}
}

View file

@ -16,7 +16,12 @@ use reblessive::tree::Stk;
use std::sync::Arc;
impl Document {
pub async fn lives(
/// Processes any LIVE SELECT statements which
/// have been defined for the table which this
/// record belongs to. This functions loops
/// through the live queries and processes them
/// all within the currently running transaction.
pub async fn process_table_lives(
&mut self,
stk: &mut Stk,
ctx: &Context,

View file

@ -1,36 +0,0 @@
use crate::ctx::Context;
use crate::dbs::Options;
use crate::dbs::Statement;
use crate::dbs::Workable;
use crate::doc::Document;
use crate::err::Error;
use reblessive::tree::Stk;
impl Document {
pub async fn merge(
&mut self,
stk: &mut Stk,
ctx: &Context,
opt: &Options,
_stm: &Statement<'_>,
) -> Result<(), Error> {
// Get the record id
let rid = self.id.as_ref().unwrap();
// Set default field values
self.current.doc.to_mut().def(rid);
// This is an INSERT statement
if let Workable::Insert(v) = &self.extras {
let v = v.compute(stk, ctx, opt, Some(&self.current)).await?;
self.current.doc.to_mut().merge(v)?;
}
// This is an INSERT RELATION statement
if let Workable::Relate(_, _, Some(v)) = &self.extras {
let v = v.compute(stk, ctx, opt, Some(&self.current)).await?;
self.current.doc.to_mut().merge(v)?;
}
// Set default field values
self.current.doc.to_mut().def(rid);
// Carry on
Ok(())
}
}

View file

@ -20,22 +20,15 @@ mod select; // Processes a SELECT statement for this document
mod update; // Processes a UPDATE statement for this document
mod upsert; // Processes a UPSERT statement for this document
mod allow; // Checks whether the query can access this document
mod alter; // Modifies and updates the fields in this document
mod changefeeds; // Processes any change feeds relevant for this document
mod check; // Checks whether the WHERE clauses matches this document
mod clean; // Ensures records adhere to the table schema
mod edges; // Attempts to store the edge data for this document
mod empty; // Checks whether the specified document actually exists
mod erase; // Removes all content and field data for this document
mod event; // Processes any table events relevant for this document
mod field; // Processes any schema-defined fields for this document
mod index; // Attempts to store the index data for this document
mod lives; // Processes any live queries relevant for this document
mod merge; // Merges any field changes for an INSERT statement
mod pluck; // Pulls the projected expressions from the document
mod purge; // Deletes this document, and any edges or indexes
mod relation; // Checks whether the record is the right kind for the table
mod reset; // Resets internal fields which were set for this document
mod store; // Writes the document content to the storage engine
mod table; // Processes any foreign tables relevant for this document

View file

@ -23,13 +23,13 @@ impl Document {
// Setup a new workable
let ins = match pro.val {
Operable::Value(v) => (v, Workable::Normal),
Operable::Mergeable(v, o) => (v, Workable::Insert(o)),
Operable::Relatable(f, v, w, o) => (v, Workable::Relate(f, w, o)),
Operable::Mergeable(v, o, u) => (v, Workable::Insert(o, u)),
Operable::Relatable(f, v, w, o, u) => (v, Workable::Relate(f, w, o, u)),
};
// Setup a new document
let mut doc = Document::new(pro.rid, pro.ir, ins.0, ins.1);
// Optionally create a save point so we can roll back any upcoming changes
let is_save_point = if !stm.is_select() {
let is_save_point = if stm.is_retryable() {
ctx.tx().lock().await.new_save_point().await;
true
} else {
@ -69,16 +69,16 @@ impl Document {
ir: None,
val: match doc.extras {
Workable::Normal => Operable::Value(val),
Workable::Insert(o) => Operable::Mergeable(val, o),
Workable::Relate(f, w, o) => Operable::Relatable(f, val, w, o),
Workable::Insert(o, _) => Operable::Mergeable(val, o, true),
Workable::Relate(f, w, o, _) => Operable::Relatable(f, val, w, o, true),
},
};
// Go to top of loop
continue;
}
// This record didn't match conditions, so skip
Err(Error::Ignore) => Err(Error::Ignore),
// If any other error was received, then let's
// pass that error through and return an error
// Pass other errors through and return the error
Err(e) => {
// We roll back any change following the save point
if is_save_point {

View file

@ -15,7 +15,7 @@ impl Document {
stm: &Statement<'_>,
) -> Result<Value, Error> {
// Check if table has correct relation status
self.relation(ctx, opt, stm).await?;
self.check_table_type(ctx, opt, stm).await?;
// Check whether current record exists
match self.current.doc.as_ref().is_some() {
// We attempted to RELATE a document with an ID,
@ -28,8 +28,7 @@ impl Document {
false => self.relate_create(stk, ctx, opt, stm).await,
}
}
// Attempt to run an INSERT clause
#[inline(always)]
/// Attempt to run a RELATE clause
async fn relate_create(
&mut self,
stk: &mut Stk,
@ -37,35 +36,24 @@ impl Document {
opt: &Options,
stm: &Statement<'_>,
) -> Result<Value, Error> {
// Alter record data
self.alter(stk, ctx, opt, stm).await?;
// Store record edges
self.edges(ctx, opt, stm).await?;
// Merge fields data
self.field(stk, ctx, opt, stm).await?;
// Reset fields data
self.reset(ctx, opt, stm).await?;
// Clean fields data
self.clean(stk, ctx, opt, stm).await?;
// Check if allowed
self.allow(stk, ctx, opt, stm).await?;
// Store record data
self.store(ctx, opt, stm).await?;
// Store index data
self.index(stk, ctx, opt, stm).await?;
// Run table queries
self.table(stk, ctx, opt, stm).await?;
// Run lives queries
self.lives(stk, ctx, opt, stm).await?;
// Run change feeds queries
self.changefeeds(ctx, opt, stm).await?;
// Run event queries
self.event(stk, ctx, opt, stm).await?;
// Yield document
self.check_permissions_quick(stk, ctx, opt, stm).await?;
self.check_table_type(ctx, opt, stm).await?;
self.check_data_fields(stk, ctx, opt, stm).await?;
self.process_record_data(stk, ctx, opt, stm).await?;
self.store_edges_data(ctx, opt, stm).await?;
self.process_table_fields(stk, ctx, opt, stm).await?;
self.cleanup_table_fields(stk, ctx, opt, stm).await?;
self.default_record_data(ctx, opt, stm).await?;
self.check_permissions_table(stk, ctx, opt, stm).await?;
self.store_record_data(ctx, opt, stm).await?;
self.store_index_data(stk, ctx, opt, stm).await?;
self.process_table_views(stk, ctx, opt, stm).await?;
self.process_table_lives(stk, ctx, opt, stm).await?;
self.process_changefeeds(ctx, opt, stm).await?;
self.process_table_events(stk, ctx, opt, stm).await?;
self.pluck(stk, ctx, opt, stm).await
}
// Attempt to run an UPDATE clause
#[inline(always)]
/// Attempt to run an UPDATE clause
async fn relate_update(
&mut self,
stk: &mut Stk,
@ -73,33 +61,22 @@ impl Document {
opt: &Options,
stm: &Statement<'_>,
) -> Result<Value, Error> {
// Check if allowed
self.allow(stk, ctx, opt, stm).await?;
// Store record edges
self.edges(ctx, opt, stm).await?;
// Alter record data
self.alter(stk, ctx, opt, stm).await?;
// Merge fields data
self.field(stk, ctx, opt, stm).await?;
// Reset fields data
self.reset(ctx, opt, stm).await?;
// Clean fields data
self.clean(stk, ctx, opt, stm).await?;
// Check if allowed
self.allow(stk, ctx, opt, stm).await?;
// Store record data
self.store(ctx, opt, stm).await?;
// Store index data
self.index(stk, ctx, opt, stm).await?;
// Run table queries
self.table(stk, ctx, opt, stm).await?;
// Run lives queries
self.lives(stk, ctx, opt, stm).await?;
// Run change feeds queries
self.changefeeds(ctx, opt, stm).await?;
// Run event queries
self.event(stk, ctx, opt, stm).await?;
// Yield document
self.check_permissions_quick(stk, ctx, opt, stm).await?;
self.check_table_type(ctx, opt, stm).await?;
self.check_data_fields(stk, ctx, opt, stm).await?;
self.check_permissions_table(stk, ctx, opt, stm).await?;
self.store_edges_data(ctx, opt, stm).await?;
self.process_record_data(stk, ctx, opt, stm).await?;
self.process_table_fields(stk, ctx, opt, stm).await?;
self.cleanup_table_fields(stk, ctx, opt, stm).await?;
self.default_record_data(ctx, opt, stm).await?;
self.check_permissions_table(stk, ctx, opt, stm).await?;
self.store_record_data(ctx, opt, stm).await?;
self.store_index_data(stk, ctx, opt, stm).await?;
self.process_table_views(stk, ctx, opt, stm).await?;
self.process_table_lives(stk, ctx, opt, stm).await?;
self.process_table_events(stk, ctx, opt, stm).await?;
self.process_changefeeds(ctx, opt, stm).await?;
self.pluck(stk, ctx, opt, stm).await
}
}

View file

@ -1,70 +0,0 @@
use crate::ctx::Context;
use crate::dbs::Options;
use crate::dbs::{Statement, Workable};
use crate::doc::Document;
use crate::err::Error;
impl Document {
pub async fn relation(
&mut self,
ctx: &Context,
opt: &Options,
stm: &Statement<'_>,
) -> Result<(), Error> {
let tb = self.tb(ctx, opt).await?;
let rid = self.id.as_ref().unwrap();
match stm {
Statement::Create(_) => {
if !tb.allows_normal() {
return Err(Error::TableCheck {
thing: rid.to_string(),
relation: false,
target_type: tb.kind.to_string(),
});
}
}
Statement::Upsert(_) => {
if !tb.allows_normal() {
return Err(Error::TableCheck {
thing: rid.to_string(),
relation: false,
target_type: tb.kind.to_string(),
});
}
}
Statement::Relate(_) => {
if !tb.allows_relation() {
return Err(Error::TableCheck {
thing: rid.to_string(),
relation: true,
target_type: tb.kind.to_string(),
});
}
}
Statement::Insert(_) => match self.extras {
Workable::Relate(_, _, _) => {
if !tb.allows_relation() {
return Err(Error::TableCheck {
thing: rid.to_string(),
relation: true,
target_type: tb.kind.to_string(),
});
}
}
_ => {
if !tb.allows_normal() {
return Err(Error::TableCheck {
thing: rid.to_string(),
relation: false,
target_type: tb.kind.to_string(),
});
}
}
},
_ => {}
}
// Carry on
Ok(())
}
}

View file

@ -1,38 +0,0 @@
use crate::ctx::Context;
use crate::dbs::Options;
use crate::dbs::Statement;
use crate::dbs::Workable;
use crate::doc::Document;
use crate::err::Error;
use crate::sql::paths::EDGE;
use crate::sql::paths::IN;
use crate::sql::paths::OUT;
use crate::sql::value::Value;
impl Document {
pub async fn reset(
&mut self,
_ctx: &Context,
_opt: &Options,
_stm: &Statement<'_>,
) -> Result<(), Error> {
// Get the record id
let rid = self.id.as_ref().unwrap();
// Set default field values
self.current.doc.to_mut().def(rid);
// This is a RELATE statement, so reset fields
if let Workable::Relate(l, r, _) = &self.extras {
self.current.doc.to_mut().put(&*EDGE, Value::Bool(true));
self.current.doc.to_mut().put(&*IN, l.clone().into());
self.current.doc.to_mut().put(&*OUT, r.clone().into());
}
// This is an UPDATE of a graph edge, so reset fields
if self.initial.doc.as_ref().pick(&*EDGE).is_true() {
self.current.doc.to_mut().put(&*EDGE, Value::Bool(true));
self.current.doc.to_mut().put(&*IN, self.initial.doc.as_ref().pick(&*IN));
self.current.doc.to_mut().put(&*OUT, self.initial.doc.as_ref().pick(&*OUT));
}
// Carry on
Ok(())
}
}

View file

@ -14,13 +14,10 @@ impl Document {
opt: &Options,
stm: &Statement<'_>,
) -> Result<Value, Error> {
// Check if record exists
self.empty(ctx, opt, stm).await?;
// Check where clause
self.check(stk, ctx, opt, stm).await?;
// Check if allowed
self.allow(stk, ctx, opt, stm).await?;
// Yield document
self.check_record_exists(ctx, opt, stm).await?;
self.check_permissions_quick(stk, ctx, opt, stm).await?;
self.check_where_condition(stk, ctx, opt, stm).await?;
self.check_permissions_table(stk, ctx, opt, stm).await?;
self.pluck(stk, ctx, opt, stm).await
}
}

View file

@ -5,7 +5,7 @@ use crate::doc::Document;
use crate::err::Error;
impl Document {
pub async fn store(
pub async fn store_record_data(
&self,
ctx: &Context,
opt: &Options,
@ -22,25 +22,69 @@ impl Document {
// Get the transaction
let txn = ctx.tx();
// Get the record id
let rid = self.id.as_ref().unwrap();
let rid = self.id()?;
// Store the record data
let key = crate::key::thing::new(opt.ns()?, opt.db()?, &rid.tb, &rid.id);
// Match the statement type
match stm {
// This is a CREATE statement so try to insert the key
Statement::Create(_) => match txn.put(key, self, opt.version).await {
// The key already exists, so return an error
Err(Error::TxKeyAlreadyExists) => Err(Error::RecordExists {
thing: rid.to_string(),
}),
// Return any other received error
Err(e) => Err(e),
// Record creation worked fine
Ok(v) => Ok(v),
},
// INSERT can be versioned
Statement::Insert(_) => txn.set(key, self, opt.version).await,
// This is not a CREATE statement, so update the key
// This is a INSERT statement so try to insert the key.
// For INSERT statements we don't first check for the
// entry from the storage engine, so when we attempt
// to store the record value, we presume that the key
// does not exist. If the record value exists then we
// attempt to run the ON DUPLICATE KEY UPDATE clause but
// at this point the current document is not empty so we
// set and update the key, without checking if the key
// already exists in the storage engine.
Statement::Insert(_) if self.extras.is_insert_initial() => {
match txn.put(key, self, opt.version).await {
// The key already exists, so return an error
Err(Error::TxKeyAlreadyExists) => Err(Error::RecordExists {
thing: rid.as_ref().to_owned(),
}),
// Return any other received error
Err(e) => Err(e),
// Record creation worked fine
Ok(v) => Ok(v),
}
}
// This is a UPSERT statement so try to insert the key.
// For UPSERT statements we don't first check for the
// entry from the storage engine, so when we attempt
// to store the record value, we must ensure that the
// key does not exist. If the record value exists then we
// retry and attempt to update the record which exists.
Statement::Upsert(_) if self.is_new() => {
match txn.put(key, self, opt.version).await {
// The key already exists, so return an error
Err(Error::TxKeyAlreadyExists) => Err(Error::RecordExists {
thing: rid.as_ref().to_owned(),
}),
// Return any other received error
Err(e) => Err(e),
// Record creation worked fine
Ok(v) => Ok(v),
}
}
// This is a CREATE statement so try to insert the key.
// For CREATE statements we don't first check for the
// entry from the storage engine, so when we attempt
// to store the record value, we must ensure that the
// key does not exist. If it already exists, then we
// return an error, and the statement fails.
Statement::Create(_) => {
match txn.put(key, self, opt.version).await {
// The key already exists, so return an error
Err(Error::TxKeyAlreadyExists) => Err(Error::RecordExists {
thing: rid.as_ref().to_owned(),
}),
// Return any other received error
Err(e) => Err(e),
// Record creation worked fine
Ok(v) => Ok(v),
}
}
// Let's update the stored value for the specified key
_ => txn.set(key, self, None).await,
}?;
// Carry on

View file

@ -47,7 +47,12 @@ struct FieldDataContext<'a> {
}
impl Document {
pub async fn table(
/// Processes any DEFINE TABLE AS clauses which
/// have been defined for the table which this
/// record belongs to. This functions loops
/// through the tables and processes them all
/// within the currently running transaction.
pub async fn process_table_views(
&self,
stk: &mut Stk,
ctx: &Context,
@ -78,7 +83,7 @@ impl Document {
// Don't run permissions
let opt = &opt.new_with_perms(false);
// Get the record id
let rid = self.id.as_ref().unwrap();
let rid = self.id()?;
// Get the query action
let act = if stm.is_delete() {
Action::Delete

View file

@ -14,35 +14,22 @@ impl Document {
opt: &Options,
stm: &Statement<'_>,
) -> Result<Value, Error> {
// Check if record exists
self.empty(ctx, opt, stm).await?;
// Check where clause
self.check(stk, ctx, opt, stm).await?;
// Check if allowed
self.allow(stk, ctx, opt, stm).await?;
// Alter record data
self.alter(stk, ctx, opt, stm).await?;
// Merge fields data
self.field(stk, ctx, opt, stm).await?;
// Reset fields data
self.reset(ctx, opt, stm).await?;
// Clean fields data
self.clean(stk, ctx, opt, stm).await?;
// Check if allowed
self.allow(stk, ctx, opt, stm).await?;
// Store record data
self.store(ctx, opt, stm).await?;
// Store index data
self.index(stk, ctx, opt, stm).await?;
// Run table queries
self.table(stk, ctx, opt, stm).await?;
// Run lives queries
self.lives(stk, ctx, opt, stm).await?;
// Run change feeds queries
self.changefeeds(ctx, opt, stm).await?;
// Run event queries
self.event(stk, ctx, opt, stm).await?;
// Yield document
self.check_record_exists(ctx, opt, stm).await?;
self.check_permissions_quick(stk, ctx, opt, stm).await?;
self.check_data_fields(stk, ctx, opt, stm).await?;
self.check_where_condition(stk, ctx, opt, stm).await?;
self.check_permissions_table(stk, ctx, opt, stm).await?;
self.process_record_data(stk, ctx, opt, stm).await?;
self.process_table_fields(stk, ctx, opt, stm).await?;
self.cleanup_table_fields(stk, ctx, opt, stm).await?;
self.default_record_data(ctx, opt, stm).await?;
self.check_permissions_table(stk, ctx, opt, stm).await?;
self.store_record_data(ctx, opt, stm).await?;
self.store_index_data(stk, ctx, opt, stm).await?;
self.process_table_views(stk, ctx, opt, stm).await?;
self.process_table_lives(stk, ctx, opt, stm).await?;
self.process_table_events(stk, ctx, opt, stm).await?;
self.process_changefeeds(ctx, opt, stm).await?;
self.pluck(stk, ctx, opt, stm).await
}
}

View file

@ -14,33 +14,44 @@ impl Document {
opt: &Options,
stm: &Statement<'_>,
) -> Result<Value, Error> {
// Check where clause
self.check(stk, ctx, opt, stm).await?;
// Check if allowed
self.allow(stk, ctx, opt, stm).await?;
// Alter record data
self.alter(stk, ctx, opt, stm).await?;
// Merge fields data
self.field(stk, ctx, opt, stm).await?;
// Reset fields data
self.reset(ctx, opt, stm).await?;
// Clean fields data
self.clean(stk, ctx, opt, stm).await?;
// Check if allowed
self.allow(stk, ctx, opt, stm).await?;
// Store record data
self.store(ctx, opt, stm).await?;
// Store index data
self.index(stk, ctx, opt, stm).await?;
// Run table queries
self.table(stk, ctx, opt, stm).await?;
// Run lives queries
self.lives(stk, ctx, opt, stm).await?;
// Run change feeds queries
self.changefeeds(ctx, opt, stm).await?;
// Run event queries
self.event(stk, ctx, opt, stm).await?;
// Yield document
match self.upsert_process(stk, ctx, opt, stm).await {
// We attempted to INSERT a document with an ID,
// and this ID already exists in the database,
// so we need to UPDATE the record instead.
Err(Error::RecordExists {
thing,
}) => Err(Error::RetryWithId(thing)),
// If any other error was received, then let's
// pass that error through and return an error
Err(e) => Err(e),
// Otherwise the record creation succeeded
Ok(v) => Ok(v),
}
}
/// Attempt to run an UPSERT clause
async fn upsert_process(
&mut self,
stk: &mut Stk,
ctx: &Context,
opt: &Options,
stm: &Statement<'_>,
) -> Result<Value, Error> {
self.check_permissions_quick(stk, ctx, opt, stm).await?;
self.check_table_type(ctx, opt, stm).await?;
self.check_data_fields(stk, ctx, opt, stm).await?;
self.check_where_condition(stk, ctx, opt, stm).await?;
self.check_permissions_table(stk, ctx, opt, stm).await?;
self.process_record_data(stk, ctx, opt, stm).await?;
self.process_table_fields(stk, ctx, opt, stm).await?;
self.cleanup_table_fields(stk, ctx, opt, stm).await?;
self.default_record_data(ctx, opt, stm).await?;
self.check_permissions_table(stk, ctx, opt, stm).await?;
self.store_record_data(ctx, opt, stm).await?;
self.store_index_data(stk, ctx, opt, stm).await?;
self.process_table_views(stk, ctx, opt, stm).await?;
self.process_table_lives(stk, ctx, opt, stm).await?;
self.process_table_events(stk, ctx, opt, stm).await?;
self.process_changefeeds(ctx, opt, stm).await?;
self.pluck(stk, ctx, opt, stm).await
}
}

View file

@ -353,12 +353,6 @@ pub enum Error {
value: String,
},
// The cluster node already exists
#[error("The node '{value}' already exists")]
ClAlreadyExists {
value: String,
},
// The cluster node does not exist
#[error("The node '{value}' does not exist")]
NdNotFound {
@ -566,7 +560,7 @@ pub enum Error {
/// A database entry for the specified record already exists
#[error("Database record `{thing}` already exists")]
RecordExists {
thing: String,
thing: Thing,
},
/// A database index entry for the specified record already exists
@ -620,10 +614,11 @@ pub enum Error {
field: Idiom,
},
/// Found a record id for the record but we are creating a specific record
#[error("Found {value} for the id field, but a specific record has been specified")]
IdMismatch {
value: String,
/// The specified field on a SCHEMAFUL table was not defined
#[error("Found field '{field}', but no such field exists for table '{table}'")]
FieldUndefined {
table: String,
field: Idiom,
},
/// Found a record id for the record but this is not a valid id
@ -632,6 +627,36 @@ pub enum Error {
value: String,
},
/// Found a record id for the record but we are creating a specific record
#[error("Found {value} for the `id` field, but a specific record has been specified")]
IdMismatch {
value: String,
},
/// Found a record id for the record but we are creating a specific record
#[error("Found {value} for the `in` field, but the value does not match the `in` record id")]
InMismatch {
value: String,
},
/// Found a record id for the record but we are creating a specific record
#[error("Found {value} for the `in` field, which does not match the existing field value")]
InOverride {
value: String,
},
/// Found a record id for the record but we are creating a specific record
#[error("Found {value} for the `out` field, but the value does not match the `out` record id")]
OutMismatch {
value: String,
},
/// Found a record id for the record but we are creating a specific record
#[error("Found {value} for the `out` field, which does not match the existing field value")]
OutOverride {
value: String,
},
/// Unable to coerce to a value to another value
#[error("Expected a {into} but found {from}")]
CoerceTo {
@ -847,6 +872,12 @@ pub enum Error {
#[error("The db is running without an available storage engine")]
MissingStorageEngine,
// The cluster node already exists
#[error("The node '{value}' already exists")]
ClAlreadyExists {
value: String,
},
/// The requested analyzer already exists
#[error("The analyzer '{value}' already exists")]
AzAlreadyExists {

View file

@ -27,7 +27,7 @@ impl SavedValue {
}
}
#[cfg(any(feature = "kv-surrealkv", feature = "kv-fdb", feature = "kv-tikv"))]
#[cfg(any(feature = "kv-fdb", feature = "kv-tikv"))]
pub(super) fn get_val(&self) -> Option<&Val> {
self.saved_val.as_ref()
}

View file

@ -94,7 +94,7 @@ impl<'a> Stream for Scanner<'a, (Key, Val)> {
}
// Get the last element of the results
let last = v.last().ok_or_else(|| {
Error::Unreachable("Last key/val can't be none".to_string())
fail!("Expected the last key-value pair to not be none")
})?;
// Start the next scan from the last result
self.range.start.clone_from(&last.0);
@ -161,7 +161,7 @@ impl<'a> Stream for Scanner<'a, Key> {
}
// Get the last element of the results
let last = v.last().ok_or_else(|| {
Error::Unreachable("Last key can't be none".to_string())
fail!("Expected the last key-value pair to not be none")
})?;
// Start the next scan from the last result
self.range.start.clone_from(last);

View file

@ -33,6 +33,10 @@ impl Cast {
}
impl Cast {
/// Check if we require a writeable transaction
pub(crate) fn writeable(&self) -> bool {
self.1.writeable()
}
/// Was marked recursively
pub(crate) async fn compute(
&self,

View file

@ -4,6 +4,8 @@ use crate::err::Error;
use crate::sql::fmt::Fmt;
use crate::sql::idiom::Idiom;
use crate::sql::operator::Operator;
use crate::sql::part::Part;
use crate::sql::paths::ID;
use crate::sql::value::Value;
use reblessive::tree::Stk;
use revision::revisioned;
@ -40,32 +42,42 @@ impl Data {
stk: &mut Stk,
ctx: &Context,
opt: &Options,
) -> Result<Option<Value>, Error> {
self.pick(stk, ctx, opt, &*ID).await
}
/// Fetch a field path value if one is specified
pub(crate) async fn pick(
&self,
stk: &mut Stk,
ctx: &Context,
opt: &Options,
path: &[Part],
) -> Result<Option<Value>, Error> {
match self {
Self::MergeExpression(v) => match v {
Value::Param(v) => Ok(v.compute(stk, ctx, opt, None).await?.rid().some()),
Value::Object(_) => Ok(v.rid().compute(stk, ctx, opt, None).await?.some()),
Value::Param(v) => Ok(v.compute(stk, ctx, opt, None).await?.pick(path).some()),
Value::Object(_) => Ok(v.pick(path).compute(stk, ctx, opt, None).await?.some()),
_ => Ok(None),
},
Self::ReplaceExpression(v) => match v {
Value::Param(v) => Ok(v.compute(stk, ctx, opt, None).await?.rid().some()),
Value::Object(_) => Ok(v.rid().compute(stk, ctx, opt, None).await?.some()),
Value::Param(v) => Ok(v.compute(stk, ctx, opt, None).await?.pick(path).some()),
Value::Object(_) => Ok(v.pick(path).compute(stk, ctx, opt, None).await?.some()),
_ => Ok(None),
},
Self::ContentExpression(v) => match v {
Value::Param(v) => Ok(v.compute(stk, ctx, opt, None).await?.rid().some()),
Value::Object(_) => Ok(v.rid().compute(stk, ctx, opt, None).await?.some()),
Value::Param(v) => Ok(v.compute(stk, ctx, opt, None).await?.pick(path).some()),
Value::Object(_) => Ok(v.pick(path).compute(stk, ctx, opt, None).await?.some()),
_ => Ok(None),
},
Self::SetExpression(v) => match v.iter().find(|f| f.0.is_id()) {
Self::SetExpression(v) => match v.iter().find(|f| f.0.is_field(path)) {
Some((_, _, v)) => {
// This SET expression has an 'id' field
// This SET expression has this field
Ok(v.compute(stk, ctx, opt, None).await?.some())
}
// This SET expression had no 'id' field
// This SET expression does not have this field
_ => Ok(None),
},
// Generate a random id for all other data clauses
// Return nothing
_ => Ok(None),
}
}

View file

@ -192,6 +192,18 @@ impl Id {
pub fn uuid() -> Self {
Self::Uuid(Uuid::new_v7())
}
/// Check if this Id matches a value
pub fn is(&self, val: &Value) -> bool {
match (self, val) {
(Self::Number(i), Value::Number(Number::Int(j))) if *i == *j => true,
(Self::String(i), Value::Strand(j)) if *i == j.0 => true,
(Self::Uuid(i), Value::Uuid(j)) if i == j => true,
(Self::Array(i), Value::Array(j)) if i == j => true,
(Self::Object(i), Value::Object(j)) if i == j => true,
(i, Value::Thing(t)) if i == &t.id => true,
_ => false,
}
}
/// Convert the Id to a raw String
pub fn to_raw(&self) -> String {
match self {

View file

@ -137,6 +137,10 @@ impl Idiom {
pub(crate) fn is_meta(&self) -> bool {
self.0.len() == 1 && self.0[0].eq(&META[0])
}
/// Check if this Idiom is an specific field
pub(crate) fn is_field(&self, other: &[Part]) -> bool {
self.as_ref().eq(other)
}
/// Check if this is an expression with multiple yields
pub(crate) fn is_multi_yield(&self) -> bool {
self.iter().any(Self::split_multi_yield)

View file

@ -47,12 +47,12 @@ impl Default for Kind {
}
impl Kind {
// Returns true if this type is an `any`
/// Returns true if this type is an `any`
pub(crate) fn is_any(&self) -> bool {
matches!(self, Kind::Any)
}
// Returns true if this type is a record
/// Returns true if this type is a record
pub(crate) fn is_record(&self) -> bool {
matches!(self, Kind::Record(_))
}
@ -144,7 +144,7 @@ impl Kind {
}
}
// return the kind of the contained value.
// Return the kind of the contained value.
//
// For example: for `array<number>` or `set<number>` this returns `number`.
// For `array<number> | set<float>` this returns `number | float`.

View file

@ -1,3 +1,4 @@
use crate::cnf::REGEX_CACHE_SIZE;
use quick_cache::sync::{Cache, GuardResult};
use revision::revisioned;
use serde::{
@ -8,9 +9,9 @@ use std::cmp::Ordering;
use std::fmt::Debug;
use std::fmt::{self, Display, Formatter};
use std::hash::{Hash, Hasher};
use std::str;
use std::str::FromStr;
use std::sync::LazyLock;
use std::{env, str};
pub(crate) const TOKEN: &str = "$surrealdb::private::sql::Regex";
@ -27,12 +28,8 @@ impl Regex {
}
fn regex_new(str: &str) -> Result<regex::Regex, regex::Error> {
static REGEX_CACHE: LazyLock<Cache<String, regex::Regex>> = LazyLock::new(|| {
let cache_size: usize = env::var("SURREAL_REGEX_CACHE_SIZE")
.map_or(1000, |v| v.parse().unwrap_or(1000))
.max(10); // The minimum cache size is 10
Cache::new(cache_size)
});
static REGEX_CACHE: LazyLock<Cache<String, regex::Regex>> =
LazyLock::new(|| Cache::new(REGEX_CACHE_SIZE.max(10)));
match REGEX_CACHE.get_value_or_guard(str, None) {
GuardResult::Value(v) => Ok(v),
GuardResult::Guard(g) => {

View file

@ -23,7 +23,6 @@ use serde::{Deserialize, Serialize};
use std::{
fmt::{self, Display, Formatter, Write},
ops::Deref,
time::Duration,
};
#[revisioned(revision = 1)]
@ -101,19 +100,6 @@ pub enum Statement {
}
impl Statement {
/// Get the statement timeout duration, if any
pub fn timeout(&self) -> Option<Duration> {
match self {
Self::Create(v) => v.timeout.as_ref().map(|v| *v.0),
Self::Delete(v) => v.timeout.as_ref().map(|v| *v.0),
Self::Insert(v) => v.timeout.as_ref().map(|v| *v.0),
Self::Relate(v) => v.timeout.as_ref().map(|v| *v.0),
Self::Select(v) => v.timeout.as_ref().map(|v| *v.0),
Self::Upsert(v) => v.timeout.as_ref().map(|v| *v.0),
Self::Update(v) => v.timeout.as_ref().map(|v| *v.0),
_ => None,
}
}
/// Check if we require a writeable transaction
pub(crate) fn writeable(&self) -> bool {
match self {

View file

@ -67,7 +67,7 @@ impl CreateStatement {
// Loop over the create targets
for w in self.what.0.iter() {
let v = w.compute(stk, &ctx, opt, doc).await?;
i.prepare(stk, &ctx, opt, &stm, v).await.map_err(|e| match e {
i.prepare(&stm, v).map_err(|e| match e {
Error::InvalidStatementTarget {
value: v,
} => Error::CreateStatement {

View file

@ -56,7 +56,7 @@ impl DeleteStatement {
// Loop over the delete targets
for w in self.what.0.iter() {
let v = w.compute(stk, &ctx, opt, doc).await?;
i.prepare(stk, &ctx, opt, &stm, v).await.map_err(|e| match e {
i.prepare(&stm, v).map_err(|e| match e {
Error::InvalidStatementTarget {
value: v,
} => Error::DeleteStatement {

View file

@ -165,7 +165,7 @@ fn iterable(id: Thing, v: Value, relation: bool) -> Result<Iterable, Error> {
match relation {
false => Ok(Iterable::Mergeable(id, v)),
true => {
let _in = match v.pick(&*IN) {
let f = match v.pick(&*IN) {
Value::Thing(v) => v,
v => {
return Err(Error::InsertStatementIn {
@ -173,7 +173,7 @@ fn iterable(id: Thing, v: Value, relation: bool) -> Result<Iterable, Error> {
})
}
};
let out = match v.pick(&*OUT) {
let w = match v.pick(&*OUT) {
Value::Thing(v) => v,
v => {
return Err(Error::InsertStatementOut {
@ -181,7 +181,7 @@ fn iterable(id: Thing, v: Value, relation: bool) -> Result<Iterable, Error> {
})
}
};
Ok(Iterable::Relatable(_in, id, out, Some(v)))
Ok(Iterable::Relatable(f, id, w, Some(v)))
}
}
}

View file

@ -2,10 +2,10 @@ use crate::ctx::{Context, MutableContext};
use crate::dbs::{Iterable, Iterator, Options, Statement};
use crate::doc::CursorDoc;
use crate::err::Error;
use crate::idx::planner::QueryPlanner;
use crate::idx::planner::{QueryPlanner, QueryPlannerParams};
use crate::sql::{
Cond, Explain, Fetchs, Field, Fields, Groups, Id, Idioms, Limit, Orders, Splits, Start,
Timeout, Value, Values, Version, With,
Cond, Explain, Fetchs, Field, Fields, Groups, Idioms, Limit, Orders, Splits, Start, Timeout,
Value, Values, Version, With,
};
use derive::Store;
use reblessive::tree::Stk;
@ -100,56 +100,49 @@ impl SelectStatement {
};
// Get a query planner
let mut planner = QueryPlanner::new();
let params = self.into();
let params: QueryPlannerParams<'_> = self.into();
let keys = params.is_keys_only();
// Loop over the select targets
for w in self.what.0.iter() {
let v = w.compute(stk, &ctx, &opt, doc).await?;
match v {
Value::Thing(v) => match v.is_range() {
true => i.prepare_range(&stm, v, keys)?,
false => i.prepare_thing(&stm, v)?,
},
Value::Edges(v) => {
if self.only && !limit_is_one_or_zero {
return Err(Error::SingleOnlyOutput);
}
i.prepare_edges(&stm, *v)?;
}
Value::Mock(v) => {
if self.only && !limit_is_one_or_zero {
return Err(Error::SingleOnlyOutput);
}
i.prepare_mock(&stm, v)?;
}
Value::Table(t) => {
if self.only && !limit_is_one_or_zero {
return Err(Error::SingleOnlyOutput);
}
planner.add_iterables(stk, &ctx, &opt, t, &params, &mut i).await?;
}
Value::Thing(v) => match &v.id {
Id::Range(r) => {
i.ingest(Iterable::TableRange(v.tb, *r.to_owned(), params.is_keys_only()))
}
_ => i.ingest(Iterable::Thing(v)),
},
Value::Edges(v) => {
if self.only && !limit_is_one_or_zero {
return Err(Error::SingleOnlyOutput);
}
i.ingest(Iterable::Edges(*v))
}
Value::Mock(v) => {
if self.only && !limit_is_one_or_zero {
return Err(Error::SingleOnlyOutput);
}
for v in v {
i.ingest(Iterable::Thing(v));
}
}
Value::Array(v) => {
if self.only && !limit_is_one_or_zero {
return Err(Error::SingleOnlyOutput);
}
for v in v {
match v {
Value::Table(t) => {
planner.add_iterables(stk, &ctx, &opt, t, &params, &mut i).await?;
}
Value::Thing(v) => i.ingest(Iterable::Thing(v)),
Value::Edges(v) => i.ingest(Iterable::Edges(*v)),
Value::Mock(v) => {
for v in v {
i.ingest(Iterable::Thing(v));
}
}
Value::Mock(v) => i.prepare_mock(&stm, v)?,
Value::Edges(v) => i.prepare_edges(&stm, *v)?,
Value::Thing(v) => match v.is_range() {
true => i.prepare_range(&stm, v, keys)?,
false => i.prepare_thing(&stm, v)?,
},
_ => i.ingest(Iterable::Value(v)),
}
}

View file

@ -48,7 +48,6 @@ impl SetStatement {
}
// The user tried to set a protected variable
true => Err(Error::InvalidParam {
// Move the parameter name, as we no longer need it
name: self.name.to_owned(),
}),
}

View file

@ -57,7 +57,7 @@ impl UpdateStatement {
// Loop over the update targets
for w in self.what.0.iter() {
let v = w.compute(stk, &ctx, opt, doc).await?;
i.prepare(stk, &ctx, opt, &stm, v).await.map_err(|e| match e {
i.prepare(&stm, v).map_err(|e| match e {
Error::InvalidStatementTarget {
value: v,
} => Error::UpdateStatement {

View file

@ -56,7 +56,7 @@ impl UpsertStatement {
// Loop over the upsert targets
for w in self.what.0.iter() {
let v = w.compute(stk, &ctx, opt, doc).await?;
i.prepare(stk, &ctx, opt, &stm, v).await.map_err(|e| match e {
i.prepare(&stm, v).map_err(|e| match e {
Error::InvalidStatementTarget {
value: v,
} => Error::UpsertStatement {

View file

@ -76,6 +76,21 @@ impl Subquery {
ctx: &Context,
opt: &Options,
doc: Option<&CursorDoc>,
) -> Result<Value, Error> {
match self.compute_unbordered(stk, ctx, opt, doc).await {
Err(Error::Return {
value,
}) => Ok(value),
res => res,
}
}
/// Process this type returning a computed simple Value, without catching errors
pub(crate) async fn compute_unbordered(
&self,
stk: &mut Stk,
ctx: &Context,
opt: &Options,
doc: Option<&CursorDoc>,
) -> Result<Value, Error> {
// Duplicate context
let mut ctx = MutableContext::new(ctx);

View file

@ -216,7 +216,11 @@ impl Thing {
pub fn to_raw(&self) -> String {
self.to_string()
}
/// Check if this Thing is a range
pub fn is_range(&self) -> bool {
matches!(self.id, Id::Range(_))
}
/// Check if this Thing is of a certain table type
pub fn is_record_type(&self, types: &[Table]) -> bool {
types.is_empty() || types.iter().any(|tb| tb.0 == self.tb)
}

View file

@ -4,6 +4,6 @@ use crate::sql::value::Value;
impl Value {
pub(crate) fn def(&mut self, val: &Thing) {
self.put(ID.as_ref(), val.clone().into())
self.put(&*ID, val.clone().into())
}
}

View file

@ -2909,6 +2909,7 @@ impl Value {
/// Check if we require a writeable transaction
pub(crate) fn writeable(&self) -> bool {
match self {
Value::Cast(v) => v.writeable(),
Value::Block(v) => v.writeable(),
Value::Idiom(v) => v.writeable(),
Value::Array(v) => v.iter().any(Value::writeable),
@ -2923,8 +2924,21 @@ impl Value {
}
}
/// Process this type returning a computed simple Value
///
/// Is used recursively.
pub(crate) async fn compute(
&self,
stk: &mut Stk,
ctx: &Context,
opt: &Options,
doc: Option<&CursorDoc>,
) -> Result<Value, Error> {
match self.compute_unbordered(stk, ctx, opt, doc).await {
Err(Error::Return {
value,
}) => Ok(value),
res => res,
}
}
/// Process this type returning a computed simple Value, without catching errors
pub(crate) async fn compute_unbordered(
&self,
stk: &mut Stk,
@ -2953,21 +2967,6 @@ impl Value {
_ => Ok(self.to_owned()),
}
}
pub(crate) async fn compute(
&self,
stk: &mut Stk,
ctx: &Context,
opt: &Options,
doc: Option<&CursorDoc>,
) -> Result<Value, Error> {
match self.compute_unbordered(stk, ctx, opt, doc).await {
Err(Error::Return {
value,
}) => Ok(value),
res => res,
}
}
}
// ------------------------------

View file

@ -527,15 +527,15 @@ async fn insert_thing() {
let table = "user";
let _: Option<ApiRecordId> = db.insert((table, "user1")).await.unwrap();
let _: Option<ApiRecordId> =
db.insert((table, "user1")).content(json!({ "foo": "bar" })).await.unwrap();
let _: Value = db.insert(Resource::from((table, "user2"))).await.unwrap();
db.insert((table, "user2")).content(json!({ "foo": "bar" })).await.unwrap();
let _: Value = db.insert(Resource::from((table, "user3"))).await.unwrap();
let _: Value =
db.insert(Resource::from((table, "user2"))).content(json!({ "foo": "bar" })).await.unwrap();
let user: Option<ApiRecordId> = db.insert((table, "user3")).await.unwrap();
db.insert(Resource::from((table, "user4"))).content(json!({ "foo": "bar" })).await.unwrap();
let user: Option<ApiRecordId> = db.insert((table, "user5")).await.unwrap();
assert_eq!(
user,
Some(ApiRecordId {
id: "user:user3".parse().unwrap(),
id: "user:user5".parse().unwrap(),
})
);
}
@ -586,104 +586,16 @@ async fn insert_relation_table() {
let val = "{in: person:a, out: thing:a}".parse::<Value>().unwrap();
let _: Vec<ApiRecordId> = db.insert("likes").relation(val).await.unwrap();
let vals =
"[{in: person:b, out: thing:a}, {id: likes:2, in: person:a, out: thing:a}, {id: hates:3, in: person:a, out: thing:a}]"
.parse::<Value>()
let vals = r#"[
{ in: person:b, out: thing:a },
{ id: likes:2, in: person:c, out: thing:a },
{ id: hates:3, in: person:d, out: thing:a },
]"#
.parse::<Value>()
.unwrap();
let _: Vec<ApiRecordId> = db.insert("likes").relation(vals).await.unwrap();
}
#[tokio::test]
async fn insert_with_savepoint() -> Result<(), surrealdb_core::err::Error> {
let (permit, db) = new_db().await;
db.use_ns(NS).use_db(Ulid::new().to_string()).await.unwrap();
drop(permit);
let sqls = vec![
("DEFINE INDEX a ON pokemon FIELDS a UNIQUE", "None"),
("DEFINE INDEX b ON pokemon FIELDS b UNIQUE", "None"),
(
"INSERT INTO pokemon (id, b) VALUES (1, 'b')",
"[
{
b: 'b',
id: pokemon:1
}
]"
),
(
"INSERT INTO pokemon (id, a, b) VALUES (2, 'a', 'b')",
"[
{
b: 'b',
id: pokemon:1
}
]"
),
(
"INSERT INTO pokemon (id, a, b) VALUES (2, 'a', 'b')",
"[
{
b: 'b',
id: pokemon:1
}
]"
),
(
"INSERT INTO pokemon (id, a, b) VALUES (2, 'a', 'b') PARALLEL",
"[
{
b: 'b',
id: pokemon:1
}
]"
),
(
"INSERT INTO pokemon (id, a, b) VALUES (2, 'a', 'b') PARALLEL",
"[
{
b: 'b',
id: pokemon:1
}
]"
),
(
"INSERT INTO pokemon (id, a, b) VALUES (2, 'a', 'b') ON DUPLICATE KEY UPDATE something = 'else'",
"[
{
b: 'b',
id: pokemon:1,
something: 'else'
}
]"
),
(
"SELECT * FROM pokemon;",
"[
{
b: 'b',
id: pokemon:1,
something: 'else'
}
]"
)
];
let check_fetch = |mut response: Response, expected: &str| {
let val: Value = response.take(0).unwrap();
let exp: Value = expected.parse().unwrap();
assert_eq!(format!("{val:#}"), format!("{exp:#}"));
};
for (sql, expected) in sqls {
let res = db.query(sql).await.unwrap().check().unwrap();
check_fetch(res, expected);
}
Ok(())
}
#[test_log::test(tokio::test)]
async fn select_table() {
let (permit, db) = new_db().await;

View file

@ -113,7 +113,8 @@ fn ok_future_graph_subquery_recursion_depth() -> Result<(), Error> {
fn ok_graph_traversal_depth() -> Result<(), Error> {
// Build the SQL traversal query
fn graph_traversal(n: usize) -> String {
let mut ret = String::from("CREATE node:0;\n");
let mut ret = String::from("DELETE node;\n");
ret.push_str("CREATE node:0;\n");
for i in 1..=n {
let prev = i - 1;
ret.push_str(&format!("CREATE node:{i};\n"));

View file

@ -169,13 +169,13 @@ async fn create_with_id() -> Result<(), Error> {
let tmp = res.remove(0).result;
assert!(matches!(
tmp.err(),
Some(e) if e.to_string() == r#"Found 'tobie' for the id field, but a specific record has been specified"#
Some(e) if e.to_string() == r#"Found 'tobie' for the `id` field, but a specific record has been specified"#
));
//
let tmp = res.remove(0).result;
assert!(matches!(
tmp.err(),
Some(e) if e.to_string() == r#"Found 'tobie' for the id field, but a specific record has been specified"#
Some(e) if e.to_string() == r#"Found 'tobie' for the `id` field, but a specific record has been specified"#
));
//
Ok(())

View file

@ -149,40 +149,66 @@ async fn insert_statement_on_duplicate_key() -> Result<(), Error> {
#[tokio::test]
async fn insert_with_savepoint() -> Result<(), Error> {
let sql = "
DEFINE INDEX a ON pokemon FIELDS a UNIQUE;
DEFINE INDEX b ON pokemon FIELDS b UNIQUE;
INSERT INTO pokemon (id, b) VALUES (1, 'b');
INSERT INTO pokemon (id, a, b) VALUES (2, 'a', 'b');
INSERT INTO pokemon (id, a, b) VALUES (2, 'a', 'b');
INSERT INTO pokemon (id, a, b) VALUES (2, 'a', 'b') PARALLEL;
INSERT INTO pokemon (id, a, b) VALUES (2, 'a', 'b') PARALLEL;
INSERT INTO pokemon (id, a, b) VALUES (2, 'a', 'b') ON DUPLICATE KEY UPDATE something = 'else';
DEFINE INDEX one ON pokemon FIELDS one UNIQUE;
DEFINE INDEX two ON pokemon FIELDS two UNIQUE;
-- This will INSERT a record with a specific id
INSERT INTO pokemon (id, two) VALUES (1, 'two');
-- This will INSERT a record with a random id
INSERT INTO pokemon (id, one) VALUES ('test', 'one');
-- This will fail, because a UNIQUE index value already exists
INSERT INTO pokemon (two) VALUES ('two');
-- This will fail, because a UNIQUE index value already exists
INSERT INTO pokemon (id, one, two) VALUES (2, 'one', 'two');
-- This will fail, because we are specifying a specific id even though we also have an ON DUPLICATE KEY UPDATE clause
INSERT INTO pokemon (id, one, two) VALUES (2, 'one', 'two') ON DUPLICATE KEY UPDATE two = 'changed';
-- This will succeed, because we are not specifying a specific id and we also have an ON DUPLICATE KEY UPDATE clause
INSERT INTO pokemon (one, two) VALUES ('one', 'two') ON DUPLICATE KEY UPDATE two = 'changed';
SELECT * FROM pokemon;
";
let mut t = Test::new(sql).await?;
t.expect_size(9)?;
t.skip_ok(2)?;
for _ in 0..5 {
t.expect_val(
"[
{
b: 'b',
id: pokemon:1,
}
]",
)?;
}
for _ in 0..2 {
t.expect_val(
"[
{
b: 'b',
id: pokemon:1,
something: 'else'
}
]",
)?;
}
t.expect_val(
"[
{
id: pokemon:1,
two: 'two'
}
]",
)?;
t.expect_val(
"[
{
id: pokemon:test,
one: 'one'
}
]",
)?;
t.expect_error("Database index `two` already contains 'two', with record `pokemon:1`")?;
t.expect_error("Database index `one` already contains 'one', with record `pokemon:test`")?;
t.expect_error("Database index `one` already contains 'one', with record `pokemon:test`")?;
t.expect_val(
"[
{
id: pokemon:test,
one: 'one',
two: 'changed'
}
]",
)?;
t.expect_val(
"[
{
id: pokemon:1,
two: 'two'
},
{
id: pokemon:test,
one: 'one',
two: 'changed'
}
]",
)?;
Ok(())
}
@ -550,7 +576,14 @@ async fn insert_statement_unique_index() -> Result<(), Error> {
assert!(tmp.is_ok());
//
let tmp = res.remove(0).result;
assert!(tmp.is_ok());
match tmp {
Err(Error::IndexExists {
index,
value,
..
}) if index.eq("name") && value.eq("'SurrealDB'") => (),
found => panic!("Expected Err(Error::IndexExists), found '{:?}'", found),
}
//
let tmp = res.remove(0).result?;
let val = Value::parse("[ { count: 1 } ]");