[dbs/processor] Reduce batch size for scan operations (#2651)
This commit is contained in:
parent
8ebd1b4a66
commit
b1ae2b4094
2 changed files with 15 additions and 11 deletions
|
@ -32,3 +32,6 @@ pub const ID_CHARS: [char; 36] = [
|
|||
|
||||
/// The publicly visible name of the server
|
||||
pub const SERVER_NAME: &str = "SurrealDB";
|
||||
|
||||
/// Datastore processor batch size for scan operations
|
||||
pub const PROCESSOR_BATCH_SIZE: u32 = 50;
|
||||
|
|
|
@ -1,3 +1,4 @@
|
|||
use crate::cnf::PROCESSOR_BATCH_SIZE;
|
||||
use crate::ctx::Context;
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
use crate::dbs::distinct::AsyncDistinct;
|
||||
|
@ -248,18 +249,18 @@ impl<'a> Processor<'a> {
|
|||
if ctx.is_done() {
|
||||
break;
|
||||
}
|
||||
// Get the next 1000 key-value entries
|
||||
// Get the next batch of key-value entries
|
||||
let res = match nxt {
|
||||
None => {
|
||||
let min = beg.clone();
|
||||
let max = end.clone();
|
||||
txn.clone().lock().await.scan(min..max, 1000).await?
|
||||
txn.clone().lock().await.scan(min..max, PROCESSOR_BATCH_SIZE).await?
|
||||
}
|
||||
Some(ref mut beg) => {
|
||||
beg.push(0x00);
|
||||
let min = beg.clone();
|
||||
let max = end.clone();
|
||||
txn.clone().lock().await.scan(min..max, 1000).await?
|
||||
txn.clone().lock().await.scan(min..max, PROCESSOR_BATCH_SIZE).await?
|
||||
}
|
||||
};
|
||||
// If there are key-value entries then fetch them
|
||||
|
@ -337,18 +338,18 @@ impl<'a> Processor<'a> {
|
|||
if ctx.is_done() {
|
||||
break;
|
||||
}
|
||||
// Get the next 1000 key-value entries
|
||||
// Get the next batch of key-value entries
|
||||
let res = match nxt {
|
||||
None => {
|
||||
let min = beg.clone();
|
||||
let max = end.clone();
|
||||
txn.clone().lock().await.scan(min..max, 1000).await?
|
||||
txn.clone().lock().await.scan(min..max, PROCESSOR_BATCH_SIZE).await?
|
||||
}
|
||||
Some(ref mut beg) => {
|
||||
beg.push(0x00);
|
||||
let min = beg.clone();
|
||||
let max = end.clone();
|
||||
txn.clone().lock().await.scan(min..max, 1000).await?
|
||||
txn.clone().lock().await.scan(min..max, PROCESSOR_BATCH_SIZE).await?
|
||||
}
|
||||
};
|
||||
// If there are key-value entries then fetch them
|
||||
|
@ -474,18 +475,18 @@ impl<'a> Processor<'a> {
|
|||
if ctx.is_done() {
|
||||
break;
|
||||
}
|
||||
// Get the next 1000 key-value entries
|
||||
// Get the next batch key-value entries
|
||||
let res = match nxt {
|
||||
None => {
|
||||
let min = beg.clone();
|
||||
let max = end.clone();
|
||||
txn.lock().await.scan(min..max, 1000).await?
|
||||
txn.lock().await.scan(min..max, PROCESSOR_BATCH_SIZE).await?
|
||||
}
|
||||
Some(ref mut beg) => {
|
||||
beg.push(0x00);
|
||||
let min = beg.clone();
|
||||
let max = end.clone();
|
||||
txn.lock().await.scan(min..max, 1000).await?
|
||||
txn.lock().await.scan(min..max, PROCESSOR_BATCH_SIZE).await?
|
||||
}
|
||||
};
|
||||
// If there are key-value entries then fetch them
|
||||
|
@ -551,7 +552,7 @@ impl<'a> Processor<'a> {
|
|||
if let Some(pla) = ctx.get_query_planner() {
|
||||
if let Some(exe) = pla.get_query_executor(&table.0) {
|
||||
if let Some(mut iterator) = exe.new_iterator(opt, ir, io).await? {
|
||||
let mut things = iterator.next_batch(txn, 1000).await?;
|
||||
let mut things = iterator.next_batch(txn, PROCESSOR_BATCH_SIZE).await?;
|
||||
while !things.is_empty() {
|
||||
// Check if the context is finished
|
||||
if ctx.is_done() {
|
||||
|
@ -589,7 +590,7 @@ impl<'a> Processor<'a> {
|
|||
}
|
||||
|
||||
// Collect the next batch of ids
|
||||
things = iterator.next_batch(txn, 1000).await?;
|
||||
things = iterator.next_batch(txn, PROCESSOR_BATCH_SIZE).await?;
|
||||
}
|
||||
// Everything ok
|
||||
return Ok(());
|
||||
|
|
Loading…
Reference in a new issue