From 96249c2a34bd70a17fd54af01c50a80d6d3df707 Mon Sep 17 00:00:00 2001 From: Przemyslaw Hugh Kaznowski Date: Tue, 9 Jan 2024 17:47:23 +0000 Subject: [PATCH] Add pagination to scan (#2951) --- Makefile.local.toml | 2 +- lib/src/cf/reader.rs | 14 +- lib/src/dbs/processor.rs | 238 ++++++--------- lib/src/err/mod.rs | 4 + lib/src/idx/planner/iterators.rs | 39 ++- lib/src/idx/trees/btree.rs | 14 +- lib/src/kvs/ds.rs | 19 +- lib/src/kvs/speedb/mod.rs | 2 +- lib/src/kvs/tests/hb.rs | 8 +- lib/src/kvs/tests/helper.rs | 3 +- lib/src/kvs/tests/nd.rs | 14 +- lib/src/kvs/tests/ndlq.rs | 10 +- lib/src/kvs/tests/raw.rs | 55 ++++ lib/src/kvs/tests/tb.rs | 13 +- lib/src/kvs/tests/tblq.rs | 8 +- lib/src/kvs/tx.rs | 508 ++++++++++++------------------- 16 files changed, 449 insertions(+), 502 deletions(-) diff --git a/Makefile.local.toml b/Makefile.local.toml index 77fd32b5..2a256bd2 100644 --- a/Makefile.local.toml +++ b/Makefile.local.toml @@ -30,7 +30,7 @@ args = ["test", "--workspace", "--no-fail-fast"] [tasks.cargo-check] category = "LOCAL USAGE" command = "cargo" -args = ["check", "--workspace"] +args = ["check", "--workspace", "--features", "${DEV_FEATURES}"] [tasks.cargo-fmt] category = "LOCAL USAGE" diff --git a/lib/src/cf/reader.rs b/lib/src/cf/reader.rs index b02214d6..952fefeb 100644 --- a/lib/src/cf/reader.rs +++ b/lib/src/cf/reader.rs @@ -1,7 +1,7 @@ use crate::cf::{ChangeSet, DatabaseMutation, TableMutations}; use crate::err::Error; use crate::key::change; -use crate::kvs::Transaction; +use crate::kvs::{Limit, ScanPage, Transaction}; use crate::sql::statements::show::ShowSince; use crate::vs; @@ -40,14 +40,22 @@ pub async fn read( let limit = limit.unwrap_or(100); - let _x = tx.scan(beg..end, limit).await?; + let scan = tx + .scan_paged( + ScanPage { + range: beg..end, + limit: Limit::Limited(limit), + }, + limit, + ) + .await?; let mut vs: Option<[u8; 10]> = None; let mut buf: Vec = Vec::new(); let mut r = Vec::::new(); // iterate over _x and put decoded elements to r - for (k, v) in _x { + for (k, v) in scan.values { trace!("read change feed; {k:?}"); let dec = crate::key::change::Cf::decode(&k).unwrap(); diff --git a/lib/src/dbs/processor.rs b/lib/src/dbs/processor.rs index 2689ebbe..22191d23 100644 --- a/lib/src/dbs/processor.rs +++ b/lib/src/dbs/processor.rs @@ -8,6 +8,7 @@ use crate::err::Error; use crate::idx::planner::executor::IteratorRef; use crate::idx::planner::IterationStage; use crate::key::{graph, thing}; +use crate::kvs::ScanPage; use crate::sql::dir::Dir; use crate::sql::{Edges, Range, Table, Thing, Value}; #[cfg(not(target_arch = "wasm32"))] @@ -308,60 +309,43 @@ impl<'a> Processor<'a> { // Prepare the start and end keys let beg = thing::prefix(opt.ns(), opt.db(), &v); let end = thing::suffix(opt.ns(), opt.db(), &v); - // Prepare the next holder key - let mut nxt: Option> = None; // Loop until no more keys - loop { + let mut next_page = Some(ScanPage::from(beg..end)); + while let Some(page) = next_page { // Check if the context is finished if ctx.is_done() { break; } // Get the next batch of key-value entries - let res = match nxt { - None => { - let min = beg.clone(); - let max = end.clone(); - txn.clone().lock().await.scan(min..max, PROCESSOR_BATCH_SIZE).await? - } - Some(ref mut beg) => { - beg.push(0x00); - let min = beg.clone(); - let max = end.clone(); - txn.clone().lock().await.scan(min..max, PROCESSOR_BATCH_SIZE).await? - } - }; - // If there are key-value entries then fetch them - if !res.is_empty() { - // Get total results - let n = res.len(); - // Loop over results - for (i, (k, v)) in res.into_iter().enumerate() { - // Check the context - if ctx.is_done() { - break; - } - // Ready the next - if n == i + 1 { - nxt = Some(k.clone()); - } - // Parse the data from the store - let key: thing::Thing = (&k).into(); - let val: Value = (&v).into(); - let rid = Thing::from((key.tb, key.id)); - // Create a new operable value - let val = Operable::Value(val); - // Process the record - let pro = Processed { - ir: None, - rid: Some(rid), - doc_id: None, - val, - }; - self.process(ctx, opt, txn, stm, pro).await?; - } - continue; + let res = txn.clone().lock().await.scan_paged(page, PROCESSOR_BATCH_SIZE).await?; + next_page = res.next_page; + let res = res.values; + // If no results then break + if res.is_empty() { + break; } - break; + // Loop over results + for (k, v) in res.into_iter() { + // Check the context + if ctx.is_done() { + break; + } + // Parse the data from the store + let key: thing::Thing = (&k).into(); + let val: Value = (&v).into(); + let rid = Thing::from((key.tb, key.id)); + // Create a new operable value + let val = Operable::Value(val); + // Process the record + let pro = Processed { + ir: None, + rid: Some(rid), + doc_id: None, + val, + }; + self.process(ctx, opt, txn, stm, pro).await?; + } + continue; } // Everything ok Ok(()) @@ -397,60 +381,43 @@ impl<'a> Processor<'a> { key } }; - // Prepare the next holder key - let mut nxt: Option> = None; // Loop until no more keys - loop { + let mut next_page = Some(ScanPage::from(beg..end)); + while let Some(page) = next_page { // Check if the context is finished if ctx.is_done() { break; } + let res = txn.clone().lock().await.scan_paged(page, PROCESSOR_BATCH_SIZE).await?; + next_page = res.next_page; // Get the next batch of key-value entries - let res = match nxt { - None => { - let min = beg.clone(); - let max = end.clone(); - txn.clone().lock().await.scan(min..max, PROCESSOR_BATCH_SIZE).await? - } - Some(ref mut beg) => { - beg.push(0x00); - let min = beg.clone(); - let max = end.clone(); - txn.clone().lock().await.scan(min..max, PROCESSOR_BATCH_SIZE).await? - } - }; + let res = res.values; // If there are key-value entries then fetch them - if !res.is_empty() { - // Get total results - let n = res.len(); - // Loop over results - for (i, (k, v)) in res.into_iter().enumerate() { - // Check the context - if ctx.is_done() { - break; - } - // Ready the next - if n == i + 1 { - nxt = Some(k.clone()); - } - // Parse the data from the store - let key: thing::Thing = (&k).into(); - let val: Value = (&v).into(); - let rid = Thing::from((key.tb, key.id)); - // Create a new operable value - let val = Operable::Value(val); - // Process the record - let pro = Processed { - ir: None, - rid: Some(rid), - doc_id: None, - val, - }; - self.process(ctx, opt, txn, stm, pro).await?; - } - continue; + if res.is_empty() { + break; } - break; + // Loop over results + for (k, v) in res.into_iter() { + // Check the context + if ctx.is_done() { + break; + } + // Parse the data from the store + let key: thing::Thing = (&k).into(); + let val: Value = (&v).into(); + let rid = Thing::from((key.tb, key.id)); + // Create a new operable value + let val = Operable::Value(val); + // Process the record + let pro = Processed { + ir: None, + rid: Some(rid), + doc_id: None, + val, + }; + self.process(ctx, opt, txn, stm, pro).await?; + } + continue; } // Everything ok Ok(()) @@ -534,69 +501,48 @@ impl<'a> Processor<'a> { }; // for (beg, end) in keys.iter() { - // Prepare the next holder key - let mut nxt: Option> = None; // Loop until no more keys - loop { + let mut next_page = Some(ScanPage::from(beg.clone()..end.clone())); + while let Some(page) = next_page { // Check if the context is finished if ctx.is_done() { break; } // Get the next batch key-value entries - let res = match nxt { - None => { - let min = beg.clone(); - let max = end.clone(); - txn.lock().await.scan(min..max, PROCESSOR_BATCH_SIZE).await? - } - Some(ref mut beg) => { - beg.push(0x00); - let min = beg.clone(); - let max = end.clone(); - txn.lock().await.scan(min..max, PROCESSOR_BATCH_SIZE).await? - } - }; + let res = txn.lock().await.scan_paged(page, PROCESSOR_BATCH_SIZE).await?; + next_page = res.next_page; + let res = res.values; // If there are key-value entries then fetch them - if !res.is_empty() { - // Get total results - let n = res.len(); - // Exit when settled - if n == 0 { + if res.is_empty() { + break; + } + // Loop over results + for (k, _) in res.into_iter() { + // Check the context + if ctx.is_done() { break; } - // Loop over results - for (i, (k, _)) in res.into_iter().enumerate() { - // Check the context - if ctx.is_done() { - break; - } - // Ready the next - if n == i + 1 { - nxt = Some(k.clone()); - } - // Parse the data from the store - let gra: graph::Graph = (&k).into(); - // Fetch the data from the store - let key = thing::new(opt.ns(), opt.db(), gra.ft, &gra.fk); - let val = txn.lock().await.get(key).await?; - let rid = Thing::from((gra.ft, gra.fk)); - // Parse the data from the store - let val = Operable::Value(match val { - Some(v) => Value::from(v), - None => Value::None, - }); - // Process the record - let pro = Processed { - ir: None, - rid: Some(rid), - doc_id: None, - val, - }; - self.process(ctx, opt, txn, stm, pro).await?; - } - continue; + // Parse the data from the store + let gra: graph::Graph = graph::Graph::decode(&k)?; + // Fetch the data from the store + let key = thing::new(opt.ns(), opt.db(), gra.ft, &gra.fk); + let val = txn.lock().await.get(key).await?; + let rid = Thing::from((gra.ft, gra.fk)); + // Parse the data from the store + let val = Operable::Value(match val { + Some(v) => Value::from(v), + None => Value::None, + }); + // Process the record + let pro = Processed { + ir: None, + rid: Some(rid), + doc_id: None, + val, + }; + self.process(ctx, opt, txn, stm, pro).await?; } - break; + continue; } } // Everything ok diff --git a/lib/src/err/mod.rs b/lib/src/err/mod.rs index 287e43d5..54c39b2e 100644 --- a/lib/src/err/mod.rs +++ b/lib/src/err/mod.rs @@ -766,6 +766,10 @@ pub enum Error { /// The key being inserted in the transaction already exists #[error("The key being inserted already exists: {0}")] TxKeyAlreadyExistsCategory(KeyCategory), + + /// The db is running without an available storage engine + #[error("The db is running without an available storage engine")] + MissingStorageEngine, } impl From for String { diff --git a/lib/src/idx/planner/iterators.rs b/lib/src/idx/planner/iterators.rs index 561b5e38..cd717db8 100644 --- a/lib/src/idx/planner/iterators.rs +++ b/lib/src/idx/planner/iterators.rs @@ -5,7 +5,7 @@ use crate::idx::ft::termdocs::TermsDocs; use crate::idx::ft::{FtIndex, HitsIterator}; use crate::idx::planner::plan::RangeValue; use crate::key::index::Index; -use crate::kvs::Key; +use crate::kvs::{Key, Limit, ScanPage}; use crate::sql::statements::DefineIndexStatement; use crate::sql::{Array, Thing, Value}; use std::collections::VecDeque; @@ -64,7 +64,18 @@ impl IndexEqualThingIterator { ) -> Result)>, Error> { let min = beg.clone(); let max = end.to_owned(); - let res = txn.lock().await.scan(min..max, limit).await?; + let res = txn + .lock() + .await + .scan_paged( + ScanPage { + range: min..max, + limit: Limit::Limited(limit), + }, + limit, + ) + .await?; + let res = res.values; if let Some((key, _)) = res.last() { let mut key = key.clone(); key.push(0x00); @@ -176,7 +187,18 @@ impl IndexRangeThingIterator { ) -> Result)>, Error> { let min = self.r.beg.clone(); let max = self.r.end.clone(); - let res = txn.lock().await.scan(min..max, limit).await?; + let res = txn + .lock() + .await + .scan_paged( + ScanPage { + range: min..max, + limit: Limit::Limited(limit), + }, + limit, + ) + .await?; + let res = res.values; if let Some((key, _)) = res.last() { self.r.beg = key.clone(); self.r.beg.push(0x00); @@ -314,7 +336,16 @@ impl UniqueRangeThingIterator { let max = self.r.end.clone(); limit += 1; let mut tx = txn.lock().await; - let res = tx.scan(min..max, limit).await?; + let res = tx + .scan_paged( + ScanPage { + range: min..max, + limit: Limit::Limited(limit), + }, + limit, + ) + .await?; + let res = res.values; let mut r = Vec::with_capacity(res.len()); for (k, v) in res { limit -= 1; diff --git a/lib/src/idx/trees/btree.rs b/lib/src/idx/trees/btree.rs index efa8a8dd..687f9311 100644 --- a/lib/src/idx/trees/btree.rs +++ b/lib/src/idx/trees/btree.rs @@ -936,7 +936,7 @@ mod tests { }; use crate::idx::trees::store::{NodeId, TreeNode, TreeNodeProvider}; use crate::idx::VersionedSerdeState; - use crate::kvs::{Datastore, Key, LockType::*, Transaction, TransactionType}; + use crate::kvs::{Datastore, Key, LockType::*, ScanPage, Transaction, TransactionType}; use rand::prelude::SliceRandom; use rand::thread_rng; use std::cmp::Ordering; @@ -1370,7 +1370,10 @@ mod tests { assert_eq!(s.max_depth, 3); assert_eq!(s.nodes_count, 10); // There should be one record per node - assert_eq!(10, tx.scan(vec![]..vec![0xf], 100).await.unwrap().len()); + assert_eq!( + 10, + tx.scan_paged(ScanPage::from(vec![]..vec![0xf]), 100).await.unwrap().values.len() + ); let nodes_count = t .inspect_nodes(&mut tx, &mut st, |count, depth, node_id, node| match count { @@ -1510,7 +1513,10 @@ mod tests { assert_eq!(s.max_depth, 2); assert_eq!(s.nodes_count, 7); // There should be one record per node - assert_eq!(7, tx.scan(vec![]..vec![0xf], 100).await.unwrap().len()); + assert_eq!( + 7, + tx.scan_paged(ScanPage::from(vec![]..vec![0xf]), 100).await.unwrap().values.len() + ); let nodes_count = t .inspect_nodes(&mut tx, &mut st, |count, depth, node_id, node| match count { @@ -1635,7 +1641,7 @@ mod tests { assert_eq!(s.max_depth, 0); assert_eq!(s.nodes_count, 0); // There should not be any record in the database - assert_eq!(0, tx.scan(vec![]..vec![0xf], 100).await?.len()); + assert_eq!(0, tx.scan_paged(ScanPage::from(vec![]..vec![0xf]), 100).await?.values.len()); tx.cancel().await?; Ok(()) } diff --git a/lib/src/kvs/ds.rs b/lib/src/kvs/ds.rs index 26d1e8f3..ef1cba48 100644 --- a/lib/src/kvs/ds.rs +++ b/lib/src/kvs/ds.rs @@ -12,7 +12,7 @@ use crate::key::root::hb::Hb; use crate::kvs::clock::SizedClock; #[allow(unused_imports)] use crate::kvs::clock::SystemClock; -use crate::kvs::{LockType, LockType::*, TransactionType, TransactionType::*, NO_LIMIT}; +use crate::kvs::{LockType, LockType::*, TransactionType, TransactionType::*}; use crate::opt::auth::Root; #[cfg(feature = "jwks")] use crate::opt::capabilities::NetTarget; @@ -38,6 +38,9 @@ use wasmtimer::std::{SystemTime, UNIX_EPOCH}; const HEARTBEAT_BATCH_SIZE: u32 = 1000; const LQ_CHANNEL_SIZE: usize = 100; +// The batch size used for non-paged operations (i.e. if there are more results, they are ignored) +const NON_PAGED_BATCH_SIZE: u32 = 100_000; + /// Used for cluster logic to move LQ data to LQ cleanup code /// Not a stored struct; Used only in this module #[derive(Debug, Clone, Eq, PartialEq)] @@ -614,7 +617,7 @@ impl Datastore { for nd in nodes.iter() { trace!("Archiving node {}", &nd); // Scan on node prefix for LQ space - let node_lqs = tx.scan_ndlq(nd, NO_LIMIT).await?; + let node_lqs = tx.scan_ndlq(nd, NON_PAGED_BATCH_SIZE).await?; trace!("Found {} LQ entries for {:?}", node_lqs.len(), nd); for lq in node_lqs { trace!("Archiving query {:?}", &lq); @@ -654,7 +657,7 @@ impl Datastore { pub async fn clear_unreachable_state(&self, tx: &mut Transaction) -> Result<(), Error> { // Scan nodes - let cluster = tx.scan_nd(NO_LIMIT).await?; + let cluster = tx.scan_nd(NON_PAGED_BATCH_SIZE).await?; trace!("Found {} nodes", cluster.len()); let mut unreachable_nodes = BTreeMap::new(); for cl in &cluster { @@ -665,7 +668,7 @@ impl Datastore { // We remove one, because the scan range adds one value: u64::MAX - 1, }; - let hbs = tx.scan_hb(&end_of_time, NO_LIMIT).await?; + let hbs = tx.scan_hb(&end_of_time, NON_PAGED_BATCH_SIZE).await?; trace!("Found {} heartbeats", hbs.len()); for hb in hbs { match unreachable_nodes.remove(&hb.nd.to_string()) { @@ -691,7 +694,7 @@ impl Datastore { for cl in &cluster { let nds = tx.scan_ndlq(&uuid::Uuid::parse_str(&cl.name).map_err(|e| { Error::Unimplemented(format!("cluster id was not uuid when parsing to aggregate cluster live queries: {:?}", e)) - })?, NO_LIMIT).await?; + })?, NON_PAGED_BATCH_SIZE).await?; nd_lq_set.extend(nds.into_iter().map(LqType::Nd)); } trace!("Found {} node live queries", nd_lq_set.len()); @@ -700,7 +703,7 @@ impl Datastore { let mut tb_lq_set: BTreeSet = BTreeSet::new(); for ndlq in &nd_lq_set { let lq = ndlq.get_inner(); - let tbs = tx.scan_tblq(&lq.ns, &lq.db, &lq.tb, NO_LIMIT).await?; + let tbs = tx.scan_tblq(&lq.ns, &lq.db, &lq.tb, NON_PAGED_BATCH_SIZE).await?; tb_lq_set.extend(tbs.into_iter().map(LqType::Tb)); } trace!("Found {} table live queries", tb_lq_set.len()); @@ -732,7 +735,7 @@ impl Datastore { // Find all the LQs we own, so that we can get the ns/ds from provided uuids // We may improve this in future by tracking in web layer - let lqs = tx.scan_ndlq(&self.id, NO_LIMIT).await?; + let lqs = tx.scan_ndlq(&self.id, NON_PAGED_BATCH_SIZE).await?; let mut hits = vec![]; for lq_value in lqs { if live_queries.contains(&lq_value.lq) { @@ -793,7 +796,7 @@ impl Datastore { ) -> Result, Error> { let dead = tx.scan_hb(ts, HEARTBEAT_BATCH_SIZE).await?; // Delete the heartbeat and everything nested - tx.delr_hb(dead.clone(), NO_LIMIT).await?; + tx.delr_hb(dead.clone(), NON_PAGED_BATCH_SIZE).await?; for dead_node in dead.clone() { tx.del_nd(dead_node.nd).await?; } diff --git a/lib/src/kvs/speedb/mod.rs b/lib/src/kvs/speedb/mod.rs index 9277442d..7061ff47 100644 --- a/lib/src/kvs/speedb/mod.rs +++ b/lib/src/kvs/speedb/mod.rs @@ -445,7 +445,7 @@ impl Transaction { end: rng.end.into(), }; // Create result set - let mut res = vec![]; + let mut res: Vec<(Key, Val)> = vec![]; // Set the key range let beg = rng.start.as_slice(); let end = rng.end.as_slice(); diff --git a/lib/src/kvs/tests/hb.rs b/lib/src/kvs/tests/hb.rs index fe25a15a..c585ff91 100644 --- a/lib/src/kvs/tests/hb.rs +++ b/lib/src/kvs/tests/hb.rs @@ -18,14 +18,14 @@ async fn write_scan_hb() { tx.set_hb(t2, Uuid::parse_str("b80ff454-c3e7-46a9-a0b0-7b40e9a62626").unwrap()).await.unwrap(); tx.commit().await.unwrap(); - // Scan limit 1000 + // Scan in batches of 1 let mut tx = test.db.transaction(Write, Optimistic).await.unwrap(); - let vals_lim = tx.scan_hb(&t3, 1000).await.unwrap(); + let vals_lim = tx.scan_hb(&t3, 1).await.unwrap(); tx.cancel().await.unwrap(); - // Scan limit 0 + // Scan in batches of 100k let mut tx = test.db.transaction(Write, Optimistic).await.unwrap(); - let vals_no_lim = tx.scan_hb(&t3, NO_LIMIT).await.unwrap(); + let vals_no_lim = tx.scan_hb(&t3, 100_000).await.unwrap(); tx.cancel().await.unwrap(); // Assert equal diff --git a/lib/src/kvs/tests/helper.rs b/lib/src/kvs/tests/helper.rs index d638b597..dcd35a79 100644 --- a/lib/src/kvs/tests/helper.rs +++ b/lib/src/kvs/tests/helper.rs @@ -37,7 +37,8 @@ pub(crate) async fn init( /// Scan the entire storage layer displaying keys /// Useful to debug scans ;) async fn _debug_scan(tx: &mut Transaction, message: &str) { - let r = tx.scan(vec![0]..vec![u8::MAX], 100000).await.unwrap(); + let r = tx.scan_paged(ScanPage::from(vec![0]..vec![u8::MAX]), u32::MAX).await.unwrap(); + let r = r.values; println!("START OF RANGE SCAN - {}", message); for (k, _v) in r.iter() { println!("{}", crate::key::debug::sprint_key(k)); diff --git a/lib/src/kvs/tests/nd.rs b/lib/src/kvs/tests/nd.rs index 88d65cec..a7255dde 100644 --- a/lib/src/kvs/tests/nd.rs +++ b/lib/src/kvs/tests/nd.rs @@ -11,18 +11,18 @@ async fn write_scan_nd() { tx.set_nd(Uuid::parse_str("cbefc4fe-8ba0-4898-ab69-782e3ebc06f9").unwrap()).await.unwrap(); tx.commit().await.unwrap(); - // Scan limit 1000 + // Scan in batches of 1 let mut tx = test.db.transaction(Write, Optimistic).await.unwrap(); - let vals_lim = tx.scan_nd(1000).await.unwrap(); + let res_many_batches = tx.scan_nd(1).await.unwrap(); tx.cancel().await.unwrap(); - // Scan limit 0 + // Scan in batches of 100k let mut tx = test.db.transaction(Write, Optimistic).await.unwrap(); - let vals_no_lim = tx.scan_nd(NO_LIMIT).await.unwrap(); + let res_single_batch = tx.scan_nd(100_000).await.unwrap(); tx.cancel().await.unwrap(); // Assert equal - assert_eq!(vals_lim, vals_no_lim); - assert_eq!(vals_lim.len(), 2); - assert_eq!(vals_no_lim.len(), 2); + assert_eq!(res_many_batches, res_single_batch); + assert_eq!(res_many_batches.len(), 2); + assert_eq!(res_single_batch.len(), 2); } diff --git a/lib/src/kvs/tests/ndlq.rs b/lib/src/kvs/tests/ndlq.rs index 515c13ab..b05ea8d4 100644 --- a/lib/src/kvs/tests/ndlq.rs +++ b/lib/src/kvs/tests/ndlq.rs @@ -1,4 +1,4 @@ -use crate::kvs::{LqValue, NO_LIMIT}; +use crate::kvs::LqValue; #[tokio::test] #[serial] @@ -19,11 +19,11 @@ async fn write_scan_ndlq() { // Verify scan let mut tx = test.db.transaction(Write, Optimistic).await.unwrap(); - let res_lim = tx.scan_ndlq(&nd, 100).await.unwrap(); - let res_no_lim = tx.scan_ndlq(&nd, NO_LIMIT).await.unwrap(); + let res_many_batches = tx.scan_ndlq(&nd, 1).await.unwrap(); + let res_single_batch = tx.scan_ndlq(&nd, 100_000).await.unwrap(); tx.commit().await.unwrap(); assert_eq!( - res_lim, + res_many_batches, vec![LqValue { nd: sql::Uuid::from(nd), ns: ns.to_string(), @@ -32,5 +32,5 @@ async fn write_scan_ndlq() { lq }] ); - assert_eq!(res_lim, res_no_lim); + assert_eq!(res_many_batches, res_single_batch); } diff --git a/lib/src/kvs/tests/raw.rs b/lib/src/kvs/tests/raw.rs index 5bd70faa..2ae800e2 100644 --- a/lib/src/kvs/tests/raw.rs +++ b/lib/src/kvs/tests/raw.rs @@ -240,3 +240,58 @@ async fn scan() { assert_eq!(val[1].1, b"2"); tx.cancel().await.unwrap(); } + +#[tokio::test] +#[serial] +async fn scan_paged() { + // Create a new datastore + let node_id = Uuid::parse_str("6572a13c-a7a0-4e19-be62-18acb4e854f5").unwrap(); + let clock = Arc::new(RwLock::new(SizedClock::Fake(FakeClock::new(Timestamp::default())))); + let (ds, _) = new_ds(node_id, clock).await; + // Create a writeable transaction + let mut tx = ds.transaction(Write, Optimistic).await.unwrap(); + assert!(tx.put(Unknown, "test1", "1").await.is_ok()); + assert!(tx.put(Unknown, "test2", "2").await.is_ok()); + assert!(tx.put(Unknown, "test3", "3").await.is_ok()); + assert!(tx.put(Unknown, "test4", "4").await.is_ok()); + assert!(tx.put(Unknown, "test5", "5").await.is_ok()); + tx.commit().await.unwrap(); + // Create a readonly transaction + let mut tx = ds.transaction(Read, Optimistic).await.unwrap(); + let val = + tx.scan_paged(ScanPage::from("test1".into().."test9".into()), u32::MAX).await.unwrap(); + let val = val.values; + assert_eq!(val.len(), 5); + assert_eq!(val[0].0, b"test1"); + assert_eq!(val[0].1, b"1"); + assert_eq!(val[1].0, b"test2"); + assert_eq!(val[1].1, b"2"); + assert_eq!(val[2].0, b"test3"); + assert_eq!(val[2].1, b"3"); + assert_eq!(val[3].0, b"test4"); + assert_eq!(val[3].1, b"4"); + assert_eq!(val[4].0, b"test5"); + assert_eq!(val[4].1, b"5"); + tx.cancel().await.unwrap(); + // Create a readonly transaction + let mut tx = ds.transaction(Read, Optimistic).await.unwrap(); + let val = + tx.scan_paged(ScanPage::from("test2".into().."test4".into()), u32::MAX).await.unwrap(); + let val = val.values; + assert_eq!(val.len(), 2); + assert_eq!(val[0].0, b"test2"); + assert_eq!(val[0].1, b"2"); + assert_eq!(val[1].0, b"test3"); + assert_eq!(val[1].1, b"3"); + tx.cancel().await.unwrap(); + // Create a readonly transaction + let mut tx = ds.transaction(Read, Optimistic).await.unwrap(); + let val = tx.scan_paged(ScanPage::from("test1".into().."test9".into()), 2).await.unwrap(); + let val = val.values; + assert_eq!(val.len(), 2); + assert_eq!(val[0].0, b"test1"); + assert_eq!(val[0].1, b"1"); + assert_eq!(val[1].0, b"test2"); + assert_eq!(val[1].1, b"2"); + tx.cancel().await.unwrap(); +} diff --git a/lib/src/kvs/tests/tb.rs b/lib/src/kvs/tests/tb.rs index 8d8c4f2e..12f9f321 100644 --- a/lib/src/kvs/tests/tb.rs +++ b/lib/src/kvs/tests/tb.rs @@ -1,5 +1,6 @@ use crate::key::database::tb; use crate::key::database::tb::Tb; +use crate::kvs::ScanPage; use crate::sql::statements::DefineTableStatement; #[tokio::test] @@ -29,10 +30,16 @@ async fn table_definitions_can_be_scanned() { tx.set(&key, &value).await.unwrap(); // Validate with scan - match tx.scan(tb::prefix(namespace, database)..tb::suffix(namespace, database), 1000).await { + match tx + .scan_paged( + ScanPage::from(tb::prefix(namespace, database)..tb::suffix(namespace, database)), + 1000, + ) + .await + { Ok(scan) => { - assert_eq!(scan.len(), 1); - let read = DefineTableStatement::from(&scan[0].1); + assert_eq!(scan.values.len(), 1); + let read = DefineTableStatement::from(&scan.values[0].1); assert_eq!(&read, &value); } Err(e) => panic!("{:?}", e), diff --git a/lib/src/kvs/tests/tblq.rs b/lib/src/kvs/tests/tblq.rs index f1dcc656..bc668818 100644 --- a/lib/src/kvs/tests/tblq.rs +++ b/lib/src/kvs/tests/tblq.rs @@ -35,11 +35,11 @@ async fn write_scan_tblq() { // Verify scan let mut tx = test.db.transaction(Write, Optimistic).await.unwrap(); - let res = tx.scan_tblq(ns, db, tb, 100).await.unwrap(); - let no_limit = tx.scan_tblq(ns, db, tb, NO_LIMIT).await.unwrap(); + let res_many_batches = tx.scan_tblq(ns, db, tb, 1).await.unwrap(); + let res_single_batch = tx.scan_tblq(ns, db, tb, 100_000).await.unwrap(); tx.commit().await.unwrap(); assert_eq!( - res, + res_many_batches, vec![LqValue { nd: sql::Uuid::from(node_id), ns: ns.to_string(), @@ -48,6 +48,6 @@ async fn write_scan_tblq() { lq: live_id }] ); - assert_eq!(res, no_limit); + assert_eq!(res_many_batches, res_single_batch); } } diff --git a/lib/src/kvs/tx.rs b/lib/src/kvs/tx.rs index 57e70064..1323dd1b 100644 --- a/lib/src/kvs/tx.rs +++ b/lib/src/kvs/tx.rs @@ -48,7 +48,36 @@ use std::sync::Arc; use tokio::sync::RwLock; use uuid::Uuid; -pub(crate) const NO_LIMIT: u32 = 0; +#[derive(Copy, Clone, Debug)] +pub enum Limit { + Unlimited, + Limited(u32), +} + +pub struct ScanPage +where + K: Into + Debug, +{ + pub range: Range, + pub limit: Limit, +} + +impl From>> for ScanPage> { + fn from(value: Range>) -> Self { + ScanPage { + range: value, + limit: Limit::Unlimited, + } + } +} + +pub struct ScanResult +where + K: Into + Debug, +{ + pub next_page: Option>, + pub values: Vec<(Key, Val)>, +} /// A set of undoable updates and requests against a dataset. #[allow(dead_code)] @@ -595,8 +624,6 @@ impl Transaction { K: Into + Debug, V: Into + Debug, { - #[cfg(debug_assertions)] - trace!("Put {:?} => {:?}", key, val); match self { #[cfg(feature = "kv-mem")] Transaction { @@ -679,6 +706,79 @@ impl Transaction { } } + /// Retrieve a specific range of keys from the datastore. + /// + /// This function fetches the full range of key-value pairs, in a single request to the underlying datastore. + #[allow(unused_variables)] + pub async fn scan_paged( + &mut self, + page: ScanPage, + batch_limit: u32, + ) -> Result, Error> + where + K: Into + From> + Debug + Clone, + { + #[cfg(debug_assertions)] + trace!("Scan {:?} - {:?}", page.range.start, page.range.end); + let range = page.range.clone(); + let res = match self { + #[cfg(feature = "kv-mem")] + Transaction { + inner: Inner::Mem(v), + .. + } => v.scan(range, batch_limit), + #[cfg(feature = "kv-rocksdb")] + Transaction { + inner: Inner::RocksDB(v), + .. + } => v.scan(range, batch_limit).await, + #[cfg(feature = "kv-speedb")] + Transaction { + inner: Inner::SpeeDB(v), + .. + } => v.scan(range, batch_limit).await, + #[cfg(feature = "kv-indxdb")] + Transaction { + inner: Inner::IndxDB(v), + .. + } => v.scan(range, batch_limit).await, + #[cfg(feature = "kv-tikv")] + Transaction { + inner: Inner::TiKV(v), + .. + } => v.scan(range, batch_limit).await, + #[cfg(feature = "kv-fdb")] + Transaction { + inner: Inner::FoundationDB(v), + .. + } => v.scan(range, batch_limit).await, + #[allow(unreachable_patterns)] + _ => Err(Error::MissingStorageEngine), + }; + // Construct next page + res.map(|tup_vec: Vec<(Key, Val)>| { + if tup_vec.len() < batch_limit as usize { + ScanResult { + next_page: None, + values: tup_vec, + } + } else { + let (mut rng, limit) = (page.range, page.limit); + rng.start = match tup_vec.last() { + Some((k, _)) => K::from(k.clone().add(0)), + None => rng.start, + }; + ScanResult { + next_page: Some(ScanPage { + range: rng, + limit, + }), + values: tup_vec, + } + } + }) + } + /// Update a key in the datastore if the current value matches a condition. #[allow(unused_variables)] pub async fn putc(&mut self, key: K, val: V, chk: Option) -> Result<(), Error> @@ -784,43 +884,25 @@ impl Transaction { trace!("Getr {:?}..{:?} (limit: {limit})", rng.start, rng.end); let beg: Key = rng.start.into(); let end: Key = rng.end.into(); - let mut nxt: Option = None; - let mut num = limit; let mut out: Vec<(Key, Val)> = vec![]; + let mut next_page = Some(ScanPage { + range: beg..end, + limit: Limit::Limited(limit), + }); // Start processing - while num > 0 { + while let Some(page) = next_page { // Get records batch - let res = match nxt { - None => { - let min = beg.clone(); - let max = end.clone(); - let num = std::cmp::min(1000, num); - self.scan(min..max, num).await? - } - Some(ref mut beg) => { - beg.push(0x00); - let min = beg.clone(); - let max = end.clone(); - let num = std::cmp::min(1000, num); - self.scan(min..max, num).await? - } - }; - // Get total results - let n = res.len(); + let res = self.scan_paged(page, 1000).await?; + next_page = res.next_page; + let res = res.values; // Exit when settled - if n == 0 { + if res.is_empty() { break; } // Loop over results - for (i, (k, v)) in res.into_iter().enumerate() { - // Ready the next - if n == i + 1 { - nxt = Some(k.clone()); - } + for (k, v) in res.into_iter() { // Delete out.push((k, v)); - // Count - num -= 1; } } Ok(out) @@ -859,42 +941,24 @@ impl Transaction { { let beg: Key = rng.start.into(); let end: Key = rng.end.into(); - let mut nxt: Option = None; - let mut num = limit; // Start processing - while num > 0 { + let mut next_page = Some(ScanPage { + range: beg..end, + limit: Limit::Limited(limit), + }); + while let Some(page) = next_page { // Get records batch - let res = match nxt { - None => { - let min = beg.clone(); - let max = end.clone(); - let num = std::cmp::min(1000, num); - self.scan(min..max, num).await? - } - Some(ref mut beg) => { - beg.push(0x00); - let min = beg.clone(); - let max = end.clone(); - let num = std::cmp::min(1000, num); - self.scan(min..max, num).await? - } - }; - // Get total results - let n = res.len(); + let res = self.scan_paged(page, limit).await?; + next_page = res.next_page; + let res = res.values; // Exit when settled - if n == 0 { + if res.is_empty() { break; } // Loop over results - for (i, (k, _)) in res.into_iter().enumerate() { - // Ready the next - if n == i + 1 { - nxt = Some(k.clone()); - } + for (k, _) in res.into_iter() { // Delete self.del(k).await?; - // Count - num -= 1; } } Ok(()) @@ -910,43 +974,25 @@ impl Transaction { trace!("Getp {:?} (limit: {limit})", key); let beg: Key = key.into(); let end: Key = beg.clone().add(0xff); - let mut nxt: Option = None; - let mut num = limit; let mut out: Vec<(Key, Val)> = vec![]; // Start processing - while num > 0 { + let mut next_page = Some(ScanPage { + range: beg..end, + limit: Limit::Limited(limit), + }); + while let Some(page) = next_page { + let res = self.scan_paged(page, 1000).await?; + next_page = res.next_page; // Get records batch - let res = match nxt { - None => { - let min = beg.clone(); - let max = end.clone(); - let num = std::cmp::min(1000, num); - self.scan(min..max, num).await? - } - Some(ref mut beg) => { - beg.push(0); - let min = beg.clone(); - let max = end.clone(); - let num = std::cmp::min(1000, num); - self.scan(min..max, num).await? - } - }; - // Get total results - let n = res.len(); + let res = res.values; // Exit when settled - if n == 0 { + if res.is_empty() { break; - } + }; // Loop over results - for (i, (k, v)) in res.into_iter().enumerate() { - // Ready the next - if n == i + 1 { - nxt = Some(k.clone()); - } + for (k, v) in res.into_iter() { // Delete out.push((k, v)); - // Count - num -= 1; } } Ok(out) @@ -1067,108 +1113,38 @@ impl Transaction { pub async fn scan_hb( &mut self, time_to: &Timestamp, - limit: u32, + batch_size: u32, ) -> Result, Error> { let beg = crate::key::root::hb::Hb::prefix(); let end = crate::key::root::hb::Hb::suffix(time_to); - trace!("Scan start: {} ({:?})", String::from_utf8_lossy(&beg).to_string(), &beg); - trace!("Scan end: {} ({:?})", String::from_utf8_lossy(&end).to_string(), &end); - let mut nxt: Option = None; - let mut num = limit; let mut out: Vec = vec![]; // Start processing - while limit == NO_LIMIT || num > 0 { - let batch_size = match num { - 0 => 1000, - _ => std::cmp::min(1000, num), - }; - // Get records batch - let res = match nxt { - None => { - let min = beg.clone(); - let max = end.clone(); - self.scan(min..max, batch_size).await? - } - Some(ref mut beg) => { - beg.push(0x00); - let min = beg.clone(); - let max = end.clone(); - self.scan(min..max, batch_size).await? - } - }; - // Get total results - let n = res.len(); - // Exit when settled - if n == 0 { - break; - } - // Loop over results - for (i, (k, _)) in res.into_iter().enumerate() { - // Ready the next - if n == i + 1 { - nxt = Some(k.clone()); - } + let mut next_page = Some(ScanPage::from(beg..end)); + while let Some(page) = next_page { + let res = self.scan_paged(page, batch_size).await?; + next_page = res.next_page; + for (k, _) in res.values.into_iter() { out.push(crate::key::root::hb::Hb::decode(k.as_slice())?); - // Count - if limit > 0 { - num -= 1; - } } } - trace!("scan_hb: {:?}", out); Ok(out) } /// scan_nd will scan all the cluster membership registers /// setting limit to 0 will result in scanning all entries - pub async fn scan_nd(&mut self, limit: u32) -> Result, Error> { + pub async fn scan_nd(&mut self, batch_size: u32) -> Result, Error> { let beg = crate::key::root::nd::Nd::prefix(); let end = crate::key::root::nd::Nd::suffix(); - trace!("Scan start: {} ({:?})", String::from_utf8_lossy(&beg).to_string(), &beg); - trace!("Scan end: {} ({:?})", String::from_utf8_lossy(&end).to_string(), &end); - let mut nxt: Option = None; - let mut num = limit; let mut out: Vec = vec![]; // Start processing - while (limit == NO_LIMIT) || (num > 0) { - let batch_size = match num { - 0 => 1000, - _ => std::cmp::min(1000, num), - }; - // Get records batch - let res = match nxt { - None => { - let min = beg.clone(); - let max = end.clone(); - self.scan(min..max, batch_size).await? - } - Some(ref mut beg) => { - beg.push(0x00); - let min = beg.clone(); - let max = end.clone(); - self.scan(min..max, batch_size).await? - } - }; - // Get total results - let n = res.len(); - // Exit when settled - if n == 0 { - break; - } - // Loop over results - for (i, (k, v)) in res.into_iter().enumerate() { - // Ready the next - if n == i + 1 { - nxt = Some(k.clone()); - } - out.push((&v).into()); - // Count - if limit > 0 { - num -= 1; - } + let mut next_page = Some(ScanPage::from(beg..end)); + while let Some(page) = next_page { + let res = self.scan_paged(page, batch_size).await?; + next_page = res.next_page; + for (_, v) in res.values.into_iter() { + out.push(v.into()); } } - trace!("scan_nd: {:?}", out); Ok(out) } @@ -1191,62 +1167,28 @@ impl Transaction { self.del(key).await } - pub async fn scan_ndlq<'a>(&mut self, node: &Uuid, limit: u32) -> Result, Error> { + pub async fn scan_ndlq<'a>( + &mut self, + node: &Uuid, + batch_size: u32, + ) -> Result, Error> { let beg = crate::key::node::lq::prefix_nd(node); let end = crate::key::node::lq::suffix_nd(node); - trace!( - "Scanning range from pref={}, suff={}", - crate::key::debug::sprint_key(&beg), - crate::key::debug::sprint_key(&end), - ); - let mut nxt: Option = None; - let mut num = limit; let mut out: Vec = vec![]; - while limit == NO_LIMIT || num > 0 { - let batch_size = match num { - 0 => 1000, - _ => std::cmp::min(1000, num), - }; - // Get records batch - let res = match nxt { - None => { - let min = beg.clone(); - let max = end.clone(); - self.scan(min..max, batch_size).await? - } - Some(ref mut beg) => { - beg.push(0x00); - let min = beg.clone(); - let max = end.clone(); - self.scan(min..max, batch_size).await? - } - }; - // Get total results - let n = res.len(); - // Exit when settled - if n == 0 { - break; - } - // Loop over results - for (i, (key, value)) in res.into_iter().enumerate() { - // Ready the next - if n == i + 1 { - nxt = Some(key.clone()); - } - let lq = crate::key::node::lq::Lq::decode(key.as_slice())?; + let mut next_page = Some(ScanPage::from(beg..end)); + while let Some(page) = next_page { + let res = self.scan_paged(page, batch_size).await?; + next_page = res.next_page; + for (key, value) in res.values.into_iter() { + let lv = crate::key::node::lq::Lq::decode(key.as_slice())?; let tb: String = String::from_utf8(value).unwrap(); - trace!("scan_lq Found tb: {:?}", tb); out.push(LqValue { - nd: lq.nd.into(), - ns: lq.ns.to_string(), - db: lq.db.to_string(), + nd: lv.nd.into(), + ns: lv.ns.to_string(), + db: lv.db.to_string(), tb, - lq: lq.lq.into(), + lq: lv.lq.into(), }); - // Count - if limit != NO_LIMIT { - num -= 1; - } } } Ok(out) @@ -1257,49 +1199,16 @@ impl Transaction { ns: &str, db: &str, tb: &str, - limit: u32, + batch_size: u32, ) -> Result, Error> { let beg = crate::key::table::lq::prefix(ns, db, tb); let end = crate::key::table::lq::suffix(ns, db, tb); - trace!( - "Scanning range from pref={}, suff={}", - crate::key::debug::sprint_key(&beg), - crate::key::debug::sprint_key(&end), - ); - let mut nxt: Option = None; - let mut num = limit; let mut out: Vec = vec![]; - while limit == NO_LIMIT || num > 0 { - let batch_size = match num { - 0 => 1000, - _ => std::cmp::min(1000, num), - }; - // Get records batch - let res = match nxt { - None => { - let min = beg.clone(); - let max = end.clone(); - self.scan(min..max, batch_size).await? - } - Some(ref mut beg) => { - beg.push(0x00); - let min = beg.clone(); - let max = end.clone(); - self.scan(min..max, batch_size).await? - } - }; - // Get total results - let n = res.len(); - // Exit when settled - if n == 0 { - break; - } - // Loop over results - for (i, (key, value)) in res.into_iter().enumerate() { - // Ready the next - if n == i + 1 { - nxt = Some(key.clone()); - } + let mut next_page = Some(ScanPage::from(beg..end)); + while let Some(page) = next_page { + let res = self.scan_paged(page, batch_size).await?; + next_page = res.next_page; + for (key, value) in res.values.into_iter() { let lv = crate::key::table::lq::Lq::decode(key.as_slice())?; let val: LiveStatement = value.into(); out.push(LqValue { @@ -1309,10 +1218,6 @@ impl Transaction { tb: lv.tb.to_string(), lq: val.id, }); - // Count - if limit != NO_LIMIT { - num -= 1; - } } } Ok(out) @@ -1329,6 +1234,7 @@ impl Transaction { ) -> Result<(), Error> { let key = crate::key::table::lq::new(ns, db, tb, live_stm.id.0); let key_enc = crate::key::table::lq::Lq::encode(&key)?; + #[cfg(debug_assertions)] trace!("putc_tblq ({:?}): key={:?}", &live_stm.id, crate::key::debug::sprint_key(&key_enc)); self.putc(key_enc, live_stm, expected).await } @@ -2528,55 +2434,35 @@ impl Transaction { // Fetch records let beg = crate::key::thing::prefix(ns, db, &tb.name); let end = crate::key::thing::suffix(ns, db, &tb.name); - let mut nxt: Option> = None; - loop { - let res = match nxt { - None => { - let min = beg.clone(); - let max = end.clone(); - self.scan(min..max, 1000).await? - } - Some(ref mut beg) => { - beg.push(0x00); - let min = beg.clone(); - let max = end.clone(); - self.scan(min..max, 1000).await? - } - }; - if !res.is_empty() { - // Get total results - let n = res.len(); - // Exit when settled - if n == 0 { - break; - } - // Loop over results - for (i, (k, v)) in res.into_iter().enumerate() { - // Ready the next - if n == i + 1 { - nxt = Some(k.clone()); - } - // Parse the key and the value - let k: crate::key::thing::Thing = (&k).into(); - let v: Value = (&v).into(); - let t = Thing::from((k.tb, k.id)); - // Check if this is a graph edge - match (v.pick(&*EDGE), v.pick(&*IN), v.pick(&*OUT)) { - // This is a graph edge record - (Value::Bool(true), Value::Thing(l), Value::Thing(r)) => { - let sql = format!("RELATE {l} -> {t} -> {r} CONTENT {v};",); - chn.send(bytes!(sql)).await?; - } - // This is a normal record - _ => { - let sql = format!("UPDATE {t} CONTENT {v};"); - chn.send(bytes!(sql)).await?; - } - } - } - continue; + let mut nxt: Option>> = Some(ScanPage::from(beg..end)); + while nxt.is_some() { + let res = self.scan_paged(nxt.unwrap(), 1000).await?; + nxt = res.next_page; + let res = res.values; + if res.is_empty() { + break; } - break; + // Loop over results + for (k, v) in res.into_iter() { + // Parse the key and the value + let k: crate::key::thing::Thing = (&k).into(); + let v: Value = (&v).into(); + let t = Thing::from((k.tb, k.id)); + // Check if this is a graph edge + match (v.pick(&*EDGE), v.pick(&*IN), v.pick(&*OUT)) { + // This is a graph edge record + (Value::Bool(true), Value::Thing(l), Value::Thing(r)) => { + let sql = format!("RELATE {l} -> {t} -> {r} CONTENT {v};",); + chn.send(bytes!(sql)).await?; + } + // This is a normal record + _ => { + let sql = format!("UPDATE {t} CONTENT {v};"); + chn.send(bytes!(sql)).await?; + } + } + } + continue; } chn.send(bytes!("")).await?; }