diff --git a/lib/src/kvs/fdb/mod.rs b/lib/src/kvs/fdb/mod.rs index 9febaf1f..b0a3c253 100644 --- a/lib/src/kvs/fdb/mod.rs +++ b/lib/src/kvs/fdb/mod.rs @@ -518,4 +518,25 @@ impl Transaction { } Ok(res) } + + /// Delete a range of keys from the databases + pub(crate) async fn delr(&mut self, rng: Range) -> Result<(), Error> + where + K: Into, + { + // Check to see if transaction is closed + if self.done { + return Err(Error::TxFinished); + } + // Check to see if transaction is writable + if !self.write { + return Err(Error::TxReadonly); + } + let begin: &[u8] = &rng.start.into(); + let end: &[u8] = &rng.end.into(); + let inner = self.inner.lock().await; + let inner = inner.as_ref().unwrap(); + inner.clear_range(begin, end); + Ok(()) + } } diff --git a/lib/src/kvs/tikv/mod.rs b/lib/src/kvs/tikv/mod.rs index 7d4f1ce8..56c3167b 100644 --- a/lib/src/kvs/tikv/mod.rs +++ b/lib/src/kvs/tikv/mod.rs @@ -395,4 +395,31 @@ impl Transaction { // Return result Ok(res) } + /// Delete a range of keys from the databases + pub(crate) async fn delr(&mut self, rng: Range, limit: u32) -> Result<(), Error> + where + K: Into, + { + // Check to see if transaction is closed + if self.done { + return Err(Error::TxFinished); + } + // Check to see if transaction is writable + if !self.write { + return Err(Error::TxReadonly); + } + // Convert the range to bytes + let rng: Range = Range { + start: rng.start.into(), + end: rng.end.into(), + }; + // Scan the keys + let res = self.inner.scan_keys(rng, limit).await?; + // Delete all the keys + for key in res { + self.inner.delete(key).await?; + } + // Return result + Ok(()) + } } diff --git a/lib/src/kvs/tx.rs b/lib/src/kvs/tx.rs index e62da1cc..45268ae6 100644 --- a/lib/src/kvs/tx.rs +++ b/lib/src/kvs/tx.rs @@ -850,6 +850,29 @@ impl Transaction { { #[cfg(debug_assertions)] trace!("Delr {:?}..{:?} (limit: {limit})", rng.start, rng.end); + match self { + #[cfg(feature = "kv-tikv")] + Transaction { + inner: Inner::TiKV(v), + .. + } => v.delr(rng, limit).await, + #[cfg(feature = "kv-fdb")] + Transaction { + inner: Inner::FoundationDB(v), + .. + } => v.delr(rng).await, + #[allow(unreachable_patterns)] + _ => self._delr(rng, limit).await, + } + } + + /// Delete a range of keys from the datastore. + /// + /// This function fetches key-value pairs from the underlying datastore in batches of 1000. + async fn _delr(&mut self, rng: Range, limit: u32) -> Result<(), Error> + where + K: Into + Debug, + { let beg: Key = rng.start.into(); let end: Key = rng.end.into(); let mut nxt: Option = None; @@ -955,44 +978,10 @@ impl Transaction { trace!("Delp {:?} (limit: {limit})", key); let beg: Key = key.into(); let end: Key = beg.clone().add(0xff); - let mut nxt: Option = None; - let mut num = limit; - // Start processing - while num > 0 { - // Get records batch - let res = match nxt { - None => { - let min = beg.clone(); - let max = end.clone(); - let num = std::cmp::min(1000, num); - self.scan(min..max, num).await? - } - Some(ref mut beg) => { - beg.push(0); - let min = beg.clone(); - let max = end.clone(); - let num = std::cmp::min(1000, num); - self.scan(min..max, num).await? - } - }; - // Get total results - let n = res.len(); - // Exit when settled - if n == 0 { - break; - } - // Loop over results - for (i, (k, _)) in res.into_iter().enumerate() { - // Ready the next - if n == i + 1 { - nxt = Some(k.clone()); - } - // Delete - self.del(k).await?; - // Count - num -= 1; - } - } + let min = beg.clone(); + let max = end.clone(); + let num = std::cmp::min(1000, limit); + self.delr(min..max, num).await?; Ok(()) }