[Bug] Concurrent indexing fails on record updates (#4629)

This commit is contained in:
Emmanuel Keller 2024-08-30 17:26:01 +01:00 committed by GitHub
parent 3ded680b33
commit cefb95df28
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 327 additions and 48 deletions

View file

@ -136,6 +136,8 @@ pub enum Category {
IndexHnswVec, IndexHnswVec,
/// crate::key::index::ia /*{ns}*{db}*{tb}+{ix}!ia{id} /// crate::key::index::ia /*{ns}*{db}*{tb}+{ix}!ia{id}
IndexAppendings, IndexAppendings,
/// crate::key::index::ip /*{ns}*{db}*{tb}+{ix}!ip{id}
IndexPrimaryAppending,
/// crate::key::index /*{ns}*{db}*{tb}+{ix}*{fd}{id} /// crate::key::index /*{ns}*{db}*{tb}+{ix}*{fd}{id}
Index, Index,
/// ///
@ -212,6 +214,7 @@ impl Display for Category {
Self::IndexHnswThings => "IndexHnswThings", Self::IndexHnswThings => "IndexHnswThings",
Self::IndexHnswVec => "IndexHnswVec", Self::IndexHnswVec => "IndexHnswVec",
Self::IndexAppendings => "IndexAppendings", Self::IndexAppendings => "IndexAppendings",
Self::IndexPrimaryAppending => "IndexPrimaryAppending",
Self::Index => "Index", Self::Index => "Index",
Self::ChangeFeed => "ChangeFeed", Self::ChangeFeed => "ChangeFeed",
Self::Thing => "Thing", Self::Thing => "Thing",

63
core/src/key/index/ip.rs Normal file
View file

@ -0,0 +1,63 @@
//! Stores the previous value of record for concurrent index building
use crate::sql::Id;
use derive::Key;
use serde::{Deserialize, Serialize};
use std::fmt::Debug;
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Key)]
#[non_exhaustive]
pub struct Ip<'a> {
__: u8,
_a: u8,
pub ns: &'a str,
_b: u8,
pub db: &'a str,
_c: u8,
pub tb: &'a str,
_d: u8,
pub ix: &'a str,
_e: u8,
_f: u8,
_g: u8,
pub id: Id,
}
impl<'a> Ip<'a> {
pub fn new(ns: &'a str, db: &'a str, tb: &'a str, ix: &'a str, id: Id) -> Self {
Self {
__: b'/',
_a: b'*',
ns,
_b: b'*',
db,
_c: b'*',
tb,
_d: b'+',
ix,
_e: b'!',
_f: b'i',
_g: b'p',
id,
}
}
}
#[cfg(test)]
mod tests {
#[test]
fn key() {
use super::*;
let val = Ip::new("testns", "testdb", "testtb", "testix", Id::from("id".to_string()));
let enc = Ip::encode(&val).unwrap();
assert_eq!(
enc,
b"/*testns\0*testdb\0*testtb\0+testix\0!ip\0\0\0\x01id\0",
"{}",
String::from_utf8_lossy(&enc)
);
let dec = Ip::decode(&enc).unwrap();
assert_eq!(val, dec);
}
}

View file

@ -18,6 +18,7 @@ pub mod hl;
pub mod hs; pub mod hs;
pub mod hv; pub mod hv;
pub mod ia; pub mod ia;
pub mod ip;
pub mod vm; pub mod vm;
use crate::key::category::Categorise; use crate::key::category::Categorise;

View file

@ -5,10 +5,11 @@ use crate::doc::{CursorDoc, Document};
use crate::err::Error; use crate::err::Error;
use crate::idx::index::IndexOperation; use crate::idx::index::IndexOperation;
use crate::key::index::ia::Ia; use crate::key::index::ia::Ia;
use crate::key::index::ip::Ip;
use crate::key::thing; use crate::key::thing;
use crate::kvs::ds::TransactionFactory; use crate::kvs::ds::TransactionFactory;
use crate::kvs::LockType::Optimistic; use crate::kvs::LockType::Optimistic;
use crate::kvs::{Transaction, TransactionType}; use crate::kvs::{Key, Transaction, TransactionType, Val};
use crate::sql::statements::DefineIndexStatement; use crate::sql::statements::DefineIndexStatement;
use crate::sql::{Id, Object, Thing, Value}; use crate::sql::{Id, Object, Thing, Value};
use dashmap::mapref::entry::Entry; use dashmap::mapref::entry::Entry;
@ -152,6 +153,11 @@ struct Appending {
id: Id, id: Id,
} }
#[revisioned(revision = 1)]
#[derive(Serialize, Deserialize, Store, Debug)]
#[non_exhaustive]
struct PrimaryAppending(u32);
#[derive(Default)] #[derive(Default)]
struct QueueSequences { struct QueueSequences {
/// The index of the next appending to be indexed /// The index of the next appending to be indexed
@ -240,13 +246,24 @@ impl Building {
return Ok(ConsumeResult::Ignored(old_values, new_values)); return Ok(ConsumeResult::Ignored(old_values, new_values));
} }
} }
let tx = ctx.tx();
let a = Appending { let a = Appending {
old_values, old_values,
new_values, new_values,
id: rid.id.clone(), id: rid.id.clone(),
}; };
let ia = self.new_ia_key(queue.add_update())?; // Get the idx of this appended record from the sequence
ctx.tx().set(ia, a, None).await?; let idx = queue.add_update();
// Store the appending
let ia = self.new_ia_key(idx)?;
tx.set(ia, a, None).await?;
// Do we already have a primary appending?
let ip = self.new_ip_key(rid.id.clone())?;
if tx.get(ip.clone(), None).await?.is_none() {
// If not we set it
tx.set(ip, PrimaryAppending(idx), None).await?;
}
Ok(ConsumeResult::Enqueued) Ok(ConsumeResult::Enqueued)
} }
@ -256,6 +273,12 @@ impl Building {
Ok(Ia::new(ns, db, &self.ix.what, &self.ix.name, i)) Ok(Ia::new(ns, db, &self.ix.what, &self.ix.name, i))
} }
fn new_ip_key(&self, id: Id) -> Result<Ip, Error> {
let ns = self.opt.ns()?;
let db = self.opt.db()?;
Ok(Ip::new(ns, db, &self.ix.what, &self.ix.name, id))
}
async fn new_read_tx(&self) -> Result<Transaction, Error> { async fn new_read_tx(&self) -> Result<Transaction, Error> {
self.tf.transaction(TransactionType::Read, Optimistic).await self.tf.transaction(TransactionType::Read, Optimistic).await
} }
@ -268,8 +291,7 @@ impl Building {
} }
async fn compute(&self) -> Result<(), Error> { async fn compute(&self) -> Result<(), Error> {
let mut stack = TreeStack::new(); // Set the initial status
self.set_status(BuildingStatus::InitialIndexing(0)).await; self.set_status(BuildingStatus::InitialIndexing(0)).await;
// First iteration, we index every keys // First iteration, we index every keys
let ns = self.opt.ns()?; let ns = self.opt.ns()?;
@ -280,33 +302,23 @@ impl Building {
let mut count = 0; let mut count = 0;
while let Some(rng) = next { while let Some(rng) = next {
// Get the next batch of records // Get the next batch of records
let batch = self.new_read_tx().await?.batch(rng, *INDEXING_BATCH_SIZE, true).await?; let tx = self.new_read_tx().await?;
let batch = catch!(tx, tx.batch(rng, *INDEXING_BATCH_SIZE, true).await);
// We can release the read transaction
drop(tx);
// Set the next scan range // Set the next scan range
next = batch.next; next = batch.next;
// Check there are records // Check there are records
if batch.values.is_empty() { if batch.values.is_empty() {
// If not, we are with the initial indexing
break; break;
} }
// Create a new context with a write transaction // Create a new context with a write transaction
let ctx = self.new_write_tx_ctx().await?; let ctx = self.new_write_tx_ctx().await?;
// Index the records let tx = ctx.tx();
for (k, v) in batch.values.into_iter() { // Index the batch
let key: thing::Thing = (&k).into(); catch!(tx, self.index_initial_batch(&ctx, &tx, batch.values, &mut count).await);
// Parse the value tx.commit().await?;
let val: Value = (&v).into();
let rid: Arc<Thing> = Thing::from((key.tb, key.id)).into();
let doc = CursorDoc::new(Some(rid.clone()), None, val);
let opt_values = stack
.enter(|stk| Document::build_opt_values(stk, &ctx, &self.opt, &self.ix, &doc))
.finish()
.await?;
// Index the record
let mut io = IndexOperation::new(&ctx, &self.opt, &self.ix, None, opt_values, &rid);
stack.enter(|stk| io.compute(stk)).finish().await?;
count += 1;
self.set_status(BuildingStatus::InitialIndexing(count)).await;
}
ctx.tx().commit().await?;
} }
// Second iteration, we index/remove any records that has been added or removed since the initial indexing // Second iteration, we index/remove any records that has been added or removed since the initial indexing
self.set_status(BuildingStatus::UpdatesIndexing(0)).await; self.set_status(BuildingStatus::UpdatesIndexing(0)).await;
@ -324,32 +336,94 @@ impl Building {
if range.is_empty() { if range.is_empty() {
continue; continue;
} }
let next_to_index = range.end;
// Create a new context with a write transaction // Create a new context with a write transaction
let ctx = self.new_write_tx_ctx().await?; let ctx = self.new_write_tx_ctx().await?;
let tx = ctx.tx(); let tx = ctx.tx();
let next_to_index = range.end; catch!(tx, self.index_appending_range(&ctx, &tx, range, &mut count).await);
tx.commit().await?;
queue.set_to_index(next_to_index);
}
Ok(())
}
async fn index_initial_batch(
&self,
ctx: &Context,
tx: &Transaction,
values: Vec<(Key, Val)>,
count: &mut usize,
) -> Result<(), Error> {
let mut stack = TreeStack::new();
// Index the records
for (k, v) in values.into_iter() {
let key: thing::Thing = (&k).into();
// Parse the value
let val: Value = (&v).into();
let rid: Arc<Thing> = Thing::from((key.tb, key.id)).into();
let opt_values;
// Do we already have an appended value?
let ip = self.new_ip_key(rid.id.clone())?;
if let Some(v) = tx.get(ip, None).await? {
// Then we take the old value of the appending value as the initial indexing value
let pa: PrimaryAppending = v.into();
let ia = self.new_ia_key(pa.0)?;
let v = tx
.get(ia, None)
.await?
.ok_or_else(|| Error::CorruptedIndex("Appending record is missing"))?;
let a: Appending = v.into();
opt_values = a.old_values;
} else {
// Otherwise, we normally proceed to the indexing
let doc = CursorDoc::new(Some(rid.clone()), None, val);
opt_values = stack
.enter(|stk| Document::build_opt_values(stk, ctx, &self.opt, &self.ix, &doc))
.finish()
.await?;
}
// Index the record
let mut io =
IndexOperation::new(ctx, &self.opt, &self.ix, None, opt_values.clone(), &rid);
stack.enter(|stk| io.compute(stk)).finish().await?;
// Increment the count and update the status
*count += 1;
self.set_status(BuildingStatus::InitialIndexing(*count)).await;
}
Ok(())
}
async fn index_appending_range(
&self,
ctx: &Context,
tx: &Transaction,
range: Range<u32>,
count: &mut usize,
) -> Result<(), Error> {
let mut stack = TreeStack::new();
for i in range { for i in range {
let ia = self.new_ia_key(i)?; let ia = self.new_ia_key(i)?;
if let Some(v) = tx.get(ia.clone(), None).await? { if let Some(v) = tx.get(ia.clone(), None).await? {
tx.del(ia).await?; tx.del(ia).await?;
let a: Appending = v.into(); let a: Appending = v.into();
let rid = Thing::from((self.tb.clone(), a.id)); let rid = Thing::from((self.tb.clone(), a.id));
let mut io = IndexOperation::new( let mut io =
&ctx, IndexOperation::new(ctx, &self.opt, &self.ix, a.old_values, a.new_values, &rid);
&self.opt,
&self.ix,
a.old_values,
a.new_values,
&rid,
);
stack.enter(|stk| io.compute(stk)).finish().await?; stack.enter(|stk| io.compute(stk)).finish().await?;
count += 1;
self.set_status(BuildingStatus::UpdatesIndexing(count)).await; // We can delete the ip record if any
let ip = self.new_ip_key(rid.id)?;
tx.del(ip).await?;
*count += 1;
self.set_status(BuildingStatus::UpdatesIndexing(*count)).await;
} }
} }
tx.commit().await?;
queue.set_to_index(next_to_index);
}
Ok(()) Ok(())
} }
} }

View file

@ -1,16 +1,20 @@
mod parse; mod parse;
use parse::Parse; use parse::Parse;
mod helpers; mod helpers;
use helpers::*; use helpers::*;
use std::collections::HashMap; use std::collections::HashMap;
use std::time::{Duration, SystemTime};
use surrealdb::dbs::Session; use surrealdb::dbs::Session;
use surrealdb::err::Error; use surrealdb::err::Error;
use surrealdb::iam::Role; use surrealdb::iam::Role;
use surrealdb::sql::Idiom; use surrealdb::sql::Idiom;
use surrealdb::sql::{Part, Value}; use surrealdb::sql::{Part, Value};
use surrealdb_core::cnf::{INDEXING_BATCH_SIZE, NORMAL_FETCH_SIZE};
use test_log::test;
use tracing::info;
#[tokio::test] #[tokio::test]
async fn define_statement_namespace() -> Result<(), Error> { async fn define_statement_namespace() -> Result<(), Error> {
@ -688,15 +692,19 @@ async fn define_statement_index_single() -> Result<(), Error> {
#[tokio::test] #[tokio::test]
async fn define_statement_index_concurrently() -> Result<(), Error> { async fn define_statement_index_concurrently() -> Result<(), Error> {
let sql = " let sql = "
CREATE user:1 SET email = 'test@surrealdb.com'; CREATE user:1 SET email = 'testA@surrealdb.com';
CREATE user:2 SET email = 'test@surrealdb.com'; CREATE user:2 SET email = 'testA@surrealdb.com';
CREATE user:3 SET email = 'testB@surrealdb.com';
DEFINE INDEX test ON user FIELDS email CONCURRENTLY; DEFINE INDEX test ON user FIELDS email CONCURRENTLY;
SLEEP 1s; SLEEP 1s;
INFO FOR TABLE user; INFO FOR TABLE user;
INFO FOR INDEX test ON user; INFO FOR INDEX test ON user;
SELECT * FROM user WHERE email = 'testA@surrealdb.com' EXPLAIN;
SELECT * FROM user WHERE email = 'testA@surrealdb.com';
SELECT * FROM user WHERE email = 'testB@surrealdb.com';
"; ";
let mut t = Test::new(sql).await?; let mut t = Test::new(sql).await?;
t.skip_ok(4)?; t.skip_ok(5)?;
t.expect_val( t.expect_val(
"{ "{
events: {}, events: {},
@ -713,7 +721,137 @@ async fn define_statement_index_concurrently() -> Result<(), Error> {
building: { status: 'built' } building: { status: 'built' }
}", }",
)?; )?;
t.expect_val(
" [
{
detail: {
plan: {
index: 'test',
operator: '=',
value: 'testA@surrealdb.com'
},
table: 'user'
},
operation: 'Iterate Index'
},
{
detail: {
type: 'Memory'
},
operation: 'Collector'
}
]",
)?;
t.expect_val(
"[
{
email: 'testA@surrealdb.com',
id: user:1
},
{
email: 'testA@surrealdb.com',
id: user:2
}
]",
)?;
t.expect_val(
"[
{
email: 'testB@surrealdb.com',
id: user:3
}
]",
)?;
Ok(())
}
#[test(tokio::test)]
async fn define_statement_index_concurrently_building_status() -> Result<(), Error> {
let session = Session::owner().with_ns("test").with_db("test");
let ds = new_ds().await?;
// Populate initial records
let initial_count = *INDEXING_BATCH_SIZE * 3 / 2;
info!("Populate: {}", initial_count);
for i in 0..initial_count {
let mut responses = ds
.execute(
&format!("CREATE user:{i} SET email = 'test{i}@surrealdb.com';"),
&session,
None,
)
.await?;
skip_ok(&mut responses, 1)?;
}
// Create the index concurrently
info!("Indexing starts");
let mut r =
ds.execute("DEFINE INDEX test ON user FIELDS email CONCURRENTLY", &session, None).await?;
skip_ok(&mut r, 1)?;
//
let mut appending_count = *NORMAL_FETCH_SIZE * 3 / 2;
info!("Appending: {}", appending_count);
// Loop until the index is built
let now = SystemTime::now();
let mut initial_count = None;
let mut updates_count = None;
// While the concurrent indexing is running, we update and delete records
let time_out = Duration::from_secs(120);
loop {
if now.elapsed().map_err(|e| Error::Internal(e.to_string()))?.gt(&time_out) {
panic!("Time-out {time_out:?}");
}
if appending_count > 0 {
let sql = if appending_count % 2 != 0 {
format!("UPDATE user:{appending_count} SET email = 'new{appending_count}@surrealdb.com';")
} else {
format!("DELETE user:{appending_count}")
};
let mut responses = ds.execute(&sql, &session, None).await?;
skip_ok(&mut responses, 1)?;
appending_count -= 1;
}
// We monitor the status
let mut r = ds.execute("INFO FOR INDEX test ON user", &session, None).await?;
let tmp = r.remove(0).result?;
if let Value::Object(o) = &tmp {
if let Some(b) = o.get("building") {
if let Value::Object(b) = b {
if let Some(v) = b.get("status") {
if Value::from("started").eq(v) {
continue;
}
let new_count = b.get("count").cloned();
if Value::from("initial").eq(v) {
if new_count != initial_count {
assert!(new_count > initial_count, "{new_count:?}");
info!("New initial count: {:?}", new_count);
initial_count = new_count;
}
continue;
}
if Value::from("updates").eq(v) {
if new_count != updates_count {
assert!(new_count > updates_count, "{new_count:?}");
info!("New updates count: {:?}", new_count);
updates_count = new_count;
}
continue;
}
let val = Value::parse(
"{
building: {
status: 'built'
}
}",
);
assert_eq!(format!("{tmp:#}"), format!("{val:#}"));
break;
}
}
}
}
panic!("Unexpected value {tmp:#}");
}
Ok(()) Ok(())
} }