[Feat] Asynchronous indexing (#4480)

Co-authored-by: Micha de Vries <micha@devrie.sh>
This commit is contained in:
Emmanuel Keller 2024-08-20 10:20:53 +01:00 committed by GitHub
parent c7457ffc56
commit d86a734d04
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
19 changed files with 1000 additions and 135 deletions

View file

@ -31,10 +31,14 @@ pub static NORMAL_FETCH_SIZE: Lazy<u32> = lazy_env_parse!("SURREAL_NORMAL_FETCH_
/// The maximum number of keys that should be scanned at once for export queries. /// The maximum number of keys that should be scanned at once for export queries.
pub static EXPORT_BATCH_SIZE: Lazy<u32> = lazy_env_parse!("SURREAL_EXPORT_BATCH_SIZE", u32, 1000); pub static EXPORT_BATCH_SIZE: Lazy<u32> = lazy_env_parse!("SURREAL_EXPORT_BATCH_SIZE", u32, 1000);
/// The maximum number of keys that should be fetched when streaming range scanns in a Scanner. /// The maximum number of keys that should be fetched when streaming range scans in a Scanner.
pub static MAX_STREAM_BATCH_SIZE: Lazy<u32> = pub static MAX_STREAM_BATCH_SIZE: Lazy<u32> =
lazy_env_parse!("SURREAL_MAX_STREAM_BATCH_SIZE", u32, 1000); lazy_env_parse!("SURREAL_MAX_STREAM_BATCH_SIZE", u32, 1000);
/// The maximum number of keys that should be scanned at once per concurrent indexing batch.
pub static INDEXING_BATCH_SIZE: Lazy<u32> =
lazy_env_parse!("SURREAL_INDEXING_BATCH_SIZE", u32, 250);
/// Forward all signup/signin/authenticate query errors to a client performing authentication. Do not use in production. /// Forward all signup/signin/authenticate query errors to a client performing authentication. Do not use in production.
pub static INSECURE_FORWARD_ACCESS_ERRORS: Lazy<bool> = pub static INSECURE_FORWARD_ACCESS_ERRORS: Lazy<bool> =
lazy_env_parse!("SURREAL_INSECURE_FORWARD_ACCESS_ERRORS", bool, false); lazy_env_parse!("SURREAL_INSECURE_FORWARD_ACCESS_ERRORS", bool, false);

View file

@ -8,6 +8,8 @@ use crate::err::Error;
use crate::idx::planner::executor::QueryExecutor; use crate::idx::planner::executor::QueryExecutor;
use crate::idx::planner::{IterationStage, QueryPlanner}; use crate::idx::planner::{IterationStage, QueryPlanner};
use crate::idx::trees::store::IndexStores; use crate::idx::trees::store::IndexStores;
#[cfg(not(target_arch = "wasm32"))]
use crate::kvs::IndexBuilder;
use crate::kvs::Transaction; use crate::kvs::Transaction;
use crate::sql::value::Value; use crate::sql::value::Value;
use channel::Sender; use channel::Sender;
@ -63,6 +65,9 @@ pub struct MutableContext {
iteration_stage: Option<IterationStage>, iteration_stage: Option<IterationStage>,
// The index store // The index store
index_stores: IndexStores, index_stores: IndexStores,
// The index concurrent builders
#[cfg(not(target_arch = "wasm32"))]
index_builder: Option<IndexBuilder>,
// Capabilities // Capabilities
capabilities: Arc<Capabilities>, capabilities: Arc<Capabilities>,
#[cfg(any( #[cfg(any(
@ -110,6 +115,7 @@ impl MutableContext {
time_out: Option<Duration>, time_out: Option<Duration>,
capabilities: Capabilities, capabilities: Capabilities,
index_stores: IndexStores, index_stores: IndexStores,
#[cfg(not(target_arch = "wasm32"))] index_builder: IndexBuilder,
#[cfg(any( #[cfg(any(
feature = "kv-mem", feature = "kv-mem",
feature = "kv-surrealkv", feature = "kv-surrealkv",
@ -130,6 +136,8 @@ impl MutableContext {
iteration_stage: None, iteration_stage: None,
capabilities: Arc::new(capabilities), capabilities: Arc::new(capabilities),
index_stores, index_stores,
#[cfg(not(target_arch = "wasm32"))]
index_builder: Some(index_builder),
#[cfg(any( #[cfg(any(
feature = "kv-mem", feature = "kv-mem",
feature = "kv-surrealkv", feature = "kv-surrealkv",
@ -159,6 +167,8 @@ impl MutableContext {
iteration_stage: None, iteration_stage: None,
capabilities: Arc::new(Capabilities::default()), capabilities: Arc::new(Capabilities::default()),
index_stores: IndexStores::default(), index_stores: IndexStores::default(),
#[cfg(not(target_arch = "wasm32"))]
index_builder: None,
#[cfg(any( #[cfg(any(
feature = "kv-mem", feature = "kv-mem",
feature = "kv-surrealkv", feature = "kv-surrealkv",
@ -184,6 +194,8 @@ impl MutableContext {
iteration_stage: parent.iteration_stage.clone(), iteration_stage: parent.iteration_stage.clone(),
capabilities: parent.capabilities.clone(), capabilities: parent.capabilities.clone(),
index_stores: parent.index_stores.clone(), index_stores: parent.index_stores.clone(),
#[cfg(not(target_arch = "wasm32"))]
index_builder: parent.index_builder.clone(),
#[cfg(any( #[cfg(any(
feature = "kv-mem", feature = "kv-mem",
feature = "kv-surrealkv", feature = "kv-surrealkv",
@ -220,6 +232,8 @@ impl MutableContext {
iteration_stage: parent.iteration_stage.clone(), iteration_stage: parent.iteration_stage.clone(),
capabilities: parent.capabilities.clone(), capabilities: parent.capabilities.clone(),
index_stores: parent.index_stores.clone(), index_stores: parent.index_stores.clone(),
#[cfg(not(target_arch = "wasm32"))]
index_builder: parent.index_builder.clone(),
#[cfg(any( #[cfg(any(
feature = "kv-mem", feature = "kv-mem",
feature = "kv-surrealkv", feature = "kv-surrealkv",
@ -234,6 +248,34 @@ impl MutableContext {
} }
} }
/// Create a new child from a frozen context.
pub fn new_concurrent(from: &Context) -> Self {
Self {
values: HashMap::default(),
deadline: None,
cancelled: Arc::new(AtomicBool::new(false)),
notifications: from.notifications.clone(),
query_planner: from.query_planner.clone(),
query_executor: from.query_executor.clone(),
iteration_stage: from.iteration_stage.clone(),
capabilities: from.capabilities.clone(),
index_stores: from.index_stores.clone(),
#[cfg(not(target_arch = "wasm32"))]
index_builder: from.index_builder.clone(),
#[cfg(any(
feature = "kv-mem",
feature = "kv-surrealkv",
feature = "kv-rocksdb",
feature = "kv-fdb",
feature = "kv-tikv",
))]
temporary_directory: from.temporary_directory.clone(),
transaction: None,
isolated: false,
parent: None,
}
}
/// Add a value to the context. It overwrites any previously set values /// Add a value to the context. It overwrites any previously set values
/// with the same key. /// with the same key.
pub fn add_value<K>(&mut self, key: K, value: Arc<Value>) pub fn add_value<K>(&mut self, key: K, value: Arc<Value>)
@ -327,6 +369,12 @@ impl MutableContext {
&self.index_stores &self.index_stores
} }
/// Get the index_builder for this context/ds
#[cfg(not(target_arch = "wasm32"))]
pub(crate) fn get_index_builder(&self) -> Option<&IndexBuilder> {
self.index_builder.as_ref()
}
/// Check if the context is done. If it returns `None` the operation may /// Check if the context is done. If it returns `None` the operation may
/// proceed, otherwise the operation should be stopped. /// proceed, otherwise the operation should be stopped.
pub fn done(&self) -> Option<Reason> { pub fn done(&self) -> Option<Reason> {

View file

@ -7,6 +7,8 @@ use crate::idx::ft::FtIndex;
use crate::idx::trees::mtree::MTreeIndex; use crate::idx::trees::mtree::MTreeIndex;
use crate::idx::IndexKeyBase; use crate::idx::IndexKeyBase;
use crate::key; use crate::key;
#[cfg(not(target_arch = "wasm32"))]
use crate::kvs::ConsumeResult;
use crate::kvs::TransactionType; use crate::kvs::TransactionType;
use crate::sql::array::Array; use crate::sql::array::Array;
use crate::sql::index::{HnswParams, Index, MTreeParams, SearchParams}; use crate::sql::index::{HnswParams, Index, MTreeParams, SearchParams};
@ -46,13 +48,42 @@ impl Document {
// Loop through all index statements // Loop through all index statements
for ix in ixs.iter() { for ix in ixs.iter() {
// Calculate old values // Calculate old values
let o = build_opt_values(stk, ctx, opt, ix, &self.initial).await?; let o = Self::build_opt_values(stk, ctx, opt, ix, &self.initial).await?;
// Calculate new values // Calculate new values
let n = build_opt_values(stk, ctx, opt, ix, &self.current).await?; let n = Self::build_opt_values(stk, ctx, opt, ix, &self.current).await?;
// Update the index entries // Update the index entries
if targeted_force || o != n { if targeted_force || o != n {
Self::one_index(stk, ctx, opt, ix, o, n, rid).await?;
}
}
// Carry on
Ok(())
}
async fn one_index(
stk: &mut Stk,
ctx: &Context,
opt: &Options,
ix: &DefineIndexStatement,
o: Option<Vec<Value>>,
n: Option<Vec<Value>>,
rid: &Thing,
) -> Result<(), Error> {
#[cfg(not(target_arch = "wasm32"))]
let (o, n) = if let Some(ib) = ctx.get_index_builder() {
match ib.consume(ix, o, n, rid).await? {
// The index builder consumed the value, which means it is currently building the index asynchronously,
// we don't index the document and let the index builder do it later.
ConsumeResult::Enqueued => return Ok(()),
// The index builder is done, the index has been built, we can proceed normally
ConsumeResult::Ignored(o, n) => (o, n),
}
} else {
(o, n)
};
// Store all the variable and parameters required by the index operation // Store all the variable and parameters required by the index operation
let mut ic = IndexOperation::new(opt, ix, o, n, rid); let mut ic = IndexOperation::new(opt, ix, o, n, rid);
@ -63,25 +94,21 @@ impl Document {
Index::Search(p) => ic.index_full_text(stk, ctx, p).await?, Index::Search(p) => ic.index_full_text(stk, ctx, p).await?,
Index::MTree(p) => ic.index_mtree(stk, ctx, p).await?, Index::MTree(p) => ic.index_mtree(stk, ctx, p).await?,
Index::Hnsw(p) => ic.index_hnsw(ctx, p).await?, Index::Hnsw(p) => ic.index_hnsw(ctx, p).await?,
};
} }
}
// Carry on
Ok(()) Ok(())
} }
}
/// Extract from the given document, the values required by the index and put then in an array. /// Extract from the given document, the values required by the index and put then in an array.
/// Eg. IF the index is composed of the columns `name` and `instrument` /// Eg. IF the index is composed of the columns `name` and `instrument`
/// Given this doc: { "id": 1, "instrument":"piano", "name":"Tobie" } /// Given this doc: { "id": 1, "instrument":"piano", "name":"Tobie" }
/// It will return: ["Tobie", "piano"] /// It will return: ["Tobie", "piano"]
async fn build_opt_values( pub(crate) async fn build_opt_values(
stk: &mut Stk, stk: &mut Stk,
ctx: &Context, ctx: &Context,
opt: &Options, opt: &Options,
ix: &DefineIndexStatement, ix: &DefineIndexStatement,
doc: &CursorDoc, doc: &CursorDoc,
) -> Result<Option<Vec<Value>>, Error> { ) -> Result<Option<Vec<Value>>, Error> {
if !doc.doc.as_ref().is_some() { if !doc.doc.as_ref().is_some() {
return Ok(None); return Ok(None);
} }
@ -91,6 +118,7 @@ async fn build_opt_values(
o.push(v); o.push(v);
} }
Ok(Some(o)) Ok(Some(o))
}
} }
/// Extract from the given document, the values required by the index and put then in an array. /// Extract from the given document, the values required by the index and put then in an array.

View file

@ -933,6 +933,12 @@ pub enum Error {
db: String, db: String,
}, },
/// A database index entry for the specified table is already building
#[error("Database index `{index}` is currently building")]
IndexAlreadyBuilding {
index: String,
},
/// The session has expired either because the token used /// The session has expired either because the token used
/// to establish it has expired or because an expiration /// to establish it has expired or because an expiration
/// was explicitly defined when establishing it /// was explicitly defined when establishing it

337
core/src/idx/index.rs Normal file
View file

@ -0,0 +1,337 @@
use crate::ctx::Context;
use crate::dbs::Options;
use crate::err::Error;
use crate::idx::ft::FtIndex;
use crate::idx::trees::mtree::MTreeIndex;
use crate::idx::IndexKeyBase;
use crate::key;
use crate::kvs::TransactionType;
use crate::sql::index::{HnswParams, MTreeParams, SearchParams};
use crate::sql::statements::DefineIndexStatement;
use crate::sql::{Array, Index, Part, Thing, Value};
use reblessive::tree::Stk;
pub(crate) struct IndexOperation<'a> {
ctx: &'a Context,
opt: &'a Options,
ix: &'a DefineIndexStatement,
/// The old values (if existing)
o: Option<Vec<Value>>,
/// The new values (if existing)
n: Option<Vec<Value>>,
rid: &'a Thing,
}
impl<'a> IndexOperation<'a> {
pub(crate) fn new(
ctx: &'a Context,
opt: &'a Options,
ix: &'a DefineIndexStatement,
o: Option<Vec<Value>>,
n: Option<Vec<Value>>,
rid: &'a Thing,
) -> Self {
Self {
ctx,
opt,
ix,
o,
n,
rid,
}
}
pub(crate) async fn compute(&mut self, stk: &mut Stk) -> Result<(), Error> {
// Index operation dispatching
match &self.ix.index {
Index::Uniq => self.index_unique().await,
Index::Idx => self.index_non_unique().await,
Index::Search(p) => self.index_full_text(stk, p).await,
Index::MTree(p) => self.index_mtree(stk, p).await,
Index::Hnsw(p) => self.index_hnsw(p).await,
}
}
fn get_unique_index_key(&self, v: &'a Array) -> Result<key::index::Index, Error> {
Ok(key::index::Index::new(
self.opt.ns()?,
self.opt.db()?,
&self.ix.what,
&self.ix.name,
v,
None,
))
}
fn get_non_unique_index_key(&self, v: &'a Array) -> Result<key::index::Index, Error> {
Ok(key::index::Index::new(
self.opt.ns()?,
self.opt.db()?,
&self.ix.what,
&self.ix.name,
v,
Some(&self.rid.id),
))
}
async fn index_unique(&mut self) -> Result<(), Error> {
// Lock the transaction
let tx = self.ctx.tx();
let mut txn = tx.lock().await;
// Delete the old index data
if let Some(o) = self.o.take() {
let i = Indexable::new(o, self.ix);
for o in i {
let key = self.get_unique_index_key(&o)?;
match txn.delc(key, Some(self.rid)).await {
Err(Error::TxConditionNotMet) => Ok(()),
Err(e) => Err(e),
Ok(v) => Ok(v),
}?
}
}
// Create the new index data
if let Some(n) = self.n.take() {
let i = Indexable::new(n, self.ix);
for n in i {
if !n.is_all_none_or_null() {
let key = self.get_unique_index_key(&n)?;
if txn.putc(key, self.rid, None).await.is_err() {
let key = self.get_unique_index_key(&n)?;
let val = txn.get(key, None).await?.unwrap();
let rid: Thing = val.into();
return self.err_index_exists(rid, n);
}
}
}
}
Ok(())
}
async fn index_non_unique(&mut self) -> Result<(), Error> {
// Lock the transaction
let tx = self.ctx.tx();
let mut txn = tx.lock().await;
// Delete the old index data
if let Some(o) = self.o.take() {
let i = Indexable::new(o, self.ix);
for o in i {
let key = self.get_non_unique_index_key(&o)?;
match txn.delc(key, Some(self.rid)).await {
Err(Error::TxConditionNotMet) => Ok(()),
Err(e) => Err(e),
Ok(v) => Ok(v),
}?
}
}
// Create the new index data
if let Some(n) = self.n.take() {
let i = Indexable::new(n, self.ix);
for n in i {
let key = self.get_non_unique_index_key(&n)?;
if txn.putc(key, self.rid, None).await.is_err() {
let key = self.get_non_unique_index_key(&n)?;
let val = txn.get(key, None).await?.unwrap();
let rid: Thing = val.into();
return self.err_index_exists(rid, n);
}
}
}
Ok(())
}
fn err_index_exists(&self, rid: Thing, n: Array) -> Result<(), Error> {
Err(Error::IndexExists {
thing: rid,
index: self.ix.name.to_string(),
value: match n.len() {
1 => n.first().unwrap().to_string(),
_ => n.to_string(),
},
})
}
async fn index_full_text(&mut self, stk: &mut Stk, p: &SearchParams) -> Result<(), Error> {
let ikb = IndexKeyBase::new(self.opt.ns()?, self.opt.db()?, self.ix)?;
let mut ft =
FtIndex::new(self.ctx, self.opt, &p.az, ikb, p, TransactionType::Write).await?;
if let Some(n) = self.n.take() {
ft.index_document(stk, self.ctx, self.opt, self.rid, n).await?;
} else {
ft.remove_document(self.ctx, self.rid).await?;
}
ft.finish(self.ctx).await
}
async fn index_mtree(&mut self, stk: &mut Stk, p: &MTreeParams) -> Result<(), Error> {
let txn = self.ctx.tx();
let ikb = IndexKeyBase::new(self.opt.ns()?, self.opt.db()?, self.ix)?;
let mut mt =
MTreeIndex::new(self.ctx.get_index_stores(), &txn, ikb, p, TransactionType::Write)
.await?;
// Delete the old index data
if let Some(o) = self.o.take() {
mt.remove_document(stk, &txn, self.rid, &o).await?;
}
// Create the new index data
if let Some(n) = self.n.take() {
mt.index_document(stk, &txn, self.rid, &n).await?;
}
mt.finish(&txn).await
}
async fn index_hnsw(&mut self, p: &HnswParams) -> Result<(), Error> {
let hnsw = self.ctx.get_index_stores().get_index_hnsw(self.opt, self.ix, p).await?;
let mut hnsw = hnsw.write().await;
// Delete the old index data
if let Some(o) = self.o.take() {
hnsw.remove_document(self.rid, &o)?;
}
// Create the new index data
if let Some(n) = self.n.take() {
hnsw.index_document(self.rid, &n)?;
}
Ok(())
}
}
/// Extract from the given document, the values required by the index and put then in an array.
/// Eg. IF the index is composed of the columns `name` and `instrument`
/// Given this doc: { "id": 1, "instrument":"piano", "name":"Tobie" }
/// It will return: ["Tobie", "piano"]
struct Indexable(Vec<(Value, bool)>);
impl Indexable {
fn new(vals: Vec<Value>, ix: &DefineIndexStatement) -> Self {
let mut source = Vec::with_capacity(vals.len());
for (v, i) in vals.into_iter().zip(ix.cols.0.iter()) {
let f = matches!(i.0.last(), Some(&Part::Flatten));
source.push((v, f));
}
Self(source)
}
}
impl IntoIterator for Indexable {
type Item = Array;
type IntoIter = Combinator;
fn into_iter(self) -> Self::IntoIter {
Combinator::new(self.0)
}
}
struct Combinator {
iterators: Vec<Box<dyn ValuesIterator>>,
has_next: bool,
}
impl Combinator {
fn new(source: Vec<(Value, bool)>) -> Self {
let mut iterators: Vec<Box<dyn ValuesIterator>> = Vec::new();
// We create an iterator for each idiom
for (v, f) in source {
if !f {
// Iterator for not flattened values
if let Value::Array(v) = v {
iterators.push(Box::new(MultiValuesIterator::new(v.0)));
continue;
}
}
iterators.push(Box::new(SingleValueIterator(v)));
}
Self {
iterators,
has_next: true,
}
}
}
impl Iterator for Combinator {
type Item = Array;
fn next(&mut self) -> Option<Self::Item> {
if !self.has_next {
return None;
}
let mut o = Vec::with_capacity(self.iterators.len());
// Create the combination and advance to the next
self.has_next = false;
for i in &mut self.iterators {
o.push(i.current().clone());
if !self.has_next {
// We advance only one iterator per iteration
if i.next() {
self.has_next = true;
}
}
}
let o = Array::from(o);
Some(o)
}
}
trait ValuesIterator: Send {
fn next(&mut self) -> bool;
fn current(&self) -> &Value;
}
struct MultiValuesIterator {
vals: Vec<Value>,
done: bool,
current: usize,
end: usize,
}
impl MultiValuesIterator {
fn new(vals: Vec<Value>) -> Self {
let len = vals.len();
if len == 0 {
Self {
vals,
done: true,
current: 0,
end: 0,
}
} else {
Self {
vals,
done: false,
current: 0,
end: len - 1,
}
}
}
}
impl ValuesIterator for MultiValuesIterator {
fn next(&mut self) -> bool {
if self.done {
return false;
}
if self.current == self.end {
self.done = true;
return false;
}
self.current += 1;
true
}
fn current(&self) -> &Value {
self.vals.get(self.current).unwrap_or(&Value::Null)
}
}
struct SingleValueIterator(Value);
impl ValuesIterator for SingleValueIterator {
fn next(&mut self) -> bool {
false
}
fn current(&self) -> &Value {
&self.0
}
}

View file

@ -1,5 +1,6 @@
pub mod docids; pub mod docids;
pub(crate) mod ft; pub(crate) mod ft;
pub(crate) mod index;
pub mod planner; pub mod planner;
pub mod trees; pub mod trees;

View file

@ -4,6 +4,7 @@ use crate::cf;
use crate::ctx::MutableContext; use crate::ctx::MutableContext;
#[cfg(feature = "jwks")] #[cfg(feature = "jwks")]
use crate::dbs::capabilities::NetTarget; use crate::dbs::capabilities::NetTarget;
use crate::dbs::node::Timestamp;
use crate::dbs::{ use crate::dbs::{
Attach, Capabilities, Executor, Notification, Options, Response, Session, Variables, Attach, Capabilities, Executor, Notification, Options, Response, Session, Variables,
}; };
@ -15,6 +16,8 @@ use crate::idx::trees::store::IndexStores;
use crate::kvs::clock::SizedClock; use crate::kvs::clock::SizedClock;
#[allow(unused_imports)] #[allow(unused_imports)]
use crate::kvs::clock::SystemClock; use crate::kvs::clock::SystemClock;
#[cfg(not(target_arch = "wasm32"))]
use crate::kvs::index::IndexBuilder;
use crate::kvs::{LockType, LockType::*, TransactionType, TransactionType::*}; use crate::kvs::{LockType, LockType::*, TransactionType, TransactionType::*};
use crate::sql::{statements::DefineUserStatement, Base, Query, Value}; use crate::sql::{statements::DefineUserStatement, Base, Query, Value};
use crate::syn; use crate::syn;
@ -55,8 +58,7 @@ const INITIAL_USER_ROLE: &str = "owner";
#[allow(dead_code)] #[allow(dead_code)]
#[non_exhaustive] #[non_exhaustive]
pub struct Datastore { pub struct Datastore {
// The inner datastore type transaction_factory: TransactionFactory,
inner: Inner,
// The unique id of this datastore, used in notifications // The unique id of this datastore, used in notifications
id: Uuid, id: Uuid,
// Whether this datastore runs in strict mode by default // Whether this datastore runs in strict mode by default
@ -71,10 +73,11 @@ pub struct Datastore {
capabilities: Capabilities, capabilities: Capabilities,
// Whether this datastore enables live query notifications to subscribers // Whether this datastore enables live query notifications to subscribers
pub(super) notification_channel: Option<(Sender<Notification>, Receiver<Notification>)>, pub(super) notification_channel: Option<(Sender<Notification>, Receiver<Notification>)>,
// Clock for tracking time. It is read only and accessible to all transactions. It is behind a mutex as tests may write to it.
pub(super) clock: Arc<SizedClock>,
// The index store cache // The index store cache
index_stores: IndexStores, index_stores: IndexStores,
// The index asynchronous builder
#[cfg(not(target_arch = "wasm32"))]
index_builder: IndexBuilder,
#[cfg(feature = "jwks")] #[cfg(feature = "jwks")]
// The JWKS object cache // The JWKS object cache
jwks_cache: Arc<RwLock<JwksCache>>, jwks_cache: Arc<RwLock<JwksCache>>,
@ -89,8 +92,80 @@ pub struct Datastore {
temporary_directory: Option<Arc<PathBuf>>, temporary_directory: Option<Arc<PathBuf>>,
} }
#[derive(Clone)]
pub(super) struct TransactionFactory {
// Clock for tracking time. It is read only and accessible to all transactions. It is behind a mutex as tests may write to it.
clock: Arc<SizedClock>,
// The inner datastore type
flavor: Arc<DatastoreFlavor>,
}
impl TransactionFactory {
#[allow(unreachable_code)]
pub async fn transaction(
&self,
write: TransactionType,
lock: LockType,
) -> Result<Transaction, Error> {
// Specify if the transaction is writeable
#[allow(unused_variables)]
let write = match write {
Read => false,
Write => true,
};
// Specify if the transaction is lockable
#[allow(unused_variables)]
let lock = match lock {
Pessimistic => true,
Optimistic => false,
};
// Create a new transaction on the datastore
#[allow(unused_variables)]
let inner = match self.flavor.as_ref() {
#[cfg(feature = "kv-mem")]
DatastoreFlavor::Mem(v) => {
let tx = v.transaction(write, lock).await?;
super::tr::Inner::Mem(tx)
}
#[cfg(feature = "kv-rocksdb")]
DatastoreFlavor::RocksDB(v) => {
let tx = v.transaction(write, lock).await?;
super::tr::Inner::RocksDB(tx)
}
#[cfg(feature = "kv-indxdb")]
DatastoreFlavor::IndxDB(v) => {
let tx = v.transaction(write, lock).await?;
super::tr::Inner::IndxDB(tx)
}
#[cfg(feature = "kv-tikv")]
DatastoreFlavor::TiKV(v) => {
let tx = v.transaction(write, lock).await?;
super::tr::Inner::TiKV(tx)
}
#[cfg(feature = "kv-fdb")]
DatastoreFlavor::FoundationDB(v) => {
let tx = v.transaction(write, lock).await?;
super::tr::Inner::FoundationDB(tx)
}
#[cfg(feature = "kv-surrealkv")]
DatastoreFlavor::SurrealKV(v) => {
let tx = v.transaction(write, lock).await?;
super::tr::Inner::SurrealKV(tx)
}
#[allow(unreachable_patterns)]
_ => unreachable!(),
};
Ok(Transaction::new(Transactor {
inner,
stash: super::stash::Stash::default(),
cf: cf::Writer::new(),
clock: self.clock.clone(),
}))
}
}
#[allow(clippy::large_enum_variant)] #[allow(clippy::large_enum_variant)]
pub(super) enum Inner { pub(super) enum DatastoreFlavor {
#[cfg(feature = "kv-mem")] #[cfg(feature = "kv-mem")]
Mem(super::mem::Datastore), Mem(super::mem::Datastore),
#[cfg(feature = "kv-rocksdb")] #[cfg(feature = "kv-rocksdb")]
@ -108,19 +183,19 @@ pub(super) enum Inner {
impl fmt::Display for Datastore { impl fmt::Display for Datastore {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
#![allow(unused_variables)] #![allow(unused_variables)]
match &self.inner { match self.transaction_factory.flavor.as_ref() {
#[cfg(feature = "kv-mem")] #[cfg(feature = "kv-mem")]
Inner::Mem(_) => write!(f, "memory"), DatastoreFlavor::Mem(_) => write!(f, "memory"),
#[cfg(feature = "kv-rocksdb")] #[cfg(feature = "kv-rocksdb")]
Inner::RocksDB(_) => write!(f, "rocksdb"), DatastoreFlavor::RocksDB(_) => write!(f, "rocksdb"),
#[cfg(feature = "kv-indxdb")] #[cfg(feature = "kv-indxdb")]
Inner::IndxDB(_) => write!(f, "indxdb"), DatastoreFlavor::IndxDB(_) => write!(f, "indxdb"),
#[cfg(feature = "kv-tikv")] #[cfg(feature = "kv-tikv")]
Inner::TiKV(_) => write!(f, "tikv"), DatastoreFlavor::TiKV(_) => write!(f, "tikv"),
#[cfg(feature = "kv-fdb")] #[cfg(feature = "kv-fdb")]
Inner::FoundationDB(_) => write!(f, "fdb"), DatastoreFlavor::FoundationDB(_) => write!(f, "fdb"),
#[cfg(feature = "kv-surrealkv")] #[cfg(feature = "kv-surrealkv")]
Inner::SurrealKV(_) => write!(f, "surrealkv"), DatastoreFlavor::SurrealKV(_) => write!(f, "surrealkv"),
#[allow(unreachable_patterns)] #[allow(unreachable_patterns)]
_ => unreachable!(), _ => unreachable!(),
} }
@ -175,13 +250,13 @@ impl Datastore {
clock: Option<Arc<SizedClock>>, clock: Option<Arc<SizedClock>>,
) -> Result<Datastore, Error> { ) -> Result<Datastore, Error> {
// Initiate the desired datastore // Initiate the desired datastore
let (inner, clock): (Result<Inner, Error>, Arc<SizedClock>) = match path { let (flavor, clock): (Result<DatastoreFlavor, Error>, Arc<SizedClock>) = match path {
// Initiate an in-memory datastore // Initiate an in-memory datastore
"memory" => { "memory" => {
#[cfg(feature = "kv-mem")] #[cfg(feature = "kv-mem")]
{ {
info!(target: TARGET, "Starting kvs store in {}", path); info!(target: TARGET, "Starting kvs store in {}", path);
let v = super::mem::Datastore::new().await.map(Inner::Mem); let v = super::mem::Datastore::new().await.map(DatastoreFlavor::Mem);
let c = clock.unwrap_or_else(|| Arc::new(SizedClock::system())); let c = clock.unwrap_or_else(|| Arc::new(SizedClock::system()));
info!(target: TARGET, "Started kvs store in {}", path); info!(target: TARGET, "Started kvs store in {}", path);
Ok((v, c)) Ok((v, c))
@ -196,7 +271,7 @@ impl Datastore {
info!(target: TARGET, "Starting kvs store at {}", path); info!(target: TARGET, "Starting kvs store at {}", path);
let s = s.trim_start_matches("file://"); let s = s.trim_start_matches("file://");
let s = s.trim_start_matches("file:"); let s = s.trim_start_matches("file:");
let v = super::rocksdb::Datastore::new(s).await.map(Inner::RocksDB); let v = super::rocksdb::Datastore::new(s).await.map(DatastoreFlavor::RocksDB);
let c = clock.unwrap_or_else(|| Arc::new(SizedClock::system())); let c = clock.unwrap_or_else(|| Arc::new(SizedClock::system()));
info!(target: TARGET, "Started kvs store at {}", path); info!(target: TARGET, "Started kvs store at {}", path);
Ok((v, c)) Ok((v, c))
@ -211,7 +286,7 @@ impl Datastore {
info!(target: TARGET, "Starting kvs store at {}", path); info!(target: TARGET, "Starting kvs store at {}", path);
let s = s.trim_start_matches("rocksdb://"); let s = s.trim_start_matches("rocksdb://");
let s = s.trim_start_matches("rocksdb:"); let s = s.trim_start_matches("rocksdb:");
let v = super::rocksdb::Datastore::new(s).await.map(Inner::RocksDB); let v = super::rocksdb::Datastore::new(s).await.map(DatastoreFlavor::RocksDB);
let c = clock.unwrap_or_else(|| Arc::new(SizedClock::system())); let c = clock.unwrap_or_else(|| Arc::new(SizedClock::system()));
info!(target: TARGET, "Started kvs store at {}", path); info!(target: TARGET, "Started kvs store at {}", path);
Ok((v, c)) Ok((v, c))
@ -226,7 +301,7 @@ impl Datastore {
info!(target: TARGET, "Starting kvs store at {}", path); info!(target: TARGET, "Starting kvs store at {}", path);
let s = s.trim_start_matches("indxdb://"); let s = s.trim_start_matches("indxdb://");
let s = s.trim_start_matches("indxdb:"); let s = s.trim_start_matches("indxdb:");
let v = super::indxdb::Datastore::new(s).await.map(Inner::IndxDB); let v = super::indxdb::Datastore::new(s).await.map(DatastoreFlavor::IndxDB);
let c = clock.unwrap_or_else(|| Arc::new(SizedClock::system())); let c = clock.unwrap_or_else(|| Arc::new(SizedClock::system()));
info!(target: TARGET, "Started kvs store at {}", path); info!(target: TARGET, "Started kvs store at {}", path);
Ok((v, c)) Ok((v, c))
@ -241,7 +316,7 @@ impl Datastore {
info!(target: TARGET, "Connecting to kvs store at {}", path); info!(target: TARGET, "Connecting to kvs store at {}", path);
let s = s.trim_start_matches("tikv://"); let s = s.trim_start_matches("tikv://");
let s = s.trim_start_matches("tikv:"); let s = s.trim_start_matches("tikv:");
let v = super::tikv::Datastore::new(s).await.map(Inner::TiKV); let v = super::tikv::Datastore::new(s).await.map(DatastoreFlavor::TiKV);
let c = clock.unwrap_or_else(|| Arc::new(SizedClock::system())); let c = clock.unwrap_or_else(|| Arc::new(SizedClock::system()));
info!(target: TARGET, "Connected to kvs store at {}", path); info!(target: TARGET, "Connected to kvs store at {}", path);
Ok((v, c)) Ok((v, c))
@ -256,7 +331,7 @@ impl Datastore {
info!(target: TARGET, "Connecting to kvs store at {}", path); info!(target: TARGET, "Connecting to kvs store at {}", path);
let s = s.trim_start_matches("fdb://"); let s = s.trim_start_matches("fdb://");
let s = s.trim_start_matches("fdb:"); let s = s.trim_start_matches("fdb:");
let v = super::fdb::Datastore::new(s).await.map(Inner::FoundationDB); let v = super::fdb::Datastore::new(s).await.map(DatastoreFlavor::FoundationDB);
let c = clock.unwrap_or_else(|| Arc::new(SizedClock::system())); let c = clock.unwrap_or_else(|| Arc::new(SizedClock::system()));
info!(target: TARGET, "Connected to kvs store at {}", path); info!(target: TARGET, "Connected to kvs store at {}", path);
Ok((v, c)) Ok((v, c))
@ -271,7 +346,8 @@ impl Datastore {
info!(target: TARGET, "Starting kvs store at {}", path); info!(target: TARGET, "Starting kvs store at {}", path);
let s = s.trim_start_matches("surrealkv://"); let s = s.trim_start_matches("surrealkv://");
let s = s.trim_start_matches("surrealkv:"); let s = s.trim_start_matches("surrealkv:");
let v = super::surrealkv::Datastore::new(s).await.map(Inner::SurrealKV); let v =
super::surrealkv::Datastore::new(s).await.map(DatastoreFlavor::SurrealKV);
let c = clock.unwrap_or_else(|| Arc::new(SizedClock::system())); let c = clock.unwrap_or_else(|| Arc::new(SizedClock::system()));
info!(target: TARGET, "Started to kvs store at {}", path); info!(target: TARGET, "Started to kvs store at {}", path);
Ok((v, c)) Ok((v, c))
@ -286,10 +362,14 @@ impl Datastore {
} }
}?; }?;
// Set the properties on the datastore // Set the properties on the datastore
inner.map(|inner| Self { flavor.map(|flavor| {
id: Uuid::new_v4(), let tf = TransactionFactory {
inner,
clock, clock,
flavor: Arc::new(flavor),
};
Self {
id: Uuid::new_v4(),
transaction_factory: tf.clone(),
strict: false, strict: false,
auth_enabled: false, auth_enabled: false,
query_timeout: None, query_timeout: None,
@ -297,6 +377,8 @@ impl Datastore {
notification_channel: None, notification_channel: None,
capabilities: Capabilities::default(), capabilities: Capabilities::default(),
index_stores: IndexStores::default(), index_stores: IndexStores::default(),
#[cfg(not(target_arch = "wasm32"))]
index_builder: IndexBuilder::new(tf),
#[cfg(feature = "jwks")] #[cfg(feature = "jwks")]
jwks_cache: Arc::new(RwLock::new(JwksCache::new())), jwks_cache: Arc::new(RwLock::new(JwksCache::new())),
#[cfg(any( #[cfg(any(
@ -307,6 +389,7 @@ impl Datastore {
feature = "kv-tikv", feature = "kv-tikv",
))] ))]
temporary_directory: None, temporary_directory: None,
}
}) })
} }
@ -389,6 +472,10 @@ impl Datastore {
&self.jwks_cache &self.jwks_cache
} }
pub(super) async fn clock_now(&self) -> Timestamp {
self.transaction_factory.clock.now().await
}
// Initialise the cluster and run bootstrap utilities // Initialise the cluster and run bootstrap utilities
#[instrument(err, level = "trace", target = "surrealdb::core::kvs::ds", skip_all)] #[instrument(err, level = "trace", target = "surrealdb::core::kvs::ds", skip_all)]
pub async fn bootstrap(&self) -> Result<(), Error> { pub async fn bootstrap(&self) -> Result<(), Error> {
@ -551,60 +638,7 @@ impl Datastore {
write: TransactionType, write: TransactionType,
lock: LockType, lock: LockType,
) -> Result<Transaction, Error> { ) -> Result<Transaction, Error> {
// Specify if the transaction is writeable self.transaction_factory.transaction(write, lock).await
#[allow(unused_variables)]
let write = match write {
Read => false,
Write => true,
};
// Specify if the transaction is lockable
#[allow(unused_variables)]
let lock = match lock {
Pessimistic => true,
Optimistic => false,
};
// Create a new transaction on the datastore
#[allow(unused_variables)]
let inner = match &self.inner {
#[cfg(feature = "kv-mem")]
Inner::Mem(v) => {
let tx = v.transaction(write, lock).await?;
super::tr::Inner::Mem(tx)
}
#[cfg(feature = "kv-rocksdb")]
Inner::RocksDB(v) => {
let tx = v.transaction(write, lock).await?;
super::tr::Inner::RocksDB(tx)
}
#[cfg(feature = "kv-indxdb")]
Inner::IndxDB(v) => {
let tx = v.transaction(write, lock).await?;
super::tr::Inner::IndxDB(tx)
}
#[cfg(feature = "kv-tikv")]
Inner::TiKV(v) => {
let tx = v.transaction(write, lock).await?;
super::tr::Inner::TiKV(tx)
}
#[cfg(feature = "kv-fdb")]
Inner::FoundationDB(v) => {
let tx = v.transaction(write, lock).await?;
super::tr::Inner::FoundationDB(tx)
}
#[cfg(feature = "kv-surrealkv")]
Inner::SurrealKV(v) => {
let tx = v.transaction(write, lock).await?;
super::tr::Inner::SurrealKV(tx)
}
#[allow(unreachable_patterns)]
_ => unreachable!(),
};
Ok(Transaction::new(Transactor {
inner,
stash: super::stash::Stash::default(),
cf: cf::Writer::new(),
clock: self.clock.clone(),
}))
} }
/// Parse and execute an SQL query /// Parse and execute an SQL query
@ -690,6 +724,8 @@ impl Datastore {
self.query_timeout, self.query_timeout,
self.capabilities.clone(), self.capabilities.clone(),
self.index_stores.clone(), self.index_stores.clone(),
#[cfg(not(target_arch = "wasm32"))]
self.index_builder.clone(),
#[cfg(any( #[cfg(any(
feature = "kv-mem", feature = "kv-mem",
feature = "kv-surrealkv", feature = "kv-surrealkv",

294
core/src/kvs/index.rs Normal file
View file

@ -0,0 +1,294 @@
use crate::cnf::{INDEXING_BATCH_SIZE, NORMAL_FETCH_SIZE};
use crate::ctx::{Context, MutableContext};
use crate::dbs::Options;
use crate::doc::{CursorDoc, Document};
use crate::err::Error;
use crate::idx::index::IndexOperation;
use crate::key::thing;
use crate::kvs::ds::TransactionFactory;
use crate::kvs::LockType::Optimistic;
use crate::kvs::{Transaction, TransactionType};
use crate::sql::statements::DefineIndexStatement;
use crate::sql::{Id, Object, Thing, Value};
use dashmap::mapref::entry::Entry;
use dashmap::DashMap;
use reblessive::TreeStack;
use std::collections::VecDeque;
use std::sync::Arc;
use tokio::sync::Mutex;
use tokio::task;
use tokio::task::JoinHandle;
#[derive(Clone)]
pub(crate) enum BuildingStatus {
Started,
InitialIndexing(usize),
UpdatesIndexing(usize),
Error(Arc<Error>),
Built,
}
pub(crate) enum ConsumeResult {
/// The document has been enqueued to be indexed
Enqueued,
/// The index has been built, the document can be indexed normally
Ignored(Option<Vec<Value>>, Option<Vec<Value>>),
}
impl BuildingStatus {
fn is_error(&self) -> bool {
matches!(self, Self::Error(_))
}
fn is_built(&self) -> bool {
matches!(self, Self::Built)
}
}
impl From<BuildingStatus> for Value {
fn from(st: BuildingStatus) -> Self {
let mut o = Object::default();
let s = match st {
BuildingStatus::Started => "started",
BuildingStatus::InitialIndexing(count) => {
o.insert("count".to_string(), count.into());
"initial"
}
BuildingStatus::UpdatesIndexing(count) => {
o.insert("count".to_string(), count.into());
"updates"
}
BuildingStatus::Error(error) => {
o.insert("error".to_string(), error.to_string().into());
"error"
}
BuildingStatus::Built => "built",
};
o.insert("status".to_string(), s.into());
o.into()
}
}
type IndexBuilding = (Arc<Building>, JoinHandle<()>);
#[derive(Clone)]
pub(crate) struct IndexBuilder {
tf: TransactionFactory,
indexes: Arc<DashMap<Arc<DefineIndexStatement>, IndexBuilding>>,
}
impl IndexBuilder {
pub(super) fn new(tf: TransactionFactory) -> Self {
Self {
tf,
indexes: Default::default(),
}
}
pub(crate) fn build(
&self,
ctx: &Context,
opt: Options,
ix: Arc<DefineIndexStatement>,
) -> Result<(), Error> {
match self.indexes.entry(ix) {
Entry::Occupied(e) => {
// If the building is currently running we return error
if !e.get().1.is_finished() {
return Err(Error::IndexAlreadyBuilding {
index: e.key().name.to_string(),
});
}
}
Entry::Vacant(e) => {
// No index is currently building, we can start building it
let building = Arc::new(Building::new(ctx, self.tf.clone(), opt, e.key().clone())?);
let b = building.clone();
let jh = task::spawn(async move {
if let Err(err) = b.compute().await {
b.set_status(BuildingStatus::Error(err.into())).await;
}
});
e.insert((building, jh));
}
}
Ok(())
}
pub(crate) async fn consume(
&self,
ix: &DefineIndexStatement,
old_values: Option<Vec<Value>>,
new_values: Option<Vec<Value>>,
rid: &Thing,
) -> Result<ConsumeResult, Error> {
if let Some(r) = self.indexes.get(ix) {
let (b, _) = r.value();
return Ok(b.maybe_consume(old_values, new_values, rid).await);
}
Ok(ConsumeResult::Ignored(old_values, new_values))
}
pub(crate) async fn get_status(&self, ix: &DefineIndexStatement) -> Option<BuildingStatus> {
if let Some(a) = self.indexes.get(ix) {
Some(a.value().0.status.lock().await.clone())
} else {
None
}
}
}
struct Appending {
old_values: Option<Vec<Value>>,
new_values: Option<Vec<Value>>,
id: Id,
}
struct Building {
ctx: Context,
opt: Options,
tf: TransactionFactory,
ix: Arc<DefineIndexStatement>,
tb: String,
status: Arc<Mutex<BuildingStatus>>,
// Should be stored on a temporary table
appended: Arc<Mutex<VecDeque<Appending>>>,
}
impl Building {
fn new(
ctx: &Context,
tf: TransactionFactory,
opt: Options,
ix: Arc<DefineIndexStatement>,
) -> Result<Self, Error> {
Ok(Self {
ctx: MutableContext::new_concurrent(ctx).freeze(),
opt,
tf,
tb: ix.what.to_string(),
ix,
status: Arc::new(Mutex::new(BuildingStatus::Started)),
appended: Default::default(),
})
}
async fn set_status(&self, status: BuildingStatus) {
let mut s = self.status.lock().await;
// We want to keep only the first error
if !s.is_error() {
*s = status;
}
}
async fn maybe_consume(
&self,
old_values: Option<Vec<Value>>,
new_values: Option<Vec<Value>>,
rid: &Thing,
) -> ConsumeResult {
let mut a = self.appended.lock().await;
// Now that the queue is locked, we have the possibility to assess if the asynchronous build is done.
if a.is_empty() {
// If the appending queue is empty and the index is built...
if self.status.lock().await.is_built() {
// ... we return the values back, so the document can be updated the usual way
return ConsumeResult::Ignored(old_values, new_values);
}
}
a.push_back(Appending {
old_values,
new_values,
id: rid.id.clone(),
});
ConsumeResult::Enqueued
}
async fn new_read_tx(&self) -> Result<Transaction, Error> {
self.tf.transaction(TransactionType::Read, Optimistic).await
}
async fn new_write_tx_ctx(&self) -> Result<Context, Error> {
let tx = self.tf.transaction(TransactionType::Write, Optimistic).await?.into();
let mut ctx = MutableContext::new(&self.ctx);
ctx.set_transaction(tx);
Ok(ctx.freeze())
}
async fn compute(&self) -> Result<(), Error> {
let mut stack = TreeStack::new();
self.set_status(BuildingStatus::InitialIndexing(0)).await;
// First iteration, we index every keys
let ns = self.opt.ns()?;
let db = self.opt.db()?;
let beg = thing::prefix(ns, db, &self.tb);
let end = thing::suffix(ns, db, &self.tb);
let mut next = Some(beg..end);
let mut count = 0;
while let Some(rng) = next {
// Get the next batch of records
let batch = self.new_read_tx().await?.batch(rng, *INDEXING_BATCH_SIZE, true).await?;
// Set the next scan range
next = batch.next;
// Check there are records
if batch.values.is_empty() {
break;
}
// Create a new context with a write transaction
let ctx = self.new_write_tx_ctx().await?;
// Index the records
for (k, v) in batch.values.into_iter() {
let key: thing::Thing = (&k).into();
// Parse the value
let val: Value = (&v).into();
let rid: Arc<Thing> = Thing::from((key.tb, key.id)).into();
let doc = CursorDoc::new(Some(rid.clone()), None, val);
let opt_values = stack
.enter(|stk| Document::build_opt_values(stk, &ctx, &self.opt, &self.ix, &doc))
.finish()
.await?;
// Index the record
let mut io = IndexOperation::new(&ctx, &self.opt, &self.ix, None, opt_values, &rid);
stack.enter(|stk| io.compute(stk)).finish().await?;
count += 1;
self.set_status(BuildingStatus::InitialIndexing(count)).await;
}
ctx.tx().commit().await?;
}
// Second iteration, we index/remove any records that has been added or removed since the initial indexing
self.set_status(BuildingStatus::UpdatesIndexing(0)).await;
loop {
let mut batch = self.appended.lock().await;
if batch.is_empty() {
// If the batch is empty, we are done.
// Due to the lock on self.appended, we know that no external process can add an item to the queue.
self.set_status(BuildingStatus::Built).await;
// This is here to be sure the lock on back is not released early
batch.clear();
break;
}
let fetch = (*NORMAL_FETCH_SIZE as usize).min(batch.len());
let drain = batch.drain(0..fetch);
// Create a new context with a write transaction
let ctx = self.new_write_tx_ctx().await?;
for a in drain {
let rid = Thing::from((self.tb.clone(), a.id));
let mut io = IndexOperation::new(
&ctx,
&self.opt,
&self.ix,
a.old_values,
a.new_values,
&rid,
);
stack.enter(|stk| io.compute(stk)).finish().await?;
count += 1;
self.set_status(BuildingStatus::UpdatesIndexing(count)).await;
}
ctx.tx().commit().await?;
}
Ok(())
}
}

View file

@ -33,10 +33,14 @@ mod rocksdb;
mod surrealkv; mod surrealkv;
mod tikv; mod tikv;
#[cfg(not(target_arch = "wasm32"))]
mod index;
#[cfg(test)] #[cfg(test)]
mod tests; mod tests;
pub use self::ds::*; pub use self::ds::*;
#[cfg(not(target_arch = "wasm32"))]
pub(crate) use self::index::*;
pub use self::kv::*; pub use self::kv::*;
pub use self::live::*; pub use self::live::*;
pub use self::tr::*; pub use self::tr::*;

View file

@ -25,7 +25,7 @@ impl Datastore {
// Open transaction and set node data // Open transaction and set node data
let txn = self.transaction(Write, Optimistic).await?; let txn = self.transaction(Write, Optimistic).await?;
let key = crate::key::root::nd::Nd::new(id); let key = crate::key::root::nd::Nd::new(id);
let now = self.clock.now().await; let now = self.clock_now().await;
let val = Node::new(id, now, false); let val = Node::new(id, now, false);
match run!(txn, txn.put(key, val)) { match run!(txn, txn.put(key, val)) {
Err(Error::TxKeyAlreadyExists) => Err(Error::ClAlreadyExists { Err(Error::TxKeyAlreadyExists) => Err(Error::ClAlreadyExists {
@ -49,7 +49,7 @@ impl Datastore {
// Open transaction and set node data // Open transaction and set node data
let txn = self.transaction(Write, Optimistic).await?; let txn = self.transaction(Write, Optimistic).await?;
let key = crate::key::root::nd::new(id); let key = crate::key::root::nd::new(id);
let now = self.clock.now().await; let now = self.clock_now().await;
let val = Node::new(id, now, false); let val = Node::new(id, now, false);
run!(txn, txn.set(key, val)) run!(txn, txn.set(key, val))
} }
@ -86,7 +86,7 @@ impl Datastore {
trace!(target: TARGET, "Archiving expired nodes in the cluster"); trace!(target: TARGET, "Archiving expired nodes in the cluster");
// Open transaction and fetch nodes // Open transaction and fetch nodes
let txn = self.transaction(Write, Optimistic).await?; let txn = self.transaction(Write, Optimistic).await?;
let now = self.clock.now().await; let now = self.clock_now().await;
let nds = catch!(txn, txn.all_nodes()); let nds = catch!(txn, txn.all_nodes());
for nd in nds.iter() { for nd in nds.iter() {
// Check that the node is active // Check that the node is active

View file

@ -13,7 +13,7 @@ use serde::{Deserialize, Serialize};
use std::fmt::{self, Display}; use std::fmt::{self, Display};
use std::sync::Arc; use std::sync::Arc;
#[revisioned(revision = 3)] #[revisioned(revision = 4)]
#[derive(Clone, Debug, Default, Eq, PartialEq, PartialOrd, Serialize, Deserialize, Store, Hash)] #[derive(Clone, Debug, Default, Eq, PartialEq, PartialOrd, Serialize, Deserialize, Store, Hash)]
#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))] #[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
#[non_exhaustive] #[non_exhaustive]
@ -27,6 +27,8 @@ pub struct DefineIndexStatement {
pub if_not_exists: bool, pub if_not_exists: bool,
#[revision(start = 3)] #[revision(start = 3)]
pub overwrite: bool, pub overwrite: bool,
#[revision(start = 4)]
pub concurrently: bool,
} }
impl DefineIndexStatement { impl DefineIndexStatement {
@ -89,6 +91,25 @@ impl DefineIndexStatement {
.await?; .await?;
// Clear the cache // Clear the cache
txn.clear(); txn.clear();
#[cfg(not(target_arch = "wasm32"))]
if self.concurrently {
self.async_index(ctx, opt)?;
} else {
self.sync_index(stk, ctx, opt, doc).await?;
}
#[cfg(target_arch = "wasm32")]
self.sync_index(stk, ctx, opt, doc).await?;
// Ok all good
Ok(Value::None)
}
async fn sync_index(
&self,
stk: &mut Stk,
ctx: &Context,
opt: &Options,
doc: Option<&CursorDoc>,
) -> Result<(), Error> {
// Force queries to run // Force queries to run
let opt = &opt.new_with_force(Force::Index(Arc::new([self.clone()]))); let opt = &opt.new_with_force(Force::Index(Arc::new([self.clone()])));
// Update the index data // Update the index data
@ -98,8 +119,16 @@ impl DefineIndexStatement {
..UpdateStatement::default() ..UpdateStatement::default()
}; };
stm.compute(stk, ctx, opt, doc).await?; stm.compute(stk, ctx, opt, doc).await?;
// Ok all good Ok(())
Ok(Value::None) }
#[cfg(not(target_arch = "wasm32"))]
fn async_index(&self, ctx: &Context, opt: &Options) -> Result<(), Error> {
ctx.get_index_builder().ok_or(Error::Unreachable("No Index Builder"))?.build(
ctx,
opt.clone(),
self.clone().into(),
)
} }
} }
@ -119,6 +148,9 @@ impl Display for DefineIndexStatement {
if let Some(ref v) = self.comment { if let Some(ref v) = self.comment {
write!(f, " COMMENT {v}")? write!(f, " COMMENT {v}")?
} }
if self.concurrently {
write!(f, " CONCURRENTLY")?
}
Ok(()) Ok(())
} }
} }

View file

@ -9,9 +9,10 @@ use derive::Store;
use revision::revisioned; use revision::revisioned;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::fmt; use std::fmt;
use std::ops::Deref;
use std::sync::Arc; use std::sync::Arc;
#[revisioned(revision = 2)] #[revisioned(revision = 3)]
#[derive(Clone, Debug, Eq, PartialEq, PartialOrd, Serialize, Deserialize, Store, Hash)] #[derive(Clone, Debug, Eq, PartialEq, PartialOrd, Serialize, Deserialize, Store, Hash)]
#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))] #[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
#[non_exhaustive] #[non_exhaustive]
@ -36,6 +37,8 @@ pub enum InfoStatement {
User(Ident, Option<Base>), User(Ident, Option<Base>),
#[revision(start = 2)] #[revision(start = 2)]
User(Ident, Option<Base>, bool), User(Ident, Option<Base>, bool),
#[revision(start = 3)]
Index(Ident, Ident, bool),
} }
impl InfoStatement { impl InfoStatement {
@ -304,6 +307,23 @@ impl InfoStatement {
false => Value::from(res.to_string()), false => Value::from(res.to_string()),
}) })
} }
InfoStatement::Index(index, table, _structured) => {
// Allowed to run?
opt.is_allowed(Action::View, ResourceKind::Actor, &Base::Db)?;
// Get the transaction
let txn = ctx.tx();
// Obtain the index
let res = txn.get_tb_index(opt.ns()?, opt.db()?, table, index).await?;
// Output
let mut out = Object::default();
#[cfg(not(target_arch = "wasm32"))]
if let Some(ib) = ctx.get_index_builder() {
if let Some(status) = ib.get_status(res.deref()).await {
out.insert("building".to_string(), status.into());
}
}
Ok(out.into())
}
} }
} }
} }
@ -327,6 +347,8 @@ impl fmt::Display for InfoStatement {
Some(ref b) => write!(f, "INFO FOR USER {u} ON {b} STRUCTURE"), Some(ref b) => write!(f, "INFO FOR USER {u} ON {b} STRUCTURE"),
None => write!(f, "INFO FOR USER {u} STRUCTURE"), None => write!(f, "INFO FOR USER {u} STRUCTURE"),
}, },
Self::Index(ref i, ref t, false) => write!(f, "INFO FOR INDEX {i} ON {t}"),
Self::Index(ref i, ref t, true) => write!(f, "INFO FOR INDEX {i} ON {t} STRUCTURE"),
} }
} }
} }
@ -343,6 +365,7 @@ impl InfoStatement {
InfoStatement::Db(_) => InfoStatement::Db(true), InfoStatement::Db(_) => InfoStatement::Db(true),
InfoStatement::Tb(t, _) => InfoStatement::Tb(t, true), InfoStatement::Tb(t, _) => InfoStatement::Tb(t, true),
InfoStatement::User(u, b, _) => InfoStatement::User(u, b, true), InfoStatement::User(u, b, _) => InfoStatement::User(u, b, true),
InfoStatement::Index(i, t, _) => InfoStatement::Index(i, t, true),
} }
} }
} }

View file

@ -85,6 +85,7 @@ pub(crate) static KEYWORDS: phf::Map<UniCase<&'static str>, TokenKind> = phf_map
UniCase::ascii("CLASS") => TokenKind::Keyword(Keyword::Class), UniCase::ascii("CLASS") => TokenKind::Keyword(Keyword::Class),
UniCase::ascii("COMMENT") => TokenKind::Keyword(Keyword::Comment), UniCase::ascii("COMMENT") => TokenKind::Keyword(Keyword::Comment),
UniCase::ascii("COMMIT") => TokenKind::Keyword(Keyword::Commit), UniCase::ascii("COMMIT") => TokenKind::Keyword(Keyword::Commit),
UniCase::ascii("CONCURRENTLY") => TokenKind::Keyword(Keyword::Concurrently),
UniCase::ascii("CONTENT") => TokenKind::Keyword(Keyword::Content), UniCase::ascii("CONTENT") => TokenKind::Keyword(Keyword::Content),
UniCase::ascii("CONTINUE") => TokenKind::Keyword(Keyword::Continue), UniCase::ascii("CONTINUE") => TokenKind::Keyword(Keyword::Continue),
UniCase::ascii("CREATE") => TokenKind::Keyword(Keyword::Create), UniCase::ascii("CREATE") => TokenKind::Keyword(Keyword::Create),

View file

@ -1070,6 +1070,10 @@ impl Parser<'_> {
keep_pruned_connections, keep_pruned_connections,
)); ));
} }
t!("CONCURRENTLY") => {
self.pop_peek();
res.concurrently = true;
}
t!("COMMENT") => { t!("COMMENT") => {
self.pop_peek(); self.pop_peek();
res.comment = Some(self.next_token_value()?); res.comment = Some(self.next_token_value()?);

View file

@ -531,6 +531,13 @@ impl Parser<'_> {
let base = self.eat(t!("ON")).then(|| self.parse_base(false)).transpose()?; let base = self.eat(t!("ON")).then(|| self.parse_base(false)).transpose()?;
InfoStatement::User(ident, base, false) InfoStatement::User(ident, base, false)
} }
t!("INDEX") => {
let index = self.next_token_value()?;
expected!(self, t!("ON"));
self.eat(t!("TABLE"));
let table = self.next_token_value()?;
InfoStatement::Index(index, table, false)
}
x => unexpected!(self, x, "an info target"), x => unexpected!(self, x, "an info target"),
}; };

View file

@ -1519,6 +1519,7 @@ fn parse_define_index() {
comment: None, comment: None,
if_not_exists: false, if_not_exists: false,
overwrite: false, overwrite: false,
concurrently: false
})) }))
); );
@ -1535,6 +1536,7 @@ fn parse_define_index() {
comment: None, comment: None,
if_not_exists: false, if_not_exists: false,
overwrite: false, overwrite: false,
concurrently: false
})) }))
); );
@ -1560,6 +1562,7 @@ fn parse_define_index() {
comment: None, comment: None,
if_not_exists: false, if_not_exists: false,
overwrite: false, overwrite: false,
concurrently: false
})) }))
); );
@ -1586,6 +1589,7 @@ fn parse_define_index() {
comment: None, comment: None,
if_not_exists: false, if_not_exists: false,
overwrite: false, overwrite: false,
concurrently: false
})) }))
); );
} }

View file

@ -343,6 +343,7 @@ fn statements() -> Vec<Statement> {
comment: None, comment: None,
if_not_exists: false, if_not_exists: false,
overwrite: false, overwrite: false,
concurrently: false,
})), })),
Statement::Define(DefineStatement::Index(DefineIndexStatement { Statement::Define(DefineStatement::Index(DefineIndexStatement {
name: Ident("index".to_owned()), name: Ident("index".to_owned()),
@ -352,6 +353,7 @@ fn statements() -> Vec<Statement> {
comment: None, comment: None,
if_not_exists: false, if_not_exists: false,
overwrite: false, overwrite: false,
concurrently: false,
})), })),
Statement::Define(DefineStatement::Index(DefineIndexStatement { Statement::Define(DefineStatement::Index(DefineIndexStatement {
name: Ident("index".to_owned()), name: Ident("index".to_owned()),
@ -370,6 +372,7 @@ fn statements() -> Vec<Statement> {
comment: None, comment: None,
if_not_exists: false, if_not_exists: false,
overwrite: false, overwrite: false,
concurrently: false,
})), })),
Statement::Define(DefineStatement::Analyzer(DefineAnalyzerStatement { Statement::Define(DefineStatement::Analyzer(DefineAnalyzerStatement {
name: Ident("ana".to_owned()), name: Ident("ana".to_owned()),

View file

@ -52,6 +52,7 @@ keyword! {
Class => "CLASS", Class => "CLASS",
Comment => "COMMENT", Comment => "COMMENT",
Commit => "COMMIT", Commit => "COMMIT",
Concurrently => "CONCURRENTLY",
Content => "CONTENT", Content => "CONTENT",
Continue => "CONTINUE", Continue => "CONTINUE",
Create => "CREATE", Create => "CREATE",

View file

@ -685,6 +685,38 @@ async fn define_statement_index_single() -> Result<(), Error> {
Ok(()) Ok(())
} }
#[tokio::test]
async fn define_statement_index_concurrently() -> Result<(), Error> {
let sql = "
CREATE user:1 SET email = 'test@surrealdb.com';
CREATE user:2 SET email = 'test@surrealdb.com';
DEFINE INDEX test ON user FIELDS email CONCURRENTLY;
SLEEP 1s;
INFO FOR TABLE user;
INFO FOR INDEX test ON user;
";
let mut t = Test::new(sql).await?;
t.skip_ok(4)?;
t.expect_val(
"{
events: {},
fields: {},
tables: {},
indexes: {
test: 'DEFINE INDEX test ON user FIELDS email CONCURRENTLY',
},
lives: {},
}",
)?;
t.expect_val(
"{
building: { status: 'built' }
}",
)?;
Ok(())
}
#[tokio::test] #[tokio::test]
async fn define_statement_index_multiple() -> Result<(), Error> { async fn define_statement_index_multiple() -> Result<(), Error> {
let sql = " let sql = "