Add support for both writeable and read-only transactions

This commit is contained in:
Tobie Morgan Hitchcock 2022-05-14 22:30:49 +01:00
parent 1017e2fffb
commit 75de89d9a1
13 changed files with 107 additions and 11 deletions

View file

@ -35,10 +35,10 @@ impl<'a> Executor<'a> {
}
}
async fn begin(&mut self) -> bool {
async fn begin(&mut self, write: bool) -> bool {
match self.txn.as_ref() {
Some(_) => false,
None => match self.kvs.transaction(true, false).await {
None => match self.kvs.transaction(write, false).await {
Ok(v) => {
self.txn = Some(Arc::new(Mutex::new(v)));
true
@ -155,7 +155,7 @@ impl<'a> Executor<'a> {
}
// Begin a new transaction
Statement::Begin(_) => {
self.begin().await;
self.begin(true).await;
continue;
}
// Cancel a running transaction
@ -208,7 +208,7 @@ impl<'a> Executor<'a> {
// Process param definition statements
Statement::Set(stm) => {
// Create a transaction
let loc = self.begin().await;
let loc = self.begin(stm.writeable()).await;
// Process the statement
match stm.compute(&ctx, &opt, &self.txn(), None).await {
Ok(val) => {
@ -228,7 +228,7 @@ impl<'a> Executor<'a> {
// Compute the statement normally
false => {
// Create a transaction
let loc = self.begin().await;
let loc = self.begin(stm.writeable()).await;
// Process the statement
let res = match stm.timeout() {
// There is a timeout clause

View file

@ -59,8 +59,8 @@ pub fn statements(i: &str) -> IResult<&str, Statements> {
#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
pub enum Statement {
Set(SetStatement),
Use(UseStatement),
Set(SetStatement),
Info(InfoStatement),
Live(LiveStatement),
Kill(KillStatement),
@ -92,9 +92,29 @@ impl Statement {
_ => None,
}
}
}
impl Statement {
pub(crate) fn writeable(&self) -> bool {
match self {
Statement::Use(_) => false,
Statement::Set(v) => v.writeable(),
Statement::Info(_) => false,
Statement::Live(_) => true,
Statement::Kill(_) => true,
Statement::Output(v) => v.writeable(),
Statement::Ifelse(v) => v.writeable(),
Statement::Select(v) => v.writeable(),
Statement::Create(v) => v.writeable(),
Statement::Update(v) => v.writeable(),
Statement::Relate(v) => v.writeable(),
Statement::Delete(v) => v.writeable(),
Statement::Insert(v) => v.writeable(),
Statement::Define(_) => true,
Statement::Remove(_) => true,
Statement::Option(_) => false,
_ => unreachable!(),
}
}
pub(crate) async fn compute(
&self,
ctx: &Context<'_>,

View file

@ -28,6 +28,10 @@ pub struct CreateStatement {
}
impl CreateStatement {
pub(crate) fn writeable(&self) -> bool {
true
}
pub(crate) async fn compute(
&self,
ctx: &Context<'_>,

View file

@ -29,6 +29,10 @@ pub struct DeleteStatement {
}
impl DeleteStatement {
pub(crate) fn writeable(&self) -> bool {
true
}
pub(crate) async fn compute(
&self,
ctx: &Context<'_>,

View file

@ -19,6 +19,15 @@ pub struct IfelseStatement {
}
impl IfelseStatement {
pub(crate) fn writeable(&self) -> bool {
for (cond, then) in self.exprs.iter() {
if cond.writeable() || then.writeable() {
return true;
}
}
self.close.as_ref().map_or(false, |v| v.writeable())
}
pub(crate) async fn compute(
&self,
ctx: &Context<'_>,

View file

@ -32,6 +32,10 @@ pub struct InsertStatement {
}
impl InsertStatement {
pub(crate) fn writeable(&self) -> bool {
true
}
pub(crate) async fn compute(
&self,
ctx: &Context<'_>,

View file

@ -16,6 +16,10 @@ pub struct OutputStatement {
}
impl OutputStatement {
pub(crate) fn writeable(&self) -> bool {
self.what.writeable()
}
pub(crate) async fn compute(
&self,
ctx: &Context<'_>,

View file

@ -35,6 +35,10 @@ pub struct RelateStatement {
}
impl RelateStatement {
pub(crate) fn writeable(&self) -> bool {
true
}
pub(crate) async fn compute(
&self,
ctx: &Context<'_>,

View file

@ -9,7 +9,7 @@ use crate::sql::comment::shouldbespace;
use crate::sql::cond::{cond, Cond};
use crate::sql::error::IResult;
use crate::sql::fetch::{fetch, Fetchs};
use crate::sql::field::{fields, Fields};
use crate::sql::field::{fields, Field, Fields};
use crate::sql::group::{group, Groups};
use crate::sql::limit::{limit, Limit};
use crate::sql::order::{order, Orders};
@ -42,21 +42,36 @@ pub struct SelectStatement {
}
impl SelectStatement {
/// Return the statement limit number or 0 if not set
pub fn limit(&self) -> usize {
match self.limit {
Some(Limit(v)) => v,
None => 0,
}
}
/// Return the statement start number or 0 if not set
pub fn start(&self) -> usize {
match self.start {
Some(Start(v)) => v,
None => 0,
}
}
}
impl SelectStatement {
pub(crate) fn writeable(&self) -> bool {
if self.expr.iter().any(|v| match v {
Field::All => false,
Field::Alone(v) => v.writeable(),
Field::Alias(v, _) => v.writeable(),
}) {
return true;
}
if self.what.iter().any(|v| v.writeable()) {
return true;
}
self.cond.as_ref().map_or(false, |v| v.writeable())
}
pub(crate) async fn compute(
&self,
ctx: &Context<'_>,

View file

@ -21,6 +21,10 @@ pub struct SetStatement {
}
impl SetStatement {
pub(crate) fn writeable(&self) -> bool {
self.what.writeable()
}
pub(crate) async fn compute(
&self,
ctx: &Context<'_>,

View file

@ -30,6 +30,10 @@ pub struct UpdateStatement {
}
impl UpdateStatement {
pub(crate) fn writeable(&self) -> bool {
true
}
pub(crate) async fn compute(
&self,
ctx: &Context<'_>,

View file

@ -38,6 +38,19 @@ impl PartialOrd for Subquery {
}
impl Subquery {
pub(crate) fn writeable(&self) -> bool {
match self {
Subquery::Value(v) => v.writeable(),
Subquery::Ifelse(v) => v.writeable(),
Subquery::Select(v) => v.writeable(),
Subquery::Create(v) => v.writeable(),
Subquery::Update(v) => v.writeable(),
Subquery::Delete(v) => v.writeable(),
Subquery::Relate(v) => v.writeable(),
Subquery::Insert(v) => v.writeable(),
}
}
pub(crate) async fn compute(
&self,
ctx: &Context<'_>,

View file

@ -967,6 +967,17 @@ impl fmt::Display for Value {
}
impl Value {
pub(crate) fn writeable(&self) -> bool {
match self {
Value::Array(v) => v.iter().any(|v| v.writeable()),
Value::Object(v) => v.iter().any(|(_, v)| v.writeable()),
Value::Function(v) => v.args().iter().any(|v| v.writeable()),
Value::Subquery(v) => v.writeable(),
Value::Expression(v) => v.l.writeable() || v.r.writeable(),
_ => false,
}
}
#[cfg_attr(feature = "parallel", async_recursion)]
#[cfg_attr(not(feature = "parallel"), async_recursion(?Send))]
pub(crate) async fn compute(