Make Remove{Table,Database,Namespace} faster for TiKV and FDB (#2915)
This commit is contained in:
parent
f8b559ace1
commit
cf040b36ba
3 changed files with 75 additions and 38 deletions
|
@ -518,4 +518,25 @@ impl Transaction {
|
||||||
}
|
}
|
||||||
Ok(res)
|
Ok(res)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Delete a range of keys from the databases
|
||||||
|
pub(crate) async fn delr<K>(&mut self, rng: Range<K>) -> Result<(), Error>
|
||||||
|
where
|
||||||
|
K: Into<Key>,
|
||||||
|
{
|
||||||
|
// Check to see if transaction is closed
|
||||||
|
if self.done {
|
||||||
|
return Err(Error::TxFinished);
|
||||||
|
}
|
||||||
|
// Check to see if transaction is writable
|
||||||
|
if !self.write {
|
||||||
|
return Err(Error::TxReadonly);
|
||||||
|
}
|
||||||
|
let begin: &[u8] = &rng.start.into();
|
||||||
|
let end: &[u8] = &rng.end.into();
|
||||||
|
let inner = self.inner.lock().await;
|
||||||
|
let inner = inner.as_ref().unwrap();
|
||||||
|
inner.clear_range(begin, end);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -395,4 +395,31 @@ impl Transaction {
|
||||||
// Return result
|
// Return result
|
||||||
Ok(res)
|
Ok(res)
|
||||||
}
|
}
|
||||||
|
/// Delete a range of keys from the databases
|
||||||
|
pub(crate) async fn delr<K>(&mut self, rng: Range<K>, limit: u32) -> Result<(), Error>
|
||||||
|
where
|
||||||
|
K: Into<Key>,
|
||||||
|
{
|
||||||
|
// Check to see if transaction is closed
|
||||||
|
if self.done {
|
||||||
|
return Err(Error::TxFinished);
|
||||||
|
}
|
||||||
|
// Check to see if transaction is writable
|
||||||
|
if !self.write {
|
||||||
|
return Err(Error::TxReadonly);
|
||||||
|
}
|
||||||
|
// Convert the range to bytes
|
||||||
|
let rng: Range<Key> = Range {
|
||||||
|
start: rng.start.into(),
|
||||||
|
end: rng.end.into(),
|
||||||
|
};
|
||||||
|
// Scan the keys
|
||||||
|
let res = self.inner.scan_keys(rng, limit).await?;
|
||||||
|
// Delete all the keys
|
||||||
|
for key in res {
|
||||||
|
self.inner.delete(key).await?;
|
||||||
|
}
|
||||||
|
// Return result
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -850,6 +850,29 @@ impl Transaction {
|
||||||
{
|
{
|
||||||
#[cfg(debug_assertions)]
|
#[cfg(debug_assertions)]
|
||||||
trace!("Delr {:?}..{:?} (limit: {limit})", rng.start, rng.end);
|
trace!("Delr {:?}..{:?} (limit: {limit})", rng.start, rng.end);
|
||||||
|
match self {
|
||||||
|
#[cfg(feature = "kv-tikv")]
|
||||||
|
Transaction {
|
||||||
|
inner: Inner::TiKV(v),
|
||||||
|
..
|
||||||
|
} => v.delr(rng, limit).await,
|
||||||
|
#[cfg(feature = "kv-fdb")]
|
||||||
|
Transaction {
|
||||||
|
inner: Inner::FoundationDB(v),
|
||||||
|
..
|
||||||
|
} => v.delr(rng).await,
|
||||||
|
#[allow(unreachable_patterns)]
|
||||||
|
_ => self._delr(rng, limit).await,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Delete a range of keys from the datastore.
|
||||||
|
///
|
||||||
|
/// This function fetches key-value pairs from the underlying datastore in batches of 1000.
|
||||||
|
async fn _delr<K>(&mut self, rng: Range<K>, limit: u32) -> Result<(), Error>
|
||||||
|
where
|
||||||
|
K: Into<Key> + Debug,
|
||||||
|
{
|
||||||
let beg: Key = rng.start.into();
|
let beg: Key = rng.start.into();
|
||||||
let end: Key = rng.end.into();
|
let end: Key = rng.end.into();
|
||||||
let mut nxt: Option<Key> = None;
|
let mut nxt: Option<Key> = None;
|
||||||
|
@ -955,44 +978,10 @@ impl Transaction {
|
||||||
trace!("Delp {:?} (limit: {limit})", key);
|
trace!("Delp {:?} (limit: {limit})", key);
|
||||||
let beg: Key = key.into();
|
let beg: Key = key.into();
|
||||||
let end: Key = beg.clone().add(0xff);
|
let end: Key = beg.clone().add(0xff);
|
||||||
let mut nxt: Option<Key> = None;
|
|
||||||
let mut num = limit;
|
|
||||||
// Start processing
|
|
||||||
while num > 0 {
|
|
||||||
// Get records batch
|
|
||||||
let res = match nxt {
|
|
||||||
None => {
|
|
||||||
let min = beg.clone();
|
let min = beg.clone();
|
||||||
let max = end.clone();
|
let max = end.clone();
|
||||||
let num = std::cmp::min(1000, num);
|
let num = std::cmp::min(1000, limit);
|
||||||
self.scan(min..max, num).await?
|
self.delr(min..max, num).await?;
|
||||||
}
|
|
||||||
Some(ref mut beg) => {
|
|
||||||
beg.push(0);
|
|
||||||
let min = beg.clone();
|
|
||||||
let max = end.clone();
|
|
||||||
let num = std::cmp::min(1000, num);
|
|
||||||
self.scan(min..max, num).await?
|
|
||||||
}
|
|
||||||
};
|
|
||||||
// Get total results
|
|
||||||
let n = res.len();
|
|
||||||
// Exit when settled
|
|
||||||
if n == 0 {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
// Loop over results
|
|
||||||
for (i, (k, _)) in res.into_iter().enumerate() {
|
|
||||||
// Ready the next
|
|
||||||
if n == i + 1 {
|
|
||||||
nxt = Some(k.clone());
|
|
||||||
}
|
|
||||||
// Delete
|
|
||||||
self.del(k).await?;
|
|
||||||
// Count
|
|
||||||
num -= 1;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue