Improve transactions

This commit is contained in:
Tobie Morgan Hitchcock 2016-10-18 13:49:46 +01:00
parent 6075e4d1d7
commit 7556a77df0
13 changed files with 78 additions and 227 deletions

View file

@ -24,17 +24,6 @@ import (
func executeCreateStatement(txn kvs.TX, ast *sql.CreateStatement) (out []interface{}, err error) { func executeCreateStatement(txn kvs.TX, ast *sql.CreateStatement) (out []interface{}, err error) {
var local bool
if txn == nil {
local = true
txn, err = db.Txn(true)
if err != nil {
return
}
defer txn.Rollback()
}
for _, w := range ast.What { for _, w := range ast.What {
if what, ok := w.(*sql.Thing); ok { if what, ok := w.(*sql.Thing); ok {
@ -61,10 +50,6 @@ func executeCreateStatement(txn kvs.TX, ast *sql.CreateStatement) (out []interfa
} }
if local {
txn.Commit()
}
return return
} }
@ -76,7 +61,7 @@ func create(doc *item.Doc, ast *sql.CreateStatement) (out interface{}, err error
} }
if !doc.Allow("CREATE") { if !doc.Allow("CREATE") {
return nil, nil return
} }
if err = doc.StoreIndex(); err != nil { if err = doc.StoreIndex(); err != nil {

View file

@ -111,13 +111,27 @@ func detail(e error) interface{} {
} }
} }
func writable(cur kvs.TX, tmp bool) (txn kvs.TX, err error, loc bool) {
if cur == nil {
cur, err = db.Txn(true)
}
return cur, err, tmp
}
func readable(cur kvs.TX, tmp bool) (txn kvs.TX, err error, loc bool) {
if cur == nil {
cur, err = db.Txn(false)
}
return cur, err, tmp
}
func execute(ctx *fibre.Context, ast *sql.Query, chn chan<- interface{}) { func execute(ctx *fibre.Context, ast *sql.Query, chn chan<- interface{}) {
var txn kvs.TX var txn kvs.TX
defer func() { defer func() {
if txn != nil { if txn != nil {
txn.Rollback() txn.Cancel()
} }
if r := recover(); r != nil { if r := recover(); r != nil {
if err, ok := r.(error); ok { if err, ok := r.(error); ok {
@ -130,15 +144,47 @@ func execute(ctx *fibre.Context, ast *sql.Query, chn chan<- interface{}) {
for _, s := range ast.Statements { for _, s := range ast.Statements {
var res []interface{} var loc bool
var err error var err error
var res []interface{}
now := time.Now() now := time.Now()
switch stm := s.(type) { switch s.(type) {
case *sql.UseStatement: case *sql.UseStatement:
continue continue
case *sql.BeginStatement:
break
case *sql.CancelStatement:
break
case *sql.CommitStatement:
break
case *sql.InfoStatement:
txn, err, loc = readable(txn, txn == nil)
default:
txn, err, loc = writable(txn, txn == nil)
}
if err != nil {
chn <- err
}
switch stm := s.(type) {
case *sql.CommitStatement:
txn.Commit()
txn = nil
continue
case *sql.CancelStatement:
txn.Cancel()
txn = nil
continue
case *sql.BeginStatement:
txn, err, loc = writable(txn, false)
continue
case *sql.InfoStatement: case *sql.InfoStatement:
res, err = executeInfoStatement(txn, stm) res, err = executeInfoStatement(txn, stm)
@ -176,36 +222,21 @@ func execute(ctx *fibre.Context, ast *sql.Query, chn chan<- interface{}) {
case *sql.RemoveIndexStatement: case *sql.RemoveIndexStatement:
res, err = executeRemoveIndexStatement(txn, stm) res, err = executeRemoveIndexStatement(txn, stm)
case *sql.BeginStatement:
if txn != nil {
chn <- fibre.NewHTTPError(400, "Transaction already running")
return
} else if txn, err = db.Txn(true); err != nil {
chn <- err
return
}
case *sql.CommitStatement:
if txn != nil {
txn.Commit()
txn = nil
continue
}
case *sql.CancelStatement:
if txn != nil {
txn.Rollback()
txn = nil
continue
}
} }
if err != nil && txn != nil { if err != nil {
txn.Rollback()
chn <- err chn <- err
} }
if loc {
if err != nil {
txn.Cancel()
} else {
txn.Commit()
}
txn = nil
}
chn <- &Response{ chn <- &Response{
Time: time.Since(now).String(), Time: time.Since(now).String(),
Status: status(err), Status: status(err),

View file

@ -24,17 +24,6 @@ import (
func executeDefineTableStatement(txn kvs.TX, ast *sql.DefineTableStatement) (out []interface{}, err error) { func executeDefineTableStatement(txn kvs.TX, ast *sql.DefineTableStatement) (out []interface{}, err error) {
var local bool
if txn == nil {
local = true
txn, err = db.Txn(true)
if err != nil {
return
}
defer txn.Rollback()
}
for _, TB := range ast.What { for _, TB := range ast.What {
// Set the namespace definition // Set the namespace definition
@ -57,27 +46,12 @@ func executeDefineTableStatement(txn kvs.TX, ast *sql.DefineTableStatement) (out
} }
if local {
txn.Commit()
}
return return
} }
func executeDefineRulesStatement(txn kvs.TX, ast *sql.DefineRulesStatement) (out []interface{}, err error) { func executeDefineRulesStatement(txn kvs.TX, ast *sql.DefineRulesStatement) (out []interface{}, err error) {
var local bool
if txn == nil {
local = true
txn, err = db.Txn(true)
if err != nil {
return
}
defer txn.Rollback()
}
for _, TB := range ast.What { for _, TB := range ast.What {
for _, RU := range ast.When { for _, RU := range ast.When {
@ -110,27 +84,12 @@ func executeDefineRulesStatement(txn kvs.TX, ast *sql.DefineRulesStatement) (out
} }
if local {
txn.Commit()
}
return return
} }
func executeDefineFieldStatement(txn kvs.TX, ast *sql.DefineFieldStatement) (out []interface{}, err error) { func executeDefineFieldStatement(txn kvs.TX, ast *sql.DefineFieldStatement) (out []interface{}, err error) {
var local bool
if txn == nil {
local = true
txn, err = db.Txn(true)
if err != nil {
return
}
defer txn.Rollback()
}
for _, TB := range ast.What { for _, TB := range ast.What {
// Set the namespace definition // Set the namespace definition
@ -159,27 +118,12 @@ func executeDefineFieldStatement(txn kvs.TX, ast *sql.DefineFieldStatement) (out
} }
if local {
txn.Commit()
}
return return
} }
func executeDefineIndexStatement(txn kvs.TX, ast *sql.DefineIndexStatement) (out []interface{}, err error) { func executeDefineIndexStatement(txn kvs.TX, ast *sql.DefineIndexStatement) (out []interface{}, err error) {
var local bool
if txn == nil {
local = true
txn, err = db.Txn(true)
if err != nil {
return
}
defer txn.Rollback()
}
for _, TB := range ast.What { for _, TB := range ast.What {
// Set the namespace definition // Set the namespace definition
@ -226,10 +170,6 @@ func executeDefineIndexStatement(txn kvs.TX, ast *sql.DefineIndexStatement) (out
} }
if local {
txn.Commit()
}
return return
} }

View file

@ -23,17 +23,6 @@ import (
func executeDeleteStatement(txn kvs.TX, ast *sql.DeleteStatement) (out []interface{}, err error) { func executeDeleteStatement(txn kvs.TX, ast *sql.DeleteStatement) (out []interface{}, err error) {
var local bool
if txn == nil {
local = true
txn, err = db.Txn(true)
if err != nil {
return
}
defer txn.Rollback()
}
for _, w := range ast.What { for _, w := range ast.What {
if what, ok := w.(*sql.Thing); ok { if what, ok := w.(*sql.Thing); ok {
@ -63,10 +52,6 @@ func executeDeleteStatement(txn kvs.TX, ast *sql.DeleteStatement) (out []interfa
} }
if local {
txn.Commit()
}
return return
} }

View file

@ -25,14 +25,6 @@ import (
func executeInfoStatement(txn kvs.TX, ast *sql.InfoStatement) (out []interface{}, err error) { func executeInfoStatement(txn kvs.TX, ast *sql.InfoStatement) (out []interface{}, err error) {
if txn == nil {
txn, err = db.Txn(false)
if err != nil {
return
}
defer txn.Rollback()
}
if ast.What == "" { if ast.What == "" {
res := data.New() res := data.New()

View file

@ -23,17 +23,6 @@ import (
func executeModifyStatement(txn kvs.TX, ast *sql.ModifyStatement) (out []interface{}, err error) { func executeModifyStatement(txn kvs.TX, ast *sql.ModifyStatement) (out []interface{}, err error) {
var local bool
if txn == nil {
local = true
txn, err = db.Txn(true)
if err != nil {
return
}
defer txn.Rollback()
}
for _, w := range ast.What { for _, w := range ast.What {
if what, ok := w.(*sql.Thing); ok { if what, ok := w.(*sql.Thing); ok {
@ -63,10 +52,6 @@ func executeModifyStatement(txn kvs.TX, ast *sql.ModifyStatement) (out []interfa
} }
if local {
txn.Commit()
}
return return
} }

View file

@ -22,17 +22,6 @@ import (
func executeRemoveTableStatement(txn kvs.TX, ast *sql.RemoveTableStatement) (out []interface{}, err error) { func executeRemoveTableStatement(txn kvs.TX, ast *sql.RemoveTableStatement) (out []interface{}, err error) {
var local bool
if txn == nil {
local = true
txn, err = db.Txn(true)
if err != nil {
return
}
defer txn.Rollback()
}
for _, TB := range ast.What { for _, TB := range ast.What {
// Remove the table config // Remove the table config
@ -67,27 +56,12 @@ func executeRemoveTableStatement(txn kvs.TX, ast *sql.RemoveTableStatement) (out
} }
if local {
txn.Commit()
}
return return
} }
func executeRemoveRulesStatement(txn kvs.TX, ast *sql.RemoveRulesStatement) (out []interface{}, err error) { func executeRemoveRulesStatement(txn kvs.TX, ast *sql.RemoveRulesStatement) (out []interface{}, err error) {
var local bool
if txn == nil {
local = true
txn, err = db.Txn(true)
if err != nil {
return
}
defer txn.Rollback()
}
for _, TB := range ast.What { for _, TB := range ast.What {
for _, RU := range ast.When { for _, RU := range ast.When {
@ -102,27 +76,12 @@ func executeRemoveRulesStatement(txn kvs.TX, ast *sql.RemoveRulesStatement) (out
} }
if local {
txn.Commit()
}
return return
} }
func executeRemoveFieldStatement(txn kvs.TX, ast *sql.RemoveFieldStatement) (out []interface{}, err error) { func executeRemoveFieldStatement(txn kvs.TX, ast *sql.RemoveFieldStatement) (out []interface{}, err error) {
var local bool
if txn == nil {
local = true
txn, err = db.Txn(true)
if err != nil {
return
}
defer txn.Rollback()
}
for _, TB := range ast.What { for _, TB := range ast.What {
// Remove the field config // Remove the field config
@ -133,27 +92,12 @@ func executeRemoveFieldStatement(txn kvs.TX, ast *sql.RemoveFieldStatement) (out
} }
if local {
txn.Commit()
}
return return
} }
func executeRemoveIndexStatement(txn kvs.TX, ast *sql.RemoveIndexStatement) (out []interface{}, err error) { func executeRemoveIndexStatement(txn kvs.TX, ast *sql.RemoveIndexStatement) (out []interface{}, err error) {
var local bool
if txn == nil {
local = true
txn, err = db.Txn(true)
if err != nil {
return
}
defer txn.Rollback()
}
for _, TB := range ast.What { for _, TB := range ast.What {
// Remove the index config // Remove the index config
@ -170,10 +114,6 @@ func executeRemoveIndexStatement(txn kvs.TX, ast *sql.RemoveIndexStatement) (out
} }
if local {
txn.Commit()
}
return return
} }

View file

@ -23,14 +23,6 @@ import (
func executeSelectStatement(txn kvs.TX, ast *sql.SelectStatement) (out []interface{}, err error) { func executeSelectStatement(txn kvs.TX, ast *sql.SelectStatement) (out []interface{}, err error) {
if txn == nil {
txn, err = db.Txn(false)
if err != nil {
return
}
defer txn.Close()
}
for _, w := range ast.What { for _, w := range ast.What {
if what, ok := w.(*sql.Thing); ok { if what, ok := w.(*sql.Thing); ok {
@ -71,7 +63,7 @@ func detect(doc *item.Doc, ast *sql.SelectStatement) (out interface{}, err error
} }
if !doc.Allow("SELECT") { if !doc.Allow("SELECT") {
return nil, nil return
} }
out = doc.Blaze(ast) out = doc.Blaze(ast)

View file

@ -23,17 +23,6 @@ import (
func executeUpdateStatement(txn kvs.TX, ast *sql.UpdateStatement) (out []interface{}, err error) { func executeUpdateStatement(txn kvs.TX, ast *sql.UpdateStatement) (out []interface{}, err error) {
var local bool
if txn == nil {
local = true
txn, err = db.Txn(true)
if err != nil {
return
}
defer txn.Rollback()
}
for _, w := range ast.What { for _, w := range ast.What {
if what, ok := w.(*sql.Thing); ok { if what, ok := w.(*sql.Thing); ok {
@ -63,10 +52,6 @@ func executeUpdateStatement(txn kvs.TX, ast *sql.UpdateStatement) (out []interfa
} }
if local {
txn.Commit()
}
return return
} }

View file

@ -333,8 +333,15 @@ func (tx *TX) Close() (err error) {
return tx.Rollback() return tx.Rollback()
} }
func (tx *TX) Cancel() (err error) {
return tx.Rollback()
}
func (tx *TX) Commit() (err error) { func (tx *TX) Commit() (err error) {
return tx.tx.Commit() if tx.tx.Writable() {
return tx.tx.Commit()
}
return tx.tx.Rollback()
} }
func (tx *TX) Rollback() (err error) { func (tx *TX) Rollback() (err error) {

View file

@ -314,6 +314,10 @@ func (tx *TX) Close() (err error) {
return tx.Rollback() return tx.Rollback()
} }
func (tx *TX) Cancel() (err error) {
return tx.Rollback()
}
func (tx *TX) Commit() (err error) { func (tx *TX) Commit() (err error) {
return tx.tx.Commit() return tx.tx.Commit()
} }

View file

@ -314,6 +314,10 @@ func (tx *TX) Close() (err error) {
return tx.Rollback() return tx.Rollback()
} }
func (tx *TX) Cancel() (err error) {
return tx.Rollback()
}
func (tx *TX) Commit() (err error) { func (tx *TX) Commit() (err error) {
return tx.tx.Commit() return tx.tx.Commit()
} }

View file

@ -29,6 +29,7 @@ type TX interface {
PDel([]byte) error PDel([]byte) error
RDel([]byte, []byte, uint64) error RDel([]byte, []byte, uint64) error
Close() error Close() error
Cancel() error
Commit() error Commit() error
Rollback() error Rollback() error
} }