Make cluster tick handle errors and rollback gracefully (#2860)

This commit is contained in:
Przemyslaw Hugh Kaznowski 2023-10-26 23:47:12 +01:00 committed by GitHub
parent b8ff68b464
commit 4b255efaea
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -783,6 +783,24 @@ impl Datastore {
// save_timestamp_for_versionstamp saves the current timestamp for the each database's current versionstamp. // save_timestamp_for_versionstamp saves the current timestamp for the each database's current versionstamp.
pub async fn save_timestamp_for_versionstamp(&self, ts: u64) -> Result<(), Error> { pub async fn save_timestamp_for_versionstamp(&self, ts: u64) -> Result<(), Error> {
let mut tx = self.transaction(Write, Optimistic).await?; let mut tx = self.transaction(Write, Optimistic).await?;
if let Err(e) = self.save_timestamp_for_versionstamp_impl(ts, &mut tx).await {
return match tx.cancel().await {
Ok(_) => {
Err(e)
}
Err(txe) => {
Err(Error::Tx(format!("Error saving timestamp for versionstamp: {:?} and error cancelling transaction: {:?}", e, txe)))
}
};
}
Ok(())
}
async fn save_timestamp_for_versionstamp_impl(
&self,
ts: u64,
tx: &mut Transaction,
) -> Result<(), Error> {
let nses = tx.all_ns().await?; let nses = tx.all_ns().await?;
let nses = nses.as_ref(); let nses = nses.as_ref();
for ns in nses { for ns in nses {
@ -801,8 +819,26 @@ impl Datastore {
// garbage_collect_stale_change_feeds deletes all change feed entries that are older than the watermarks. // garbage_collect_stale_change_feeds deletes all change feed entries that are older than the watermarks.
pub async fn garbage_collect_stale_change_feeds(&self, ts: u64) -> Result<(), Error> { pub async fn garbage_collect_stale_change_feeds(&self, ts: u64) -> Result<(), Error> {
let mut tx = self.transaction(Write, Optimistic).await?; let mut tx = self.transaction(Write, Optimistic).await?;
if let Err(e) = self.garbage_collect_stale_change_feeds_impl(ts, &mut tx).await {
return match tx.cancel().await {
Ok(_) => {
Err(e)
}
Err(txe) => {
Err(Error::Tx(format!("Error garbage collecting stale change feeds: {:?} and error cancelling transaction: {:?}", e, txe)))
}
};
}
Ok(())
}
async fn garbage_collect_stale_change_feeds_impl(
&self,
ts: u64,
tx: &mut Transaction,
) -> Result<(), Error> {
// TODO Make gc batch size/limit configurable? // TODO Make gc batch size/limit configurable?
crate::cf::gc_all_at(&mut tx, ts, Some(100)).await?; cf::gc_all_at(tx, ts, Some(100)).await?;
tx.commit().await?; tx.commit().await?;
Ok(()) Ok(())
} }