diff --git a/lib/src/kvs/ds.rs b/lib/src/kvs/ds.rs index 00ac763f..d4a0952b 100644 --- a/lib/src/kvs/ds.rs +++ b/lib/src/kvs/ds.rs @@ -783,6 +783,24 @@ impl Datastore { // save_timestamp_for_versionstamp saves the current timestamp for the each database's current versionstamp. pub async fn save_timestamp_for_versionstamp(&self, ts: u64) -> Result<(), Error> { let mut tx = self.transaction(Write, Optimistic).await?; + if let Err(e) = self.save_timestamp_for_versionstamp_impl(ts, &mut tx).await { + return match tx.cancel().await { + Ok(_) => { + Err(e) + } + Err(txe) => { + Err(Error::Tx(format!("Error saving timestamp for versionstamp: {:?} and error cancelling transaction: {:?}", e, txe))) + } + }; + } + Ok(()) + } + + async fn save_timestamp_for_versionstamp_impl( + &self, + ts: u64, + tx: &mut Transaction, + ) -> Result<(), Error> { let nses = tx.all_ns().await?; let nses = nses.as_ref(); for ns in nses { @@ -801,8 +819,26 @@ impl Datastore { // garbage_collect_stale_change_feeds deletes all change feed entries that are older than the watermarks. pub async fn garbage_collect_stale_change_feeds(&self, ts: u64) -> Result<(), Error> { let mut tx = self.transaction(Write, Optimistic).await?; + if let Err(e) = self.garbage_collect_stale_change_feeds_impl(ts, &mut tx).await { + return match tx.cancel().await { + Ok(_) => { + Err(e) + } + Err(txe) => { + Err(Error::Tx(format!("Error garbage collecting stale change feeds: {:?} and error cancelling transaction: {:?}", e, txe))) + } + }; + } + Ok(()) + } + + async fn garbage_collect_stale_change_feeds_impl( + &self, + ts: u64, + tx: &mut Transaction, + ) -> Result<(), Error> { // TODO Make gc batch size/limit configurable? - crate::cf::gc_all_at(&mut tx, ts, Some(100)).await?; + cf::gc_all_at(tx, ts, Some(100)).await?; tx.commit().await?; Ok(()) }