Don’t use concurrent workers in iterator

This commit is contained in:
Tobie Morgan Hitchcock 2019-11-20 16:59:17 +00:00
parent 16e3c6bf4e
commit c3b0bf3a26

View file

@ -18,7 +18,6 @@ import (
"fmt" "fmt"
"math" "math"
"sort" "sort"
"sync"
"context" "context"
@ -40,16 +39,12 @@ type iterator struct {
id int id int
err error
vir bool vir bool
err error
stm sql.Statement stm sql.Statement
res []interface{} res []interface{}
wait sync.WaitGroup
fail chan error
stop chan struct{} stop chan struct{}
jobs chan *workable
vals chan *doneable
expr sql.Fields expr sql.Fields
what sql.Exprs what sql.Exprs
@ -60,18 +55,6 @@ type iterator struct {
limit int limit int
start int start int
versn int64 versn int64
async bool
}
type workable struct {
key *keys.Thing
val kvs.KV
doc *data.Doc
}
type doneable struct {
res interface{}
err error
} }
type groupable struct { type groupable struct {
@ -92,24 +75,14 @@ func newIterator(e *executor, ctx context.Context, stm sql.Statement, vir bool)
i.id = rand.Int() i.id = rand.Int()
i.vir = vir
i.err = nil i.err = nil
i.stm = stm i.stm = stm
i.vir = vir
i.res = make([]interface{}, 0) i.res = make([]interface{}, 0)
i.wait = sync.WaitGroup{}
i.fail = make(chan error, 1)
i.stop = make(chan struct{}) i.stop = make(chan struct{})
i.jobs = make(chan *workable, workerCount)
i.vals = make(chan *doneable, workerCount)
// Comment here // Comment here
i.setup(ctx)
i.setupState(ctx)
// Comment here ...
i.setupWorkers(ctx)
return return
@ -121,11 +94,7 @@ func (i *iterator) Close() {
i.err = nil i.err = nil
i.stm = nil i.stm = nil
i.res = nil i.res = nil
i.fail = nil
i.stop = nil i.stop = nil
i.jobs = nil
i.vals = nil
i.expr = nil i.expr = nil
i.what = nil i.what = nil
@ -136,13 +105,12 @@ func (i *iterator) Close() {
i.limit = -1 i.limit = -1
i.start = -1 i.start = -1
i.versn = 0 i.versn = 0
i.async = false
iteratorPool.Put(i) iteratorPool.Put(i)
} }
func (i *iterator) setupState(ctx context.Context) { func (i *iterator) setup(ctx context.Context) {
i.expr = nil i.expr = nil
i.what = nil i.what = nil
@ -150,7 +118,6 @@ func (i *iterator) setupState(ctx context.Context) {
i.split = nil i.split = nil
i.group = nil i.group = nil
i.order = nil i.order = nil
i.split = nil
i.limit = -1 i.limit = -1
i.start = -1 i.start = -1
i.versn = math.MaxInt64 i.versn = math.MaxInt64
@ -163,24 +130,18 @@ func (i *iterator) setupState(ctx context.Context) {
i.split = stm.Split i.split = stm.Split
i.group = stm.Group i.group = stm.Group
i.order = stm.Order i.order = stm.Order
i.async = stm.Parallel
case *sql.CreateStatement: case *sql.CreateStatement:
i.what = stm.What i.what = stm.What
i.async = stm.Parallel
case *sql.UpdateStatement: case *sql.UpdateStatement:
i.what = stm.What i.what = stm.What
i.cond = stm.Cond i.cond = stm.Cond
i.async = stm.Parallel
case *sql.DeleteStatement: case *sql.DeleteStatement:
i.what = stm.What i.what = stm.What
i.cond = stm.Cond i.cond = stm.Cond
i.async = stm.Parallel
case *sql.InsertStatement: case *sql.InsertStatement:
i.what = sql.Exprs{stm.Data} i.what = sql.Exprs{stm.Data}
i.async = stm.Parallel
case *sql.UpsertStatement: case *sql.UpsertStatement:
i.what = sql.Exprs{stm.Data} i.what = sql.Exprs{stm.Data}
i.async = stm.Parallel
} }
if stm, ok := i.stm.(*sql.SelectStatement); ok { if stm, ok := i.stm.(*sql.SelectStatement); ok {
@ -216,7 +177,7 @@ func (i *iterator) setupState(ctx context.Context) {
} }
func (i *iterator) checkState(ctx context.Context) bool { func (i *iterator) check(ctx context.Context) bool {
select { select {
case <-ctx.Done(): case <-ctx.Done():
@ -229,57 +190,22 @@ func (i *iterator) checkState(ctx context.Context) bool {
} }
func (i *iterator) setupWorkers(ctx context.Context) { func (i *iterator) process(ctx context.Context, key *keys.Thing, val kvs.KV, doc *data.Doc) {
if !i.checkState(ctx) { res, err := newDocument(i, key, val, doc).query(ctx, i.stm)
return
}
go func(vals <-chan *doneable) {
for v := range vals {
i.receive(v)
}
}(i.vals)
workers := 1
if i.async {
workers = workerCount
}
for w := 1; w <= workers; w++ {
go func(jobs <-chan *workable, vals chan<- *doneable) {
for j := range jobs {
res, err := newDocument(i, j.key, j.val, j.doc).query(ctx, i.stm)
vals <- &doneable{res: res, err: err}
}
}(i.jobs, i.vals)
}
}
func (i *iterator) deliver(key *keys.Thing, val kvs.KV, doc *data.Doc) {
i.wait.Add(1)
i.jobs <- &workable{key: key, val: val, doc: doc}
}
func (i *iterator) receive(val *doneable) {
defer i.wait.Done()
// If an error was received from the // If an error was received from the
// worker, then set the error if no // worker, then set the error if no
// previous iterator error has occured. // previous iterator error has occured.
if val.err != nil { if err != nil {
select { select {
case <-ctx.Done():
return
case <-i.stop: case <-i.stop:
return return
default: default:
i.fail <- val.err i.err = err
close(i.stop) close(i.stop)
return return
} }
@ -289,8 +215,8 @@ func (i *iterator) receive(val *doneable) {
// to the iterator result slice so // to the iterator result slice so
// that it is ready for processing. // that it is ready for processing.
if val.res != nil { if res != nil {
i.res = append(i.res, val.res) i.res = append(i.res, res)
} }
// The statement does not have a limit // The statement does not have a limit
@ -326,6 +252,8 @@ func (i *iterator) receive(val *doneable) {
// query statement. // query statement.
select { select {
case <-ctx.Done():
return
case <-i.stop: case <-i.stop:
return return
default: default:
@ -519,8 +447,8 @@ func (i *iterator) processThing(ctx context.Context, key *keys.Thing) {
i.processPerms(ctx, key.NS, key.DB, key.TB) i.processPerms(ctx, key.NS, key.DB, key.TB)
if i.checkState(ctx) { if i.check(ctx) {
i.deliver(key, nil, nil) i.process(ctx, key, nil, nil)
} }
} }
@ -545,7 +473,7 @@ func (i *iterator) processTable(ctx context.Context, key *keys.Table) {
var vals []kvs.KV var vals []kvs.KV
if !i.checkState(ctx) { if !i.check(ctx) {
return return
} }
@ -568,8 +496,8 @@ func (i *iterator) processTable(ctx context.Context, key *keys.Table) {
// the items and process the records. // the items and process the records.
for _, val := range vals { for _, val := range vals {
if i.checkState(ctx) { if i.check(ctx) {
i.deliver(nil, val, nil) i.process(ctx, nil, val, nil)
continue continue
} }
} }
@ -596,10 +524,10 @@ func (i *iterator) processBatch(ctx context.Context, key *keys.Thing, qry *sql.B
// and specify the TB and ID for // and specify the TB and ID for
// each record. // each record.
if i.checkState(ctx) { if i.check(ctx) {
key := key.Copy() key := key.Copy()
key.TB, key.ID = val.TB, val.ID key.TB, key.ID = val.TB, val.ID
i.deliver(key, nil, nil) i.process(ctx, key, nil, nil)
continue continue
} }
@ -623,10 +551,10 @@ func (i *iterator) processModel(ctx context.Context, key *keys.Thing, qry *sql.M
for j := 1; j <= int(qry.MAX); j++ { for j := 1; j <= int(qry.MAX); j++ {
if i.checkState(ctx) { if i.check(ctx) {
key := key.Copy() key := key.Copy()
key.ID = guid.New().String() key.ID = guid.New().String()
i.deliver(key, nil, nil) i.process(ctx, key, nil, nil)
continue continue
} }
@ -644,10 +572,10 @@ func (i *iterator) processModel(ctx context.Context, key *keys.Thing, qry *sql.M
for num := qry.MIN; num <= qry.MAX; num = nums.FormatPlaces(num+qry.INC, dec) { for num := qry.MIN; num <= qry.MAX; num = nums.FormatPlaces(num+qry.INC, dec) {
if i.checkState(ctx) { if i.check(ctx) {
key := key.Copy() key := key.Copy()
key.ID = num key.ID = num
i.deliver(key, nil, nil) i.process(ctx, key, nil, nil)
continue continue
} }
@ -665,10 +593,10 @@ func (i *iterator) processModel(ctx context.Context, key *keys.Thing, qry *sql.M
for num := qry.MIN; num >= qry.MAX; num = nums.FormatPlaces(num-qry.INC, dec) { for num := qry.MIN; num >= qry.MAX; num = nums.FormatPlaces(num-qry.INC, dec) {
if i.checkState(ctx) { if i.check(ctx) {
key := key.Copy() key := key.Copy()
key.ID = num key.ID = num
i.deliver(key, nil, nil) i.process(ctx, key, nil, nil)
continue continue
} }
@ -695,10 +623,10 @@ func (i *iterator) processOther(ctx context.Context, key *keys.Thing, val []inte
// the ID only, and we can query the // the ID only, and we can query the
// record further after loading it. // record further after loading it.
if i.checkState(ctx) { if i.check(ctx) {
key := key.Copy() key := key.Copy()
key.TB, key.ID = v.TB, v.ID key.TB, key.ID = v.TB, v.ID
i.deliver(key, nil, nil) i.process(ctx, key, nil, nil)
continue continue
} }
@ -706,13 +634,13 @@ func (i *iterator) processOther(ctx context.Context, key *keys.Thing, val []inte
switch i.stm.(type) { switch i.stm.(type) {
case *sql.CreateStatement: case *sql.CreateStatement:
i.fail <- fmt.Errorf("Can not execute CREATE query using value '%v'", val) i.err = fmt.Errorf("Can not execute CREATE query using value '%v'", val)
case *sql.UpdateStatement: case *sql.UpdateStatement:
i.fail <- fmt.Errorf("Can not execute UPDATE query using value '%v'", val) i.err = fmt.Errorf("Can not execute UPDATE query using value '%v'", val)
case *sql.DeleteStatement: case *sql.DeleteStatement:
i.fail <- fmt.Errorf("Can not execute DELETE query using value '%v'", val) i.err = fmt.Errorf("Can not execute DELETE query using value '%v'", val)
case *sql.RelateStatement: case *sql.RelateStatement:
i.fail <- fmt.Errorf("Can not execute RELATE query using value '%v'", val) i.err = fmt.Errorf("Can not execute RELATE query using value '%v'", val)
} }
close(i.stop) close(i.stop)
@ -740,10 +668,10 @@ func (i *iterator) processQuery(ctx context.Context, key *keys.Thing, val []inte
// the ID only, and we can query the // the ID only, and we can query the
// record further after loading it. // record further after loading it.
if i.checkState(ctx) { if i.check(ctx) {
key := key.Copy() key := key.Copy()
key.TB, key.ID = v.TB, v.ID key.TB, key.ID = v.TB, v.ID
i.deliver(key, nil, nil) i.process(ctx, key, nil, nil)
continue continue
} }
@ -752,8 +680,8 @@ func (i *iterator) processQuery(ctx context.Context, key *keys.Thing, val []inte
// Otherwise let's just load up all // Otherwise let's just load up all
// of the data so we can process it. // of the data so we can process it.
if i.checkState(ctx) { if i.check(ctx) {
i.deliver(nil, nil, data.Consume(v)) i.process(ctx, nil, nil, data.Consume(v))
continue continue
} }
@ -778,10 +706,10 @@ func (i *iterator) processArray(ctx context.Context, key *keys.Thing, val []inte
// If the item is a *sql.Thing then // If the item is a *sql.Thing then
// this was a subquery, so use the ID. // this was a subquery, so use the ID.
if i.checkState(ctx) { if i.check(ctx) {
key := key.Copy() key := key.Copy()
key.ID = v.ID key.ID = v.ID
i.deliver(key, nil, nil) i.process(ctx, key, nil, nil)
continue continue
} }
@ -797,10 +725,10 @@ func (i *iterator) processArray(ctx context.Context, key *keys.Thing, val []inte
// If the ID is a *sql.Thing then this // If the ID is a *sql.Thing then this
// was a subquery, so use the ID. // was a subquery, so use the ID.
if i.checkState(ctx) { if i.check(ctx) {
key := key.Copy() key := key.Copy()
key.ID = thg.ID key.ID = thg.ID
i.deliver(key, nil, data.Consume(v)) i.process(ctx, key, nil, data.Consume(v))
continue continue
} }
@ -809,10 +737,10 @@ func (i *iterator) processArray(ctx context.Context, key *keys.Thing, val []inte
// If not, then take the whole ID and // If not, then take the whole ID and
// use that as the ID of the new record. // use that as the ID of the new record.
if i.checkState(ctx) { if i.check(ctx) {
key := key.Copy() key := key.Copy()
key.ID = fld key.ID = fld
i.deliver(key, nil, data.Consume(v)) i.process(ctx, key, nil, data.Consume(v))
continue continue
} }
@ -823,10 +751,10 @@ func (i *iterator) processArray(ctx context.Context, key *keys.Thing, val []inte
// If there is no ID field, then create // If there is no ID field, then create
// a unique id for the new record. // a unique id for the new record.
if i.checkState(ctx) { if i.check(ctx) {
key := key.Copy() key := key.Copy()
key.ID = guid.New().String() key.ID = guid.New().String()
i.deliver(key, nil, data.Consume(v)) i.process(ctx, key, nil, data.Consume(v))
continue continue
} }
@ -844,23 +772,10 @@ func (i *iterator) Yield(ctx context.Context) (out []interface{}, err error) {
defer i.Close() defer i.Close()
i.wait.Wait()
close(i.jobs)
close(i.vals)
if i.err != nil { if i.err != nil {
return nil, i.err return nil, i.err
} }
if i.err == nil {
select {
default:
case i.err = <-i.fail:
return nil, i.err
}
}
if len(i.split) > 0 { if len(i.split) > 0 {
i.res = i.Split(ctx, i.res) i.res = i.Split(ctx, i.res)
} }