[Feat] Async indexing: appending queue is persistent (#4622)

This commit is contained in:
Emmanuel Keller 2024-08-28 11:08:59 +01:00 committed by GitHub
parent a5abc66e06
commit e5bf40ae01
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 159 additions and 32 deletions

View file

@ -73,7 +73,7 @@ impl Document {
) -> Result<(), Error> { ) -> Result<(), Error> {
#[cfg(not(target_arch = "wasm32"))] #[cfg(not(target_arch = "wasm32"))]
let (o, n) = if let Some(ib) = ctx.get_index_builder() { let (o, n) = if let Some(ib) = ctx.get_index_builder() {
match ib.consume(ix, o, n, rid).await? { match ib.consume(ctx, ix, o, n, rid).await? {
// The index builder consumed the value, which means it is currently building the index asynchronously, // The index builder consumed the value, which means it is currently building the index asynchronously,
// we don't index the document and let the index builder do it later. // we don't index the document and let the index builder do it later.
ConsumeResult::Enqueued => return Ok(()), ConsumeResult::Enqueued => return Ok(()),

View file

@ -134,6 +134,8 @@ pub enum Category {
IndexHnswThings, IndexHnswThings,
/// crate::key::index::hv /*{ns}*{db}*{tb}+{ix}!hv{vec} /// crate::key::index::hv /*{ns}*{db}*{tb}+{ix}!hv{vec}
IndexHnswVec, IndexHnswVec,
/// crate::key::index::ia /*{ns}*{db}*{tb}+{ix}!ia{id}
IndexAppendings,
/// crate::key::index /*{ns}*{db}*{tb}+{ix}*{fd}{id} /// crate::key::index /*{ns}*{db}*{tb}+{ix}*{fd}{id}
Index, Index,
/// ///
@ -209,6 +211,7 @@ impl Display for Category {
Self::IndexHnswDocIds => "IndexHnswDocIds", Self::IndexHnswDocIds => "IndexHnswDocIds",
Self::IndexHnswThings => "IndexHnswThings", Self::IndexHnswThings => "IndexHnswThings",
Self::IndexHnswVec => "IndexHnswVec", Self::IndexHnswVec => "IndexHnswVec",
Self::IndexAppendings => "IndexAppendings",
Self::Index => "Index", Self::Index => "Index",
Self::ChangeFeed => "ChangeFeed", Self::ChangeFeed => "ChangeFeed",
Self::Thing => "Thing", Self::Thing => "Thing",

62
core/src/key/index/ia.rs Normal file
View file

@ -0,0 +1,62 @@
//! Store appended records for concurrent index building
use derive::Key;
use serde::{Deserialize, Serialize};
use std::fmt::Debug;
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Key)]
#[non_exhaustive]
pub struct Ia<'a> {
__: u8,
_a: u8,
pub ns: &'a str,
_b: u8,
pub db: &'a str,
_c: u8,
pub tb: &'a str,
_d: u8,
pub ix: &'a str,
_e: u8,
_f: u8,
_g: u8,
pub i: u32,
}
impl<'a> Ia<'a> {
pub fn new(ns: &'a str, db: &'a str, tb: &'a str, ix: &'a str, i: u32) -> Self {
Self {
__: b'/',
_a: b'*',
ns,
_b: b'*',
db,
_c: b'*',
tb,
_d: b'+',
ix,
_e: b'!',
_f: b'i',
_g: b'a',
i,
}
}
}
#[cfg(test)]
mod tests {
#[test]
fn key() {
use super::*;
let val = Ia::new("testns", "testdb", "testtb", "testix", 1);
let enc = Ia::encode(&val).unwrap();
assert_eq!(
enc,
b"/*testns\0*testdb\0*testtb\0+testix\0!ia\x00\x00\x00\x01",
"{}",
String::from_utf8_lossy(&enc)
);
let dec = Ia::decode(&enc).unwrap();
assert_eq!(val, dec);
}
}

View file

@ -17,6 +17,7 @@ pub mod hi;
pub mod hl; pub mod hl;
pub mod hs; pub mod hs;
pub mod hv; pub mod hv;
pub mod ia;
pub mod vm; pub mod vm;
use crate::key::category::Categorise; use crate::key::category::Categorise;

View file

@ -4,6 +4,7 @@ use crate::dbs::Options;
use crate::doc::{CursorDoc, Document}; use crate::doc::{CursorDoc, Document};
use crate::err::Error; use crate::err::Error;
use crate::idx::index::IndexOperation; use crate::idx::index::IndexOperation;
use crate::key::index::ia::Ia;
use crate::key::thing; use crate::key::thing;
use crate::kvs::ds::TransactionFactory; use crate::kvs::ds::TransactionFactory;
use crate::kvs::LockType::Optimistic; use crate::kvs::LockType::Optimistic;
@ -12,8 +13,11 @@ use crate::sql::statements::DefineIndexStatement;
use crate::sql::{Id, Object, Thing, Value}; use crate::sql::{Id, Object, Thing, Value};
use dashmap::mapref::entry::Entry; use dashmap::mapref::entry::Entry;
use dashmap::DashMap; use dashmap::DashMap;
use derive::Store;
use reblessive::TreeStack; use reblessive::TreeStack;
use std::collections::VecDeque; use revision::revisioned;
use serde::{Deserialize, Serialize};
use std::ops::Range;
use std::sync::Arc; use std::sync::Arc;
use tokio::sync::Mutex; use tokio::sync::Mutex;
use tokio::task; use tokio::task;
@ -117,6 +121,7 @@ impl IndexBuilder {
pub(crate) async fn consume( pub(crate) async fn consume(
&self, &self,
ctx: &Context,
ix: &DefineIndexStatement, ix: &DefineIndexStatement,
old_values: Option<Vec<Value>>, old_values: Option<Vec<Value>>,
new_values: Option<Vec<Value>>, new_values: Option<Vec<Value>>,
@ -124,7 +129,7 @@ impl IndexBuilder {
) -> Result<ConsumeResult, Error> { ) -> Result<ConsumeResult, Error> {
if let Some(r) = self.indexes.get(ix) { if let Some(r) = self.indexes.get(ix) {
let (b, _) = r.value(); let (b, _) = r.value();
return Ok(b.maybe_consume(old_values, new_values, rid).await); return b.maybe_consume(ctx, old_values, new_values, rid).await;
} }
Ok(ConsumeResult::Ignored(old_values, new_values)) Ok(ConsumeResult::Ignored(old_values, new_values))
} }
@ -138,12 +143,50 @@ impl IndexBuilder {
} }
} }
#[revisioned(revision = 1)]
#[derive(Serialize, Deserialize, Store, Debug)]
#[non_exhaustive]
struct Appending { struct Appending {
old_values: Option<Vec<Value>>, old_values: Option<Vec<Value>>,
new_values: Option<Vec<Value>>, new_values: Option<Vec<Value>>,
id: Id, id: Id,
} }
#[derive(Default)]
struct QueueSequences {
/// The index of the next appending to be indexed
to_index: u32,
/// The index of the next appending to be added
next: u32,
}
impl QueueSequences {
fn is_empty(&self) -> bool {
self.to_index == self.next
}
fn add_update(&mut self) -> u32 {
let i = self.next;
self.next += 1;
i
}
fn clear(&mut self) {
self.to_index = 0;
self.next = 0;
}
fn set_to_index(&mut self, i: u32) {
self.to_index = i;
}
fn next_indexing_batch(&self, page: u32) -> Range<u32> {
let s = self.to_index;
let e = (s + page).min(self.next);
s..e
}
}
struct Building { struct Building {
ctx: Context, ctx: Context,
opt: Options, opt: Options,
@ -152,7 +195,7 @@ struct Building {
tb: String, tb: String,
status: Arc<Mutex<BuildingStatus>>, status: Arc<Mutex<BuildingStatus>>,
// Should be stored on a temporary table // Should be stored on a temporary table
appended: Arc<Mutex<VecDeque<Appending>>>, queue: Arc<Mutex<QueueSequences>>,
} }
impl Building { impl Building {
@ -169,7 +212,7 @@ impl Building {
tb: ix.what.to_string(), tb: ix.what.to_string(),
ix, ix,
status: Arc::new(Mutex::new(BuildingStatus::Started)), status: Arc::new(Mutex::new(BuildingStatus::Started)),
appended: Default::default(), queue: Default::default(),
}) })
} }
@ -183,25 +226,34 @@ impl Building {
async fn maybe_consume( async fn maybe_consume(
&self, &self,
ctx: &Context,
old_values: Option<Vec<Value>>, old_values: Option<Vec<Value>>,
new_values: Option<Vec<Value>>, new_values: Option<Vec<Value>>,
rid: &Thing, rid: &Thing,
) -> ConsumeResult { ) -> Result<ConsumeResult, Error> {
let mut a = self.appended.lock().await; let mut queue = self.queue.lock().await;
// Now that the queue is locked, we have the possibility to assess if the asynchronous build is done. // Now that the queue is locked, we have the possibility to assess if the asynchronous build is done.
if a.is_empty() { if queue.is_empty() {
// If the appending queue is empty and the index is built... // If the appending queue is empty and the index is built...
if self.status.lock().await.is_built() { if self.status.lock().await.is_built() {
// ... we return the values back, so the document can be updated the usual way // ... we return the values back, so the document can be updated the usual way
return ConsumeResult::Ignored(old_values, new_values); return Ok(ConsumeResult::Ignored(old_values, new_values));
} }
} }
a.push_back(Appending { let a = Appending {
old_values, old_values,
new_values, new_values,
id: rid.id.clone(), id: rid.id.clone(),
}); };
ConsumeResult::Enqueued let ia = self.new_ia_key(queue.add_update())?;
ctx.tx().set(ia, a, None).await?;
Ok(ConsumeResult::Enqueued)
}
fn new_ia_key(&self, i: u32) -> Result<Ia, Error> {
let ns = self.opt.ns()?;
let db = self.opt.db()?;
Ok(Ia::new(ns, db, &self.ix.what, &self.ix.name, i))
} }
async fn new_read_tx(&self) -> Result<Transaction, Error> { async fn new_read_tx(&self) -> Result<Transaction, Error> {
@ -259,35 +311,44 @@ impl Building {
// Second iteration, we index/remove any records that has been added or removed since the initial indexing // Second iteration, we index/remove any records that has been added or removed since the initial indexing
self.set_status(BuildingStatus::UpdatesIndexing(0)).await; self.set_status(BuildingStatus::UpdatesIndexing(0)).await;
loop { loop {
let mut batch = self.appended.lock().await; let mut queue = self.queue.lock().await;
if batch.is_empty() { if queue.is_empty() {
// If the batch is empty, we are done. // If the batch is empty, we are done.
// Due to the lock on self.appended, we know that no external process can add an item to the queue. // Due to the lock on self.appended, we know that no external process can add an item to the queue.
self.set_status(BuildingStatus::Built).await; self.set_status(BuildingStatus::Built).await;
// This is here to be sure the lock on back is not released early // This is here to be sure the lock on back is not released early
batch.clear(); queue.clear();
break; break;
} }
let fetch = (*NORMAL_FETCH_SIZE as usize).min(batch.len()); let range = queue.next_indexing_batch(*NORMAL_FETCH_SIZE);
let drain = batch.drain(0..fetch); if range.is_empty() {
continue;
}
// Create a new context with a write transaction // Create a new context with a write transaction
let ctx = self.new_write_tx_ctx().await?; let ctx = self.new_write_tx_ctx().await?;
let tx = ctx.tx();
for a in drain { let next_to_index = range.end;
let rid = Thing::from((self.tb.clone(), a.id)); for i in range {
let mut io = IndexOperation::new( let ia = self.new_ia_key(i)?;
&ctx, if let Some(v) = tx.get(ia.clone(), None).await? {
&self.opt, tx.del(ia).await?;
&self.ix, let a: Appending = v.into();
a.old_values, let rid = Thing::from((self.tb.clone(), a.id));
a.new_values, let mut io = IndexOperation::new(
&rid, &ctx,
); &self.opt,
stack.enter(|stk| io.compute(stk)).finish().await?; &self.ix,
count += 1; a.old_values,
self.set_status(BuildingStatus::UpdatesIndexing(count)).await; a.new_values,
&rid,
);
stack.enter(|stk| io.compute(stk)).finish().await?;
count += 1;
self.set_status(BuildingStatus::UpdatesIndexing(count)).await;
}
} }
ctx.tx().commit().await?; tx.commit().await?;
queue.set_to_index(next_to_index);
} }
Ok(()) Ok(())
} }