diff --git a/db/executor.go b/db/executor.go index e3ef0e40..26a9a004 100644 --- a/db/executor.go +++ b/db/executor.go @@ -144,13 +144,21 @@ func (e *executor) execute(ctx context.Context, ast *sql.Query) { continue case *sql.CancelStatement: err, buf = e.cancel(buf, err, e.send) + if err != nil { + clear() + } else { + clear() + } trc.Finish() - clear() continue case *sql.CommitStatement: err, buf = e.commit(buf, err, e.send) + if err != nil { + clear() + } else { + flush() + } trc.Finish() - flush() continue } @@ -339,9 +347,9 @@ func (e *executor) operate(ctx context.Context, stm sql.Statement) (res []interf } - // If this is a local transaction for only the - // current statement, then commit or cancel - // depending on the result error. + // If the context is already closed or errord, + // then ignore this result, clear all queued + // changes, and reset the transaction. select { @@ -353,15 +361,44 @@ func (e *executor) operate(ctx context.Context, stm sql.Statement) (res []interf default: + // If this is a local transaction for only the + // current statement, then commit or cancel + // depending on the result error. + if loc && e.dbo.Closed() == false { - if !trw || err != nil { + + // As this is a local transaction then + // make sure we reset the transaction + // context. + + defer e.dbo.Reset() + + // If there was an error with the query + // then clear the queued changes and + // return immediately. + + if err != nil { e.dbo.Cancel() - e.dbo.Reset() clear() + return + } + + // Otherwise check if this is a read or + // a write transaction, and attempt to + // Cancel or Commit, returning any errors. + + if !trw { + if err = e.dbo.Cancel(); err != nil { + clear() + } else { + clear() + } } else { - e.dbo.Commit() - e.dbo.Reset() - flush() + if err = e.dbo.Commit(); err != nil { + clear() + } else { + flush() + } } } @@ -380,18 +417,18 @@ func (e *executor) begin(rw bool) (err error) { func (e *executor) cancel(buf []*Response, err error, chn chan<- *Response) (error, []*Response) { - defer func() { - e.dbo.Reset() - }() + defer e.dbo.Reset() if e.dbo.TX == nil { return nil, buf } - e.dbo.Cancel() + err = e.dbo.Cancel() for _, v := range buf { v.Status = "ERR" + v.Result = []interface{}{} + v.Detail = "Transaction cancelled" chn <- v } @@ -400,29 +437,29 @@ func (e *executor) cancel(buf []*Response, err error, chn chan<- *Response) (err buf = buf[:len(buf)-1] } - return nil, buf + return err, buf } func (e *executor) commit(buf []*Response, err error, chn chan<- *Response) (error, []*Response) { - defer func() { - e.dbo.Reset() - }() + defer e.dbo.Reset() if e.dbo.TX == nil { return nil, buf } if err != nil { - e.dbo.Cancel() + err = e.dbo.Cancel() } else { - e.dbo.Commit() + err = e.dbo.Commit() } for _, v := range buf { if err != nil { v.Status = "ERR" + v.Result = []interface{}{} + v.Detail = "Transaction failed: " + err.Error() } chn <- v } @@ -432,7 +469,7 @@ func (e *executor) commit(buf []*Response, err error, chn chan<- *Response) (err buf = buf[:len(buf)-1] } - return nil, buf + return err, buf }