From cefb95df28da41a9231dcc52e0dca17dd17835a1 Mon Sep 17 00:00:00 2001 From: Emmanuel Keller Date: Fri, 30 Aug 2024 17:26:01 +0100 Subject: [PATCH] [Bug] Concurrent indexing fails on record updates (#4629) --- core/src/key/category.rs | 3 + core/src/key/index/ip.rs | 63 +++++++++++++++ core/src/key/index/mod.rs | 1 + core/src/kvs/index.rs | 162 +++++++++++++++++++++++++++----------- sdk/tests/define.rs | 146 +++++++++++++++++++++++++++++++++- 5 files changed, 327 insertions(+), 48 deletions(-) create mode 100644 core/src/key/index/ip.rs diff --git a/core/src/key/category.rs b/core/src/key/category.rs index 895594df..df47e8d3 100644 --- a/core/src/key/category.rs +++ b/core/src/key/category.rs @@ -136,6 +136,8 @@ pub enum Category { IndexHnswVec, /// crate::key::index::ia /*{ns}*{db}*{tb}+{ix}!ia{id} IndexAppendings, + /// crate::key::index::ip /*{ns}*{db}*{tb}+{ix}!ip{id} + IndexPrimaryAppending, /// crate::key::index /*{ns}*{db}*{tb}+{ix}*{fd}{id} Index, /// @@ -212,6 +214,7 @@ impl Display for Category { Self::IndexHnswThings => "IndexHnswThings", Self::IndexHnswVec => "IndexHnswVec", Self::IndexAppendings => "IndexAppendings", + Self::IndexPrimaryAppending => "IndexPrimaryAppending", Self::Index => "Index", Self::ChangeFeed => "ChangeFeed", Self::Thing => "Thing", diff --git a/core/src/key/index/ip.rs b/core/src/key/index/ip.rs new file mode 100644 index 00000000..3db06c00 --- /dev/null +++ b/core/src/key/index/ip.rs @@ -0,0 +1,63 @@ +//! Stores the previous value of record for concurrent index building +use crate::sql::Id; +use derive::Key; +use serde::{Deserialize, Serialize}; +use std::fmt::Debug; + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Key)] +#[non_exhaustive] +pub struct Ip<'a> { + __: u8, + _a: u8, + pub ns: &'a str, + _b: u8, + pub db: &'a str, + _c: u8, + pub tb: &'a str, + _d: u8, + pub ix: &'a str, + _e: u8, + _f: u8, + _g: u8, + pub id: Id, +} + +impl<'a> Ip<'a> { + pub fn new(ns: &'a str, db: &'a str, tb: &'a str, ix: &'a str, id: Id) -> Self { + Self { + __: b'/', + _a: b'*', + ns, + _b: b'*', + db, + _c: b'*', + tb, + _d: b'+', + ix, + _e: b'!', + _f: b'i', + _g: b'p', + id, + } + } +} + +#[cfg(test)] +mod tests { + + #[test] + fn key() { + use super::*; + let val = Ip::new("testns", "testdb", "testtb", "testix", Id::from("id".to_string())); + let enc = Ip::encode(&val).unwrap(); + assert_eq!( + enc, + b"/*testns\0*testdb\0*testtb\0+testix\0!ip\0\0\0\x01id\0", + "{}", + String::from_utf8_lossy(&enc) + ); + + let dec = Ip::decode(&enc).unwrap(); + assert_eq!(val, dec); + } +} diff --git a/core/src/key/index/mod.rs b/core/src/key/index/mod.rs index 7f2a6cb5..9692cece 100644 --- a/core/src/key/index/mod.rs +++ b/core/src/key/index/mod.rs @@ -18,6 +18,7 @@ pub mod hl; pub mod hs; pub mod hv; pub mod ia; +pub mod ip; pub mod vm; use crate::key::category::Categorise; diff --git a/core/src/kvs/index.rs b/core/src/kvs/index.rs index dc6c8941..13358291 100644 --- a/core/src/kvs/index.rs +++ b/core/src/kvs/index.rs @@ -5,10 +5,11 @@ use crate::doc::{CursorDoc, Document}; use crate::err::Error; use crate::idx::index::IndexOperation; use crate::key::index::ia::Ia; +use crate::key::index::ip::Ip; use crate::key::thing; use crate::kvs::ds::TransactionFactory; use crate::kvs::LockType::Optimistic; -use crate::kvs::{Transaction, TransactionType}; +use crate::kvs::{Key, Transaction, TransactionType, Val}; use crate::sql::statements::DefineIndexStatement; use crate::sql::{Id, Object, Thing, Value}; use dashmap::mapref::entry::Entry; @@ -152,6 +153,11 @@ struct Appending { id: Id, } +#[revisioned(revision = 1)] +#[derive(Serialize, Deserialize, Store, Debug)] +#[non_exhaustive] +struct PrimaryAppending(u32); + #[derive(Default)] struct QueueSequences { /// The index of the next appending to be indexed @@ -240,13 +246,24 @@ impl Building { return Ok(ConsumeResult::Ignored(old_values, new_values)); } } + + let tx = ctx.tx(); let a = Appending { old_values, new_values, id: rid.id.clone(), }; - let ia = self.new_ia_key(queue.add_update())?; - ctx.tx().set(ia, a, None).await?; + // Get the idx of this appended record from the sequence + let idx = queue.add_update(); + // Store the appending + let ia = self.new_ia_key(idx)?; + tx.set(ia, a, None).await?; + // Do we already have a primary appending? + let ip = self.new_ip_key(rid.id.clone())?; + if tx.get(ip.clone(), None).await?.is_none() { + // If not we set it + tx.set(ip, PrimaryAppending(idx), None).await?; + } Ok(ConsumeResult::Enqueued) } @@ -256,6 +273,12 @@ impl Building { Ok(Ia::new(ns, db, &self.ix.what, &self.ix.name, i)) } + fn new_ip_key(&self, id: Id) -> Result { + let ns = self.opt.ns()?; + let db = self.opt.db()?; + Ok(Ip::new(ns, db, &self.ix.what, &self.ix.name, id)) + } + async fn new_read_tx(&self) -> Result { self.tf.transaction(TransactionType::Read, Optimistic).await } @@ -268,8 +291,7 @@ impl Building { } async fn compute(&self) -> Result<(), Error> { - let mut stack = TreeStack::new(); - + // Set the initial status self.set_status(BuildingStatus::InitialIndexing(0)).await; // First iteration, we index every keys let ns = self.opt.ns()?; @@ -280,33 +302,23 @@ impl Building { let mut count = 0; while let Some(rng) = next { // Get the next batch of records - let batch = self.new_read_tx().await?.batch(rng, *INDEXING_BATCH_SIZE, true).await?; + let tx = self.new_read_tx().await?; + let batch = catch!(tx, tx.batch(rng, *INDEXING_BATCH_SIZE, true).await); + // We can release the read transaction + drop(tx); // Set the next scan range next = batch.next; // Check there are records if batch.values.is_empty() { + // If not, we are with the initial indexing break; } // Create a new context with a write transaction let ctx = self.new_write_tx_ctx().await?; - // Index the records - for (k, v) in batch.values.into_iter() { - let key: thing::Thing = (&k).into(); - // Parse the value - let val: Value = (&v).into(); - let rid: Arc = Thing::from((key.tb, key.id)).into(); - let doc = CursorDoc::new(Some(rid.clone()), None, val); - let opt_values = stack - .enter(|stk| Document::build_opt_values(stk, &ctx, &self.opt, &self.ix, &doc)) - .finish() - .await?; - // Index the record - let mut io = IndexOperation::new(&ctx, &self.opt, &self.ix, None, opt_values, &rid); - stack.enter(|stk| io.compute(stk)).finish().await?; - count += 1; - self.set_status(BuildingStatus::InitialIndexing(count)).await; - } - ctx.tx().commit().await?; + let tx = ctx.tx(); + // Index the batch + catch!(tx, self.index_initial_batch(&ctx, &tx, batch.values, &mut count).await); + tx.commit().await?; } // Second iteration, we index/remove any records that has been added or removed since the initial indexing self.set_status(BuildingStatus::UpdatesIndexing(0)).await; @@ -324,32 +336,94 @@ impl Building { if range.is_empty() { continue; } + let next_to_index = range.end; + // Create a new context with a write transaction let ctx = self.new_write_tx_ctx().await?; let tx = ctx.tx(); - let next_to_index = range.end; - for i in range { - let ia = self.new_ia_key(i)?; - if let Some(v) = tx.get(ia.clone(), None).await? { - tx.del(ia).await?; - let a: Appending = v.into(); - let rid = Thing::from((self.tb.clone(), a.id)); - let mut io = IndexOperation::new( - &ctx, - &self.opt, - &self.ix, - a.old_values, - a.new_values, - &rid, - ); - stack.enter(|stk| io.compute(stk)).finish().await?; - count += 1; - self.set_status(BuildingStatus::UpdatesIndexing(count)).await; - } - } + catch!(tx, self.index_appending_range(&ctx, &tx, range, &mut count).await); tx.commit().await?; queue.set_to_index(next_to_index); } Ok(()) } + + async fn index_initial_batch( + &self, + ctx: &Context, + tx: &Transaction, + values: Vec<(Key, Val)>, + count: &mut usize, + ) -> Result<(), Error> { + let mut stack = TreeStack::new(); + // Index the records + for (k, v) in values.into_iter() { + let key: thing::Thing = (&k).into(); + // Parse the value + let val: Value = (&v).into(); + let rid: Arc = Thing::from((key.tb, key.id)).into(); + + let opt_values; + + // Do we already have an appended value? + let ip = self.new_ip_key(rid.id.clone())?; + if let Some(v) = tx.get(ip, None).await? { + // Then we take the old value of the appending value as the initial indexing value + let pa: PrimaryAppending = v.into(); + let ia = self.new_ia_key(pa.0)?; + let v = tx + .get(ia, None) + .await? + .ok_or_else(|| Error::CorruptedIndex("Appending record is missing"))?; + let a: Appending = v.into(); + opt_values = a.old_values; + } else { + // Otherwise, we normally proceed to the indexing + let doc = CursorDoc::new(Some(rid.clone()), None, val); + opt_values = stack + .enter(|stk| Document::build_opt_values(stk, ctx, &self.opt, &self.ix, &doc)) + .finish() + .await?; + } + + // Index the record + let mut io = + IndexOperation::new(ctx, &self.opt, &self.ix, None, opt_values.clone(), &rid); + stack.enter(|stk| io.compute(stk)).finish().await?; + + // Increment the count and update the status + *count += 1; + self.set_status(BuildingStatus::InitialIndexing(*count)).await; + } + Ok(()) + } + + async fn index_appending_range( + &self, + ctx: &Context, + tx: &Transaction, + range: Range, + count: &mut usize, + ) -> Result<(), Error> { + let mut stack = TreeStack::new(); + for i in range { + let ia = self.new_ia_key(i)?; + if let Some(v) = tx.get(ia.clone(), None).await? { + tx.del(ia).await?; + let a: Appending = v.into(); + let rid = Thing::from((self.tb.clone(), a.id)); + let mut io = + IndexOperation::new(ctx, &self.opt, &self.ix, a.old_values, a.new_values, &rid); + stack.enter(|stk| io.compute(stk)).finish().await?; + + // We can delete the ip record if any + let ip = self.new_ip_key(rid.id)?; + tx.del(ip).await?; + + *count += 1; + self.set_status(BuildingStatus::UpdatesIndexing(*count)).await; + } + } + Ok(()) + } } diff --git a/sdk/tests/define.rs b/sdk/tests/define.rs index 171fd787..062d7f2a 100644 --- a/sdk/tests/define.rs +++ b/sdk/tests/define.rs @@ -1,16 +1,20 @@ mod parse; + use parse::Parse; mod helpers; use helpers::*; use std::collections::HashMap; - +use std::time::{Duration, SystemTime}; use surrealdb::dbs::Session; use surrealdb::err::Error; use surrealdb::iam::Role; use surrealdb::sql::Idiom; use surrealdb::sql::{Part, Value}; +use surrealdb_core::cnf::{INDEXING_BATCH_SIZE, NORMAL_FETCH_SIZE}; +use test_log::test; +use tracing::info; #[tokio::test] async fn define_statement_namespace() -> Result<(), Error> { @@ -688,15 +692,19 @@ async fn define_statement_index_single() -> Result<(), Error> { #[tokio::test] async fn define_statement_index_concurrently() -> Result<(), Error> { let sql = " - CREATE user:1 SET email = 'test@surrealdb.com'; - CREATE user:2 SET email = 'test@surrealdb.com'; + CREATE user:1 SET email = 'testA@surrealdb.com'; + CREATE user:2 SET email = 'testA@surrealdb.com'; + CREATE user:3 SET email = 'testB@surrealdb.com'; DEFINE INDEX test ON user FIELDS email CONCURRENTLY; SLEEP 1s; INFO FOR TABLE user; INFO FOR INDEX test ON user; + SELECT * FROM user WHERE email = 'testA@surrealdb.com' EXPLAIN; + SELECT * FROM user WHERE email = 'testA@surrealdb.com'; + SELECT * FROM user WHERE email = 'testB@surrealdb.com'; "; let mut t = Test::new(sql).await?; - t.skip_ok(4)?; + t.skip_ok(5)?; t.expect_val( "{ events: {}, @@ -713,7 +721,137 @@ async fn define_statement_index_concurrently() -> Result<(), Error> { building: { status: 'built' } }", )?; + t.expect_val( + " [ + { + detail: { + plan: { + index: 'test', + operator: '=', + value: 'testA@surrealdb.com' + }, + table: 'user' + }, + operation: 'Iterate Index' + }, + { + detail: { + type: 'Memory' + }, + operation: 'Collector' + } + ]", + )?; + t.expect_val( + "[ + { + email: 'testA@surrealdb.com', + id: user:1 + }, + { + email: 'testA@surrealdb.com', + id: user:2 + } + ]", + )?; + t.expect_val( + "[ + { + email: 'testB@surrealdb.com', + id: user:3 + } + ]", + )?; + Ok(()) +} +#[test(tokio::test)] +async fn define_statement_index_concurrently_building_status() -> Result<(), Error> { + let session = Session::owner().with_ns("test").with_db("test"); + let ds = new_ds().await?; + // Populate initial records + let initial_count = *INDEXING_BATCH_SIZE * 3 / 2; + info!("Populate: {}", initial_count); + for i in 0..initial_count { + let mut responses = ds + .execute( + &format!("CREATE user:{i} SET email = 'test{i}@surrealdb.com';"), + &session, + None, + ) + .await?; + skip_ok(&mut responses, 1)?; + } + // Create the index concurrently + info!("Indexing starts"); + let mut r = + ds.execute("DEFINE INDEX test ON user FIELDS email CONCURRENTLY", &session, None).await?; + skip_ok(&mut r, 1)?; + // + let mut appending_count = *NORMAL_FETCH_SIZE * 3 / 2; + info!("Appending: {}", appending_count); + // Loop until the index is built + let now = SystemTime::now(); + let mut initial_count = None; + let mut updates_count = None; + // While the concurrent indexing is running, we update and delete records + let time_out = Duration::from_secs(120); + loop { + if now.elapsed().map_err(|e| Error::Internal(e.to_string()))?.gt(&time_out) { + panic!("Time-out {time_out:?}"); + } + if appending_count > 0 { + let sql = if appending_count % 2 != 0 { + format!("UPDATE user:{appending_count} SET email = 'new{appending_count}@surrealdb.com';") + } else { + format!("DELETE user:{appending_count}") + }; + let mut responses = ds.execute(&sql, &session, None).await?; + skip_ok(&mut responses, 1)?; + appending_count -= 1; + } + // We monitor the status + let mut r = ds.execute("INFO FOR INDEX test ON user", &session, None).await?; + let tmp = r.remove(0).result?; + if let Value::Object(o) = &tmp { + if let Some(b) = o.get("building") { + if let Value::Object(b) = b { + if let Some(v) = b.get("status") { + if Value::from("started").eq(v) { + continue; + } + let new_count = b.get("count").cloned(); + if Value::from("initial").eq(v) { + if new_count != initial_count { + assert!(new_count > initial_count, "{new_count:?}"); + info!("New initial count: {:?}", new_count); + initial_count = new_count; + } + continue; + } + if Value::from("updates").eq(v) { + if new_count != updates_count { + assert!(new_count > updates_count, "{new_count:?}"); + info!("New updates count: {:?}", new_count); + updates_count = new_count; + } + continue; + } + let val = Value::parse( + "{ + building: { + status: 'built' + } + }", + ); + assert_eq!(format!("{tmp:#}"), format!("{val:#}")); + break; + } + } + } + } + panic!("Unexpected value {tmp:#}"); + } Ok(()) }