Prevent panics when remote datastore transactions fail to start
This commit is contained in:
parent
cdf244f0f5
commit
b9fc84bf18
2 changed files with 64 additions and 47 deletions
|
@ -79,16 +79,13 @@ impl<'a> Executor<'a> {
|
||||||
|
|
||||||
async fn cancel(&mut self, local: bool) {
|
async fn cancel(&mut self, local: bool) {
|
||||||
if local {
|
if local {
|
||||||
match self.txn.as_ref() {
|
if let Some(txn) = self.txn.as_ref() {
|
||||||
Some(txn) => {
|
let txn = txn.clone();
|
||||||
let txn = txn.clone();
|
let mut txn = txn.lock().await;
|
||||||
let mut txn = txn.lock().await;
|
if txn.cancel().await.is_err() {
|
||||||
if txn.cancel().await.is_err() {
|
self.err = true;
|
||||||
self.err = true;
|
|
||||||
}
|
|
||||||
self.txn = None;
|
|
||||||
}
|
}
|
||||||
None => unreachable!(),
|
self.txn = None;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -212,20 +209,28 @@ impl<'a> Executor<'a> {
|
||||||
Statement::Set(stm) => {
|
Statement::Set(stm) => {
|
||||||
// Create a transaction
|
// Create a transaction
|
||||||
let loc = self.begin(stm.writeable()).await;
|
let loc = self.begin(stm.writeable()).await;
|
||||||
// Process the statement
|
// Check the transaction
|
||||||
match stm.compute(&ctx, &opt, &self.txn(), None).await {
|
match self.err {
|
||||||
Ok(val) => {
|
// We failed to create a transaction
|
||||||
ctx.add_value(stm.name.to_owned(), val);
|
true => Err(Error::TxFailure),
|
||||||
|
// The transaction began successfully
|
||||||
|
false => {
|
||||||
|
// Process the statement
|
||||||
|
match stm.compute(&ctx, &opt, &self.txn(), None).await {
|
||||||
|
Ok(val) => {
|
||||||
|
ctx.add_value(stm.name.to_owned(), val);
|
||||||
|
}
|
||||||
|
_ => break,
|
||||||
|
}
|
||||||
|
// Cancel transaction
|
||||||
|
match stm.writeable() {
|
||||||
|
true => self.commit(loc).await,
|
||||||
|
false => self.cancel(loc).await,
|
||||||
|
};
|
||||||
|
// Return nothing
|
||||||
|
Ok(Value::None)
|
||||||
}
|
}
|
||||||
_ => break,
|
|
||||||
}
|
}
|
||||||
// Cancel transaction
|
|
||||||
match stm.writeable() {
|
|
||||||
true => self.commit(loc).await,
|
|
||||||
false => self.cancel(loc).await,
|
|
||||||
};
|
|
||||||
// Return nothing
|
|
||||||
Ok(Value::None)
|
|
||||||
}
|
}
|
||||||
// Process all other normal statements
|
// Process all other normal statements
|
||||||
_ => match self.err {
|
_ => match self.err {
|
||||||
|
@ -235,34 +240,42 @@ impl<'a> Executor<'a> {
|
||||||
false => {
|
false => {
|
||||||
// Create a transaction
|
// Create a transaction
|
||||||
let loc = self.begin(stm.writeable()).await;
|
let loc = self.begin(stm.writeable()).await;
|
||||||
// Process the statement
|
// Check the transaction
|
||||||
let res = match stm.timeout() {
|
match self.err {
|
||||||
// There is a timeout clause
|
// We failed to create a transaction
|
||||||
Some(timeout) => {
|
true => Err(Error::TxFailure),
|
||||||
// Set statement timeout
|
// The transaction began successfully
|
||||||
let mut ctx = Context::new(&ctx);
|
false => {
|
||||||
ctx.add_timeout(timeout);
|
|
||||||
// Process the statement
|
// Process the statement
|
||||||
let res = stm.compute(&ctx, &opt, &self.txn(), None).await;
|
let res = match stm.timeout() {
|
||||||
// Catch statement timeout
|
// There is a timeout clause
|
||||||
match ctx.is_timedout() {
|
Some(timeout) => {
|
||||||
true => Err(Error::QueryTimedout),
|
// Set statement timeout
|
||||||
false => res,
|
let mut ctx = Context::new(&ctx);
|
||||||
}
|
ctx.add_timeout(timeout);
|
||||||
|
// Process the statement
|
||||||
|
let res = stm.compute(&ctx, &opt, &self.txn(), None).await;
|
||||||
|
// Catch statement timeout
|
||||||
|
match ctx.is_timedout() {
|
||||||
|
true => Err(Error::QueryTimedout),
|
||||||
|
false => res,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// There is no timeout clause
|
||||||
|
None => stm.compute(&ctx, &opt, &self.txn(), None).await,
|
||||||
|
};
|
||||||
|
// Finalise transaction
|
||||||
|
match &res {
|
||||||
|
Ok(_) => match stm.writeable() {
|
||||||
|
true => self.commit(loc).await,
|
||||||
|
false => self.cancel(loc).await,
|
||||||
|
},
|
||||||
|
Err(_) => self.cancel(loc).await,
|
||||||
|
};
|
||||||
|
// Return the result
|
||||||
|
res
|
||||||
}
|
}
|
||||||
// There is no timeout clause
|
}
|
||||||
None => stm.compute(&ctx, &opt, &self.txn(), None).await,
|
|
||||||
};
|
|
||||||
// Finalise transaction
|
|
||||||
match &res {
|
|
||||||
Ok(_) => match stm.writeable() {
|
|
||||||
true => self.commit(loc).await,
|
|
||||||
false => self.cancel(loc).await,
|
|
||||||
},
|
|
||||||
Err(_) => self.cancel(loc).await,
|
|
||||||
};
|
|
||||||
// Return the result
|
|
||||||
res
|
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
|
|
|
@ -21,6 +21,10 @@ pub enum Error {
|
||||||
#[error("There was a problem with a datastore transaction: {0}")]
|
#[error("There was a problem with a datastore transaction: {0}")]
|
||||||
Tx(String),
|
Tx(String),
|
||||||
|
|
||||||
|
/// There was an error when starting a new datastore transaction
|
||||||
|
#[error("There was an error when starting a new datastore transaction")]
|
||||||
|
TxFailure,
|
||||||
|
|
||||||
/// The transaction was already cancelled or committed
|
/// The transaction was already cancelled or committed
|
||||||
#[error("Couldn't update a finished transaction")]
|
#[error("Couldn't update a finished transaction")]
|
||||||
TxFinished,
|
TxFinished,
|
||||||
|
|
Loading…
Reference in a new issue