Implement async document processing in queries

This commit is contained in:
Tobie Morgan Hitchcock 2019-06-14 18:33:41 +01:00
parent bcb1d3174c
commit db1864224b
12 changed files with 565 additions and 393 deletions

View file

@ -60,6 +60,7 @@ type iterator struct {
limit int
start int
versn int64
async bool
}
type workable struct {
@ -135,6 +136,7 @@ func (i *iterator) Close() {
i.limit = -1
i.start = -1
i.versn = 0
i.async = false
iteratorPool.Put(i)
@ -161,18 +163,24 @@ func (i *iterator) setupState(ctx context.Context) {
i.split = stm.Split
i.group = stm.Group
i.order = stm.Order
i.async = stm.Parallel
case *sql.CreateStatement:
i.what = stm.What
i.async = stm.Parallel
case *sql.UpdateStatement:
i.what = stm.What
i.cond = stm.Cond
i.async = stm.Parallel
case *sql.DeleteStatement:
i.what = stm.What
i.cond = stm.Cond
i.async = stm.Parallel
case *sql.InsertStatement:
i.what = sql.Exprs{stm.Data}
i.async = stm.Parallel
case *sql.UpsertStatement:
i.what = sql.Exprs{stm.Data}
i.async = stm.Parallel
}
if stm, ok := i.stm.(*sql.SelectStatement); ok {
@ -233,7 +241,13 @@ func (i *iterator) setupWorkers(ctx context.Context) {
}
}(i.vals)
for w := 1; w <= workerCount; w++ {
workers := 1
if i.async {
workers = workerCount
}
for w := 1; w <= workers; w++ {
go func(jobs <-chan *workable, vals chan<- *doneable) {
for j := range jobs {
res, err := newDocument(i, j.key, j.val, j.doc).query(ctx, i.stm)

View file

@ -16,6 +16,7 @@ package db
import (
"errors"
"runtime"
)
type method int8
@ -85,7 +86,7 @@ const (
var (
// workerCount specifies how many workers should be used
// to process each query statement concurrently.
workerCount = 1
workerCount = runtime.NumCPU()
// maxRecursiveQueries specifies how many queries will be
// processed recursively before the query is cancelled.

View file

@ -160,70 +160,77 @@ type IfelseStatement struct {
// SelectStatement represents a SQL SELECT statement.
type SelectStatement struct {
RW bool
Expr Fields
What Exprs
Cond Expr
Split Idents
Group Groups
Order Orders
Limit Expr
Start Expr
Fetch Fetchs
Version Expr
Timeout time.Duration
RW bool
Expr Fields
What Exprs
Cond Expr
Split Idents
Group Groups
Order Orders
Limit Expr
Start Expr
Fetch Fetchs
Version Expr
Timeout time.Duration
Parallel bool
}
// CreateStatement represents a SQL CREATE statement.
type CreateStatement struct {
What Exprs
Data Expr
Echo Token
Timeout time.Duration
What Exprs
Data Expr
Echo Token
Timeout time.Duration
Parallel bool
}
// UpdateStatement represents a SQL UPDATE statement.
type UpdateStatement struct {
What Exprs
Data Expr
Cond Expr
Echo Token
Timeout time.Duration
What Exprs
Data Expr
Cond Expr
Echo Token
Timeout time.Duration
Parallel bool
}
// DeleteStatement represents a SQL DELETE statement.
type DeleteStatement struct {
What Exprs
Cond Expr
Echo Token
Timeout time.Duration
What Exprs
Cond Expr
Echo Token
Timeout time.Duration
Parallel bool
}
// RelateStatement represents a SQL RELATE statement.
type RelateStatement struct {
Type Expr
From Exprs
With Exprs
Data Expr
Uniq bool
Echo Token
Timeout time.Duration
Type Expr
From Exprs
With Exprs
Data Expr
Uniq bool
Echo Token
Timeout time.Duration
Parallel bool
}
// InsertStatement represents a SQL INSERT statement.
type InsertStatement struct {
Data Expr
Into *Table
Echo Token
Timeout time.Duration
Data Expr
Into *Table
Echo Token
Timeout time.Duration
Parallel bool
}
// UpsertStatement represents a SQL UPSERT statement.
type UpsertStatement struct {
Data Expr
Into *Table
Echo Token
Timeout time.Duration
Data Expr
Into *Table
Echo Token
Timeout time.Duration
Parallel bool
}
// --------------------------------------------------

View file

@ -34,6 +34,10 @@ func (p *parser) parseCreateStatement() (stmt *CreateStatement, err error) {
return nil, err
}
if stmt.Parallel, err = p.parseParallel(); err != nil {
return nil, err
}
return
}

View file

@ -36,6 +36,10 @@ func (p *parser) parseDeleteStatement() (stmt *DeleteStatement, err error) {
return nil, err
}
if stmt.Parallel, err = p.parseParallel(); err != nil {
return nil, err
}
return
}

View file

@ -244,20 +244,20 @@ func (p *parser) parsePriority() (float64, error) {
}
func (p *parser) parseParallel() (int, error) {
func (p *parser) parseParallel() (bool, error) {
if _, _, exi := p.mightBe(PARALLEL); !exi {
return 0, nil
return true, nil
}
tok, lit, err := p.shouldBe(NUMBER)
tok, lit, err := p.shouldBe(TRUE, FALSE)
if err != nil {
return 0, &ParseError{Found: lit, Expected: []string{"number"}}
return true, &ParseError{Found: lit, Expected: []string{"true", "false"}}
}
val, err := p.declare(tok, lit)
return int(val.(float64)), err
return val.(bool), err
}

View file

@ -40,6 +40,10 @@ func (p *parser) parseInsertStatement() (stmt *InsertStatement, err error) {
return nil, err
}
if stmt.Parallel, err = p.parseParallel(); err != nil {
return nil, err
}
return
}

View file

@ -52,6 +52,10 @@ func (p *parser) parseRelateStatement() (stmt *RelateStatement, err error) {
return nil, err
}
if stmt.Parallel, err = p.parseParallel(); err != nil {
return nil, err
}
return
}

View file

@ -67,6 +67,10 @@ func (p *parser) parseSelectStatement() (stmt *SelectStatement, err error) {
return nil, err
}
if stmt.Parallel, err = p.parseParallel(); err != nil {
return nil, err
}
if err = checkExpression(aggrs, stmt.Expr, stmt.Group); err != nil {
return nil, err
}

File diff suppressed because it is too large Load diff

View file

@ -38,6 +38,10 @@ func (p *parser) parseUpdateStatement() (stmt *UpdateStatement, err error) {
return nil, err
}
if stmt.Parallel, err = p.parseParallel(); err != nil {
return nil, err
}
return
}

View file

@ -40,6 +40,10 @@ func (p *parser) parseUpsertStatement() (stmt *UpsertStatement, err error) {
return nil, err
}
if stmt.Parallel, err = p.parseParallel(); err != nil {
return nil, err
}
return
}