Add pagination to scan (#2951)

This commit is contained in:
Przemyslaw Hugh Kaznowski 2024-01-09 17:47:23 +00:00 committed by GitHub
parent ccb4813886
commit 96249c2a34
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
16 changed files with 449 additions and 502 deletions

View file

@ -30,7 +30,7 @@ args = ["test", "--workspace", "--no-fail-fast"]
[tasks.cargo-check]
category = "LOCAL USAGE"
command = "cargo"
args = ["check", "--workspace"]
args = ["check", "--workspace", "--features", "${DEV_FEATURES}"]
[tasks.cargo-fmt]
category = "LOCAL USAGE"

View file

@ -1,7 +1,7 @@
use crate::cf::{ChangeSet, DatabaseMutation, TableMutations};
use crate::err::Error;
use crate::key::change;
use crate::kvs::Transaction;
use crate::kvs::{Limit, ScanPage, Transaction};
use crate::sql::statements::show::ShowSince;
use crate::vs;
@ -40,14 +40,22 @@ pub async fn read(
let limit = limit.unwrap_or(100);
let _x = tx.scan(beg..end, limit).await?;
let scan = tx
.scan_paged(
ScanPage {
range: beg..end,
limit: Limit::Limited(limit),
},
limit,
)
.await?;
let mut vs: Option<[u8; 10]> = None;
let mut buf: Vec<TableMutations> = Vec::new();
let mut r = Vec::<ChangeSet>::new();
// iterate over _x and put decoded elements to r
for (k, v) in _x {
for (k, v) in scan.values {
trace!("read change feed; {k:?}");
let dec = crate::key::change::Cf::decode(&k).unwrap();

View file

@ -8,6 +8,7 @@ use crate::err::Error;
use crate::idx::planner::executor::IteratorRef;
use crate::idx::planner::IterationStage;
use crate::key::{graph, thing};
use crate::kvs::ScanPage;
use crate::sql::dir::Dir;
use crate::sql::{Edges, Range, Table, Thing, Value};
#[cfg(not(target_arch = "wasm32"))]
@ -308,60 +309,43 @@ impl<'a> Processor<'a> {
// Prepare the start and end keys
let beg = thing::prefix(opt.ns(), opt.db(), &v);
let end = thing::suffix(opt.ns(), opt.db(), &v);
// Prepare the next holder key
let mut nxt: Option<Vec<u8>> = None;
// Loop until no more keys
loop {
let mut next_page = Some(ScanPage::from(beg..end));
while let Some(page) = next_page {
// Check if the context is finished
if ctx.is_done() {
break;
}
// Get the next batch of key-value entries
let res = match nxt {
None => {
let min = beg.clone();
let max = end.clone();
txn.clone().lock().await.scan(min..max, PROCESSOR_BATCH_SIZE).await?
}
Some(ref mut beg) => {
beg.push(0x00);
let min = beg.clone();
let max = end.clone();
txn.clone().lock().await.scan(min..max, PROCESSOR_BATCH_SIZE).await?
}
};
// If there are key-value entries then fetch them
if !res.is_empty() {
// Get total results
let n = res.len();
// Loop over results
for (i, (k, v)) in res.into_iter().enumerate() {
// Check the context
if ctx.is_done() {
break;
}
// Ready the next
if n == i + 1 {
nxt = Some(k.clone());
}
// Parse the data from the store
let key: thing::Thing = (&k).into();
let val: Value = (&v).into();
let rid = Thing::from((key.tb, key.id));
// Create a new operable value
let val = Operable::Value(val);
// Process the record
let pro = Processed {
ir: None,
rid: Some(rid),
doc_id: None,
val,
};
self.process(ctx, opt, txn, stm, pro).await?;
}
continue;
let res = txn.clone().lock().await.scan_paged(page, PROCESSOR_BATCH_SIZE).await?;
next_page = res.next_page;
let res = res.values;
// If no results then break
if res.is_empty() {
break;
}
break;
// Loop over results
for (k, v) in res.into_iter() {
// Check the context
if ctx.is_done() {
break;
}
// Parse the data from the store
let key: thing::Thing = (&k).into();
let val: Value = (&v).into();
let rid = Thing::from((key.tb, key.id));
// Create a new operable value
let val = Operable::Value(val);
// Process the record
let pro = Processed {
ir: None,
rid: Some(rid),
doc_id: None,
val,
};
self.process(ctx, opt, txn, stm, pro).await?;
}
continue;
}
// Everything ok
Ok(())
@ -397,60 +381,43 @@ impl<'a> Processor<'a> {
key
}
};
// Prepare the next holder key
let mut nxt: Option<Vec<u8>> = None;
// Loop until no more keys
loop {
let mut next_page = Some(ScanPage::from(beg..end));
while let Some(page) = next_page {
// Check if the context is finished
if ctx.is_done() {
break;
}
let res = txn.clone().lock().await.scan_paged(page, PROCESSOR_BATCH_SIZE).await?;
next_page = res.next_page;
// Get the next batch of key-value entries
let res = match nxt {
None => {
let min = beg.clone();
let max = end.clone();
txn.clone().lock().await.scan(min..max, PROCESSOR_BATCH_SIZE).await?
}
Some(ref mut beg) => {
beg.push(0x00);
let min = beg.clone();
let max = end.clone();
txn.clone().lock().await.scan(min..max, PROCESSOR_BATCH_SIZE).await?
}
};
let res = res.values;
// If there are key-value entries then fetch them
if !res.is_empty() {
// Get total results
let n = res.len();
// Loop over results
for (i, (k, v)) in res.into_iter().enumerate() {
// Check the context
if ctx.is_done() {
break;
}
// Ready the next
if n == i + 1 {
nxt = Some(k.clone());
}
// Parse the data from the store
let key: thing::Thing = (&k).into();
let val: Value = (&v).into();
let rid = Thing::from((key.tb, key.id));
// Create a new operable value
let val = Operable::Value(val);
// Process the record
let pro = Processed {
ir: None,
rid: Some(rid),
doc_id: None,
val,
};
self.process(ctx, opt, txn, stm, pro).await?;
}
continue;
if res.is_empty() {
break;
}
break;
// Loop over results
for (k, v) in res.into_iter() {
// Check the context
if ctx.is_done() {
break;
}
// Parse the data from the store
let key: thing::Thing = (&k).into();
let val: Value = (&v).into();
let rid = Thing::from((key.tb, key.id));
// Create a new operable value
let val = Operable::Value(val);
// Process the record
let pro = Processed {
ir: None,
rid: Some(rid),
doc_id: None,
val,
};
self.process(ctx, opt, txn, stm, pro).await?;
}
continue;
}
// Everything ok
Ok(())
@ -534,69 +501,48 @@ impl<'a> Processor<'a> {
};
//
for (beg, end) in keys.iter() {
// Prepare the next holder key
let mut nxt: Option<Vec<u8>> = None;
// Loop until no more keys
loop {
let mut next_page = Some(ScanPage::from(beg.clone()..end.clone()));
while let Some(page) = next_page {
// Check if the context is finished
if ctx.is_done() {
break;
}
// Get the next batch key-value entries
let res = match nxt {
None => {
let min = beg.clone();
let max = end.clone();
txn.lock().await.scan(min..max, PROCESSOR_BATCH_SIZE).await?
}
Some(ref mut beg) => {
beg.push(0x00);
let min = beg.clone();
let max = end.clone();
txn.lock().await.scan(min..max, PROCESSOR_BATCH_SIZE).await?
}
};
let res = txn.lock().await.scan_paged(page, PROCESSOR_BATCH_SIZE).await?;
next_page = res.next_page;
let res = res.values;
// If there are key-value entries then fetch them
if !res.is_empty() {
// Get total results
let n = res.len();
// Exit when settled
if n == 0 {
if res.is_empty() {
break;
}
// Loop over results
for (k, _) in res.into_iter() {
// Check the context
if ctx.is_done() {
break;
}
// Loop over results
for (i, (k, _)) in res.into_iter().enumerate() {
// Check the context
if ctx.is_done() {
break;
}
// Ready the next
if n == i + 1 {
nxt = Some(k.clone());
}
// Parse the data from the store
let gra: graph::Graph = (&k).into();
// Fetch the data from the store
let key = thing::new(opt.ns(), opt.db(), gra.ft, &gra.fk);
let val = txn.lock().await.get(key).await?;
let rid = Thing::from((gra.ft, gra.fk));
// Parse the data from the store
let val = Operable::Value(match val {
Some(v) => Value::from(v),
None => Value::None,
});
// Process the record
let pro = Processed {
ir: None,
rid: Some(rid),
doc_id: None,
val,
};
self.process(ctx, opt, txn, stm, pro).await?;
}
continue;
// Parse the data from the store
let gra: graph::Graph = graph::Graph::decode(&k)?;
// Fetch the data from the store
let key = thing::new(opt.ns(), opt.db(), gra.ft, &gra.fk);
let val = txn.lock().await.get(key).await?;
let rid = Thing::from((gra.ft, gra.fk));
// Parse the data from the store
let val = Operable::Value(match val {
Some(v) => Value::from(v),
None => Value::None,
});
// Process the record
let pro = Processed {
ir: None,
rid: Some(rid),
doc_id: None,
val,
};
self.process(ctx, opt, txn, stm, pro).await?;
}
break;
continue;
}
}
// Everything ok

View file

@ -766,6 +766,10 @@ pub enum Error {
/// The key being inserted in the transaction already exists
#[error("The key being inserted already exists: {0}")]
TxKeyAlreadyExistsCategory(KeyCategory),
/// The db is running without an available storage engine
#[error("The db is running without an available storage engine")]
MissingStorageEngine,
}
impl From<Error> for String {

View file

@ -5,7 +5,7 @@ use crate::idx::ft::termdocs::TermsDocs;
use crate::idx::ft::{FtIndex, HitsIterator};
use crate::idx::planner::plan::RangeValue;
use crate::key::index::Index;
use crate::kvs::Key;
use crate::kvs::{Key, Limit, ScanPage};
use crate::sql::statements::DefineIndexStatement;
use crate::sql::{Array, Thing, Value};
use std::collections::VecDeque;
@ -64,7 +64,18 @@ impl IndexEqualThingIterator {
) -> Result<Vec<(Thing, Option<DocId>)>, Error> {
let min = beg.clone();
let max = end.to_owned();
let res = txn.lock().await.scan(min..max, limit).await?;
let res = txn
.lock()
.await
.scan_paged(
ScanPage {
range: min..max,
limit: Limit::Limited(limit),
},
limit,
)
.await?;
let res = res.values;
if let Some((key, _)) = res.last() {
let mut key = key.clone();
key.push(0x00);
@ -176,7 +187,18 @@ impl IndexRangeThingIterator {
) -> Result<Vec<(Thing, Option<DocId>)>, Error> {
let min = self.r.beg.clone();
let max = self.r.end.clone();
let res = txn.lock().await.scan(min..max, limit).await?;
let res = txn
.lock()
.await
.scan_paged(
ScanPage {
range: min..max,
limit: Limit::Limited(limit),
},
limit,
)
.await?;
let res = res.values;
if let Some((key, _)) = res.last() {
self.r.beg = key.clone();
self.r.beg.push(0x00);
@ -314,7 +336,16 @@ impl UniqueRangeThingIterator {
let max = self.r.end.clone();
limit += 1;
let mut tx = txn.lock().await;
let res = tx.scan(min..max, limit).await?;
let res = tx
.scan_paged(
ScanPage {
range: min..max,
limit: Limit::Limited(limit),
},
limit,
)
.await?;
let res = res.values;
let mut r = Vec::with_capacity(res.len());
for (k, v) in res {
limit -= 1;

View file

@ -936,7 +936,7 @@ mod tests {
};
use crate::idx::trees::store::{NodeId, TreeNode, TreeNodeProvider};
use crate::idx::VersionedSerdeState;
use crate::kvs::{Datastore, Key, LockType::*, Transaction, TransactionType};
use crate::kvs::{Datastore, Key, LockType::*, ScanPage, Transaction, TransactionType};
use rand::prelude::SliceRandom;
use rand::thread_rng;
use std::cmp::Ordering;
@ -1370,7 +1370,10 @@ mod tests {
assert_eq!(s.max_depth, 3);
assert_eq!(s.nodes_count, 10);
// There should be one record per node
assert_eq!(10, tx.scan(vec![]..vec![0xf], 100).await.unwrap().len());
assert_eq!(
10,
tx.scan_paged(ScanPage::from(vec![]..vec![0xf]), 100).await.unwrap().values.len()
);
let nodes_count = t
.inspect_nodes(&mut tx, &mut st, |count, depth, node_id, node| match count {
@ -1510,7 +1513,10 @@ mod tests {
assert_eq!(s.max_depth, 2);
assert_eq!(s.nodes_count, 7);
// There should be one record per node
assert_eq!(7, tx.scan(vec![]..vec![0xf], 100).await.unwrap().len());
assert_eq!(
7,
tx.scan_paged(ScanPage::from(vec![]..vec![0xf]), 100).await.unwrap().values.len()
);
let nodes_count = t
.inspect_nodes(&mut tx, &mut st, |count, depth, node_id, node| match count {
@ -1635,7 +1641,7 @@ mod tests {
assert_eq!(s.max_depth, 0);
assert_eq!(s.nodes_count, 0);
// There should not be any record in the database
assert_eq!(0, tx.scan(vec![]..vec![0xf], 100).await?.len());
assert_eq!(0, tx.scan_paged(ScanPage::from(vec![]..vec![0xf]), 100).await?.values.len());
tx.cancel().await?;
Ok(())
}

View file

@ -12,7 +12,7 @@ use crate::key::root::hb::Hb;
use crate::kvs::clock::SizedClock;
#[allow(unused_imports)]
use crate::kvs::clock::SystemClock;
use crate::kvs::{LockType, LockType::*, TransactionType, TransactionType::*, NO_LIMIT};
use crate::kvs::{LockType, LockType::*, TransactionType, TransactionType::*};
use crate::opt::auth::Root;
#[cfg(feature = "jwks")]
use crate::opt::capabilities::NetTarget;
@ -38,6 +38,9 @@ use wasmtimer::std::{SystemTime, UNIX_EPOCH};
const HEARTBEAT_BATCH_SIZE: u32 = 1000;
const LQ_CHANNEL_SIZE: usize = 100;
// The batch size used for non-paged operations (i.e. if there are more results, they are ignored)
const NON_PAGED_BATCH_SIZE: u32 = 100_000;
/// Used for cluster logic to move LQ data to LQ cleanup code
/// Not a stored struct; Used only in this module
#[derive(Debug, Clone, Eq, PartialEq)]
@ -614,7 +617,7 @@ impl Datastore {
for nd in nodes.iter() {
trace!("Archiving node {}", &nd);
// Scan on node prefix for LQ space
let node_lqs = tx.scan_ndlq(nd, NO_LIMIT).await?;
let node_lqs = tx.scan_ndlq(nd, NON_PAGED_BATCH_SIZE).await?;
trace!("Found {} LQ entries for {:?}", node_lqs.len(), nd);
for lq in node_lqs {
trace!("Archiving query {:?}", &lq);
@ -654,7 +657,7 @@ impl Datastore {
pub async fn clear_unreachable_state(&self, tx: &mut Transaction) -> Result<(), Error> {
// Scan nodes
let cluster = tx.scan_nd(NO_LIMIT).await?;
let cluster = tx.scan_nd(NON_PAGED_BATCH_SIZE).await?;
trace!("Found {} nodes", cluster.len());
let mut unreachable_nodes = BTreeMap::new();
for cl in &cluster {
@ -665,7 +668,7 @@ impl Datastore {
// We remove one, because the scan range adds one
value: u64::MAX - 1,
};
let hbs = tx.scan_hb(&end_of_time, NO_LIMIT).await?;
let hbs = tx.scan_hb(&end_of_time, NON_PAGED_BATCH_SIZE).await?;
trace!("Found {} heartbeats", hbs.len());
for hb in hbs {
match unreachable_nodes.remove(&hb.nd.to_string()) {
@ -691,7 +694,7 @@ impl Datastore {
for cl in &cluster {
let nds = tx.scan_ndlq(&uuid::Uuid::parse_str(&cl.name).map_err(|e| {
Error::Unimplemented(format!("cluster id was not uuid when parsing to aggregate cluster live queries: {:?}", e))
})?, NO_LIMIT).await?;
})?, NON_PAGED_BATCH_SIZE).await?;
nd_lq_set.extend(nds.into_iter().map(LqType::Nd));
}
trace!("Found {} node live queries", nd_lq_set.len());
@ -700,7 +703,7 @@ impl Datastore {
let mut tb_lq_set: BTreeSet<LqType> = BTreeSet::new();
for ndlq in &nd_lq_set {
let lq = ndlq.get_inner();
let tbs = tx.scan_tblq(&lq.ns, &lq.db, &lq.tb, NO_LIMIT).await?;
let tbs = tx.scan_tblq(&lq.ns, &lq.db, &lq.tb, NON_PAGED_BATCH_SIZE).await?;
tb_lq_set.extend(tbs.into_iter().map(LqType::Tb));
}
trace!("Found {} table live queries", tb_lq_set.len());
@ -732,7 +735,7 @@ impl Datastore {
// Find all the LQs we own, so that we can get the ns/ds from provided uuids
// We may improve this in future by tracking in web layer
let lqs = tx.scan_ndlq(&self.id, NO_LIMIT).await?;
let lqs = tx.scan_ndlq(&self.id, NON_PAGED_BATCH_SIZE).await?;
let mut hits = vec![];
for lq_value in lqs {
if live_queries.contains(&lq_value.lq) {
@ -793,7 +796,7 @@ impl Datastore {
) -> Result<Vec<Hb>, Error> {
let dead = tx.scan_hb(ts, HEARTBEAT_BATCH_SIZE).await?;
// Delete the heartbeat and everything nested
tx.delr_hb(dead.clone(), NO_LIMIT).await?;
tx.delr_hb(dead.clone(), NON_PAGED_BATCH_SIZE).await?;
for dead_node in dead.clone() {
tx.del_nd(dead_node.nd).await?;
}

View file

@ -445,7 +445,7 @@ impl Transaction {
end: rng.end.into(),
};
// Create result set
let mut res = vec![];
let mut res: Vec<(Key, Val)> = vec![];
// Set the key range
let beg = rng.start.as_slice();
let end = rng.end.as_slice();

View file

@ -18,14 +18,14 @@ async fn write_scan_hb() {
tx.set_hb(t2, Uuid::parse_str("b80ff454-c3e7-46a9-a0b0-7b40e9a62626").unwrap()).await.unwrap();
tx.commit().await.unwrap();
// Scan limit 1000
// Scan in batches of 1
let mut tx = test.db.transaction(Write, Optimistic).await.unwrap();
let vals_lim = tx.scan_hb(&t3, 1000).await.unwrap();
let vals_lim = tx.scan_hb(&t3, 1).await.unwrap();
tx.cancel().await.unwrap();
// Scan limit 0
// Scan in batches of 100k
let mut tx = test.db.transaction(Write, Optimistic).await.unwrap();
let vals_no_lim = tx.scan_hb(&t3, NO_LIMIT).await.unwrap();
let vals_no_lim = tx.scan_hb(&t3, 100_000).await.unwrap();
tx.cancel().await.unwrap();
// Assert equal

View file

@ -37,7 +37,8 @@ pub(crate) async fn init(
/// Scan the entire storage layer displaying keys
/// Useful to debug scans ;)
async fn _debug_scan(tx: &mut Transaction, message: &str) {
let r = tx.scan(vec![0]..vec![u8::MAX], 100000).await.unwrap();
let r = tx.scan_paged(ScanPage::from(vec![0]..vec![u8::MAX]), u32::MAX).await.unwrap();
let r = r.values;
println!("START OF RANGE SCAN - {}", message);
for (k, _v) in r.iter() {
println!("{}", crate::key::debug::sprint_key(k));

View file

@ -11,18 +11,18 @@ async fn write_scan_nd() {
tx.set_nd(Uuid::parse_str("cbefc4fe-8ba0-4898-ab69-782e3ebc06f9").unwrap()).await.unwrap();
tx.commit().await.unwrap();
// Scan limit 1000
// Scan in batches of 1
let mut tx = test.db.transaction(Write, Optimistic).await.unwrap();
let vals_lim = tx.scan_nd(1000).await.unwrap();
let res_many_batches = tx.scan_nd(1).await.unwrap();
tx.cancel().await.unwrap();
// Scan limit 0
// Scan in batches of 100k
let mut tx = test.db.transaction(Write, Optimistic).await.unwrap();
let vals_no_lim = tx.scan_nd(NO_LIMIT).await.unwrap();
let res_single_batch = tx.scan_nd(100_000).await.unwrap();
tx.cancel().await.unwrap();
// Assert equal
assert_eq!(vals_lim, vals_no_lim);
assert_eq!(vals_lim.len(), 2);
assert_eq!(vals_no_lim.len(), 2);
assert_eq!(res_many_batches, res_single_batch);
assert_eq!(res_many_batches.len(), 2);
assert_eq!(res_single_batch.len(), 2);
}

View file

@ -1,4 +1,4 @@
use crate::kvs::{LqValue, NO_LIMIT};
use crate::kvs::LqValue;
#[tokio::test]
#[serial]
@ -19,11 +19,11 @@ async fn write_scan_ndlq() {
// Verify scan
let mut tx = test.db.transaction(Write, Optimistic).await.unwrap();
let res_lim = tx.scan_ndlq(&nd, 100).await.unwrap();
let res_no_lim = tx.scan_ndlq(&nd, NO_LIMIT).await.unwrap();
let res_many_batches = tx.scan_ndlq(&nd, 1).await.unwrap();
let res_single_batch = tx.scan_ndlq(&nd, 100_000).await.unwrap();
tx.commit().await.unwrap();
assert_eq!(
res_lim,
res_many_batches,
vec![LqValue {
nd: sql::Uuid::from(nd),
ns: ns.to_string(),
@ -32,5 +32,5 @@ async fn write_scan_ndlq() {
lq
}]
);
assert_eq!(res_lim, res_no_lim);
assert_eq!(res_many_batches, res_single_batch);
}

View file

@ -240,3 +240,58 @@ async fn scan() {
assert_eq!(val[1].1, b"2");
tx.cancel().await.unwrap();
}
#[tokio::test]
#[serial]
async fn scan_paged() {
// Create a new datastore
let node_id = Uuid::parse_str("6572a13c-a7a0-4e19-be62-18acb4e854f5").unwrap();
let clock = Arc::new(RwLock::new(SizedClock::Fake(FakeClock::new(Timestamp::default()))));
let (ds, _) = new_ds(node_id, clock).await;
// Create a writeable transaction
let mut tx = ds.transaction(Write, Optimistic).await.unwrap();
assert!(tx.put(Unknown, "test1", "1").await.is_ok());
assert!(tx.put(Unknown, "test2", "2").await.is_ok());
assert!(tx.put(Unknown, "test3", "3").await.is_ok());
assert!(tx.put(Unknown, "test4", "4").await.is_ok());
assert!(tx.put(Unknown, "test5", "5").await.is_ok());
tx.commit().await.unwrap();
// Create a readonly transaction
let mut tx = ds.transaction(Read, Optimistic).await.unwrap();
let val =
tx.scan_paged(ScanPage::from("test1".into().."test9".into()), u32::MAX).await.unwrap();
let val = val.values;
assert_eq!(val.len(), 5);
assert_eq!(val[0].0, b"test1");
assert_eq!(val[0].1, b"1");
assert_eq!(val[1].0, b"test2");
assert_eq!(val[1].1, b"2");
assert_eq!(val[2].0, b"test3");
assert_eq!(val[2].1, b"3");
assert_eq!(val[3].0, b"test4");
assert_eq!(val[3].1, b"4");
assert_eq!(val[4].0, b"test5");
assert_eq!(val[4].1, b"5");
tx.cancel().await.unwrap();
// Create a readonly transaction
let mut tx = ds.transaction(Read, Optimistic).await.unwrap();
let val =
tx.scan_paged(ScanPage::from("test2".into().."test4".into()), u32::MAX).await.unwrap();
let val = val.values;
assert_eq!(val.len(), 2);
assert_eq!(val[0].0, b"test2");
assert_eq!(val[0].1, b"2");
assert_eq!(val[1].0, b"test3");
assert_eq!(val[1].1, b"3");
tx.cancel().await.unwrap();
// Create a readonly transaction
let mut tx = ds.transaction(Read, Optimistic).await.unwrap();
let val = tx.scan_paged(ScanPage::from("test1".into().."test9".into()), 2).await.unwrap();
let val = val.values;
assert_eq!(val.len(), 2);
assert_eq!(val[0].0, b"test1");
assert_eq!(val[0].1, b"1");
assert_eq!(val[1].0, b"test2");
assert_eq!(val[1].1, b"2");
tx.cancel().await.unwrap();
}

View file

@ -1,5 +1,6 @@
use crate::key::database::tb;
use crate::key::database::tb::Tb;
use crate::kvs::ScanPage;
use crate::sql::statements::DefineTableStatement;
#[tokio::test]
@ -29,10 +30,16 @@ async fn table_definitions_can_be_scanned() {
tx.set(&key, &value).await.unwrap();
// Validate with scan
match tx.scan(tb::prefix(namespace, database)..tb::suffix(namespace, database), 1000).await {
match tx
.scan_paged(
ScanPage::from(tb::prefix(namespace, database)..tb::suffix(namespace, database)),
1000,
)
.await
{
Ok(scan) => {
assert_eq!(scan.len(), 1);
let read = DefineTableStatement::from(&scan[0].1);
assert_eq!(scan.values.len(), 1);
let read = DefineTableStatement::from(&scan.values[0].1);
assert_eq!(&read, &value);
}
Err(e) => panic!("{:?}", e),

View file

@ -35,11 +35,11 @@ async fn write_scan_tblq() {
// Verify scan
let mut tx = test.db.transaction(Write, Optimistic).await.unwrap();
let res = tx.scan_tblq(ns, db, tb, 100).await.unwrap();
let no_limit = tx.scan_tblq(ns, db, tb, NO_LIMIT).await.unwrap();
let res_many_batches = tx.scan_tblq(ns, db, tb, 1).await.unwrap();
let res_single_batch = tx.scan_tblq(ns, db, tb, 100_000).await.unwrap();
tx.commit().await.unwrap();
assert_eq!(
res,
res_many_batches,
vec![LqValue {
nd: sql::Uuid::from(node_id),
ns: ns.to_string(),
@ -48,6 +48,6 @@ async fn write_scan_tblq() {
lq: live_id
}]
);
assert_eq!(res, no_limit);
assert_eq!(res_many_batches, res_single_batch);
}
}

View file

@ -48,7 +48,36 @@ use std::sync::Arc;
use tokio::sync::RwLock;
use uuid::Uuid;
pub(crate) const NO_LIMIT: u32 = 0;
#[derive(Copy, Clone, Debug)]
pub enum Limit {
Unlimited,
Limited(u32),
}
pub struct ScanPage<K>
where
K: Into<Key> + Debug,
{
pub range: Range<K>,
pub limit: Limit,
}
impl From<Range<Vec<u8>>> for ScanPage<Vec<u8>> {
fn from(value: Range<Vec<u8>>) -> Self {
ScanPage {
range: value,
limit: Limit::Unlimited,
}
}
}
pub struct ScanResult<K>
where
K: Into<Key> + Debug,
{
pub next_page: Option<ScanPage<K>>,
pub values: Vec<(Key, Val)>,
}
/// A set of undoable updates and requests against a dataset.
#[allow(dead_code)]
@ -595,8 +624,6 @@ impl Transaction {
K: Into<Key> + Debug,
V: Into<Val> + Debug,
{
#[cfg(debug_assertions)]
trace!("Put {:?} => {:?}", key, val);
match self {
#[cfg(feature = "kv-mem")]
Transaction {
@ -679,6 +706,79 @@ impl Transaction {
}
}
/// Retrieve a specific range of keys from the datastore.
///
/// This function fetches the full range of key-value pairs, in a single request to the underlying datastore.
#[allow(unused_variables)]
pub async fn scan_paged<K>(
&mut self,
page: ScanPage<K>,
batch_limit: u32,
) -> Result<ScanResult<K>, Error>
where
K: Into<Key> + From<Vec<u8>> + Debug + Clone,
{
#[cfg(debug_assertions)]
trace!("Scan {:?} - {:?}", page.range.start, page.range.end);
let range = page.range.clone();
let res = match self {
#[cfg(feature = "kv-mem")]
Transaction {
inner: Inner::Mem(v),
..
} => v.scan(range, batch_limit),
#[cfg(feature = "kv-rocksdb")]
Transaction {
inner: Inner::RocksDB(v),
..
} => v.scan(range, batch_limit).await,
#[cfg(feature = "kv-speedb")]
Transaction {
inner: Inner::SpeeDB(v),
..
} => v.scan(range, batch_limit).await,
#[cfg(feature = "kv-indxdb")]
Transaction {
inner: Inner::IndxDB(v),
..
} => v.scan(range, batch_limit).await,
#[cfg(feature = "kv-tikv")]
Transaction {
inner: Inner::TiKV(v),
..
} => v.scan(range, batch_limit).await,
#[cfg(feature = "kv-fdb")]
Transaction {
inner: Inner::FoundationDB(v),
..
} => v.scan(range, batch_limit).await,
#[allow(unreachable_patterns)]
_ => Err(Error::MissingStorageEngine),
};
// Construct next page
res.map(|tup_vec: Vec<(Key, Val)>| {
if tup_vec.len() < batch_limit as usize {
ScanResult {
next_page: None,
values: tup_vec,
}
} else {
let (mut rng, limit) = (page.range, page.limit);
rng.start = match tup_vec.last() {
Some((k, _)) => K::from(k.clone().add(0)),
None => rng.start,
};
ScanResult {
next_page: Some(ScanPage {
range: rng,
limit,
}),
values: tup_vec,
}
}
})
}
/// Update a key in the datastore if the current value matches a condition.
#[allow(unused_variables)]
pub async fn putc<K, V>(&mut self, key: K, val: V, chk: Option<V>) -> Result<(), Error>
@ -784,43 +884,25 @@ impl Transaction {
trace!("Getr {:?}..{:?} (limit: {limit})", rng.start, rng.end);
let beg: Key = rng.start.into();
let end: Key = rng.end.into();
let mut nxt: Option<Key> = None;
let mut num = limit;
let mut out: Vec<(Key, Val)> = vec![];
let mut next_page = Some(ScanPage {
range: beg..end,
limit: Limit::Limited(limit),
});
// Start processing
while num > 0 {
while let Some(page) = next_page {
// Get records batch
let res = match nxt {
None => {
let min = beg.clone();
let max = end.clone();
let num = std::cmp::min(1000, num);
self.scan(min..max, num).await?
}
Some(ref mut beg) => {
beg.push(0x00);
let min = beg.clone();
let max = end.clone();
let num = std::cmp::min(1000, num);
self.scan(min..max, num).await?
}
};
// Get total results
let n = res.len();
let res = self.scan_paged(page, 1000).await?;
next_page = res.next_page;
let res = res.values;
// Exit when settled
if n == 0 {
if res.is_empty() {
break;
}
// Loop over results
for (i, (k, v)) in res.into_iter().enumerate() {
// Ready the next
if n == i + 1 {
nxt = Some(k.clone());
}
for (k, v) in res.into_iter() {
// Delete
out.push((k, v));
// Count
num -= 1;
}
}
Ok(out)
@ -859,42 +941,24 @@ impl Transaction {
{
let beg: Key = rng.start.into();
let end: Key = rng.end.into();
let mut nxt: Option<Key> = None;
let mut num = limit;
// Start processing
while num > 0 {
let mut next_page = Some(ScanPage {
range: beg..end,
limit: Limit::Limited(limit),
});
while let Some(page) = next_page {
// Get records batch
let res = match nxt {
None => {
let min = beg.clone();
let max = end.clone();
let num = std::cmp::min(1000, num);
self.scan(min..max, num).await?
}
Some(ref mut beg) => {
beg.push(0x00);
let min = beg.clone();
let max = end.clone();
let num = std::cmp::min(1000, num);
self.scan(min..max, num).await?
}
};
// Get total results
let n = res.len();
let res = self.scan_paged(page, limit).await?;
next_page = res.next_page;
let res = res.values;
// Exit when settled
if n == 0 {
if res.is_empty() {
break;
}
// Loop over results
for (i, (k, _)) in res.into_iter().enumerate() {
// Ready the next
if n == i + 1 {
nxt = Some(k.clone());
}
for (k, _) in res.into_iter() {
// Delete
self.del(k).await?;
// Count
num -= 1;
}
}
Ok(())
@ -910,43 +974,25 @@ impl Transaction {
trace!("Getp {:?} (limit: {limit})", key);
let beg: Key = key.into();
let end: Key = beg.clone().add(0xff);
let mut nxt: Option<Key> = None;
let mut num = limit;
let mut out: Vec<(Key, Val)> = vec![];
// Start processing
while num > 0 {
let mut next_page = Some(ScanPage {
range: beg..end,
limit: Limit::Limited(limit),
});
while let Some(page) = next_page {
let res = self.scan_paged(page, 1000).await?;
next_page = res.next_page;
// Get records batch
let res = match nxt {
None => {
let min = beg.clone();
let max = end.clone();
let num = std::cmp::min(1000, num);
self.scan(min..max, num).await?
}
Some(ref mut beg) => {
beg.push(0);
let min = beg.clone();
let max = end.clone();
let num = std::cmp::min(1000, num);
self.scan(min..max, num).await?
}
};
// Get total results
let n = res.len();
let res = res.values;
// Exit when settled
if n == 0 {
if res.is_empty() {
break;
}
};
// Loop over results
for (i, (k, v)) in res.into_iter().enumerate() {
// Ready the next
if n == i + 1 {
nxt = Some(k.clone());
}
for (k, v) in res.into_iter() {
// Delete
out.push((k, v));
// Count
num -= 1;
}
}
Ok(out)
@ -1067,108 +1113,38 @@ impl Transaction {
pub async fn scan_hb(
&mut self,
time_to: &Timestamp,
limit: u32,
batch_size: u32,
) -> Result<Vec<crate::key::root::hb::Hb>, Error> {
let beg = crate::key::root::hb::Hb::prefix();
let end = crate::key::root::hb::Hb::suffix(time_to);
trace!("Scan start: {} ({:?})", String::from_utf8_lossy(&beg).to_string(), &beg);
trace!("Scan end: {} ({:?})", String::from_utf8_lossy(&end).to_string(), &end);
let mut nxt: Option<Key> = None;
let mut num = limit;
let mut out: Vec<crate::key::root::hb::Hb> = vec![];
// Start processing
while limit == NO_LIMIT || num > 0 {
let batch_size = match num {
0 => 1000,
_ => std::cmp::min(1000, num),
};
// Get records batch
let res = match nxt {
None => {
let min = beg.clone();
let max = end.clone();
self.scan(min..max, batch_size).await?
}
Some(ref mut beg) => {
beg.push(0x00);
let min = beg.clone();
let max = end.clone();
self.scan(min..max, batch_size).await?
}
};
// Get total results
let n = res.len();
// Exit when settled
if n == 0 {
break;
}
// Loop over results
for (i, (k, _)) in res.into_iter().enumerate() {
// Ready the next
if n == i + 1 {
nxt = Some(k.clone());
}
let mut next_page = Some(ScanPage::from(beg..end));
while let Some(page) = next_page {
let res = self.scan_paged(page, batch_size).await?;
next_page = res.next_page;
for (k, _) in res.values.into_iter() {
out.push(crate::key::root::hb::Hb::decode(k.as_slice())?);
// Count
if limit > 0 {
num -= 1;
}
}
}
trace!("scan_hb: {:?}", out);
Ok(out)
}
/// scan_nd will scan all the cluster membership registers
/// setting limit to 0 will result in scanning all entries
pub async fn scan_nd(&mut self, limit: u32) -> Result<Vec<ClusterMembership>, Error> {
pub async fn scan_nd(&mut self, batch_size: u32) -> Result<Vec<ClusterMembership>, Error> {
let beg = crate::key::root::nd::Nd::prefix();
let end = crate::key::root::nd::Nd::suffix();
trace!("Scan start: {} ({:?})", String::from_utf8_lossy(&beg).to_string(), &beg);
trace!("Scan end: {} ({:?})", String::from_utf8_lossy(&end).to_string(), &end);
let mut nxt: Option<Key> = None;
let mut num = limit;
let mut out: Vec<ClusterMembership> = vec![];
// Start processing
while (limit == NO_LIMIT) || (num > 0) {
let batch_size = match num {
0 => 1000,
_ => std::cmp::min(1000, num),
};
// Get records batch
let res = match nxt {
None => {
let min = beg.clone();
let max = end.clone();
self.scan(min..max, batch_size).await?
}
Some(ref mut beg) => {
beg.push(0x00);
let min = beg.clone();
let max = end.clone();
self.scan(min..max, batch_size).await?
}
};
// Get total results
let n = res.len();
// Exit when settled
if n == 0 {
break;
}
// Loop over results
for (i, (k, v)) in res.into_iter().enumerate() {
// Ready the next
if n == i + 1 {
nxt = Some(k.clone());
}
out.push((&v).into());
// Count
if limit > 0 {
num -= 1;
}
let mut next_page = Some(ScanPage::from(beg..end));
while let Some(page) = next_page {
let res = self.scan_paged(page, batch_size).await?;
next_page = res.next_page;
for (_, v) in res.values.into_iter() {
out.push(v.into());
}
}
trace!("scan_nd: {:?}", out);
Ok(out)
}
@ -1191,62 +1167,28 @@ impl Transaction {
self.del(key).await
}
pub async fn scan_ndlq<'a>(&mut self, node: &Uuid, limit: u32) -> Result<Vec<LqValue>, Error> {
pub async fn scan_ndlq<'a>(
&mut self,
node: &Uuid,
batch_size: u32,
) -> Result<Vec<LqValue>, Error> {
let beg = crate::key::node::lq::prefix_nd(node);
let end = crate::key::node::lq::suffix_nd(node);
trace!(
"Scanning range from pref={}, suff={}",
crate::key::debug::sprint_key(&beg),
crate::key::debug::sprint_key(&end),
);
let mut nxt: Option<Key> = None;
let mut num = limit;
let mut out: Vec<LqValue> = vec![];
while limit == NO_LIMIT || num > 0 {
let batch_size = match num {
0 => 1000,
_ => std::cmp::min(1000, num),
};
// Get records batch
let res = match nxt {
None => {
let min = beg.clone();
let max = end.clone();
self.scan(min..max, batch_size).await?
}
Some(ref mut beg) => {
beg.push(0x00);
let min = beg.clone();
let max = end.clone();
self.scan(min..max, batch_size).await?
}
};
// Get total results
let n = res.len();
// Exit when settled
if n == 0 {
break;
}
// Loop over results
for (i, (key, value)) in res.into_iter().enumerate() {
// Ready the next
if n == i + 1 {
nxt = Some(key.clone());
}
let lq = crate::key::node::lq::Lq::decode(key.as_slice())?;
let mut next_page = Some(ScanPage::from(beg..end));
while let Some(page) = next_page {
let res = self.scan_paged(page, batch_size).await?;
next_page = res.next_page;
for (key, value) in res.values.into_iter() {
let lv = crate::key::node::lq::Lq::decode(key.as_slice())?;
let tb: String = String::from_utf8(value).unwrap();
trace!("scan_lq Found tb: {:?}", tb);
out.push(LqValue {
nd: lq.nd.into(),
ns: lq.ns.to_string(),
db: lq.db.to_string(),
nd: lv.nd.into(),
ns: lv.ns.to_string(),
db: lv.db.to_string(),
tb,
lq: lq.lq.into(),
lq: lv.lq.into(),
});
// Count
if limit != NO_LIMIT {
num -= 1;
}
}
}
Ok(out)
@ -1257,49 +1199,16 @@ impl Transaction {
ns: &str,
db: &str,
tb: &str,
limit: u32,
batch_size: u32,
) -> Result<Vec<LqValue>, Error> {
let beg = crate::key::table::lq::prefix(ns, db, tb);
let end = crate::key::table::lq::suffix(ns, db, tb);
trace!(
"Scanning range from pref={}, suff={}",
crate::key::debug::sprint_key(&beg),
crate::key::debug::sprint_key(&end),
);
let mut nxt: Option<Key> = None;
let mut num = limit;
let mut out: Vec<LqValue> = vec![];
while limit == NO_LIMIT || num > 0 {
let batch_size = match num {
0 => 1000,
_ => std::cmp::min(1000, num),
};
// Get records batch
let res = match nxt {
None => {
let min = beg.clone();
let max = end.clone();
self.scan(min..max, batch_size).await?
}
Some(ref mut beg) => {
beg.push(0x00);
let min = beg.clone();
let max = end.clone();
self.scan(min..max, batch_size).await?
}
};
// Get total results
let n = res.len();
// Exit when settled
if n == 0 {
break;
}
// Loop over results
for (i, (key, value)) in res.into_iter().enumerate() {
// Ready the next
if n == i + 1 {
nxt = Some(key.clone());
}
let mut next_page = Some(ScanPage::from(beg..end));
while let Some(page) = next_page {
let res = self.scan_paged(page, batch_size).await?;
next_page = res.next_page;
for (key, value) in res.values.into_iter() {
let lv = crate::key::table::lq::Lq::decode(key.as_slice())?;
let val: LiveStatement = value.into();
out.push(LqValue {
@ -1309,10 +1218,6 @@ impl Transaction {
tb: lv.tb.to_string(),
lq: val.id,
});
// Count
if limit != NO_LIMIT {
num -= 1;
}
}
}
Ok(out)
@ -1329,6 +1234,7 @@ impl Transaction {
) -> Result<(), Error> {
let key = crate::key::table::lq::new(ns, db, tb, live_stm.id.0);
let key_enc = crate::key::table::lq::Lq::encode(&key)?;
#[cfg(debug_assertions)]
trace!("putc_tblq ({:?}): key={:?}", &live_stm.id, crate::key::debug::sprint_key(&key_enc));
self.putc(key_enc, live_stm, expected).await
}
@ -2528,55 +2434,35 @@ impl Transaction {
// Fetch records
let beg = crate::key::thing::prefix(ns, db, &tb.name);
let end = crate::key::thing::suffix(ns, db, &tb.name);
let mut nxt: Option<Vec<u8>> = None;
loop {
let res = match nxt {
None => {
let min = beg.clone();
let max = end.clone();
self.scan(min..max, 1000).await?
}
Some(ref mut beg) => {
beg.push(0x00);
let min = beg.clone();
let max = end.clone();
self.scan(min..max, 1000).await?
}
};
if !res.is_empty() {
// Get total results
let n = res.len();
// Exit when settled
if n == 0 {
break;
}
// Loop over results
for (i, (k, v)) in res.into_iter().enumerate() {
// Ready the next
if n == i + 1 {
nxt = Some(k.clone());
}
// Parse the key and the value
let k: crate::key::thing::Thing = (&k).into();
let v: Value = (&v).into();
let t = Thing::from((k.tb, k.id));
// Check if this is a graph edge
match (v.pick(&*EDGE), v.pick(&*IN), v.pick(&*OUT)) {
// This is a graph edge record
(Value::Bool(true), Value::Thing(l), Value::Thing(r)) => {
let sql = format!("RELATE {l} -> {t} -> {r} CONTENT {v};",);
chn.send(bytes!(sql)).await?;
}
// This is a normal record
_ => {
let sql = format!("UPDATE {t} CONTENT {v};");
chn.send(bytes!(sql)).await?;
}
}
}
continue;
let mut nxt: Option<ScanPage<Vec<u8>>> = Some(ScanPage::from(beg..end));
while nxt.is_some() {
let res = self.scan_paged(nxt.unwrap(), 1000).await?;
nxt = res.next_page;
let res = res.values;
if res.is_empty() {
break;
}
break;
// Loop over results
for (k, v) in res.into_iter() {
// Parse the key and the value
let k: crate::key::thing::Thing = (&k).into();
let v: Value = (&v).into();
let t = Thing::from((k.tb, k.id));
// Check if this is a graph edge
match (v.pick(&*EDGE), v.pick(&*IN), v.pick(&*OUT)) {
// This is a graph edge record
(Value::Bool(true), Value::Thing(l), Value::Thing(r)) => {
let sql = format!("RELATE {l} -> {t} -> {r} CONTENT {v};",);
chn.send(bytes!(sql)).await?;
}
// This is a normal record
_ => {
let sql = format!("UPDATE {t} CONTENT {v};");
chn.send(bytes!(sql)).await?;
}
}
}
continue;
}
chn.send(bytes!("")).await?;
}