From c3b0bf3a26b4026defad0a6f1ecd74547b0c3df5 Mon Sep 17 00:00:00 2001 From: Tobie Morgan Hitchcock Date: Wed, 20 Nov 2019 16:59:17 +0000 Subject: [PATCH] =?UTF-8?q?Don=E2=80=99t=20use=20concurrent=20workers=20in?= =?UTF-8?q?=20iterator?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- db/iterator.go | 177 +++++++++++++------------------------------------ 1 file changed, 46 insertions(+), 131 deletions(-) diff --git a/db/iterator.go b/db/iterator.go index e89b4f88..aced2f01 100644 --- a/db/iterator.go +++ b/db/iterator.go @@ -18,7 +18,6 @@ import ( "fmt" "math" "sort" - "sync" "context" @@ -40,16 +39,12 @@ type iterator struct { id int - err error vir bool + err error stm sql.Statement res []interface{} - wait sync.WaitGroup - fail chan error stop chan struct{} - jobs chan *workable - vals chan *doneable expr sql.Fields what sql.Exprs @@ -60,18 +55,6 @@ type iterator struct { limit int start int versn int64 - async bool -} - -type workable struct { - key *keys.Thing - val kvs.KV - doc *data.Doc -} - -type doneable struct { - res interface{} - err error } type groupable struct { @@ -92,24 +75,14 @@ func newIterator(e *executor, ctx context.Context, stm sql.Statement, vir bool) i.id = rand.Int() + i.vir = vir i.err = nil i.stm = stm - i.vir = vir i.res = make([]interface{}, 0) - - i.wait = sync.WaitGroup{} - i.fail = make(chan error, 1) i.stop = make(chan struct{}) - i.jobs = make(chan *workable, workerCount) - i.vals = make(chan *doneable, workerCount) // Comment here - - i.setupState(ctx) - - // Comment here ... - - i.setupWorkers(ctx) + i.setup(ctx) return @@ -121,11 +94,7 @@ func (i *iterator) Close() { i.err = nil i.stm = nil i.res = nil - - i.fail = nil i.stop = nil - i.jobs = nil - i.vals = nil i.expr = nil i.what = nil @@ -136,13 +105,12 @@ func (i *iterator) Close() { i.limit = -1 i.start = -1 i.versn = 0 - i.async = false iteratorPool.Put(i) } -func (i *iterator) setupState(ctx context.Context) { +func (i *iterator) setup(ctx context.Context) { i.expr = nil i.what = nil @@ -150,7 +118,6 @@ func (i *iterator) setupState(ctx context.Context) { i.split = nil i.group = nil i.order = nil - i.split = nil i.limit = -1 i.start = -1 i.versn = math.MaxInt64 @@ -163,24 +130,18 @@ func (i *iterator) setupState(ctx context.Context) { i.split = stm.Split i.group = stm.Group i.order = stm.Order - i.async = stm.Parallel case *sql.CreateStatement: i.what = stm.What - i.async = stm.Parallel case *sql.UpdateStatement: i.what = stm.What i.cond = stm.Cond - i.async = stm.Parallel case *sql.DeleteStatement: i.what = stm.What i.cond = stm.Cond - i.async = stm.Parallel case *sql.InsertStatement: i.what = sql.Exprs{stm.Data} - i.async = stm.Parallel case *sql.UpsertStatement: i.what = sql.Exprs{stm.Data} - i.async = stm.Parallel } if stm, ok := i.stm.(*sql.SelectStatement); ok { @@ -216,7 +177,7 @@ func (i *iterator) setupState(ctx context.Context) { } -func (i *iterator) checkState(ctx context.Context) bool { +func (i *iterator) check(ctx context.Context) bool { select { case <-ctx.Done(): @@ -229,57 +190,22 @@ func (i *iterator) checkState(ctx context.Context) bool { } -func (i *iterator) setupWorkers(ctx context.Context) { +func (i *iterator) process(ctx context.Context, key *keys.Thing, val kvs.KV, doc *data.Doc) { - if !i.checkState(ctx) { - return - } - - go func(vals <-chan *doneable) { - for v := range vals { - i.receive(v) - } - }(i.vals) - - workers := 1 - - if i.async { - workers = workerCount - } - - for w := 1; w <= workers; w++ { - go func(jobs <-chan *workable, vals chan<- *doneable) { - for j := range jobs { - res, err := newDocument(i, j.key, j.val, j.doc).query(ctx, i.stm) - vals <- &doneable{res: res, err: err} - } - }(i.jobs, i.vals) - } - -} - -func (i *iterator) deliver(key *keys.Thing, val kvs.KV, doc *data.Doc) { - - i.wait.Add(1) - - i.jobs <- &workable{key: key, val: val, doc: doc} - -} - -func (i *iterator) receive(val *doneable) { - - defer i.wait.Done() + res, err := newDocument(i, key, val, doc).query(ctx, i.stm) // If an error was received from the // worker, then set the error if no // previous iterator error has occured. - if val.err != nil { + if err != nil { select { + case <-ctx.Done(): + return case <-i.stop: return default: - i.fail <- val.err + i.err = err close(i.stop) return } @@ -289,8 +215,8 @@ func (i *iterator) receive(val *doneable) { // to the iterator result slice so // that it is ready for processing. - if val.res != nil { - i.res = append(i.res, val.res) + if res != nil { + i.res = append(i.res, res) } // The statement does not have a limit @@ -326,6 +252,8 @@ func (i *iterator) receive(val *doneable) { // query statement. select { + case <-ctx.Done(): + return case <-i.stop: return default: @@ -519,8 +447,8 @@ func (i *iterator) processThing(ctx context.Context, key *keys.Thing) { i.processPerms(ctx, key.NS, key.DB, key.TB) - if i.checkState(ctx) { - i.deliver(key, nil, nil) + if i.check(ctx) { + i.process(ctx, key, nil, nil) } } @@ -545,7 +473,7 @@ func (i *iterator) processTable(ctx context.Context, key *keys.Table) { var vals []kvs.KV - if !i.checkState(ctx) { + if !i.check(ctx) { return } @@ -568,8 +496,8 @@ func (i *iterator) processTable(ctx context.Context, key *keys.Table) { // the items and process the records. for _, val := range vals { - if i.checkState(ctx) { - i.deliver(nil, val, nil) + if i.check(ctx) { + i.process(ctx, nil, val, nil) continue } } @@ -596,10 +524,10 @@ func (i *iterator) processBatch(ctx context.Context, key *keys.Thing, qry *sql.B // and specify the TB and ID for // each record. - if i.checkState(ctx) { + if i.check(ctx) { key := key.Copy() key.TB, key.ID = val.TB, val.ID - i.deliver(key, nil, nil) + i.process(ctx, key, nil, nil) continue } @@ -623,10 +551,10 @@ func (i *iterator) processModel(ctx context.Context, key *keys.Thing, qry *sql.M for j := 1; j <= int(qry.MAX); j++ { - if i.checkState(ctx) { + if i.check(ctx) { key := key.Copy() key.ID = guid.New().String() - i.deliver(key, nil, nil) + i.process(ctx, key, nil, nil) continue } @@ -644,10 +572,10 @@ func (i *iterator) processModel(ctx context.Context, key *keys.Thing, qry *sql.M for num := qry.MIN; num <= qry.MAX; num = nums.FormatPlaces(num+qry.INC, dec) { - if i.checkState(ctx) { + if i.check(ctx) { key := key.Copy() key.ID = num - i.deliver(key, nil, nil) + i.process(ctx, key, nil, nil) continue } @@ -665,10 +593,10 @@ func (i *iterator) processModel(ctx context.Context, key *keys.Thing, qry *sql.M for num := qry.MIN; num >= qry.MAX; num = nums.FormatPlaces(num-qry.INC, dec) { - if i.checkState(ctx) { + if i.check(ctx) { key := key.Copy() key.ID = num - i.deliver(key, nil, nil) + i.process(ctx, key, nil, nil) continue } @@ -695,10 +623,10 @@ func (i *iterator) processOther(ctx context.Context, key *keys.Thing, val []inte // the ID only, and we can query the // record further after loading it. - if i.checkState(ctx) { + if i.check(ctx) { key := key.Copy() key.TB, key.ID = v.TB, v.ID - i.deliver(key, nil, nil) + i.process(ctx, key, nil, nil) continue } @@ -706,13 +634,13 @@ func (i *iterator) processOther(ctx context.Context, key *keys.Thing, val []inte switch i.stm.(type) { case *sql.CreateStatement: - i.fail <- fmt.Errorf("Can not execute CREATE query using value '%v'", val) + i.err = fmt.Errorf("Can not execute CREATE query using value '%v'", val) case *sql.UpdateStatement: - i.fail <- fmt.Errorf("Can not execute UPDATE query using value '%v'", val) + i.err = fmt.Errorf("Can not execute UPDATE query using value '%v'", val) case *sql.DeleteStatement: - i.fail <- fmt.Errorf("Can not execute DELETE query using value '%v'", val) + i.err = fmt.Errorf("Can not execute DELETE query using value '%v'", val) case *sql.RelateStatement: - i.fail <- fmt.Errorf("Can not execute RELATE query using value '%v'", val) + i.err = fmt.Errorf("Can not execute RELATE query using value '%v'", val) } close(i.stop) @@ -740,10 +668,10 @@ func (i *iterator) processQuery(ctx context.Context, key *keys.Thing, val []inte // the ID only, and we can query the // record further after loading it. - if i.checkState(ctx) { + if i.check(ctx) { key := key.Copy() key.TB, key.ID = v.TB, v.ID - i.deliver(key, nil, nil) + i.process(ctx, key, nil, nil) continue } @@ -752,8 +680,8 @@ func (i *iterator) processQuery(ctx context.Context, key *keys.Thing, val []inte // Otherwise let's just load up all // of the data so we can process it. - if i.checkState(ctx) { - i.deliver(nil, nil, data.Consume(v)) + if i.check(ctx) { + i.process(ctx, nil, nil, data.Consume(v)) continue } @@ -778,10 +706,10 @@ func (i *iterator) processArray(ctx context.Context, key *keys.Thing, val []inte // If the item is a *sql.Thing then // this was a subquery, so use the ID. - if i.checkState(ctx) { + if i.check(ctx) { key := key.Copy() key.ID = v.ID - i.deliver(key, nil, nil) + i.process(ctx, key, nil, nil) continue } @@ -797,10 +725,10 @@ func (i *iterator) processArray(ctx context.Context, key *keys.Thing, val []inte // If the ID is a *sql.Thing then this // was a subquery, so use the ID. - if i.checkState(ctx) { + if i.check(ctx) { key := key.Copy() key.ID = thg.ID - i.deliver(key, nil, data.Consume(v)) + i.process(ctx, key, nil, data.Consume(v)) continue } @@ -809,10 +737,10 @@ func (i *iterator) processArray(ctx context.Context, key *keys.Thing, val []inte // If not, then take the whole ID and // use that as the ID of the new record. - if i.checkState(ctx) { + if i.check(ctx) { key := key.Copy() key.ID = fld - i.deliver(key, nil, data.Consume(v)) + i.process(ctx, key, nil, data.Consume(v)) continue } @@ -823,10 +751,10 @@ func (i *iterator) processArray(ctx context.Context, key *keys.Thing, val []inte // If there is no ID field, then create // a unique id for the new record. - if i.checkState(ctx) { + if i.check(ctx) { key := key.Copy() key.ID = guid.New().String() - i.deliver(key, nil, data.Consume(v)) + i.process(ctx, key, nil, data.Consume(v)) continue } @@ -844,23 +772,10 @@ func (i *iterator) Yield(ctx context.Context) (out []interface{}, err error) { defer i.Close() - i.wait.Wait() - - close(i.jobs) - close(i.vals) - if i.err != nil { return nil, i.err } - if i.err == nil { - select { - default: - case i.err = <-i.fail: - return nil, i.err - } - } - if len(i.split) > 0 { i.res = i.Split(ctx, i.res) }