From 9c0d1075885e2c848d89467c91fe865548c9c54d Mon Sep 17 00:00:00 2001 From: Tobie Morgan Hitchcock Date: Mon, 20 Nov 2023 18:47:44 +0000 Subject: [PATCH] Improve RocksDB and SpeeDB performance (#3000) --- Cargo.lock | 1 + lib/Cargo.toml | 5 ++- lib/src/doc/pluck.rs | 2 +- lib/src/key/database/lg.rs | 75 ------------------------------------ lib/src/key/database/mod.rs | 1 - lib/src/key/debug.rs | 3 +- lib/src/key/namespace/lg.rs | 70 --------------------------------- lib/src/key/namespace/mod.rs | 1 - lib/src/kvs/rocksdb/cnf.rs | 57 +++++++++++++++++++++++++++ lib/src/kvs/rocksdb/mod.rs | 55 ++++++++++++++++++++++++-- lib/src/kvs/speedb/cnf.rs | 57 +++++++++++++++++++++++++++ lib/src/kvs/speedb/mod.rs | 54 ++++++++++++++++++++++++-- lib/src/kvs/tx.rs | 18 +++------ 13 files changed, 227 insertions(+), 172 deletions(-) delete mode 100644 lib/src/key/database/lg.rs delete mode 100644 lib/src/key/namespace/lg.rs create mode 100644 lib/src/kvs/rocksdb/cnf.rs create mode 100644 lib/src/kvs/speedb/cnf.rs diff --git a/Cargo.lock b/Cargo.lock index 000c8efc..7aad450c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5044,6 +5044,7 @@ dependencies = [ "nanoid", "native-tls", "nom", + "num_cpus", "once_cell", "path-clean", "pbkdf2", diff --git a/lib/Cargo.toml b/lib/Cargo.toml index 07d91f58..bfe05413 100644 --- a/lib/Cargo.toml +++ b/lib/Cargo.toml @@ -86,6 +86,7 @@ md-5 = "0.10.6" nanoid = "0.4.0" native-tls = { version = "0.2.11", optional = true } nom = { version = "7.1.3", features = ["alloc"] } +num_cpus = "1.16.0" once_cell = "1.18.0" path-clean = "1.0.1" pbkdf2 = { version = "0.12.2", features = ["simple"] } @@ -96,7 +97,7 @@ regex = "1.10.2" reqwest = { version = "0.11.22", default-features = false, features = ["json", "stream", "multipart"], optional = true } revision = "0.5.0" roaring = { version = "0.10.2", features = ["serde"] } -rocksdb = { version = "0.21.0", optional = true } +rocksdb = { version = "0.21.0", features = ["lz4", "snappy"], optional = true } rust_decimal = { version = "1.32.0", features = ["maths", "serde-str"] } rust-stemmers = "1.2.0" rustls = { version = "=0.21.6", optional = true } @@ -107,7 +108,7 @@ serde_json = "1.0.108" sha1 = "0.10.6" sha2 = "0.10.8" snap = "1.1.0" -speedb = { version = "0.0.2", optional = true } +speedb = { version = "0.0.2", features = ["lz4", "snappy"], optional = true } storekey = "0.5.0" thiserror = "1.0.50" tikv = { version = "0.2.0-surreal.2", default-features = false, package = "surrealdb-tikv-client", optional = true } diff --git a/lib/src/doc/pluck.rs b/lib/src/doc/pluck.rs index 21db2e73..2d116e18 100644 --- a/lib/src/doc/pluck.rs +++ b/lib/src/doc/pluck.rs @@ -110,7 +110,7 @@ impl<'a> Document<'a> { } } // Remove metadata fields on output - out.del(ctx, opt, txn, &*META).await?; + out.cut(&*META); // Output result Ok(out) } diff --git a/lib/src/key/database/lg.rs b/lib/src/key/database/lg.rs deleted file mode 100644 index af93c8b0..00000000 --- a/lib/src/key/database/lg.rs +++ /dev/null @@ -1,75 +0,0 @@ -//! Stores a DEFINE LOGIN ON DATABASE config definition -use crate::key::error::KeyCategory; -use crate::key::key_req::KeyRequirements; -use derive::Key; -use serde::{Deserialize, Serialize}; - -#[derive(Clone, Debug, Eq, PartialEq, PartialOrd, Serialize, Deserialize, Key)] -pub struct Lg<'a> { - __: u8, - _a: u8, - pub ns: &'a str, - _b: u8, - pub db: &'a str, - _c: u8, - _d: u8, - _e: u8, - pub dl: &'a str, -} - -pub fn new<'a>(ns: &'a str, db: &'a str, dl: &'a str) -> Lg<'a> { - Lg::new(ns, db, dl) -} - -pub fn prefix(ns: &str, db: &str) -> Vec { - let mut k = super::all::new(ns, db).encode().unwrap(); - k.extend_from_slice(&[b'!', b'l', b'g', 0x00]); - k -} - -pub fn suffix(ns: &str, db: &str) -> Vec { - let mut k = super::all::new(ns, db).encode().unwrap(); - k.extend_from_slice(&[b'!', b'l', b'g', 0xff]); - k -} - -impl KeyRequirements for Lg<'_> { - fn key_category(&self) -> KeyCategory { - KeyCategory::DatabaseLog - } -} - -impl<'a> Lg<'a> { - pub fn new(ns: &'a str, db: &'a str, dl: &'a str) -> Self { - Self { - __: b'/', - _a: b'*', - ns, - _b: b'*', - db, - _c: b'!', - _d: b'l', - _e: b'g', - dl, - } - } -} - -#[cfg(test)] -mod tests { - #[test] - fn key() { - use super::*; - #[rustfmt::skip] - let val = Lg::new( - "testns", - "testdb", - "testdl", - ); - let enc = Lg::encode(&val).unwrap(); - assert_eq!(enc, b"/*testns\0*testdb\0!lgtestdl\0"); - - let dec = Lg::decode(&enc).unwrap(); - assert_eq!(val, dec); - } -} diff --git a/lib/src/key/database/mod.rs b/lib/src/key/database/mod.rs index 2ee6cc4b..2c13da86 100644 --- a/lib/src/key/database/mod.rs +++ b/lib/src/key/database/mod.rs @@ -1,7 +1,6 @@ pub mod all; pub mod az; pub mod fc; -pub mod lg; pub mod pa; pub mod sc; pub mod tb; diff --git a/lib/src/key/debug.rs b/lib/src/key/debug.rs index e7961502..66a581d7 100644 --- a/lib/src/key/debug.rs +++ b/lib/src/key/debug.rs @@ -6,8 +6,7 @@ use crate::kvs::Key; /// sprint_key converts a key to an escaped string. /// This is used for logging and debugging tests and should not be used in implementation code. pub fn sprint_key(key: &Key) -> String { - key.clone() - .iter() + key.iter() .flat_map(|&byte| std::ascii::escape_default(byte)) .map(|byte| byte as char) .collect::() diff --git a/lib/src/key/namespace/lg.rs b/lib/src/key/namespace/lg.rs deleted file mode 100644 index 3750e863..00000000 --- a/lib/src/key/namespace/lg.rs +++ /dev/null @@ -1,70 +0,0 @@ -//! Stores a DEFINE LOGIN ON NAMESPACE config definition -use crate::key::error::KeyCategory; -use crate::key::key_req::KeyRequirements; -use derive::Key; -use serde::{Deserialize, Serialize}; - -#[derive(Clone, Debug, Eq, PartialEq, PartialOrd, Serialize, Deserialize, Key)] -pub struct Lg<'a> { - __: u8, - _a: u8, - pub ns: &'a str, - _b: u8, - _c: u8, - _d: u8, - pub us: &'a str, -} - -pub fn new<'a>(ns: &'a str, us: &'a str) -> Lg<'a> { - Lg::new(ns, us) -} - -pub fn prefix(ns: &str) -> Vec { - let mut k = super::all::new(ns).encode().unwrap(); - k.extend_from_slice(&[b'!', b'l', b'g', 0x00]); - k -} - -pub fn suffix(ns: &str) -> Vec { - let mut k = super::all::new(ns).encode().unwrap(); - k.extend_from_slice(&[b'!', b'l', b'g', 0xff]); - k -} - -impl KeyRequirements for Lg<'_> { - fn key_category(&self) -> KeyCategory { - KeyCategory::DatabaseLogAlias - } -} - -impl<'a> Lg<'a> { - pub fn new(ns: &'a str, us: &'a str) -> Self { - Self { - __: b'/', - _a: b'*', - ns, - _b: b'!', - _c: b'l', - _d: b'g', - us, - } - } -} - -#[cfg(test)] -mod tests { - #[test] - fn key() { - use super::*; - #[rustfmt::skip] - let val = Lg::new( - "testns", - "testus", - ); - let enc = Lg::encode(&val).unwrap(); - assert_eq!(enc, b"/*testns\0!lgtestus\0"); - - let dec = Lg::decode(&enc).unwrap(); - assert_eq!(val, dec); - } -} diff --git a/lib/src/key/namespace/mod.rs b/lib/src/key/namespace/mod.rs index f4d3724c..a3e6d74a 100644 --- a/lib/src/key/namespace/mod.rs +++ b/lib/src/key/namespace/mod.rs @@ -1,6 +1,5 @@ pub mod all; pub mod db; pub mod di; -pub mod lg; pub mod tk; pub mod us; diff --git a/lib/src/kvs/rocksdb/cnf.rs b/lib/src/kvs/rocksdb/cnf.rs new file mode 100644 index 00000000..84c1fca6 --- /dev/null +++ b/lib/src/kvs/rocksdb/cnf.rs @@ -0,0 +1,57 @@ +use once_cell::sync::Lazy; + +pub static ROCKSDB_THREAD_COUNT: Lazy = Lazy::new(|| { + let default = num_cpus::get() as i32; + std::env::var("SURREAL_ROCKSDB_THREAD_COUNT") + .map(|v| v.parse::().unwrap_or(default)) + .unwrap_or(default) +}); + +pub static ROCKSDB_WRITE_BUFFER_SIZE: Lazy = Lazy::new(|| { + let default = 256 * 1024 * 1024; + std::env::var("SURREAL_ROCKSDB_WRITE_BUFFER_SIZE") + .map(|v| v.parse::().unwrap_or(default)) + .unwrap_or(default) +}); + +pub static ROCKSDB_TARGET_FILE_SIZE_BASE: Lazy = Lazy::new(|| { + let default = 512 * 1024 * 1024; + std::env::var("SURREAL_ROCKSDB_TARGET_FILE_SIZE_BASE") + .map(|v| v.parse::().unwrap_or(default)) + .unwrap_or(default) +}); + +pub static ROCKSDB_MAX_WRITE_BUFFER_NUMBER: Lazy = Lazy::new(|| { + let default = 32; + std::env::var("SURREAL_ROCKSDB_MAX_WRITE_BUFFER_NUMBER") + .map(|v| v.parse::().unwrap_or(default)) + .unwrap_or(default) +}); + +pub static ROCKSDB_MIN_WRITE_BUFFER_NUMBER_TO_MERGE: Lazy = Lazy::new(|| { + let default = 4; + std::env::var("SURREAL_ROCKSDB_MIN_WRITE_BUFFER_NUMBER_TO_MERGE") + .map(|v| v.parse::().unwrap_or(default)) + .unwrap_or(default) +}); + +pub static ROCKSDB_ENABLE_PIPELINED_WRITES: Lazy = Lazy::new(|| { + let default = true; + std::env::var("SURREAL_ROCKSDB_ENABLE_PIPELINED_WRITES") + .map(|v| v.parse::().unwrap_or(default)) + .unwrap_or(default) +}); + +pub static ROCKSDB_ENABLE_BLOB_FILES: Lazy = Lazy::new(|| { + let default = true; + std::env::var("SURREAL_ROCKSDB_ENABLE_BLOB_FILES") + .map(|v| v.parse::().unwrap_or(default)) + .unwrap_or(default) +}); + +pub static ROCKSDB_MIN_BLOB_SIZE: Lazy = Lazy::new(|| { + let default = 4 * 1024; + std::env::var("SURREAL_ROCKSDB_MIN_BLOB_SIZE") + .map(|v| v.parse::().unwrap_or(default)) + .unwrap_or(default) +}); diff --git a/lib/src/kvs/rocksdb/mod.rs b/lib/src/kvs/rocksdb/mod.rs index 65fd0d87..42a518a4 100644 --- a/lib/src/kvs/rocksdb/mod.rs +++ b/lib/src/kvs/rocksdb/mod.rs @@ -1,5 +1,7 @@ #![cfg(feature = "kv-rocksdb")] +mod cnf; + use crate::err::Error; use crate::key::error::KeyCategory; use crate::kvs::Check; @@ -7,7 +9,10 @@ use crate::kvs::Key; use crate::kvs::Val; use crate::vs::{try_to_u64_be, u64_to_versionstamp, Versionstamp}; use futures::lock::Mutex; -use rocksdb::{OptimisticTransactionDB, OptimisticTransactionOptions, ReadOptions, WriteOptions}; +use rocksdb::{ + DBCompactionStyle, DBCompressionType, LogLevel, OptimisticTransactionDB, + OptimisticTransactionOptions, Options, ReadOptions, WriteOptions, +}; use std::ops::Range; use std::pin::Pin; use std::sync::Arc; @@ -68,17 +73,57 @@ impl Drop for Transaction { impl Datastore { /// Open a new database pub(crate) async fn new(path: &str) -> Result { + // Configure custom options + let mut opts = Options::default(); + // Ensure we use fdatasync + opts.set_use_fsync(false); + // Only use warning log level + opts.set_log_level(LogLevel::Warn); + // Create database if missing + opts.create_if_missing(true); + // Create column families if missing + opts.create_missing_column_families(true); + // Set the datastore compaction style + opts.set_compaction_style(DBCompactionStyle::Level); + // Increase the background thread count + opts.increase_parallelism(*cnf::ROCKSDB_THREAD_COUNT); + // Set the maximum number of write buffers + opts.set_max_write_buffer_number(*cnf::ROCKSDB_MAX_WRITE_BUFFER_NUMBER); + // Set the amount of data to build up in memory + opts.set_write_buffer_size(*cnf::ROCKSDB_WRITE_BUFFER_SIZE); + // Set the target file size for compaction + opts.set_target_file_size_base(*cnf::ROCKSDB_TARGET_FILE_SIZE_BASE); + // Set minimum number of write buffers to merge + opts.set_min_write_buffer_number_to_merge(*cnf::ROCKSDB_MIN_WRITE_BUFFER_NUMBER_TO_MERGE); + // Use separate write thread queues + opts.set_enable_pipelined_write(*cnf::ROCKSDB_ENABLE_PIPELINED_WRITES); + // Enable separation of keys and values + opts.set_enable_blob_files(*cnf::ROCKSDB_ENABLE_BLOB_FILES); + // Store 4KB values separate from keys + opts.set_min_blob_size(*cnf::ROCKSDB_MIN_BLOB_SIZE); + // Set specific compression levels + opts.set_compression_per_level(&[ + DBCompressionType::None, + DBCompressionType::None, + DBCompressionType::Lz4hc, + DBCompressionType::Lz4hc, + DBCompressionType::Lz4hc, + ]); + // Create the datastore Ok(Datastore { - db: Arc::pin(OptimisticTransactionDB::open_default(path)?), + db: Arc::pin(OptimisticTransactionDB::open(&opts, path)?), }) } /// Start a new transaction pub(crate) async fn transaction(&self, write: bool, _: bool) -> Result { - // Activate the snapshot options + // Set the transaction options let mut to = OptimisticTransactionOptions::default(); to.set_snapshot(true); + // Set the write options + let mut wo = WriteOptions::default(); + wo.set_sync(false); // Create a new transaction - let inner = self.db.transaction_opt(&WriteOptions::default(), &to); + let inner = self.db.transaction_opt(&wo, &to); // The database reference must always outlive // the transaction. If it doesn't then this // is undefined behaviour. This unsafe block @@ -93,6 +138,8 @@ impl Datastore { }; let mut ro = ReadOptions::default(); ro.set_snapshot(&inner.snapshot()); + ro.set_async_io(true); + ro.fill_cache(true); // Specify the check level #[cfg(not(debug_assertions))] let check = Check::Warn; diff --git a/lib/src/kvs/speedb/cnf.rs b/lib/src/kvs/speedb/cnf.rs new file mode 100644 index 00000000..41ce30a2 --- /dev/null +++ b/lib/src/kvs/speedb/cnf.rs @@ -0,0 +1,57 @@ +use once_cell::sync::Lazy; + +pub static SPEEDB_THREAD_COUNT: Lazy = Lazy::new(|| { + let default = num_cpus::get() as i32; + std::env::var("SURREAL_SPEEDB_THREAD_COUNT") + .map(|v| v.parse::().unwrap_or(default)) + .unwrap_or(default) +}); + +pub static SPEEDB_WRITE_BUFFER_SIZE: Lazy = Lazy::new(|| { + let default = 256 * 1024 * 1024; + std::env::var("SURREAL_SPEEDB_WRITE_BUFFER_SIZE") + .map(|v| v.parse::().unwrap_or(default)) + .unwrap_or(default) +}); + +pub static SPEEDB_TARGET_FILE_SIZE_BASE: Lazy = Lazy::new(|| { + let default = 512 * 1024 * 1024; + std::env::var("SURREAL_SPEEDB_TARGET_FILE_SIZE_BASE") + .map(|v| v.parse::().unwrap_or(default)) + .unwrap_or(default) +}); + +pub static SPEEDB_MAX_WRITE_BUFFER_NUMBER: Lazy = Lazy::new(|| { + let default = 32; + std::env::var("SURREAL_SPEEDB_MAX_WRITE_BUFFER_NUMBER") + .map(|v| v.parse::().unwrap_or(default)) + .unwrap_or(default) +}); + +pub static SPEEDB_MIN_WRITE_BUFFER_NUMBER_TO_MERGE: Lazy = Lazy::new(|| { + let default = 4; + std::env::var("SURREAL_SPEEDB_MIN_WRITE_BUFFER_NUMBER_TO_MERGE") + .map(|v| v.parse::().unwrap_or(default)) + .unwrap_or(default) +}); + +pub static SPEEDB_ENABLE_PIPELINED_WRITES: Lazy = Lazy::new(|| { + let default = true; + std::env::var("SURREAL_SPEEDB_ENABLE_PIPELINED_WRITES") + .map(|v| v.parse::().unwrap_or(default)) + .unwrap_or(default) +}); + +pub static SPEEDB_ENABLE_BLOB_FILES: Lazy = Lazy::new(|| { + let default = true; + std::env::var("SURREAL_SPEEDB_ENABLE_BLOB_FILES") + .map(|v| v.parse::().unwrap_or(default)) + .unwrap_or(default) +}); + +pub static SPEEDB_MIN_BLOB_SIZE: Lazy = Lazy::new(|| { + let default = 4 * 1024; + std::env::var("SURREAL_SPEEDB_ENABLE_BLOB_FILES") + .map(|v| v.parse::().unwrap_or(default)) + .unwrap_or(default) +}); diff --git a/lib/src/kvs/speedb/mod.rs b/lib/src/kvs/speedb/mod.rs index 508e931f..2727b347 100644 --- a/lib/src/kvs/speedb/mod.rs +++ b/lib/src/kvs/speedb/mod.rs @@ -1,5 +1,7 @@ #![cfg(feature = "kv-speedb")] +mod cnf; + use crate::err::Error; use crate::key::error::KeyCategory; use crate::kvs::Check; @@ -7,7 +9,10 @@ use crate::kvs::Key; use crate::kvs::Val; use crate::vs::{try_to_u64_be, u64_to_versionstamp, Versionstamp}; use futures::lock::Mutex; -use speedb::{OptimisticTransactionDB, OptimisticTransactionOptions, ReadOptions, WriteOptions}; +use speedb::{ + DBCompactionStyle, DBCompressionType, LogLevel, OptimisticTransactionDB, + OptimisticTransactionOptions, Options, ReadOptions, WriteOptions, +}; use std::ops::Range; use std::pin::Pin; use std::sync::Arc; @@ -68,17 +73,57 @@ impl Drop for Transaction { impl Datastore { /// Open a new database pub(crate) async fn new(path: &str) -> Result { + // Configure custom options + let mut opts = Options::default(); + // Ensure we use fdatasync + opts.set_use_fsync(false); + // Only use warning log level + opts.set_log_level(LogLevel::Warn); + // Create database if missing + opts.create_if_missing(true); + // Create column families if missing + opts.create_missing_column_families(true); + // Set the datastore compaction style + opts.set_compaction_style(DBCompactionStyle::Level); + // Increase the background thread count + opts.increase_parallelism(*cnf::SPEEDB_THREAD_COUNT); + // Set the maximum number of write buffers + opts.set_max_write_buffer_number(*cnf::SPEEDB_MAX_WRITE_BUFFER_NUMBER); + // Set the amount of data to build up in memory + opts.set_write_buffer_size(*cnf::SPEEDB_WRITE_BUFFER_SIZE); + // Set the target file size for compaction + opts.set_target_file_size_base(*cnf::SPEEDB_TARGET_FILE_SIZE_BASE); + // Set minimum number of write buffers to merge + opts.set_min_write_buffer_number_to_merge(*cnf::SPEEDB_MIN_WRITE_BUFFER_NUMBER_TO_MERGE); + // Use separate write thread queues + opts.set_enable_pipelined_write(*cnf::SPEEDB_ENABLE_PIPELINED_WRITES); + // Enable separation of keys and values + opts.set_enable_blob_files(*cnf::SPEEDB_ENABLE_BLOB_FILES); + // Store 4KB values separate from keys + opts.set_min_blob_size(*cnf::SPEEDB_MIN_BLOB_SIZE); + // Set specific compression levels + opts.set_compression_per_level(&[ + DBCompressionType::None, + DBCompressionType::None, + DBCompressionType::Lz4hc, + DBCompressionType::Lz4hc, + DBCompressionType::Lz4hc, + ]); + // Create the datastore Ok(Datastore { - db: Arc::pin(OptimisticTransactionDB::open_default(path)?), + db: Arc::pin(OptimisticTransactionDB::open(&opts, path)?), }) } /// Start a new transaction pub(crate) async fn transaction(&self, write: bool, _: bool) -> Result { - // Activate the snapshot options + // Set the transaction options let mut to = OptimisticTransactionOptions::default(); to.set_snapshot(true); + // Set the write options + let mut wo = WriteOptions::default(); + wo.set_sync(false); // Create a new transaction - let inner = self.db.transaction_opt(&WriteOptions::default(), &to); + let inner = self.db.transaction_opt(&wo, &to); // The database reference must always outlive // the transaction. If it doesn't then this // is undefined behaviour. This unsafe block @@ -93,6 +138,7 @@ impl Datastore { }; let mut ro = ReadOptions::default(); ro.set_snapshot(&inner.snapshot()); + ro.fill_cache(true); // Specify the check level #[cfg(not(debug_assertions))] let check = Check::Warn; diff --git a/lib/src/kvs/tx.rs b/lib/src/kvs/tx.rs index 480ab5e0..96bf5d61 100644 --- a/lib/src/kvs/tx.rs +++ b/lib/src/kvs/tx.rs @@ -8,8 +8,6 @@ use crate::dbs::node::Timestamp; use crate::err::Error; use crate::idg::u32::U32; use crate::idx::trees::store::TreeStoreType; -#[cfg(debug_assertions)] -use crate::key::debug; use crate::key::error::KeyCategory; use crate::key::key_req::KeyRequirements; use crate::kvs::cache::Cache; @@ -290,10 +288,10 @@ impl Transaction { #[allow(unused_variables)] pub async fn del(&mut self, key: K) -> Result<(), Error> where - K: Into + Debug + Into> + Clone, + K: Into + Debug, { #[cfg(debug_assertions)] - trace!("Del {:?}", crate::key::debug::sprint_key(&key.clone().into())); + trace!("Del {:?}", key); match self { #[cfg(feature = "kv-mem")] Transaction { @@ -544,7 +542,7 @@ impl Transaction { val: V, ) -> Result<(), Error> where - K: Into + Debug + Clone, + K: Into + Debug, V: Into + Debug, { #[cfg(debug_assertions)] @@ -651,14 +649,10 @@ impl Transaction { #[allow(unused_variables)] pub async fn scan(&mut self, rng: Range, limit: u32) -> Result, Error> where - K: Into + Debug + Clone, + K: Into + Debug, { #[cfg(debug_assertions)] - trace!( - "Scan {:?} - {:?}", - debug::sprint_key(&rng.start.clone().into()), - debug::sprint_key(&rng.end.clone().into()) - ); + trace!("Scan {:?} - {:?}", rng.start, rng.end); match self { #[cfg(feature = "kv-mem")] Transaction { @@ -794,7 +788,7 @@ impl Transaction { /// This function fetches key-value pairs from the underlying datastore in batches of 1000. pub async fn getr(&mut self, rng: Range, limit: u32) -> Result, Error> where - K: Into + Debug + Clone, + K: Into + Debug, { #[cfg(debug_assertions)] trace!("Getr {:?}..{:?} (limit: {limit})", rng.start, rng.end);