From b1ae2b409432aa70a68e21f0fe9104c3d5a0f1d0 Mon Sep 17 00:00:00 2001 From: Salvador Girones Gil Date: Fri, 8 Sep 2023 17:53:26 +0200 Subject: [PATCH] [dbs/processor] Reduce batch size for scan operations (#2651) --- lib/src/cnf/mod.rs | 3 +++ lib/src/dbs/processor.rs | 23 ++++++++++++----------- 2 files changed, 15 insertions(+), 11 deletions(-) diff --git a/lib/src/cnf/mod.rs b/lib/src/cnf/mod.rs index 321cf32e..595df9b2 100644 --- a/lib/src/cnf/mod.rs +++ b/lib/src/cnf/mod.rs @@ -32,3 +32,6 @@ pub const ID_CHARS: [char; 36] = [ /// The publicly visible name of the server pub const SERVER_NAME: &str = "SurrealDB"; + +/// Datastore processor batch size for scan operations +pub const PROCESSOR_BATCH_SIZE: u32 = 50; diff --git a/lib/src/dbs/processor.rs b/lib/src/dbs/processor.rs index 49604407..4a7f09fd 100644 --- a/lib/src/dbs/processor.rs +++ b/lib/src/dbs/processor.rs @@ -1,3 +1,4 @@ +use crate::cnf::PROCESSOR_BATCH_SIZE; use crate::ctx::Context; #[cfg(not(target_arch = "wasm32"))] use crate::dbs::distinct::AsyncDistinct; @@ -248,18 +249,18 @@ impl<'a> Processor<'a> { if ctx.is_done() { break; } - // Get the next 1000 key-value entries + // Get the next batch of key-value entries let res = match nxt { None => { let min = beg.clone(); let max = end.clone(); - txn.clone().lock().await.scan(min..max, 1000).await? + txn.clone().lock().await.scan(min..max, PROCESSOR_BATCH_SIZE).await? } Some(ref mut beg) => { beg.push(0x00); let min = beg.clone(); let max = end.clone(); - txn.clone().lock().await.scan(min..max, 1000).await? + txn.clone().lock().await.scan(min..max, PROCESSOR_BATCH_SIZE).await? } }; // If there are key-value entries then fetch them @@ -337,18 +338,18 @@ impl<'a> Processor<'a> { if ctx.is_done() { break; } - // Get the next 1000 key-value entries + // Get the next batch of key-value entries let res = match nxt { None => { let min = beg.clone(); let max = end.clone(); - txn.clone().lock().await.scan(min..max, 1000).await? + txn.clone().lock().await.scan(min..max, PROCESSOR_BATCH_SIZE).await? } Some(ref mut beg) => { beg.push(0x00); let min = beg.clone(); let max = end.clone(); - txn.clone().lock().await.scan(min..max, 1000).await? + txn.clone().lock().await.scan(min..max, PROCESSOR_BATCH_SIZE).await? } }; // If there are key-value entries then fetch them @@ -474,18 +475,18 @@ impl<'a> Processor<'a> { if ctx.is_done() { break; } - // Get the next 1000 key-value entries + // Get the next batch key-value entries let res = match nxt { None => { let min = beg.clone(); let max = end.clone(); - txn.lock().await.scan(min..max, 1000).await? + txn.lock().await.scan(min..max, PROCESSOR_BATCH_SIZE).await? } Some(ref mut beg) => { beg.push(0x00); let min = beg.clone(); let max = end.clone(); - txn.lock().await.scan(min..max, 1000).await? + txn.lock().await.scan(min..max, PROCESSOR_BATCH_SIZE).await? } }; // If there are key-value entries then fetch them @@ -551,7 +552,7 @@ impl<'a> Processor<'a> { if let Some(pla) = ctx.get_query_planner() { if let Some(exe) = pla.get_query_executor(&table.0) { if let Some(mut iterator) = exe.new_iterator(opt, ir, io).await? { - let mut things = iterator.next_batch(txn, 1000).await?; + let mut things = iterator.next_batch(txn, PROCESSOR_BATCH_SIZE).await?; while !things.is_empty() { // Check if the context is finished if ctx.is_done() { @@ -589,7 +590,7 @@ impl<'a> Processor<'a> { } // Collect the next batch of ids - things = iterator.next_batch(txn, 1000).await?; + things = iterator.next_batch(txn, PROCESSOR_BATCH_SIZE).await?; } // Everything ok return Ok(());