Feature - Add message to query not executed error. (#2049)
This commit is contained in:
parent
d56a574467
commit
e835d27ecc
4 changed files with 82 additions and 33 deletions
1
Cargo.lock
generated
1
Cargo.lock
generated
|
@ -4320,6 +4320,7 @@ dependencies = [
|
|||
"test-log",
|
||||
"thiserror",
|
||||
"tikv-client",
|
||||
"tikv-client-proto",
|
||||
"time 0.3.21",
|
||||
"tokio",
|
||||
"tokio-tungstenite",
|
||||
|
|
|
@ -23,7 +23,7 @@ protocol-ws = ["dep:tokio-tungstenite", "tokio/time"]
|
|||
kv-mem = ["dep:echodb", "tokio/time"]
|
||||
kv-indxdb = ["dep:indxdb"]
|
||||
kv-rocksdb = ["dep:rocksdb", "tokio/time"]
|
||||
kv-tikv = ["dep:tikv"]
|
||||
kv-tikv = ["dep:tikv", "dep:tikv-client-proto"]
|
||||
kv-fdb-5_1 = ["foundationdb/fdb-5_1", "kv-fdb"]
|
||||
kv-fdb-5_2 = ["foundationdb/fdb-5_2", "kv-fdb"]
|
||||
kv-fdb-6_0 = ["foundationdb/fdb-6_0", "kv-fdb"]
|
||||
|
@ -102,6 +102,7 @@ sha2 = "0.10.6"
|
|||
storekey = "0.5.0"
|
||||
thiserror = "1.0.40"
|
||||
tikv = { version = "0.1.0", package = "tikv-client", optional = true }
|
||||
tikv-client-proto = { version = "0.1.0", optional = true }
|
||||
tokio-util = { version = "0.7.8", optional = true, features = ["compat"] }
|
||||
tracing = "0.1.37"
|
||||
trice = "0.3.1"
|
||||
|
|
|
@ -34,12 +34,14 @@ impl<'a> Executor<'a> {
|
|||
}
|
||||
|
||||
fn txn(&self) -> Transaction {
|
||||
match self.txn.as_ref() {
|
||||
Some(txn) => txn.clone(),
|
||||
None => unreachable!(),
|
||||
}
|
||||
self.txn.clone().expect("unreachable: txn was None after successful begin")
|
||||
}
|
||||
|
||||
/// # Return
|
||||
/// - true if a new transaction has begun
|
||||
/// - false if
|
||||
/// - couldn't create transaction (sets err flag)
|
||||
/// - a transaction has already begun
|
||||
async fn begin(&mut self, write: bool) -> bool {
|
||||
match self.txn.as_ref() {
|
||||
Some(_) => false,
|
||||
|
@ -56,33 +58,41 @@ impl<'a> Executor<'a> {
|
|||
}
|
||||
}
|
||||
|
||||
async fn commit(&mut self, local: bool) {
|
||||
/// # Return
|
||||
///
|
||||
/// An `Err` if the transaction could not be commited;
|
||||
/// otherwise returns `Ok`.
|
||||
async fn commit(&mut self, local: bool) -> Result<(), Error> {
|
||||
if local {
|
||||
if let Some(txn) = self.txn.as_ref() {
|
||||
let txn = txn.clone();
|
||||
// Extract the transaction
|
||||
if let Some(txn) = self.txn.take() {
|
||||
let mut txn = txn.lock().await;
|
||||
let result = if self.err {
|
||||
txn.cancel().await
|
||||
} else {
|
||||
txn.commit().await
|
||||
};
|
||||
if result.is_err() {
|
||||
if self.err {
|
||||
// Cancel and ignore any error because the error flag was
|
||||
// already set
|
||||
let _ = txn.cancel().await;
|
||||
} else if let Err(e) = txn.commit().await {
|
||||
// Transaction failed to commit
|
||||
//
|
||||
// TODO: Not all commit errors definitively mean
|
||||
// the transaction didn't commit. Detect that and tell
|
||||
// the user.
|
||||
self.err = true;
|
||||
return Err(e);
|
||||
}
|
||||
self.txn = None;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn cancel(&mut self, local: bool) {
|
||||
if local {
|
||||
if let Some(txn) = self.txn.as_ref() {
|
||||
let txn = txn.clone();
|
||||
// Extract the transaction
|
||||
if let Some(txn) = self.txn.take() {
|
||||
let mut txn = txn.lock().await;
|
||||
if txn.cancel().await.is_err() {
|
||||
self.err = true;
|
||||
}
|
||||
self.txn = None;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -94,12 +104,17 @@ impl<'a> Executor<'a> {
|
|||
}
|
||||
}
|
||||
|
||||
fn buf_commit(&self, v: Response) -> Response {
|
||||
fn buf_commit(&self, v: Response, commit_error: &Option<Error>) -> Response {
|
||||
match &self.err {
|
||||
true => Response {
|
||||
time: v.time,
|
||||
result: match v.result {
|
||||
Ok(_) => Err(Error::QueryNotExecuted),
|
||||
Ok(_) => Err(commit_error
|
||||
.as_ref()
|
||||
.map(|e| Error::QueryNotExecutedDetail {
|
||||
message: e.to_string(),
|
||||
})
|
||||
.unwrap_or(Error::QueryNotExecuted)),
|
||||
Err(e) => Err(e),
|
||||
},
|
||||
},
|
||||
|
@ -176,15 +191,15 @@ impl<'a> Executor<'a> {
|
|||
self.cancel(true).await;
|
||||
buf = buf.into_iter().map(|v| self.buf_cancel(v)).collect();
|
||||
out.append(&mut buf);
|
||||
self.txn = None;
|
||||
debug_assert!(self.txn.is_none(), "cancel(true) should have unset txn");
|
||||
continue;
|
||||
}
|
||||
// Commit a running transaction
|
||||
Statement::Commit(_) => {
|
||||
self.commit(true).await;
|
||||
buf = buf.into_iter().map(|v| self.buf_commit(v)).collect();
|
||||
let commit_error = self.commit(true).await.err();
|
||||
buf = buf.into_iter().map(|v| self.buf_commit(v, &commit_error)).collect();
|
||||
out.append(&mut buf);
|
||||
self.txn = None;
|
||||
debug_assert!(self.txn.is_none(), "commit(true) should have unset txn");
|
||||
continue;
|
||||
}
|
||||
// Switch to a different NS or DB
|
||||
|
@ -246,13 +261,18 @@ impl<'a> Executor<'a> {
|
|||
let writeable = stm.writeable();
|
||||
// Set the parameter
|
||||
ctx.add_value(stm.name, val);
|
||||
// Finalise transaction
|
||||
match writeable {
|
||||
true => self.commit(loc).await,
|
||||
false => self.cancel(loc).await,
|
||||
// Finalise transaction, returning nothing unless it couldn't commit
|
||||
if writeable {
|
||||
match self.commit(loc).await {
|
||||
Err(e) => Err(Error::QueryNotExecutedDetail {
|
||||
message: e.to_string(),
|
||||
}),
|
||||
Ok(_) => Ok(Value::None),
|
||||
}
|
||||
} else {
|
||||
self.cancel(loc).await;
|
||||
Ok(Value::None)
|
||||
}
|
||||
// Return nothing
|
||||
Ok(Value::None)
|
||||
}
|
||||
Err(err) => {
|
||||
// Cancel transaction
|
||||
|
@ -298,10 +318,11 @@ impl<'a> Executor<'a> {
|
|||
};
|
||||
// Finalise transaction and return the result.
|
||||
if res.is_ok() && stm.writeable() {
|
||||
self.commit(loc).await;
|
||||
if self.err {
|
||||
if let Err(e) = self.commit(loc).await {
|
||||
// The commit failed
|
||||
Err(Error::QueryNotExecuted)
|
||||
Err(Error::QueryNotExecutedDetail {
|
||||
message: e.to_string(),
|
||||
})
|
||||
} else {
|
||||
// Successful, committed result
|
||||
res
|
||||
|
|
|
@ -47,6 +47,18 @@ pub enum Error {
|
|||
#[error("The key being inserted already exists")]
|
||||
TxKeyAlreadyExists,
|
||||
|
||||
/// The key exceeds a limit set by the KV store
|
||||
#[error("Record id or key is too large")]
|
||||
TxKeyTooLarge,
|
||||
|
||||
/// The value exceeds a limit set by the KV store
|
||||
#[error("Record or value is too large")]
|
||||
TxValueTooLarge,
|
||||
|
||||
/// The transaction writes too much data for the KV store
|
||||
#[error("Transaction is too large")]
|
||||
TxTooLarge,
|
||||
|
||||
/// No namespace has been selected
|
||||
#[error("Specify a namespace to use")]
|
||||
NsEmpty,
|
||||
|
@ -155,6 +167,12 @@ pub enum Error {
|
|||
#[error("The query was not executed due to a failed transaction")]
|
||||
QueryNotExecuted,
|
||||
|
||||
/// The query did not execute, because the transaction has failed (with a message)
|
||||
#[error("The query was not executed due to a failed transaction. {message}")]
|
||||
QueryNotExecutedDetail {
|
||||
message: String,
|
||||
},
|
||||
|
||||
/// The permissions do not allow for performing the specified query
|
||||
#[error("You don't have permission to perform this query type")]
|
||||
QueryPermissions,
|
||||
|
@ -452,6 +470,14 @@ impl From<tikv::Error> for Error {
|
|||
fn from(e: tikv::Error) -> Error {
|
||||
match e {
|
||||
tikv::Error::DuplicateKeyInsertion => Error::TxKeyAlreadyExists,
|
||||
tikv::Error::KeyError(tikv_client_proto::kvrpcpb::KeyError {
|
||||
abort,
|
||||
..
|
||||
}) if abort.contains("KeyTooLarge") => Error::TxKeyTooLarge,
|
||||
tikv::Error::RegionError(tikv_client_proto::errorpb::Error {
|
||||
raft_entry_too_large,
|
||||
..
|
||||
}) if raft_entry_too_large.is_some() => Error::TxTooLarge,
|
||||
_ => Error::Tx(e.to_string()),
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue