Pass channels into goroutines to prevent data race
This commit is contained in:
parent
001d7f866c
commit
34aa10f9b8
1 changed files with 11 additions and 11 deletions
|
@ -236,24 +236,24 @@ func (i *iterator) setupWorkers(ctx context.Context) {
|
||||||
switch {
|
switch {
|
||||||
case i.tasks == 0:
|
case i.tasks == 0:
|
||||||
for w := 1; w <= workerCount; w++ {
|
for w := 1; w <= workerCount; w++ {
|
||||||
go i.setupWorker(ctx)
|
go i.setupWorker(ctx, i.jobs, i.vals)
|
||||||
}
|
}
|
||||||
default:
|
default:
|
||||||
for w := 1; w <= ints.Between(1, workerCount, i.tasks); w++ {
|
for w := 1; w <= ints.Between(1, workerCount, i.tasks); w++ {
|
||||||
go i.setupWorker(ctx)
|
go i.setupWorker(ctx, i.jobs, i.vals)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (i *iterator) setupWorker(ctx context.Context) {
|
func (i *iterator) setupWorker(ctx context.Context, jobs chan *workable, vals chan *doneable) {
|
||||||
|
|
||||||
for j := range i.jobs {
|
for j := range jobs {
|
||||||
|
|
||||||
res, err := newDocument(i, j.key, j.val, j.doc).query(ctx, i.stm)
|
res, err := newDocument(i, j.key, j.val, j.doc).query(ctx, i.stm)
|
||||||
|
|
||||||
i.vals <- &doneable{res: res, err: err}
|
vals <- &doneable{res: res, err: err}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -269,17 +269,17 @@ func (i *iterator) submitTask(key *keys.Thing, val kvs.KV, doc *data.Doc) {
|
||||||
|
|
||||||
func (i *iterator) checkWorker(ctx context.Context) {
|
func (i *iterator) checkWorker(ctx context.Context) {
|
||||||
|
|
||||||
go func() {
|
go func(fail chan error) {
|
||||||
for err := range i.fail {
|
for err := range fail {
|
||||||
i.receivedError(err)
|
i.receivedError(err)
|
||||||
}
|
}
|
||||||
}()
|
}(i.fail)
|
||||||
|
|
||||||
go func() {
|
go func(vals chan *doneable) {
|
||||||
for val := range i.vals {
|
for val := range vals {
|
||||||
i.receivedResult(val)
|
i.receivedResult(val)
|
||||||
}
|
}
|
||||||
}()
|
}(i.vals)
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue