Implement query statement timeout functionality
This commit is contained in:
parent
378bbe6dae
commit
98db89a2d7
14 changed files with 206 additions and 46 deletions
16
db/db.go
16
db/db.go
|
@ -338,6 +338,14 @@ func (e *executor) operate(ast sql.Statement) (res []interface{}, err error) {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Mark the beginning of this statement so we
|
||||||
|
// can monitor the running time, and ensure
|
||||||
|
// it runs no longer than specified.
|
||||||
|
|
||||||
|
if stm, ok := ast.(sql.KillableStatement); ok {
|
||||||
|
stm.Begin()
|
||||||
|
}
|
||||||
|
|
||||||
// Execute the defined statement, receiving the
|
// Execute the defined statement, receiving the
|
||||||
// result set, and any errors which occured
|
// result set, and any errors which occured
|
||||||
// while processing the query.
|
// while processing the query.
|
||||||
|
@ -419,6 +427,14 @@ func (e *executor) operate(ast sql.Statement) (res []interface{}, err error) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// The statement has successfully cancelled
|
||||||
|
// or committed, so stop all the transaction
|
||||||
|
// timeout timers if any were set.
|
||||||
|
|
||||||
|
if stm, ok := ast.(sql.KillableStatement); ok {
|
||||||
|
stm.Cease()
|
||||||
|
}
|
||||||
|
|
||||||
return
|
return
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
25
sql/ast.go
25
sql/ast.go
|
@ -34,6 +34,21 @@ type Statement interface{}
|
||||||
// Statements represents multiple SQL ASTs
|
// Statements represents multiple SQL ASTs
|
||||||
type Statements []Statement
|
type Statements []Statement
|
||||||
|
|
||||||
|
// --------------------------------------------------
|
||||||
|
// Other
|
||||||
|
// --------------------------------------------------
|
||||||
|
|
||||||
|
type KillableStatement interface {
|
||||||
|
Begin()
|
||||||
|
Cease()
|
||||||
|
Timedout() <-chan struct{}
|
||||||
|
}
|
||||||
|
|
||||||
|
type killable struct {
|
||||||
|
ticker *time.Timer
|
||||||
|
closer chan struct{}
|
||||||
|
}
|
||||||
|
|
||||||
// --------------------------------------------------
|
// --------------------------------------------------
|
||||||
// Trans
|
// Trans
|
||||||
// --------------------------------------------------
|
// --------------------------------------------------
|
||||||
|
@ -108,6 +123,7 @@ type LiveStatement struct {
|
||||||
|
|
||||||
// SelectStatement represents a SQL SELECT statement.
|
// SelectStatement represents a SQL SELECT statement.
|
||||||
type SelectStatement struct {
|
type SelectStatement struct {
|
||||||
|
killable
|
||||||
KV string `cork:"-" codec:"-"`
|
KV string `cork:"-" codec:"-"`
|
||||||
NS string `cork:"-" codec:"-"`
|
NS string `cork:"-" codec:"-"`
|
||||||
DB string `cork:"-" codec:"-"`
|
DB string `cork:"-" codec:"-"`
|
||||||
|
@ -119,20 +135,24 @@ type SelectStatement struct {
|
||||||
Limit Expr `cork:"limit" codec:"limit"`
|
Limit Expr `cork:"limit" codec:"limit"`
|
||||||
Start Expr `cork:"start" codec:"start"`
|
Start Expr `cork:"start" codec:"start"`
|
||||||
Version Expr `cork:"version" codec:"version"`
|
Version Expr `cork:"version" codec:"version"`
|
||||||
|
Timeout time.Duration `cork:"timeout" codec:"timeout"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// CreateStatement represents a SQL CREATE statement.
|
// CreateStatement represents a SQL CREATE statement.
|
||||||
type CreateStatement struct {
|
type CreateStatement struct {
|
||||||
|
killable
|
||||||
KV string `cork:"-" codec:"-"`
|
KV string `cork:"-" codec:"-"`
|
||||||
NS string `cork:"-" codec:"-"`
|
NS string `cork:"-" codec:"-"`
|
||||||
DB string `cork:"-" codec:"-"`
|
DB string `cork:"-" codec:"-"`
|
||||||
What Exprs `cork:"what" codec:"what"`
|
What Exprs `cork:"what" codec:"what"`
|
||||||
Data Expr `cork:"data" codec:"data"`
|
Data Expr `cork:"data" codec:"data"`
|
||||||
Echo Token `cork:"echo" codec:"echo"`
|
Echo Token `cork:"echo" codec:"echo"`
|
||||||
|
Timeout time.Duration `cork:"timeout" codec:"timeout"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// UpdateStatement represents a SQL UPDATE statement.
|
// UpdateStatement represents a SQL UPDATE statement.
|
||||||
type UpdateStatement struct {
|
type UpdateStatement struct {
|
||||||
|
killable
|
||||||
KV string `cork:"-" codec:"-"`
|
KV string `cork:"-" codec:"-"`
|
||||||
NS string `cork:"-" codec:"-"`
|
NS string `cork:"-" codec:"-"`
|
||||||
DB string `cork:"-" codec:"-"`
|
DB string `cork:"-" codec:"-"`
|
||||||
|
@ -141,10 +161,12 @@ type UpdateStatement struct {
|
||||||
Data Expr `cork:"data" codec:"data"`
|
Data Expr `cork:"data" codec:"data"`
|
||||||
Cond Expr `cork:"cond" codec:"cond"`
|
Cond Expr `cork:"cond" codec:"cond"`
|
||||||
Echo Token `cork:"echo" codec:"echo"`
|
Echo Token `cork:"echo" codec:"echo"`
|
||||||
|
Timeout time.Duration `cork:"timeout" codec:"timeout"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// DeleteStatement represents a SQL DELETE statement.
|
// DeleteStatement represents a SQL DELETE statement.
|
||||||
type DeleteStatement struct {
|
type DeleteStatement struct {
|
||||||
|
killable
|
||||||
KV string `cork:"-" codec:"-"`
|
KV string `cork:"-" codec:"-"`
|
||||||
NS string `cork:"-" codec:"-"`
|
NS string `cork:"-" codec:"-"`
|
||||||
DB string `cork:"-" codec:"-"`
|
DB string `cork:"-" codec:"-"`
|
||||||
|
@ -152,10 +174,12 @@ type DeleteStatement struct {
|
||||||
What Exprs `cork:"what" codec:"what"`
|
What Exprs `cork:"what" codec:"what"`
|
||||||
Cond Expr `cork:"cond" codec:"cond"`
|
Cond Expr `cork:"cond" codec:"cond"`
|
||||||
Echo Token `cork:"echo" codec:"echo"`
|
Echo Token `cork:"echo" codec:"echo"`
|
||||||
|
Timeout time.Duration `cork:"timeout" codec:"timeout"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// RelateStatement represents a SQL RELATE statement.
|
// RelateStatement represents a SQL RELATE statement.
|
||||||
type RelateStatement struct {
|
type RelateStatement struct {
|
||||||
|
killable
|
||||||
KV string `cork:"-" codec:"-"`
|
KV string `cork:"-" codec:"-"`
|
||||||
NS string `cork:"-" codec:"-"`
|
NS string `cork:"-" codec:"-"`
|
||||||
DB string `cork:"-" codec:"-"`
|
DB string `cork:"-" codec:"-"`
|
||||||
|
@ -165,6 +189,7 @@ type RelateStatement struct {
|
||||||
Data Expr `cork:"data" codec:"data"`
|
Data Expr `cork:"data" codec:"data"`
|
||||||
Uniq bool `cork:"uniq" codec:"uniq"`
|
Uniq bool `cork:"uniq" codec:"uniq"`
|
||||||
Echo Token `cork:"echo" codec:"echo"`
|
Echo Token `cork:"echo" codec:"echo"`
|
||||||
|
Timeout time.Duration `cork:"timeout" codec:"timeout"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// --------------------------------------------------
|
// --------------------------------------------------
|
||||||
|
|
|
@ -34,6 +34,10 @@ func (p *parser) parseCreateStatement() (stmt *CreateStatement, err error) {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if stmt.Timeout, err = p.parseTimeout(); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
if _, _, err = p.shouldBe(EOF, RPAREN, SEMICOLON); err != nil {
|
if _, _, err = p.shouldBe(EOF, RPAREN, SEMICOLON); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
|
@ -43,6 +43,10 @@ func (p *parser) parseDeleteStatement() (stmt *DeleteStatement, err error) {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if stmt.Timeout, err = p.parseTimeout(); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
if _, _, err = p.shouldBe(EOF, RPAREN, SEMICOLON); err != nil {
|
if _, _, err = p.shouldBe(EOF, RPAREN, SEMICOLON); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
10
sql/exprs.go
10
sql/exprs.go
|
@ -367,6 +367,16 @@ func (p *parser) parseDuration() (time.Duration, error) {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (p *parser) parseTimeout() (time.Duration, error) {
|
||||||
|
|
||||||
|
if _, _, exi := p.mightBe(TIMEOUT); !exi {
|
||||||
|
return 0, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return p.parseDuration()
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
func (p *parser) parseBcrypt() ([]byte, error) {
|
func (p *parser) parseBcrypt() ([]byte, error) {
|
||||||
|
|
||||||
_, lit, err := p.shouldBe(STRING)
|
_, lit, err := p.shouldBe(STRING)
|
||||||
|
|
|
@ -14,4 +14,7 @@
|
||||||
|
|
||||||
package sql
|
package sql
|
||||||
|
|
||||||
|
//go:generate go get -u github.com/abcum/tmpl
|
||||||
|
//go:generate tmpl -file=kill.gen.json kill.gen.go.tmpl
|
||||||
|
|
||||||
//go:generate codecgen -o ast.gen.go ast.go
|
//go:generate codecgen -o ast.gen.go ast.go
|
||||||
|
|
59
sql/kill.gen.go.tmpl
Normal file
59
sql/kill.gen.go.tmpl
Normal file
|
@ -0,0 +1,59 @@
|
||||||
|
// Copyright © 2016 Abcum Ltd
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package sql
|
||||||
|
|
||||||
|
import (
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
{{with $types := .}}{{range $k := $types}}
|
||||||
|
|
||||||
|
func (s *{{$k.name}}Statement) Begin() {
|
||||||
|
if s.Timeout == 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if s.killable.closer == nil {
|
||||||
|
s.killable.closer = make(chan struct{})
|
||||||
|
}
|
||||||
|
s.killable.ticker = time.AfterFunc(s.Timeout, func() {
|
||||||
|
s.killable.ticker.Stop()
|
||||||
|
s.killable.ticker = nil
|
||||||
|
close(s.killable.closer)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *{{$k.name}}Statement) Cease() {
|
||||||
|
if s.Timeout == 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if s.killable.closer == nil {
|
||||||
|
s.killable.closer = make(chan struct{})
|
||||||
|
}
|
||||||
|
if s.killable.ticker != nil {
|
||||||
|
s.killable.ticker.Stop()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *{{$k.name}}Statement) Timedout() <-chan struct{} {
|
||||||
|
if s.Timeout == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
if s.killable.closer == nil {
|
||||||
|
s.killable.closer = make(chan struct{})
|
||||||
|
}
|
||||||
|
return s.killable.closer
|
||||||
|
}
|
||||||
|
|
||||||
|
{{end}}{{end}}
|
7
sql/kill.gen.json
Normal file
7
sql/kill.gen.json
Normal file
|
@ -0,0 +1,7 @@
|
||||||
|
[
|
||||||
|
{ "name": "Select" },
|
||||||
|
{ "name": "Create" },
|
||||||
|
{ "name": "Update" },
|
||||||
|
{ "name": "Delete" },
|
||||||
|
{ "name": "Relate" }
|
||||||
|
]
|
|
@ -52,6 +52,10 @@ func (p *parser) parseRelateStatement() (stmt *RelateStatement, err error) {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if stmt.Timeout, err = p.parseTimeout(); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
if _, _, err = p.shouldBe(EOF, RPAREN, SEMICOLON); err != nil {
|
if _, _, err = p.shouldBe(EOF, RPAREN, SEMICOLON); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
|
@ -591,6 +591,9 @@ func (s *scanner) scanNumber(chp ...rune) (tok Token, lit string, val interface{
|
||||||
if chn := s.next(); chn == 's' {
|
if chn := s.next(); chn == 's' {
|
||||||
tok = DURATION
|
tok = DURATION
|
||||||
buf.WriteRune(chn)
|
buf.WriteRune(chn)
|
||||||
|
} else if ch == 'm' {
|
||||||
|
tok = DURATION
|
||||||
|
s.undo()
|
||||||
} else {
|
} else {
|
||||||
s.undo()
|
s.undo()
|
||||||
}
|
}
|
||||||
|
|
|
@ -59,6 +59,10 @@ func (p *parser) parseSelectStatement() (stmt *SelectStatement, err error) {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if stmt.Timeout, err = p.parseTimeout(); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
if _, _, err = p.shouldBe(EOF, RPAREN, SEMICOLON); err != nil {
|
if _, _, err = p.shouldBe(EOF, RPAREN, SEMICOLON); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,6 +18,7 @@ import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"strings"
|
"strings"
|
||||||
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
func orNil(v interface{}) string {
|
func orNil(v interface{}) string {
|
||||||
|
@ -147,6 +148,15 @@ func stringFromInterface(v interface{}, y, n string) string {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func stringFromDuration(v time.Duration, y, n string) string {
|
||||||
|
switch v {
|
||||||
|
case 0:
|
||||||
|
return n
|
||||||
|
default:
|
||||||
|
return y
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// ---------------------------------------------
|
// ---------------------------------------------
|
||||||
// Statements
|
// Statements
|
||||||
// ---------------------------------------------
|
// ---------------------------------------------
|
||||||
|
@ -199,7 +209,7 @@ func (this ReturnStatement) String() string {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (this SelectStatement) String() string {
|
func (this SelectStatement) String() string {
|
||||||
return fmt.Sprintf("SELECT %v FROM %v%v%v%v%v%v%v",
|
return fmt.Sprintf("SELECT %v FROM %v%v%v%v%v%v%v%v",
|
||||||
this.Expr,
|
this.Expr,
|
||||||
this.What,
|
this.What,
|
||||||
stringFromInterface(this.Cond, fmt.Sprintf(" WHERE %v", this.Cond), ""),
|
stringFromInterface(this.Cond, fmt.Sprintf(" WHERE %v", this.Cond), ""),
|
||||||
|
@ -208,43 +218,48 @@ func (this SelectStatement) String() string {
|
||||||
stringFromInterface(this.Limit, fmt.Sprintf(" LIMIT %v", this.Limit), ""),
|
stringFromInterface(this.Limit, fmt.Sprintf(" LIMIT %v", this.Limit), ""),
|
||||||
stringFromInterface(this.Start, fmt.Sprintf(" START %v", this.Start), ""),
|
stringFromInterface(this.Start, fmt.Sprintf(" START %v", this.Start), ""),
|
||||||
stringFromInterface(this.Version, fmt.Sprintf(" VERSION %v", this.Version), ""),
|
stringFromInterface(this.Version, fmt.Sprintf(" VERSION %v", this.Version), ""),
|
||||||
|
stringFromDuration(this.Timeout, fmt.Sprintf(" TIMEOUT %v", this.Timeout.String()), ""),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (this CreateStatement) String() string {
|
func (this CreateStatement) String() string {
|
||||||
return fmt.Sprintf("CREATE %v%v RETURN %v",
|
return fmt.Sprintf("CREATE %v%v RETURN %v%v",
|
||||||
this.What,
|
this.What,
|
||||||
this.Data,
|
this.Data,
|
||||||
this.Echo,
|
this.Echo,
|
||||||
|
stringFromDuration(this.Timeout, fmt.Sprintf(" TIMEOUT %v", this.Timeout.String()), ""),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (this UpdateStatement) String() string {
|
func (this UpdateStatement) String() string {
|
||||||
return fmt.Sprintf("CREATE %v%v%v RETURN %v",
|
return fmt.Sprintf("CREATE %v%v%v RETURN %v%v",
|
||||||
this.What,
|
this.What,
|
||||||
this.Data,
|
this.Data,
|
||||||
this.Cond,
|
this.Cond,
|
||||||
this.Echo,
|
this.Echo,
|
||||||
|
stringFromDuration(this.Timeout, fmt.Sprintf(" TIMEOUT %v", this.Timeout.String()), ""),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (this DeleteStatement) String() string {
|
func (this DeleteStatement) String() string {
|
||||||
return fmt.Sprintf("DELETE %v%v%v RETURN %v",
|
return fmt.Sprintf("DELETE %v%v%v RETURN %v%v",
|
||||||
stringFromBool(this.Hard, "AND EXPUNGE ", ""),
|
stringFromBool(this.Hard, "AND EXPUNGE ", ""),
|
||||||
this.What,
|
this.What,
|
||||||
this.Cond,
|
this.Cond,
|
||||||
this.Echo,
|
this.Echo,
|
||||||
|
stringFromDuration(this.Timeout, fmt.Sprintf(" TIMEOUT %v", this.Timeout.String()), ""),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (this RelateStatement) String() string {
|
func (this RelateStatement) String() string {
|
||||||
return fmt.Sprintf("RELATE %v FROM %v WITH %v%v%v RETURN %v",
|
return fmt.Sprintf("RELATE %v FROM %v WITH %v%v%v RETURN %v%v",
|
||||||
this.Type,
|
this.Type,
|
||||||
this.From,
|
this.From,
|
||||||
this.With,
|
this.With,
|
||||||
this.Data,
|
this.Data,
|
||||||
stringFromBool(this.Uniq, " UNIQUE", ""),
|
stringFromBool(this.Uniq, " UNIQUE", ""),
|
||||||
this.Echo,
|
this.Echo,
|
||||||
|
stringFromDuration(this.Timeout, fmt.Sprintf(" TIMEOUT %v", this.Timeout.String()), ""),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -180,6 +180,7 @@ const (
|
||||||
SOMECONTAINEDIN
|
SOMECONTAINEDIN
|
||||||
START
|
START
|
||||||
TABLE
|
TABLE
|
||||||
|
TIMEOUT
|
||||||
TO
|
TO
|
||||||
TOKEN
|
TOKEN
|
||||||
TRANSACTION
|
TRANSACTION
|
||||||
|
@ -346,6 +347,7 @@ var tokens = [...]string{
|
||||||
SOMECONTAINEDIN: "SOMECONTAINEDIN",
|
SOMECONTAINEDIN: "SOMECONTAINEDIN",
|
||||||
START: "START",
|
START: "START",
|
||||||
TABLE: "TABLE",
|
TABLE: "TABLE",
|
||||||
|
TIMEOUT: "TIMEOUT",
|
||||||
TO: "TO",
|
TO: "TO",
|
||||||
TOKEN: "TOKEN",
|
TOKEN: "TOKEN",
|
||||||
TRANSACTION: "TRANSACTION",
|
TRANSACTION: "TRANSACTION",
|
||||||
|
|
|
@ -45,6 +45,10 @@ func (p *parser) parseUpdateStatement() (stmt *UpdateStatement, err error) {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if stmt.Timeout, err = p.parseTimeout(); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
if _, _, err = p.shouldBe(EOF, RPAREN, SEMICOLON); err != nil {
|
if _, _, err = p.shouldBe(EOF, RPAREN, SEMICOLON); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue