diff --git a/core/src/cnf/mod.rs b/core/src/cnf/mod.rs index d36ae5ff..2c51b1d5 100644 --- a/core/src/cnf/mod.rs +++ b/core/src/cnf/mod.rs @@ -31,10 +31,14 @@ pub static NORMAL_FETCH_SIZE: Lazy = lazy_env_parse!("SURREAL_NORMAL_FETCH_ /// The maximum number of keys that should be scanned at once for export queries. pub static EXPORT_BATCH_SIZE: Lazy = lazy_env_parse!("SURREAL_EXPORT_BATCH_SIZE", u32, 1000); -/// The maximum number of keys that should be fetched when streaming range scanns in a Scanner. +/// The maximum number of keys that should be fetched when streaming range scans in a Scanner. pub static MAX_STREAM_BATCH_SIZE: Lazy = lazy_env_parse!("SURREAL_MAX_STREAM_BATCH_SIZE", u32, 1000); +/// The maximum number of keys that should be scanned at once per concurrent indexing batch. +pub static INDEXING_BATCH_SIZE: Lazy = + lazy_env_parse!("SURREAL_INDEXING_BATCH_SIZE", u32, 250); + /// Forward all signup/signin/authenticate query errors to a client performing authentication. Do not use in production. pub static INSECURE_FORWARD_ACCESS_ERRORS: Lazy = lazy_env_parse!("SURREAL_INSECURE_FORWARD_ACCESS_ERRORS", bool, false); diff --git a/core/src/ctx/context.rs b/core/src/ctx/context.rs index f2720cb0..568f2d42 100644 --- a/core/src/ctx/context.rs +++ b/core/src/ctx/context.rs @@ -8,6 +8,8 @@ use crate::err::Error; use crate::idx::planner::executor::QueryExecutor; use crate::idx::planner::{IterationStage, QueryPlanner}; use crate::idx::trees::store::IndexStores; +#[cfg(not(target_arch = "wasm32"))] +use crate::kvs::IndexBuilder; use crate::kvs::Transaction; use crate::sql::value::Value; use channel::Sender; @@ -63,6 +65,9 @@ pub struct MutableContext { iteration_stage: Option, // The index store index_stores: IndexStores, + // The index concurrent builders + #[cfg(not(target_arch = "wasm32"))] + index_builder: Option, // Capabilities capabilities: Arc, #[cfg(any( @@ -110,6 +115,7 @@ impl MutableContext { time_out: Option, capabilities: Capabilities, index_stores: IndexStores, + #[cfg(not(target_arch = "wasm32"))] index_builder: IndexBuilder, #[cfg(any( feature = "kv-mem", feature = "kv-surrealkv", @@ -130,6 +136,8 @@ impl MutableContext { iteration_stage: None, capabilities: Arc::new(capabilities), index_stores, + #[cfg(not(target_arch = "wasm32"))] + index_builder: Some(index_builder), #[cfg(any( feature = "kv-mem", feature = "kv-surrealkv", @@ -159,6 +167,8 @@ impl MutableContext { iteration_stage: None, capabilities: Arc::new(Capabilities::default()), index_stores: IndexStores::default(), + #[cfg(not(target_arch = "wasm32"))] + index_builder: None, #[cfg(any( feature = "kv-mem", feature = "kv-surrealkv", @@ -184,6 +194,8 @@ impl MutableContext { iteration_stage: parent.iteration_stage.clone(), capabilities: parent.capabilities.clone(), index_stores: parent.index_stores.clone(), + #[cfg(not(target_arch = "wasm32"))] + index_builder: parent.index_builder.clone(), #[cfg(any( feature = "kv-mem", feature = "kv-surrealkv", @@ -220,6 +232,8 @@ impl MutableContext { iteration_stage: parent.iteration_stage.clone(), capabilities: parent.capabilities.clone(), index_stores: parent.index_stores.clone(), + #[cfg(not(target_arch = "wasm32"))] + index_builder: parent.index_builder.clone(), #[cfg(any( feature = "kv-mem", feature = "kv-surrealkv", @@ -234,6 +248,34 @@ impl MutableContext { } } + /// Create a new child from a frozen context. + pub fn new_concurrent(from: &Context) -> Self { + Self { + values: HashMap::default(), + deadline: None, + cancelled: Arc::new(AtomicBool::new(false)), + notifications: from.notifications.clone(), + query_planner: from.query_planner.clone(), + query_executor: from.query_executor.clone(), + iteration_stage: from.iteration_stage.clone(), + capabilities: from.capabilities.clone(), + index_stores: from.index_stores.clone(), + #[cfg(not(target_arch = "wasm32"))] + index_builder: from.index_builder.clone(), + #[cfg(any( + feature = "kv-mem", + feature = "kv-surrealkv", + feature = "kv-rocksdb", + feature = "kv-fdb", + feature = "kv-tikv", + ))] + temporary_directory: from.temporary_directory.clone(), + transaction: None, + isolated: false, + parent: None, + } + } + /// Add a value to the context. It overwrites any previously set values /// with the same key. pub fn add_value(&mut self, key: K, value: Arc) @@ -327,6 +369,12 @@ impl MutableContext { &self.index_stores } + /// Get the index_builder for this context/ds + #[cfg(not(target_arch = "wasm32"))] + pub(crate) fn get_index_builder(&self) -> Option<&IndexBuilder> { + self.index_builder.as_ref() + } + /// Check if the context is done. If it returns `None` the operation may /// proceed, otherwise the operation should be stopped. pub fn done(&self) -> Option { diff --git a/core/src/doc/index.rs b/core/src/doc/index.rs index 58928eef..1c549919 100644 --- a/core/src/doc/index.rs +++ b/core/src/doc/index.rs @@ -7,6 +7,8 @@ use crate::idx::ft::FtIndex; use crate::idx::trees::mtree::MTreeIndex; use crate::idx::IndexKeyBase; use crate::key; +#[cfg(not(target_arch = "wasm32"))] +use crate::kvs::ConsumeResult; use crate::kvs::TransactionType; use crate::sql::array::Array; use crate::sql::index::{HnswParams, Index, MTreeParams, SearchParams}; @@ -46,51 +48,77 @@ impl Document { // Loop through all index statements for ix in ixs.iter() { // Calculate old values - let o = build_opt_values(stk, ctx, opt, ix, &self.initial).await?; + let o = Self::build_opt_values(stk, ctx, opt, ix, &self.initial).await?; // Calculate new values - let n = build_opt_values(stk, ctx, opt, ix, &self.current).await?; + let n = Self::build_opt_values(stk, ctx, opt, ix, &self.current).await?; // Update the index entries if targeted_force || o != n { - // Store all the variable and parameters required by the index operation - let mut ic = IndexOperation::new(opt, ix, o, n, rid); - - // Index operation dispatching - match &ix.index { - Index::Uniq => ic.index_unique(ctx).await?, - Index::Idx => ic.index_non_unique(ctx).await?, - Index::Search(p) => ic.index_full_text(stk, ctx, p).await?, - Index::MTree(p) => ic.index_mtree(stk, ctx, p).await?, - Index::Hnsw(p) => ic.index_hnsw(ctx, p).await?, - }; + Self::one_index(stk, ctx, opt, ix, o, n, rid).await?; } } // Carry on Ok(()) } -} -/// Extract from the given document, the values required by the index and put then in an array. -/// Eg. IF the index is composed of the columns `name` and `instrument` -/// Given this doc: { "id": 1, "instrument":"piano", "name":"Tobie" } -/// It will return: ["Tobie", "piano"] -async fn build_opt_values( - stk: &mut Stk, - ctx: &Context, - opt: &Options, - ix: &DefineIndexStatement, - doc: &CursorDoc, -) -> Result>, Error> { - if !doc.doc.as_ref().is_some() { - return Ok(None); + async fn one_index( + stk: &mut Stk, + ctx: &Context, + opt: &Options, + ix: &DefineIndexStatement, + o: Option>, + n: Option>, + rid: &Thing, + ) -> Result<(), Error> { + #[cfg(not(target_arch = "wasm32"))] + let (o, n) = if let Some(ib) = ctx.get_index_builder() { + match ib.consume(ix, o, n, rid).await? { + // The index builder consumed the value, which means it is currently building the index asynchronously, + // we don't index the document and let the index builder do it later. + ConsumeResult::Enqueued => return Ok(()), + // The index builder is done, the index has been built, we can proceed normally + ConsumeResult::Ignored(o, n) => (o, n), + } + } else { + (o, n) + }; + + // Store all the variable and parameters required by the index operation + let mut ic = IndexOperation::new(opt, ix, o, n, rid); + + // Index operation dispatching + match &ix.index { + Index::Uniq => ic.index_unique(ctx).await?, + Index::Idx => ic.index_non_unique(ctx).await?, + Index::Search(p) => ic.index_full_text(stk, ctx, p).await?, + Index::MTree(p) => ic.index_mtree(stk, ctx, p).await?, + Index::Hnsw(p) => ic.index_hnsw(ctx, p).await?, + } + Ok(()) } - let mut o = Vec::with_capacity(ix.cols.len()); - for i in ix.cols.iter() { - let v = i.compute(stk, ctx, opt, Some(doc)).await?; - o.push(v); + + /// Extract from the given document, the values required by the index and put then in an array. + /// Eg. IF the index is composed of the columns `name` and `instrument` + /// Given this doc: { "id": 1, "instrument":"piano", "name":"Tobie" } + /// It will return: ["Tobie", "piano"] + pub(crate) async fn build_opt_values( + stk: &mut Stk, + ctx: &Context, + opt: &Options, + ix: &DefineIndexStatement, + doc: &CursorDoc, + ) -> Result>, Error> { + if !doc.doc.as_ref().is_some() { + return Ok(None); + } + let mut o = Vec::with_capacity(ix.cols.len()); + for i in ix.cols.iter() { + let v = i.compute(stk, ctx, opt, Some(doc)).await?; + o.push(v); + } + Ok(Some(o)) } - Ok(Some(o)) } /// Extract from the given document, the values required by the index and put then in an array. diff --git a/core/src/err/mod.rs b/core/src/err/mod.rs index 6f876faa..9c46a278 100644 --- a/core/src/err/mod.rs +++ b/core/src/err/mod.rs @@ -933,6 +933,12 @@ pub enum Error { db: String, }, + /// A database index entry for the specified table is already building + #[error("Database index `{index}` is currently building")] + IndexAlreadyBuilding { + index: String, + }, + /// The session has expired either because the token used /// to establish it has expired or because an expiration /// was explicitly defined when establishing it diff --git a/core/src/idx/index.rs b/core/src/idx/index.rs new file mode 100644 index 00000000..87215a95 --- /dev/null +++ b/core/src/idx/index.rs @@ -0,0 +1,337 @@ +use crate::ctx::Context; +use crate::dbs::Options; +use crate::err::Error; +use crate::idx::ft::FtIndex; +use crate::idx::trees::mtree::MTreeIndex; +use crate::idx::IndexKeyBase; +use crate::key; +use crate::kvs::TransactionType; +use crate::sql::index::{HnswParams, MTreeParams, SearchParams}; +use crate::sql::statements::DefineIndexStatement; +use crate::sql::{Array, Index, Part, Thing, Value}; +use reblessive::tree::Stk; + +pub(crate) struct IndexOperation<'a> { + ctx: &'a Context, + opt: &'a Options, + ix: &'a DefineIndexStatement, + /// The old values (if existing) + o: Option>, + /// The new values (if existing) + n: Option>, + rid: &'a Thing, +} + +impl<'a> IndexOperation<'a> { + pub(crate) fn new( + ctx: &'a Context, + opt: &'a Options, + ix: &'a DefineIndexStatement, + o: Option>, + n: Option>, + rid: &'a Thing, + ) -> Self { + Self { + ctx, + opt, + ix, + o, + n, + rid, + } + } + + pub(crate) async fn compute(&mut self, stk: &mut Stk) -> Result<(), Error> { + // Index operation dispatching + match &self.ix.index { + Index::Uniq => self.index_unique().await, + Index::Idx => self.index_non_unique().await, + Index::Search(p) => self.index_full_text(stk, p).await, + Index::MTree(p) => self.index_mtree(stk, p).await, + Index::Hnsw(p) => self.index_hnsw(p).await, + } + } + + fn get_unique_index_key(&self, v: &'a Array) -> Result { + Ok(key::index::Index::new( + self.opt.ns()?, + self.opt.db()?, + &self.ix.what, + &self.ix.name, + v, + None, + )) + } + + fn get_non_unique_index_key(&self, v: &'a Array) -> Result { + Ok(key::index::Index::new( + self.opt.ns()?, + self.opt.db()?, + &self.ix.what, + &self.ix.name, + v, + Some(&self.rid.id), + )) + } + + async fn index_unique(&mut self) -> Result<(), Error> { + // Lock the transaction + let tx = self.ctx.tx(); + let mut txn = tx.lock().await; + // Delete the old index data + if let Some(o) = self.o.take() { + let i = Indexable::new(o, self.ix); + for o in i { + let key = self.get_unique_index_key(&o)?; + match txn.delc(key, Some(self.rid)).await { + Err(Error::TxConditionNotMet) => Ok(()), + Err(e) => Err(e), + Ok(v) => Ok(v), + }? + } + } + // Create the new index data + if let Some(n) = self.n.take() { + let i = Indexable::new(n, self.ix); + for n in i { + if !n.is_all_none_or_null() { + let key = self.get_unique_index_key(&n)?; + if txn.putc(key, self.rid, None).await.is_err() { + let key = self.get_unique_index_key(&n)?; + let val = txn.get(key, None).await?.unwrap(); + let rid: Thing = val.into(); + return self.err_index_exists(rid, n); + } + } + } + } + Ok(()) + } + + async fn index_non_unique(&mut self) -> Result<(), Error> { + // Lock the transaction + let tx = self.ctx.tx(); + let mut txn = tx.lock().await; + // Delete the old index data + if let Some(o) = self.o.take() { + let i = Indexable::new(o, self.ix); + for o in i { + let key = self.get_non_unique_index_key(&o)?; + match txn.delc(key, Some(self.rid)).await { + Err(Error::TxConditionNotMet) => Ok(()), + Err(e) => Err(e), + Ok(v) => Ok(v), + }? + } + } + // Create the new index data + if let Some(n) = self.n.take() { + let i = Indexable::new(n, self.ix); + for n in i { + let key = self.get_non_unique_index_key(&n)?; + if txn.putc(key, self.rid, None).await.is_err() { + let key = self.get_non_unique_index_key(&n)?; + let val = txn.get(key, None).await?.unwrap(); + let rid: Thing = val.into(); + return self.err_index_exists(rid, n); + } + } + } + Ok(()) + } + + fn err_index_exists(&self, rid: Thing, n: Array) -> Result<(), Error> { + Err(Error::IndexExists { + thing: rid, + index: self.ix.name.to_string(), + value: match n.len() { + 1 => n.first().unwrap().to_string(), + _ => n.to_string(), + }, + }) + } + + async fn index_full_text(&mut self, stk: &mut Stk, p: &SearchParams) -> Result<(), Error> { + let ikb = IndexKeyBase::new(self.opt.ns()?, self.opt.db()?, self.ix)?; + + let mut ft = + FtIndex::new(self.ctx, self.opt, &p.az, ikb, p, TransactionType::Write).await?; + + if let Some(n) = self.n.take() { + ft.index_document(stk, self.ctx, self.opt, self.rid, n).await?; + } else { + ft.remove_document(self.ctx, self.rid).await?; + } + ft.finish(self.ctx).await + } + + async fn index_mtree(&mut self, stk: &mut Stk, p: &MTreeParams) -> Result<(), Error> { + let txn = self.ctx.tx(); + let ikb = IndexKeyBase::new(self.opt.ns()?, self.opt.db()?, self.ix)?; + let mut mt = + MTreeIndex::new(self.ctx.get_index_stores(), &txn, ikb, p, TransactionType::Write) + .await?; + // Delete the old index data + if let Some(o) = self.o.take() { + mt.remove_document(stk, &txn, self.rid, &o).await?; + } + // Create the new index data + if let Some(n) = self.n.take() { + mt.index_document(stk, &txn, self.rid, &n).await?; + } + mt.finish(&txn).await + } + + async fn index_hnsw(&mut self, p: &HnswParams) -> Result<(), Error> { + let hnsw = self.ctx.get_index_stores().get_index_hnsw(self.opt, self.ix, p).await?; + let mut hnsw = hnsw.write().await; + // Delete the old index data + if let Some(o) = self.o.take() { + hnsw.remove_document(self.rid, &o)?; + } + // Create the new index data + if let Some(n) = self.n.take() { + hnsw.index_document(self.rid, &n)?; + } + Ok(()) + } +} + +/// Extract from the given document, the values required by the index and put then in an array. +/// Eg. IF the index is composed of the columns `name` and `instrument` +/// Given this doc: { "id": 1, "instrument":"piano", "name":"Tobie" } +/// It will return: ["Tobie", "piano"] +struct Indexable(Vec<(Value, bool)>); + +impl Indexable { + fn new(vals: Vec, ix: &DefineIndexStatement) -> Self { + let mut source = Vec::with_capacity(vals.len()); + for (v, i) in vals.into_iter().zip(ix.cols.0.iter()) { + let f = matches!(i.0.last(), Some(&Part::Flatten)); + source.push((v, f)); + } + Self(source) + } +} + +impl IntoIterator for Indexable { + type Item = Array; + type IntoIter = Combinator; + + fn into_iter(self) -> Self::IntoIter { + Combinator::new(self.0) + } +} + +struct Combinator { + iterators: Vec>, + has_next: bool, +} + +impl Combinator { + fn new(source: Vec<(Value, bool)>) -> Self { + let mut iterators: Vec> = Vec::new(); + // We create an iterator for each idiom + for (v, f) in source { + if !f { + // Iterator for not flattened values + if let Value::Array(v) = v { + iterators.push(Box::new(MultiValuesIterator::new(v.0))); + continue; + } + } + iterators.push(Box::new(SingleValueIterator(v))); + } + Self { + iterators, + has_next: true, + } + } +} + +impl Iterator for Combinator { + type Item = Array; + + fn next(&mut self) -> Option { + if !self.has_next { + return None; + } + let mut o = Vec::with_capacity(self.iterators.len()); + // Create the combination and advance to the next + self.has_next = false; + for i in &mut self.iterators { + o.push(i.current().clone()); + if !self.has_next { + // We advance only one iterator per iteration + if i.next() { + self.has_next = true; + } + } + } + let o = Array::from(o); + Some(o) + } +} + +trait ValuesIterator: Send { + fn next(&mut self) -> bool; + fn current(&self) -> &Value; +} + +struct MultiValuesIterator { + vals: Vec, + done: bool, + current: usize, + end: usize, +} + +impl MultiValuesIterator { + fn new(vals: Vec) -> Self { + let len = vals.len(); + if len == 0 { + Self { + vals, + done: true, + current: 0, + end: 0, + } + } else { + Self { + vals, + done: false, + current: 0, + end: len - 1, + } + } + } +} + +impl ValuesIterator for MultiValuesIterator { + fn next(&mut self) -> bool { + if self.done { + return false; + } + if self.current == self.end { + self.done = true; + return false; + } + self.current += 1; + true + } + + fn current(&self) -> &Value { + self.vals.get(self.current).unwrap_or(&Value::Null) + } +} + +struct SingleValueIterator(Value); + +impl ValuesIterator for SingleValueIterator { + fn next(&mut self) -> bool { + false + } + + fn current(&self) -> &Value { + &self.0 + } +} diff --git a/core/src/idx/mod.rs b/core/src/idx/mod.rs index 87d2f710..727c1b4d 100644 --- a/core/src/idx/mod.rs +++ b/core/src/idx/mod.rs @@ -1,5 +1,6 @@ pub mod docids; pub(crate) mod ft; +pub(crate) mod index; pub mod planner; pub mod trees; diff --git a/core/src/kvs/ds.rs b/core/src/kvs/ds.rs index 5393cf36..a9ba11dd 100644 --- a/core/src/kvs/ds.rs +++ b/core/src/kvs/ds.rs @@ -4,6 +4,7 @@ use crate::cf; use crate::ctx::MutableContext; #[cfg(feature = "jwks")] use crate::dbs::capabilities::NetTarget; +use crate::dbs::node::Timestamp; use crate::dbs::{ Attach, Capabilities, Executor, Notification, Options, Response, Session, Variables, }; @@ -15,6 +16,8 @@ use crate::idx::trees::store::IndexStores; use crate::kvs::clock::SizedClock; #[allow(unused_imports)] use crate::kvs::clock::SystemClock; +#[cfg(not(target_arch = "wasm32"))] +use crate::kvs::index::IndexBuilder; use crate::kvs::{LockType, LockType::*, TransactionType, TransactionType::*}; use crate::sql::{statements::DefineUserStatement, Base, Query, Value}; use crate::syn; @@ -55,8 +58,7 @@ const INITIAL_USER_ROLE: &str = "owner"; #[allow(dead_code)] #[non_exhaustive] pub struct Datastore { - // The inner datastore type - inner: Inner, + transaction_factory: TransactionFactory, // The unique id of this datastore, used in notifications id: Uuid, // Whether this datastore runs in strict mode by default @@ -71,10 +73,11 @@ pub struct Datastore { capabilities: Capabilities, // Whether this datastore enables live query notifications to subscribers pub(super) notification_channel: Option<(Sender, Receiver)>, - // Clock for tracking time. It is read only and accessible to all transactions. It is behind a mutex as tests may write to it. - pub(super) clock: Arc, // The index store cache index_stores: IndexStores, + // The index asynchronous builder + #[cfg(not(target_arch = "wasm32"))] + index_builder: IndexBuilder, #[cfg(feature = "jwks")] // The JWKS object cache jwks_cache: Arc>, @@ -89,8 +92,80 @@ pub struct Datastore { temporary_directory: Option>, } +#[derive(Clone)] +pub(super) struct TransactionFactory { + // Clock for tracking time. It is read only and accessible to all transactions. It is behind a mutex as tests may write to it. + clock: Arc, + // The inner datastore type + flavor: Arc, +} + +impl TransactionFactory { + #[allow(unreachable_code)] + pub async fn transaction( + &self, + write: TransactionType, + lock: LockType, + ) -> Result { + // Specify if the transaction is writeable + #[allow(unused_variables)] + let write = match write { + Read => false, + Write => true, + }; + // Specify if the transaction is lockable + #[allow(unused_variables)] + let lock = match lock { + Pessimistic => true, + Optimistic => false, + }; + // Create a new transaction on the datastore + #[allow(unused_variables)] + let inner = match self.flavor.as_ref() { + #[cfg(feature = "kv-mem")] + DatastoreFlavor::Mem(v) => { + let tx = v.transaction(write, lock).await?; + super::tr::Inner::Mem(tx) + } + #[cfg(feature = "kv-rocksdb")] + DatastoreFlavor::RocksDB(v) => { + let tx = v.transaction(write, lock).await?; + super::tr::Inner::RocksDB(tx) + } + #[cfg(feature = "kv-indxdb")] + DatastoreFlavor::IndxDB(v) => { + let tx = v.transaction(write, lock).await?; + super::tr::Inner::IndxDB(tx) + } + #[cfg(feature = "kv-tikv")] + DatastoreFlavor::TiKV(v) => { + let tx = v.transaction(write, lock).await?; + super::tr::Inner::TiKV(tx) + } + #[cfg(feature = "kv-fdb")] + DatastoreFlavor::FoundationDB(v) => { + let tx = v.transaction(write, lock).await?; + super::tr::Inner::FoundationDB(tx) + } + #[cfg(feature = "kv-surrealkv")] + DatastoreFlavor::SurrealKV(v) => { + let tx = v.transaction(write, lock).await?; + super::tr::Inner::SurrealKV(tx) + } + #[allow(unreachable_patterns)] + _ => unreachable!(), + }; + Ok(Transaction::new(Transactor { + inner, + stash: super::stash::Stash::default(), + cf: cf::Writer::new(), + clock: self.clock.clone(), + })) + } +} + #[allow(clippy::large_enum_variant)] -pub(super) enum Inner { +pub(super) enum DatastoreFlavor { #[cfg(feature = "kv-mem")] Mem(super::mem::Datastore), #[cfg(feature = "kv-rocksdb")] @@ -108,19 +183,19 @@ pub(super) enum Inner { impl fmt::Display for Datastore { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { #![allow(unused_variables)] - match &self.inner { + match self.transaction_factory.flavor.as_ref() { #[cfg(feature = "kv-mem")] - Inner::Mem(_) => write!(f, "memory"), + DatastoreFlavor::Mem(_) => write!(f, "memory"), #[cfg(feature = "kv-rocksdb")] - Inner::RocksDB(_) => write!(f, "rocksdb"), + DatastoreFlavor::RocksDB(_) => write!(f, "rocksdb"), #[cfg(feature = "kv-indxdb")] - Inner::IndxDB(_) => write!(f, "indxdb"), + DatastoreFlavor::IndxDB(_) => write!(f, "indxdb"), #[cfg(feature = "kv-tikv")] - Inner::TiKV(_) => write!(f, "tikv"), + DatastoreFlavor::TiKV(_) => write!(f, "tikv"), #[cfg(feature = "kv-fdb")] - Inner::FoundationDB(_) => write!(f, "fdb"), + DatastoreFlavor::FoundationDB(_) => write!(f, "fdb"), #[cfg(feature = "kv-surrealkv")] - Inner::SurrealKV(_) => write!(f, "surrealkv"), + DatastoreFlavor::SurrealKV(_) => write!(f, "surrealkv"), #[allow(unreachable_patterns)] _ => unreachable!(), } @@ -175,13 +250,13 @@ impl Datastore { clock: Option>, ) -> Result { // Initiate the desired datastore - let (inner, clock): (Result, Arc) = match path { + let (flavor, clock): (Result, Arc) = match path { // Initiate an in-memory datastore "memory" => { #[cfg(feature = "kv-mem")] { info!(target: TARGET, "Starting kvs store in {}", path); - let v = super::mem::Datastore::new().await.map(Inner::Mem); + let v = super::mem::Datastore::new().await.map(DatastoreFlavor::Mem); let c = clock.unwrap_or_else(|| Arc::new(SizedClock::system())); info!(target: TARGET, "Started kvs store in {}", path); Ok((v, c)) @@ -196,7 +271,7 @@ impl Datastore { info!(target: TARGET, "Starting kvs store at {}", path); let s = s.trim_start_matches("file://"); let s = s.trim_start_matches("file:"); - let v = super::rocksdb::Datastore::new(s).await.map(Inner::RocksDB); + let v = super::rocksdb::Datastore::new(s).await.map(DatastoreFlavor::RocksDB); let c = clock.unwrap_or_else(|| Arc::new(SizedClock::system())); info!(target: TARGET, "Started kvs store at {}", path); Ok((v, c)) @@ -211,7 +286,7 @@ impl Datastore { info!(target: TARGET, "Starting kvs store at {}", path); let s = s.trim_start_matches("rocksdb://"); let s = s.trim_start_matches("rocksdb:"); - let v = super::rocksdb::Datastore::new(s).await.map(Inner::RocksDB); + let v = super::rocksdb::Datastore::new(s).await.map(DatastoreFlavor::RocksDB); let c = clock.unwrap_or_else(|| Arc::new(SizedClock::system())); info!(target: TARGET, "Started kvs store at {}", path); Ok((v, c)) @@ -226,7 +301,7 @@ impl Datastore { info!(target: TARGET, "Starting kvs store at {}", path); let s = s.trim_start_matches("indxdb://"); let s = s.trim_start_matches("indxdb:"); - let v = super::indxdb::Datastore::new(s).await.map(Inner::IndxDB); + let v = super::indxdb::Datastore::new(s).await.map(DatastoreFlavor::IndxDB); let c = clock.unwrap_or_else(|| Arc::new(SizedClock::system())); info!(target: TARGET, "Started kvs store at {}", path); Ok((v, c)) @@ -241,7 +316,7 @@ impl Datastore { info!(target: TARGET, "Connecting to kvs store at {}", path); let s = s.trim_start_matches("tikv://"); let s = s.trim_start_matches("tikv:"); - let v = super::tikv::Datastore::new(s).await.map(Inner::TiKV); + let v = super::tikv::Datastore::new(s).await.map(DatastoreFlavor::TiKV); let c = clock.unwrap_or_else(|| Arc::new(SizedClock::system())); info!(target: TARGET, "Connected to kvs store at {}", path); Ok((v, c)) @@ -256,7 +331,7 @@ impl Datastore { info!(target: TARGET, "Connecting to kvs store at {}", path); let s = s.trim_start_matches("fdb://"); let s = s.trim_start_matches("fdb:"); - let v = super::fdb::Datastore::new(s).await.map(Inner::FoundationDB); + let v = super::fdb::Datastore::new(s).await.map(DatastoreFlavor::FoundationDB); let c = clock.unwrap_or_else(|| Arc::new(SizedClock::system())); info!(target: TARGET, "Connected to kvs store at {}", path); Ok((v, c)) @@ -271,7 +346,8 @@ impl Datastore { info!(target: TARGET, "Starting kvs store at {}", path); let s = s.trim_start_matches("surrealkv://"); let s = s.trim_start_matches("surrealkv:"); - let v = super::surrealkv::Datastore::new(s).await.map(Inner::SurrealKV); + let v = + super::surrealkv::Datastore::new(s).await.map(DatastoreFlavor::SurrealKV); let c = clock.unwrap_or_else(|| Arc::new(SizedClock::system())); info!(target: TARGET, "Started to kvs store at {}", path); Ok((v, c)) @@ -286,27 +362,34 @@ impl Datastore { } }?; // Set the properties on the datastore - inner.map(|inner| Self { - id: Uuid::new_v4(), - inner, - clock, - strict: false, - auth_enabled: false, - query_timeout: None, - transaction_timeout: None, - notification_channel: None, - capabilities: Capabilities::default(), - index_stores: IndexStores::default(), - #[cfg(feature = "jwks")] - jwks_cache: Arc::new(RwLock::new(JwksCache::new())), - #[cfg(any( - feature = "kv-mem", - feature = "kv-surrealkv", - feature = "kv-rocksdb", - feature = "kv-fdb", - feature = "kv-tikv", - ))] - temporary_directory: None, + flavor.map(|flavor| { + let tf = TransactionFactory { + clock, + flavor: Arc::new(flavor), + }; + Self { + id: Uuid::new_v4(), + transaction_factory: tf.clone(), + strict: false, + auth_enabled: false, + query_timeout: None, + transaction_timeout: None, + notification_channel: None, + capabilities: Capabilities::default(), + index_stores: IndexStores::default(), + #[cfg(not(target_arch = "wasm32"))] + index_builder: IndexBuilder::new(tf), + #[cfg(feature = "jwks")] + jwks_cache: Arc::new(RwLock::new(JwksCache::new())), + #[cfg(any( + feature = "kv-mem", + feature = "kv-surrealkv", + feature = "kv-rocksdb", + feature = "kv-fdb", + feature = "kv-tikv", + ))] + temporary_directory: None, + } }) } @@ -389,6 +472,10 @@ impl Datastore { &self.jwks_cache } + pub(super) async fn clock_now(&self) -> Timestamp { + self.transaction_factory.clock.now().await + } + // Initialise the cluster and run bootstrap utilities #[instrument(err, level = "trace", target = "surrealdb::core::kvs::ds", skip_all)] pub async fn bootstrap(&self) -> Result<(), Error> { @@ -551,60 +638,7 @@ impl Datastore { write: TransactionType, lock: LockType, ) -> Result { - // Specify if the transaction is writeable - #[allow(unused_variables)] - let write = match write { - Read => false, - Write => true, - }; - // Specify if the transaction is lockable - #[allow(unused_variables)] - let lock = match lock { - Pessimistic => true, - Optimistic => false, - }; - // Create a new transaction on the datastore - #[allow(unused_variables)] - let inner = match &self.inner { - #[cfg(feature = "kv-mem")] - Inner::Mem(v) => { - let tx = v.transaction(write, lock).await?; - super::tr::Inner::Mem(tx) - } - #[cfg(feature = "kv-rocksdb")] - Inner::RocksDB(v) => { - let tx = v.transaction(write, lock).await?; - super::tr::Inner::RocksDB(tx) - } - #[cfg(feature = "kv-indxdb")] - Inner::IndxDB(v) => { - let tx = v.transaction(write, lock).await?; - super::tr::Inner::IndxDB(tx) - } - #[cfg(feature = "kv-tikv")] - Inner::TiKV(v) => { - let tx = v.transaction(write, lock).await?; - super::tr::Inner::TiKV(tx) - } - #[cfg(feature = "kv-fdb")] - Inner::FoundationDB(v) => { - let tx = v.transaction(write, lock).await?; - super::tr::Inner::FoundationDB(tx) - } - #[cfg(feature = "kv-surrealkv")] - Inner::SurrealKV(v) => { - let tx = v.transaction(write, lock).await?; - super::tr::Inner::SurrealKV(tx) - } - #[allow(unreachable_patterns)] - _ => unreachable!(), - }; - Ok(Transaction::new(Transactor { - inner, - stash: super::stash::Stash::default(), - cf: cf::Writer::new(), - clock: self.clock.clone(), - })) + self.transaction_factory.transaction(write, lock).await } /// Parse and execute an SQL query @@ -690,6 +724,8 @@ impl Datastore { self.query_timeout, self.capabilities.clone(), self.index_stores.clone(), + #[cfg(not(target_arch = "wasm32"))] + self.index_builder.clone(), #[cfg(any( feature = "kv-mem", feature = "kv-surrealkv", diff --git a/core/src/kvs/index.rs b/core/src/kvs/index.rs new file mode 100644 index 00000000..f5770cba --- /dev/null +++ b/core/src/kvs/index.rs @@ -0,0 +1,294 @@ +use crate::cnf::{INDEXING_BATCH_SIZE, NORMAL_FETCH_SIZE}; +use crate::ctx::{Context, MutableContext}; +use crate::dbs::Options; +use crate::doc::{CursorDoc, Document}; +use crate::err::Error; +use crate::idx::index::IndexOperation; +use crate::key::thing; +use crate::kvs::ds::TransactionFactory; +use crate::kvs::LockType::Optimistic; +use crate::kvs::{Transaction, TransactionType}; +use crate::sql::statements::DefineIndexStatement; +use crate::sql::{Id, Object, Thing, Value}; +use dashmap::mapref::entry::Entry; +use dashmap::DashMap; +use reblessive::TreeStack; +use std::collections::VecDeque; +use std::sync::Arc; +use tokio::sync::Mutex; +use tokio::task; +use tokio::task::JoinHandle; + +#[derive(Clone)] +pub(crate) enum BuildingStatus { + Started, + InitialIndexing(usize), + UpdatesIndexing(usize), + Error(Arc), + Built, +} + +pub(crate) enum ConsumeResult { + /// The document has been enqueued to be indexed + Enqueued, + /// The index has been built, the document can be indexed normally + Ignored(Option>, Option>), +} + +impl BuildingStatus { + fn is_error(&self) -> bool { + matches!(self, Self::Error(_)) + } + + fn is_built(&self) -> bool { + matches!(self, Self::Built) + } +} + +impl From for Value { + fn from(st: BuildingStatus) -> Self { + let mut o = Object::default(); + let s = match st { + BuildingStatus::Started => "started", + BuildingStatus::InitialIndexing(count) => { + o.insert("count".to_string(), count.into()); + "initial" + } + BuildingStatus::UpdatesIndexing(count) => { + o.insert("count".to_string(), count.into()); + "updates" + } + BuildingStatus::Error(error) => { + o.insert("error".to_string(), error.to_string().into()); + "error" + } + BuildingStatus::Built => "built", + }; + o.insert("status".to_string(), s.into()); + o.into() + } +} + +type IndexBuilding = (Arc, JoinHandle<()>); + +#[derive(Clone)] +pub(crate) struct IndexBuilder { + tf: TransactionFactory, + indexes: Arc, IndexBuilding>>, +} + +impl IndexBuilder { + pub(super) fn new(tf: TransactionFactory) -> Self { + Self { + tf, + indexes: Default::default(), + } + } + + pub(crate) fn build( + &self, + ctx: &Context, + opt: Options, + ix: Arc, + ) -> Result<(), Error> { + match self.indexes.entry(ix) { + Entry::Occupied(e) => { + // If the building is currently running we return error + if !e.get().1.is_finished() { + return Err(Error::IndexAlreadyBuilding { + index: e.key().name.to_string(), + }); + } + } + Entry::Vacant(e) => { + // No index is currently building, we can start building it + let building = Arc::new(Building::new(ctx, self.tf.clone(), opt, e.key().clone())?); + let b = building.clone(); + let jh = task::spawn(async move { + if let Err(err) = b.compute().await { + b.set_status(BuildingStatus::Error(err.into())).await; + } + }); + e.insert((building, jh)); + } + } + Ok(()) + } + + pub(crate) async fn consume( + &self, + ix: &DefineIndexStatement, + old_values: Option>, + new_values: Option>, + rid: &Thing, + ) -> Result { + if let Some(r) = self.indexes.get(ix) { + let (b, _) = r.value(); + return Ok(b.maybe_consume(old_values, new_values, rid).await); + } + Ok(ConsumeResult::Ignored(old_values, new_values)) + } + + pub(crate) async fn get_status(&self, ix: &DefineIndexStatement) -> Option { + if let Some(a) = self.indexes.get(ix) { + Some(a.value().0.status.lock().await.clone()) + } else { + None + } + } +} + +struct Appending { + old_values: Option>, + new_values: Option>, + id: Id, +} + +struct Building { + ctx: Context, + opt: Options, + tf: TransactionFactory, + ix: Arc, + tb: String, + status: Arc>, + // Should be stored on a temporary table + appended: Arc>>, +} + +impl Building { + fn new( + ctx: &Context, + tf: TransactionFactory, + opt: Options, + ix: Arc, + ) -> Result { + Ok(Self { + ctx: MutableContext::new_concurrent(ctx).freeze(), + opt, + tf, + tb: ix.what.to_string(), + ix, + status: Arc::new(Mutex::new(BuildingStatus::Started)), + appended: Default::default(), + }) + } + + async fn set_status(&self, status: BuildingStatus) { + let mut s = self.status.lock().await; + // We want to keep only the first error + if !s.is_error() { + *s = status; + } + } + + async fn maybe_consume( + &self, + old_values: Option>, + new_values: Option>, + rid: &Thing, + ) -> ConsumeResult { + let mut a = self.appended.lock().await; + // Now that the queue is locked, we have the possibility to assess if the asynchronous build is done. + if a.is_empty() { + // If the appending queue is empty and the index is built... + if self.status.lock().await.is_built() { + // ... we return the values back, so the document can be updated the usual way + return ConsumeResult::Ignored(old_values, new_values); + } + } + a.push_back(Appending { + old_values, + new_values, + id: rid.id.clone(), + }); + ConsumeResult::Enqueued + } + + async fn new_read_tx(&self) -> Result { + self.tf.transaction(TransactionType::Read, Optimistic).await + } + + async fn new_write_tx_ctx(&self) -> Result { + let tx = self.tf.transaction(TransactionType::Write, Optimistic).await?.into(); + let mut ctx = MutableContext::new(&self.ctx); + ctx.set_transaction(tx); + Ok(ctx.freeze()) + } + + async fn compute(&self) -> Result<(), Error> { + let mut stack = TreeStack::new(); + + self.set_status(BuildingStatus::InitialIndexing(0)).await; + // First iteration, we index every keys + let ns = self.opt.ns()?; + let db = self.opt.db()?; + let beg = thing::prefix(ns, db, &self.tb); + let end = thing::suffix(ns, db, &self.tb); + let mut next = Some(beg..end); + let mut count = 0; + while let Some(rng) = next { + // Get the next batch of records + let batch = self.new_read_tx().await?.batch(rng, *INDEXING_BATCH_SIZE, true).await?; + // Set the next scan range + next = batch.next; + // Check there are records + if batch.values.is_empty() { + break; + } + // Create a new context with a write transaction + let ctx = self.new_write_tx_ctx().await?; + // Index the records + for (k, v) in batch.values.into_iter() { + let key: thing::Thing = (&k).into(); + // Parse the value + let val: Value = (&v).into(); + let rid: Arc = Thing::from((key.tb, key.id)).into(); + let doc = CursorDoc::new(Some(rid.clone()), None, val); + let opt_values = stack + .enter(|stk| Document::build_opt_values(stk, &ctx, &self.opt, &self.ix, &doc)) + .finish() + .await?; + // Index the record + let mut io = IndexOperation::new(&ctx, &self.opt, &self.ix, None, opt_values, &rid); + stack.enter(|stk| io.compute(stk)).finish().await?; + count += 1; + self.set_status(BuildingStatus::InitialIndexing(count)).await; + } + ctx.tx().commit().await?; + } + // Second iteration, we index/remove any records that has been added or removed since the initial indexing + self.set_status(BuildingStatus::UpdatesIndexing(0)).await; + loop { + let mut batch = self.appended.lock().await; + if batch.is_empty() { + // If the batch is empty, we are done. + // Due to the lock on self.appended, we know that no external process can add an item to the queue. + self.set_status(BuildingStatus::Built).await; + // This is here to be sure the lock on back is not released early + batch.clear(); + break; + } + let fetch = (*NORMAL_FETCH_SIZE as usize).min(batch.len()); + let drain = batch.drain(0..fetch); + // Create a new context with a write transaction + let ctx = self.new_write_tx_ctx().await?; + + for a in drain { + let rid = Thing::from((self.tb.clone(), a.id)); + let mut io = IndexOperation::new( + &ctx, + &self.opt, + &self.ix, + a.old_values, + a.new_values, + &rid, + ); + stack.enter(|stk| io.compute(stk)).finish().await?; + count += 1; + self.set_status(BuildingStatus::UpdatesIndexing(count)).await; + } + ctx.tx().commit().await?; + } + Ok(()) + } +} diff --git a/core/src/kvs/mod.rs b/core/src/kvs/mod.rs index fe3e5b14..81f90351 100644 --- a/core/src/kvs/mod.rs +++ b/core/src/kvs/mod.rs @@ -33,10 +33,14 @@ mod rocksdb; mod surrealkv; mod tikv; +#[cfg(not(target_arch = "wasm32"))] +mod index; #[cfg(test)] mod tests; pub use self::ds::*; +#[cfg(not(target_arch = "wasm32"))] +pub(crate) use self::index::*; pub use self::kv::*; pub use self::live::*; pub use self::tr::*; diff --git a/core/src/kvs/node.rs b/core/src/kvs/node.rs index a0b030e1..eac15b1e 100644 --- a/core/src/kvs/node.rs +++ b/core/src/kvs/node.rs @@ -25,7 +25,7 @@ impl Datastore { // Open transaction and set node data let txn = self.transaction(Write, Optimistic).await?; let key = crate::key::root::nd::Nd::new(id); - let now = self.clock.now().await; + let now = self.clock_now().await; let val = Node::new(id, now, false); match run!(txn, txn.put(key, val)) { Err(Error::TxKeyAlreadyExists) => Err(Error::ClAlreadyExists { @@ -49,7 +49,7 @@ impl Datastore { // Open transaction and set node data let txn = self.transaction(Write, Optimistic).await?; let key = crate::key::root::nd::new(id); - let now = self.clock.now().await; + let now = self.clock_now().await; let val = Node::new(id, now, false); run!(txn, txn.set(key, val)) } @@ -86,7 +86,7 @@ impl Datastore { trace!(target: TARGET, "Archiving expired nodes in the cluster"); // Open transaction and fetch nodes let txn = self.transaction(Write, Optimistic).await?; - let now = self.clock.now().await; + let now = self.clock_now().await; let nds = catch!(txn, txn.all_nodes()); for nd in nds.iter() { // Check that the node is active diff --git a/core/src/sql/statements/define/index.rs b/core/src/sql/statements/define/index.rs index cbbf47e1..96a474d1 100644 --- a/core/src/sql/statements/define/index.rs +++ b/core/src/sql/statements/define/index.rs @@ -13,7 +13,7 @@ use serde::{Deserialize, Serialize}; use std::fmt::{self, Display}; use std::sync::Arc; -#[revisioned(revision = 3)] +#[revisioned(revision = 4)] #[derive(Clone, Debug, Default, Eq, PartialEq, PartialOrd, Serialize, Deserialize, Store, Hash)] #[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))] #[non_exhaustive] @@ -27,6 +27,8 @@ pub struct DefineIndexStatement { pub if_not_exists: bool, #[revision(start = 3)] pub overwrite: bool, + #[revision(start = 4)] + pub concurrently: bool, } impl DefineIndexStatement { @@ -89,6 +91,25 @@ impl DefineIndexStatement { .await?; // Clear the cache txn.clear(); + #[cfg(not(target_arch = "wasm32"))] + if self.concurrently { + self.async_index(ctx, opt)?; + } else { + self.sync_index(stk, ctx, opt, doc).await?; + } + #[cfg(target_arch = "wasm32")] + self.sync_index(stk, ctx, opt, doc).await?; + // Ok all good + Ok(Value::None) + } + + async fn sync_index( + &self, + stk: &mut Stk, + ctx: &Context, + opt: &Options, + doc: Option<&CursorDoc>, + ) -> Result<(), Error> { // Force queries to run let opt = &opt.new_with_force(Force::Index(Arc::new([self.clone()]))); // Update the index data @@ -98,8 +119,16 @@ impl DefineIndexStatement { ..UpdateStatement::default() }; stm.compute(stk, ctx, opt, doc).await?; - // Ok all good - Ok(Value::None) + Ok(()) + } + + #[cfg(not(target_arch = "wasm32"))] + fn async_index(&self, ctx: &Context, opt: &Options) -> Result<(), Error> { + ctx.get_index_builder().ok_or(Error::Unreachable("No Index Builder"))?.build( + ctx, + opt.clone(), + self.clone().into(), + ) } } @@ -119,6 +148,9 @@ impl Display for DefineIndexStatement { if let Some(ref v) = self.comment { write!(f, " COMMENT {v}")? } + if self.concurrently { + write!(f, " CONCURRENTLY")? + } Ok(()) } } diff --git a/core/src/sql/statements/info.rs b/core/src/sql/statements/info.rs index c737ef0a..2d076d9b 100644 --- a/core/src/sql/statements/info.rs +++ b/core/src/sql/statements/info.rs @@ -9,9 +9,10 @@ use derive::Store; use revision::revisioned; use serde::{Deserialize, Serialize}; use std::fmt; +use std::ops::Deref; use std::sync::Arc; -#[revisioned(revision = 2)] +#[revisioned(revision = 3)] #[derive(Clone, Debug, Eq, PartialEq, PartialOrd, Serialize, Deserialize, Store, Hash)] #[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))] #[non_exhaustive] @@ -36,6 +37,8 @@ pub enum InfoStatement { User(Ident, Option), #[revision(start = 2)] User(Ident, Option, bool), + #[revision(start = 3)] + Index(Ident, Ident, bool), } impl InfoStatement { @@ -304,6 +307,23 @@ impl InfoStatement { false => Value::from(res.to_string()), }) } + InfoStatement::Index(index, table, _structured) => { + // Allowed to run? + opt.is_allowed(Action::View, ResourceKind::Actor, &Base::Db)?; + // Get the transaction + let txn = ctx.tx(); + // Obtain the index + let res = txn.get_tb_index(opt.ns()?, opt.db()?, table, index).await?; + // Output + let mut out = Object::default(); + #[cfg(not(target_arch = "wasm32"))] + if let Some(ib) = ctx.get_index_builder() { + if let Some(status) = ib.get_status(res.deref()).await { + out.insert("building".to_string(), status.into()); + } + } + Ok(out.into()) + } } } } @@ -327,6 +347,8 @@ impl fmt::Display for InfoStatement { Some(ref b) => write!(f, "INFO FOR USER {u} ON {b} STRUCTURE"), None => write!(f, "INFO FOR USER {u} STRUCTURE"), }, + Self::Index(ref i, ref t, false) => write!(f, "INFO FOR INDEX {i} ON {t}"), + Self::Index(ref i, ref t, true) => write!(f, "INFO FOR INDEX {i} ON {t} STRUCTURE"), } } } @@ -343,6 +365,7 @@ impl InfoStatement { InfoStatement::Db(_) => InfoStatement::Db(true), InfoStatement::Tb(t, _) => InfoStatement::Tb(t, true), InfoStatement::User(u, b, _) => InfoStatement::User(u, b, true), + InfoStatement::Index(i, t, _) => InfoStatement::Index(i, t, true), } } } diff --git a/core/src/syn/lexer/keywords.rs b/core/src/syn/lexer/keywords.rs index 47639989..c2aaba97 100644 --- a/core/src/syn/lexer/keywords.rs +++ b/core/src/syn/lexer/keywords.rs @@ -85,6 +85,7 @@ pub(crate) static KEYWORDS: phf::Map, TokenKind> = phf_map UniCase::ascii("CLASS") => TokenKind::Keyword(Keyword::Class), UniCase::ascii("COMMENT") => TokenKind::Keyword(Keyword::Comment), UniCase::ascii("COMMIT") => TokenKind::Keyword(Keyword::Commit), + UniCase::ascii("CONCURRENTLY") => TokenKind::Keyword(Keyword::Concurrently), UniCase::ascii("CONTENT") => TokenKind::Keyword(Keyword::Content), UniCase::ascii("CONTINUE") => TokenKind::Keyword(Keyword::Continue), UniCase::ascii("CREATE") => TokenKind::Keyword(Keyword::Create), diff --git a/core/src/syn/parser/stmt/define.rs b/core/src/syn/parser/stmt/define.rs index 5df637f8..bee3776f 100644 --- a/core/src/syn/parser/stmt/define.rs +++ b/core/src/syn/parser/stmt/define.rs @@ -1070,6 +1070,10 @@ impl Parser<'_> { keep_pruned_connections, )); } + t!("CONCURRENTLY") => { + self.pop_peek(); + res.concurrently = true; + } t!("COMMENT") => { self.pop_peek(); res.comment = Some(self.next_token_value()?); diff --git a/core/src/syn/parser/stmt/mod.rs b/core/src/syn/parser/stmt/mod.rs index dd78157f..328e23ae 100644 --- a/core/src/syn/parser/stmt/mod.rs +++ b/core/src/syn/parser/stmt/mod.rs @@ -531,6 +531,13 @@ impl Parser<'_> { let base = self.eat(t!("ON")).then(|| self.parse_base(false)).transpose()?; InfoStatement::User(ident, base, false) } + t!("INDEX") => { + let index = self.next_token_value()?; + expected!(self, t!("ON")); + self.eat(t!("TABLE")); + let table = self.next_token_value()?; + InfoStatement::Index(index, table, false) + } x => unexpected!(self, x, "an info target"), }; diff --git a/core/src/syn/parser/test/stmt.rs b/core/src/syn/parser/test/stmt.rs index 76685dbc..451a9b9e 100644 --- a/core/src/syn/parser/test/stmt.rs +++ b/core/src/syn/parser/test/stmt.rs @@ -1519,6 +1519,7 @@ fn parse_define_index() { comment: None, if_not_exists: false, overwrite: false, + concurrently: false })) ); @@ -1535,6 +1536,7 @@ fn parse_define_index() { comment: None, if_not_exists: false, overwrite: false, + concurrently: false })) ); @@ -1560,6 +1562,7 @@ fn parse_define_index() { comment: None, if_not_exists: false, overwrite: false, + concurrently: false })) ); @@ -1586,6 +1589,7 @@ fn parse_define_index() { comment: None, if_not_exists: false, overwrite: false, + concurrently: false })) ); } diff --git a/core/src/syn/parser/test/streaming.rs b/core/src/syn/parser/test/streaming.rs index 0df465e0..594d520c 100644 --- a/core/src/syn/parser/test/streaming.rs +++ b/core/src/syn/parser/test/streaming.rs @@ -343,6 +343,7 @@ fn statements() -> Vec { comment: None, if_not_exists: false, overwrite: false, + concurrently: false, })), Statement::Define(DefineStatement::Index(DefineIndexStatement { name: Ident("index".to_owned()), @@ -352,6 +353,7 @@ fn statements() -> Vec { comment: None, if_not_exists: false, overwrite: false, + concurrently: false, })), Statement::Define(DefineStatement::Index(DefineIndexStatement { name: Ident("index".to_owned()), @@ -370,6 +372,7 @@ fn statements() -> Vec { comment: None, if_not_exists: false, overwrite: false, + concurrently: false, })), Statement::Define(DefineStatement::Analyzer(DefineAnalyzerStatement { name: Ident("ana".to_owned()), diff --git a/core/src/syn/token/keyword.rs b/core/src/syn/token/keyword.rs index dfcbe376..bb1b9fb8 100644 --- a/core/src/syn/token/keyword.rs +++ b/core/src/syn/token/keyword.rs @@ -52,6 +52,7 @@ keyword! { Class => "CLASS", Comment => "COMMENT", Commit => "COMMIT", + Concurrently => "CONCURRENTLY", Content => "CONTENT", Continue => "CONTINUE", Create => "CREATE", diff --git a/lib/tests/define.rs b/lib/tests/define.rs index 9a844bbc..9c64a053 100644 --- a/lib/tests/define.rs +++ b/lib/tests/define.rs @@ -685,6 +685,38 @@ async fn define_statement_index_single() -> Result<(), Error> { Ok(()) } +#[tokio::test] +async fn define_statement_index_concurrently() -> Result<(), Error> { + let sql = " + CREATE user:1 SET email = 'test@surrealdb.com'; + CREATE user:2 SET email = 'test@surrealdb.com'; + DEFINE INDEX test ON user FIELDS email CONCURRENTLY; + SLEEP 1s; + INFO FOR TABLE user; + INFO FOR INDEX test ON user; + "; + let mut t = Test::new(sql).await?; + t.skip_ok(4)?; + t.expect_val( + "{ + events: {}, + fields: {}, + tables: {}, + indexes: { + test: 'DEFINE INDEX test ON user FIELDS email CONCURRENTLY', + }, + lives: {}, + }", + )?; + t.expect_val( + "{ + building: { status: 'built' } + }", + )?; + + Ok(()) +} + #[tokio::test] async fn define_statement_index_multiple() -> Result<(), Error> { let sql = "