Remove custom PARALLEL config support from queries

This commit is contained in:
Tobie Morgan Hitchcock 2019-01-22 20:33:15 +00:00
parent efeb186200
commit 62f02015ba
14 changed files with 1705 additions and 1988 deletions

View file

@ -11,23 +11,23 @@ import (
const ( const (
// ----- content types ---- // ----- content types ----
codecSelferCcUTF84278 = 1 codecSelferCcUTF89522 = 1
codecSelferCcRAW4278 = 0 codecSelferCcRAW9522 = 0
// ----- value types used ---- // ----- value types used ----
codecSelferValueTypeArray4278 = 10 codecSelferValueTypeArray9522 = 10
codecSelferValueTypeMap4278 = 9 codecSelferValueTypeMap9522 = 9
codecSelferValueTypeString4278 = 6 codecSelferValueTypeString9522 = 6
codecSelferValueTypeInt4278 = 2 codecSelferValueTypeInt9522 = 2
codecSelferValueTypeUint4278 = 3 codecSelferValueTypeUint9522 = 3
codecSelferValueTypeFloat4278 = 4 codecSelferValueTypeFloat9522 = 4
codecSelferBitsize4278 = uint8(32 << (^uint(0) >> 63)) codecSelferBitsize9522 = uint8(32 << (^uint(0) >> 63))
) )
var ( var (
errCodecSelferOnlyMapOrArrayEncodeToStruct4278 = errors.New(`only encoded map or array can be decoded into a struct`) errCodecSelferOnlyMapOrArrayEncodeToStruct9522 = errors.New(`only encoded map or array can be decoded into a struct`)
) )
type codecSelfer4278 struct{} type codecSelfer9522 struct{}
func init() { func init() {
if codec1978.GenVersion != 8 { if codec1978.GenVersion != 8 {
@ -39,7 +39,7 @@ func init() {
} }
func (x *Response) CodecEncodeSelf(e *codec1978.Encoder) { func (x *Response) CodecEncodeSelf(e *codec1978.Encoder) {
var h codecSelfer4278 var h codecSelfer9522
z, r := codec1978.GenHelperEncoder(e) z, r := codec1978.GenHelperEncoder(e)
_, _, _ = h, z, r _, _, _ = h, z, r
if x == nil { if x == nil {
@ -77,19 +77,19 @@ func (x *Response) CodecEncodeSelf(e *codec1978.Encoder) {
if yyq2[0] { if yyq2[0] {
if false { if false {
} else { } else {
r.EncodeString(codecSelferCcUTF84278, string(x.Time)) r.EncodeString(codecSelferCcUTF89522, string(x.Time))
} }
} else { } else {
r.EncodeString(codecSelferCcUTF84278, "") r.EncodeString(codecSelferCcUTF89522, "")
} }
} else { } else {
if yyq2[0] { if yyq2[0] {
r.WriteMapElemKey() r.WriteMapElemKey()
r.EncodeString(codecSelferCcUTF84278, `time`) r.EncodeString(codecSelferCcUTF89522, `time`)
r.WriteMapElemValue() r.WriteMapElemValue()
if false { if false {
} else { } else {
r.EncodeString(codecSelferCcUTF84278, string(x.Time)) r.EncodeString(codecSelferCcUTF89522, string(x.Time))
} }
} }
} }
@ -98,19 +98,19 @@ func (x *Response) CodecEncodeSelf(e *codec1978.Encoder) {
if yyq2[1] { if yyq2[1] {
if false { if false {
} else { } else {
r.EncodeString(codecSelferCcUTF84278, string(x.Status)) r.EncodeString(codecSelferCcUTF89522, string(x.Status))
} }
} else { } else {
r.EncodeString(codecSelferCcUTF84278, "") r.EncodeString(codecSelferCcUTF89522, "")
} }
} else { } else {
if yyq2[1] { if yyq2[1] {
r.WriteMapElemKey() r.WriteMapElemKey()
r.EncodeString(codecSelferCcUTF84278, `status`) r.EncodeString(codecSelferCcUTF89522, `status`)
r.WriteMapElemValue() r.WriteMapElemValue()
if false { if false {
} else { } else {
r.EncodeString(codecSelferCcUTF84278, string(x.Status)) r.EncodeString(codecSelferCcUTF89522, string(x.Status))
} }
} }
} }
@ -119,19 +119,19 @@ func (x *Response) CodecEncodeSelf(e *codec1978.Encoder) {
if yyq2[2] { if yyq2[2] {
if false { if false {
} else { } else {
r.EncodeString(codecSelferCcUTF84278, string(x.Detail)) r.EncodeString(codecSelferCcUTF89522, string(x.Detail))
} }
} else { } else {
r.EncodeString(codecSelferCcUTF84278, "") r.EncodeString(codecSelferCcUTF89522, "")
} }
} else { } else {
if yyq2[2] { if yyq2[2] {
r.WriteMapElemKey() r.WriteMapElemKey()
r.EncodeString(codecSelferCcUTF84278, `detail`) r.EncodeString(codecSelferCcUTF89522, `detail`)
r.WriteMapElemValue() r.WriteMapElemValue()
if false { if false {
} else { } else {
r.EncodeString(codecSelferCcUTF84278, string(x.Detail)) r.EncodeString(codecSelferCcUTF89522, string(x.Detail))
} }
} }
} }
@ -152,7 +152,7 @@ func (x *Response) CodecEncodeSelf(e *codec1978.Encoder) {
} else { } else {
if yyq2[3] { if yyq2[3] {
r.WriteMapElemKey() r.WriteMapElemKey()
r.EncodeString(codecSelferCcUTF84278, `result`) r.EncodeString(codecSelferCcUTF89522, `result`)
r.WriteMapElemValue() r.WriteMapElemValue()
if x.Result == nil { if x.Result == nil {
r.EncodeNil() r.EncodeNil()
@ -174,7 +174,7 @@ func (x *Response) CodecEncodeSelf(e *codec1978.Encoder) {
} }
func (x *Response) CodecDecodeSelf(d *codec1978.Decoder) { func (x *Response) CodecDecodeSelf(d *codec1978.Decoder) {
var h codecSelfer4278 var h codecSelfer9522
z, r := codec1978.GenHelperDecoder(d) z, r := codec1978.GenHelperDecoder(d)
_, _, _ = h, z, r _, _, _ = h, z, r
if false { if false {
@ -182,14 +182,14 @@ func (x *Response) CodecDecodeSelf(d *codec1978.Decoder) {
z.DecExtension(x, yyxt1) z.DecExtension(x, yyxt1)
} else { } else {
yyct2 := r.ContainerType() yyct2 := r.ContainerType()
if yyct2 == codecSelferValueTypeMap4278 { if yyct2 == codecSelferValueTypeMap9522 {
yyl2 := r.ReadMapStart() yyl2 := r.ReadMapStart()
if yyl2 == 0 { if yyl2 == 0 {
r.ReadMapEnd() r.ReadMapEnd()
} else { } else {
x.codecDecodeSelfFromMap(yyl2, d) x.codecDecodeSelfFromMap(yyl2, d)
} }
} else if yyct2 == codecSelferValueTypeArray4278 { } else if yyct2 == codecSelferValueTypeArray9522 {
yyl2 := r.ReadArrayStart() yyl2 := r.ReadArrayStart()
if yyl2 == 0 { if yyl2 == 0 {
r.ReadArrayEnd() r.ReadArrayEnd()
@ -197,13 +197,13 @@ func (x *Response) CodecDecodeSelf(d *codec1978.Decoder) {
x.codecDecodeSelfFromArray(yyl2, d) x.codecDecodeSelfFromArray(yyl2, d)
} }
} else { } else {
panic(errCodecSelferOnlyMapOrArrayEncodeToStruct4278) panic(errCodecSelferOnlyMapOrArrayEncodeToStruct9522)
} }
} }
} }
func (x *Response) codecDecodeSelfFromMap(l int, d *codec1978.Decoder) { func (x *Response) codecDecodeSelfFromMap(l int, d *codec1978.Decoder) {
var h codecSelfer4278 var h codecSelfer9522
z, r := codec1978.GenHelperDecoder(d) z, r := codec1978.GenHelperDecoder(d)
_, _, _ = h, z, r _, _, _ = h, z, r
var yyhl3 bool = l >= 0 var yyhl3 bool = l >= 0
@ -256,7 +256,7 @@ func (x *Response) codecDecodeSelfFromMap(l int, d *codec1978.Decoder) {
} }
func (x *Response) codecDecodeSelfFromArray(l int, d *codec1978.Decoder) { func (x *Response) codecDecodeSelfFromArray(l int, d *codec1978.Decoder) {
var h codecSelfer4278 var h codecSelfer9522
z, r := codec1978.GenHelperDecoder(d) z, r := codec1978.GenHelperDecoder(d)
_, _, _ = h, z, r _, _, _ = h, z, r
var yyj9 int var yyj9 int
@ -346,7 +346,7 @@ func (x *Response) codecDecodeSelfFromArray(l int, d *codec1978.Decoder) {
} }
func (x *Dispatch) CodecEncodeSelf(e *codec1978.Encoder) { func (x *Dispatch) CodecEncodeSelf(e *codec1978.Encoder) {
var h codecSelfer4278 var h codecSelfer9522
z, r := codec1978.GenHelperEncoder(e) z, r := codec1978.GenHelperEncoder(e)
_, _, _ = h, z, r _, _, _ = h, z, r
if x == nil { if x == nil {
@ -383,19 +383,19 @@ func (x *Dispatch) CodecEncodeSelf(e *codec1978.Encoder) {
if yyq2[0] { if yyq2[0] {
if false { if false {
} else { } else {
r.EncodeString(codecSelferCcUTF84278, string(x.Query)) r.EncodeString(codecSelferCcUTF89522, string(x.Query))
} }
} else { } else {
r.EncodeString(codecSelferCcUTF84278, "") r.EncodeString(codecSelferCcUTF89522, "")
} }
} else { } else {
if yyq2[0] { if yyq2[0] {
r.WriteMapElemKey() r.WriteMapElemKey()
r.EncodeString(codecSelferCcUTF84278, `query`) r.EncodeString(codecSelferCcUTF89522, `query`)
r.WriteMapElemValue() r.WriteMapElemValue()
if false { if false {
} else { } else {
r.EncodeString(codecSelferCcUTF84278, string(x.Query)) r.EncodeString(codecSelferCcUTF89522, string(x.Query))
} }
} }
} }
@ -404,19 +404,19 @@ func (x *Dispatch) CodecEncodeSelf(e *codec1978.Encoder) {
if yyq2[1] { if yyq2[1] {
if false { if false {
} else { } else {
r.EncodeString(codecSelferCcUTF84278, string(x.Action)) r.EncodeString(codecSelferCcUTF89522, string(x.Action))
} }
} else { } else {
r.EncodeString(codecSelferCcUTF84278, "") r.EncodeString(codecSelferCcUTF89522, "")
} }
} else { } else {
if yyq2[1] { if yyq2[1] {
r.WriteMapElemKey() r.WriteMapElemKey()
r.EncodeString(codecSelferCcUTF84278, `action`) r.EncodeString(codecSelferCcUTF89522, `action`)
r.WriteMapElemValue() r.WriteMapElemValue()
if false { if false {
} else { } else {
r.EncodeString(codecSelferCcUTF84278, string(x.Action)) r.EncodeString(codecSelferCcUTF89522, string(x.Action))
} }
} }
} }
@ -437,7 +437,7 @@ func (x *Dispatch) CodecEncodeSelf(e *codec1978.Encoder) {
} else { } else {
if yyq2[2] { if yyq2[2] {
r.WriteMapElemKey() r.WriteMapElemKey()
r.EncodeString(codecSelferCcUTF84278, `result`) r.EncodeString(codecSelferCcUTF89522, `result`)
r.WriteMapElemValue() r.WriteMapElemValue()
if x.Result == nil { if x.Result == nil {
r.EncodeNil() r.EncodeNil()
@ -459,7 +459,7 @@ func (x *Dispatch) CodecEncodeSelf(e *codec1978.Encoder) {
} }
func (x *Dispatch) CodecDecodeSelf(d *codec1978.Decoder) { func (x *Dispatch) CodecDecodeSelf(d *codec1978.Decoder) {
var h codecSelfer4278 var h codecSelfer9522
z, r := codec1978.GenHelperDecoder(d) z, r := codec1978.GenHelperDecoder(d)
_, _, _ = h, z, r _, _, _ = h, z, r
if false { if false {
@ -467,14 +467,14 @@ func (x *Dispatch) CodecDecodeSelf(d *codec1978.Decoder) {
z.DecExtension(x, yyxt1) z.DecExtension(x, yyxt1)
} else { } else {
yyct2 := r.ContainerType() yyct2 := r.ContainerType()
if yyct2 == codecSelferValueTypeMap4278 { if yyct2 == codecSelferValueTypeMap9522 {
yyl2 := r.ReadMapStart() yyl2 := r.ReadMapStart()
if yyl2 == 0 { if yyl2 == 0 {
r.ReadMapEnd() r.ReadMapEnd()
} else { } else {
x.codecDecodeSelfFromMap(yyl2, d) x.codecDecodeSelfFromMap(yyl2, d)
} }
} else if yyct2 == codecSelferValueTypeArray4278 { } else if yyct2 == codecSelferValueTypeArray9522 {
yyl2 := r.ReadArrayStart() yyl2 := r.ReadArrayStart()
if yyl2 == 0 { if yyl2 == 0 {
r.ReadArrayEnd() r.ReadArrayEnd()
@ -482,13 +482,13 @@ func (x *Dispatch) CodecDecodeSelf(d *codec1978.Decoder) {
x.codecDecodeSelfFromArray(yyl2, d) x.codecDecodeSelfFromArray(yyl2, d)
} }
} else { } else {
panic(errCodecSelferOnlyMapOrArrayEncodeToStruct4278) panic(errCodecSelferOnlyMapOrArrayEncodeToStruct9522)
} }
} }
} }
func (x *Dispatch) codecDecodeSelfFromMap(l int, d *codec1978.Decoder) { func (x *Dispatch) codecDecodeSelfFromMap(l int, d *codec1978.Decoder) {
var h codecSelfer4278 var h codecSelfer9522
z, r := codec1978.GenHelperDecoder(d) z, r := codec1978.GenHelperDecoder(d)
_, _, _ = h, z, r _, _, _ = h, z, r
var yyhl3 bool = l >= 0 var yyhl3 bool = l >= 0
@ -535,7 +535,7 @@ func (x *Dispatch) codecDecodeSelfFromMap(l int, d *codec1978.Decoder) {
} }
func (x *Dispatch) codecDecodeSelfFromArray(l int, d *codec1978.Decoder) { func (x *Dispatch) codecDecodeSelfFromArray(l int, d *codec1978.Decoder) {
var h codecSelfer4278 var h codecSelfer9522
z, r := codec1978.GenHelperDecoder(d) z, r := codec1978.GenHelperDecoder(d)
_, _, _ = h, z, r _, _, _ = h, z, r
var yyj8 int var yyj8 int

View file

@ -358,13 +358,12 @@ func (e *executor) fetchThing(ctx context.Context, val *sql.Thing, doc *data.Doc
} }
res, err := e.executeSelect(ctx, &sql.SelectStatement{ res, err := e.executeSelect(ctx, &sql.SelectStatement{
KV: cnf.Settings.DB.Base, KV: cnf.Settings.DB.Base,
NS: ctx.Value(ctxKeyNs).(string), NS: ctx.Value(ctxKeyNs).(string),
DB: ctx.Value(ctxKeyDb).(string), DB: ctx.Value(ctxKeyDb).(string),
Expr: []*sql.Field{{Expr: &sql.All{}}}, Expr: []*sql.Field{{Expr: &sql.All{}}},
What: []sql.Expr{val}, What: []sql.Expr{val},
Version: sql.Expr(ver), Version: sql.Expr(ver),
Parallel: 1,
}) })
if err != nil { if err != nil {
@ -394,13 +393,12 @@ func (e *executor) fetchArray(ctx context.Context, val []interface{}, doc *data.
} }
res, err := e.executeSelect(ctx, &sql.SelectStatement{ res, err := e.executeSelect(ctx, &sql.SelectStatement{
KV: cnf.Settings.DB.Base, KV: cnf.Settings.DB.Base,
NS: ctx.Value(ctxKeyNs).(string), NS: ctx.Value(ctxKeyNs).(string),
DB: ctx.Value(ctxKeyDb).(string), DB: ctx.Value(ctxKeyDb).(string),
Expr: []*sql.Field{{Expr: &sql.All{}}}, Expr: []*sql.Field{{Expr: &sql.All{}}},
What: []sql.Expr{val}, What: []sql.Expr{val},
Version: sql.Expr(ver), Version: sql.Expr(ver),
Parallel: 1,
}) })
if err != nil { if err != nil {

View file

@ -60,7 +60,6 @@ type iterator struct {
limit int limit int
start int start int
versn int64 versn int64
tasks int
} }
type workable struct { type workable struct {
@ -100,8 +99,8 @@ func newIterator(e *executor, ctx context.Context, stm sql.Statement, vir bool)
i.wait = sync.WaitGroup{} i.wait = sync.WaitGroup{}
i.fail = make(chan error, 1) i.fail = make(chan error, 1)
i.stop = make(chan struct{}) i.stop = make(chan struct{})
i.jobs = make(chan *workable, 1000) i.jobs = make(chan *workable, workerCount)
i.vals = make(chan *doneable, 1000) i.vals = make(chan *doneable, workerCount)
// Comment here // Comment here
@ -111,10 +110,6 @@ func newIterator(e *executor, ctx context.Context, stm sql.Statement, vir bool)
i.setupWorkers(ctx) i.setupWorkers(ctx)
// Comment here ...
i.watchVals(ctx)
return return
} }
@ -140,7 +135,6 @@ func (i *iterator) Close() {
i.limit = -1 i.limit = -1
i.start = -1 i.start = -1
i.versn = 0 i.versn = 0
i.tasks = 0
iteratorPool.Put(i) iteratorPool.Put(i)
@ -167,26 +161,18 @@ func (i *iterator) setupState(ctx context.Context) {
i.split = stm.Split i.split = stm.Split
i.group = stm.Group i.group = stm.Group
i.order = stm.Order i.order = stm.Order
i.tasks = stm.Parallel
case *sql.CreateStatement: case *sql.CreateStatement:
i.what = stm.What i.what = stm.What
i.tasks = stm.Parallel
case *sql.UpdateStatement: case *sql.UpdateStatement:
i.what = stm.What i.what = stm.What
i.cond = stm.Cond i.cond = stm.Cond
i.tasks = stm.Parallel
case *sql.DeleteStatement: case *sql.DeleteStatement:
i.what = stm.What i.what = stm.What
i.cond = stm.Cond i.cond = stm.Cond
i.tasks = stm.Parallel
case *sql.RelateStatement:
i.tasks = stm.Parallel
case *sql.InsertStatement: case *sql.InsertStatement:
i.what = sql.Exprs{stm.Data} i.what = sql.Exprs{stm.Data}
i.tasks = stm.Parallel
case *sql.UpsertStatement: case *sql.UpsertStatement:
i.what = sql.Exprs{stm.Data} i.what = sql.Exprs{stm.Data}
i.tasks = stm.Parallel
} }
if stm, ok := i.stm.(*sql.SelectStatement); ok { if stm, ok := i.stm.(*sql.SelectStatement); ok {
@ -237,27 +223,28 @@ func (i *iterator) checkState(ctx context.Context) bool {
func (i *iterator) setupWorkers(ctx context.Context) { func (i *iterator) setupWorkers(ctx context.Context) {
if i.checkState(ctx) { if !i.checkState(ctx) {
for w := 1; w <= ints.Between(1, workerCount, i.tasks); w++ { return
go i.setupWorker(ctx, i.jobs, i.vals) }
go func(vals <-chan *doneable) {
for v := range vals {
i.receive(v)
} }
}(i.vals)
for w := 1; w <= workerCount; w++ {
go func(jobs <-chan *workable, vals chan<- *doneable) {
for j := range jobs {
res, err := newDocument(i, j.key, j.val, j.doc).query(ctx, i.stm)
vals <- &doneable{res: res, err: err}
}
}(i.jobs, i.vals)
} }
} }
func (i *iterator) setupWorker(ctx context.Context, jobs chan *workable, vals chan *doneable) { func (i *iterator) deliver(key *keys.Thing, val kvs.KV, doc *data.Doc) {
for j := range jobs {
res, err := newDocument(i, j.key, j.val, j.doc).query(ctx, i.stm)
vals <- &doneable{res: res, err: err}
}
}
func (i *iterator) submitTask(key *keys.Thing, val kvs.KV, doc *data.Doc) {
i.wait.Add(1) i.wait.Add(1)
@ -265,16 +252,6 @@ func (i *iterator) submitTask(key *keys.Thing, val kvs.KV, doc *data.Doc) {
} }
func (i *iterator) watchVals(ctx context.Context) {
go func(vals <-chan *doneable) {
for val := range vals {
i.receive(val)
}
}(i.vals)
}
func (i *iterator) receive(val *doneable) { func (i *iterator) receive(val *doneable) {
defer i.wait.Done() defer i.wait.Done()
@ -529,7 +506,7 @@ func (i *iterator) processThing(ctx context.Context, key *keys.Thing) {
i.processPerms(ctx, key.NS, key.DB, key.TB) i.processPerms(ctx, key.NS, key.DB, key.TB)
if i.checkState(ctx) { if i.checkState(ctx) {
i.submitTask(key, nil, nil) i.deliver(key, nil, nil)
} }
} }
@ -578,7 +555,7 @@ func (i *iterator) processTable(ctx context.Context, key *keys.Table) {
for _, val := range vals { for _, val := range vals {
if i.checkState(ctx) { if i.checkState(ctx) {
i.submitTask(nil, val, nil) i.deliver(nil, val, nil)
continue continue
} }
} }
@ -608,7 +585,7 @@ func (i *iterator) processBatch(ctx context.Context, key *keys.Thing, qry *sql.B
if i.checkState(ctx) { if i.checkState(ctx) {
key := key.Copy() key := key.Copy()
key.TB, key.ID = val.TB, val.ID key.TB, key.ID = val.TB, val.ID
i.submitTask(key, nil, nil) i.deliver(key, nil, nil)
continue continue
} }
@ -635,7 +612,7 @@ func (i *iterator) processModel(ctx context.Context, key *keys.Thing, qry *sql.M
if i.checkState(ctx) { if i.checkState(ctx) {
key := key.Copy() key := key.Copy()
key.ID = guid.New().String() key.ID = guid.New().String()
i.submitTask(key, nil, nil) i.deliver(key, nil, nil)
continue continue
} }
@ -656,7 +633,7 @@ func (i *iterator) processModel(ctx context.Context, key *keys.Thing, qry *sql.M
if i.checkState(ctx) { if i.checkState(ctx) {
key := key.Copy() key := key.Copy()
key.ID = num key.ID = num
i.submitTask(key, nil, nil) i.deliver(key, nil, nil)
continue continue
} }
@ -677,7 +654,7 @@ func (i *iterator) processModel(ctx context.Context, key *keys.Thing, qry *sql.M
if i.checkState(ctx) { if i.checkState(ctx) {
key := key.Copy() key := key.Copy()
key.ID = num key.ID = num
i.submitTask(key, nil, nil) i.deliver(key, nil, nil)
continue continue
} }
@ -707,7 +684,7 @@ func (i *iterator) processOther(ctx context.Context, key *keys.Thing, val []inte
if i.checkState(ctx) { if i.checkState(ctx) {
key := key.Copy() key := key.Copy()
key.TB, key.ID = v.TB, v.ID key.TB, key.ID = v.TB, v.ID
i.submitTask(key, nil, nil) i.deliver(key, nil, nil)
continue continue
} }
@ -752,7 +729,7 @@ func (i *iterator) processQuery(ctx context.Context, key *keys.Thing, val []inte
if i.checkState(ctx) { if i.checkState(ctx) {
key := key.Copy() key := key.Copy()
key.TB, key.ID = v.TB, v.ID key.TB, key.ID = v.TB, v.ID
i.submitTask(key, nil, nil) i.deliver(key, nil, nil)
continue continue
} }
@ -762,7 +739,7 @@ func (i *iterator) processQuery(ctx context.Context, key *keys.Thing, val []inte
// of the data so we can process it. // of the data so we can process it.
if i.checkState(ctx) { if i.checkState(ctx) {
i.submitTask(nil, nil, data.Consume(v)) i.deliver(nil, nil, data.Consume(v))
continue continue
} }
@ -790,7 +767,7 @@ func (i *iterator) processArray(ctx context.Context, key *keys.Thing, val []inte
if i.checkState(ctx) { if i.checkState(ctx) {
key := key.Copy() key := key.Copy()
key.ID = v.ID key.ID = v.ID
i.submitTask(key, nil, nil) i.deliver(key, nil, nil)
continue continue
} }
@ -809,7 +786,7 @@ func (i *iterator) processArray(ctx context.Context, key *keys.Thing, val []inte
if i.checkState(ctx) { if i.checkState(ctx) {
key := key.Copy() key := key.Copy()
key.ID = thg.ID key.ID = thg.ID
i.submitTask(key, nil, data.Consume(v)) i.deliver(key, nil, data.Consume(v))
continue continue
} }
@ -821,7 +798,7 @@ func (i *iterator) processArray(ctx context.Context, key *keys.Thing, val []inte
if i.checkState(ctx) { if i.checkState(ctx) {
key := key.Copy() key := key.Copy()
key.ID = fld key.ID = fld
i.submitTask(key, nil, data.Consume(v)) i.deliver(key, nil, data.Consume(v))
continue continue
} }
@ -835,7 +812,7 @@ func (i *iterator) processArray(ctx context.Context, key *keys.Thing, val []inte
if i.checkState(ctx) { if i.checkState(ctx) {
key := key.Copy() key := key.Copy()
key.ID = guid.New().String() key.ID = guid.New().String()
i.submitTask(key, nil, data.Consume(v)) i.deliver(key, nil, data.Consume(v))
continue continue
} }

View file

@ -157,11 +157,10 @@ func (d *document) table(ctx context.Context, when method) (err error) {
func (d *document) tableDelete(ctx context.Context, tng *sql.Thing, exp sql.Fields) (err error) { func (d *document) tableDelete(ctx context.Context, tng *sql.Thing, exp sql.Fields) (err error) {
stm := &sql.DeleteStatement{ stm := &sql.DeleteStatement{
KV: d.key.KV, KV: d.key.KV,
NS: d.key.NS, NS: d.key.NS,
DB: d.key.DB, DB: d.key.DB,
What: sql.Exprs{tng}, What: sql.Exprs{tng},
Parallel: 1,
} }
key := &keys.Thing{KV: stm.KV, NS: stm.NS, DB: stm.DB, TB: tng.TB, ID: tng.ID} key := &keys.Thing{KV: stm.KV, NS: stm.NS, DB: stm.DB, TB: tng.TB, ID: tng.ID}
@ -184,12 +183,11 @@ func (d *document) tableUpdate(ctx context.Context, tng *sql.Thing, exp sql.Fiel
} }
stm := &sql.UpdateStatement{ stm := &sql.UpdateStatement{
KV: d.key.KV, KV: d.key.KV,
NS: d.key.NS, NS: d.key.NS,
DB: d.key.DB, DB: d.key.DB,
What: sql.Exprs{tng}, What: sql.Exprs{tng},
Data: &sql.ContentExpression{Data: res}, Data: &sql.ContentExpression{Data: res},
Parallel: 1,
} }
key := &keys.Thing{KV: stm.KV, NS: stm.NS, DB: stm.DB, TB: tng.TB, ID: tng.ID} key := &keys.Thing{KV: stm.KV, NS: stm.NS, DB: stm.DB, TB: tng.TB, ID: tng.ID}
@ -297,12 +295,11 @@ func (d *document) tableModify(ctx context.Context, tng *sql.Thing, exp sql.Fiel
} }
stm := &sql.UpdateStatement{ stm := &sql.UpdateStatement{
KV: d.key.KV, KV: d.key.KV,
NS: d.key.NS, NS: d.key.NS,
DB: d.key.DB, DB: d.key.DB,
What: sql.Exprs{tng}, What: sql.Exprs{tng},
Data: set, Data: set,
Parallel: 1,
} }
key := &keys.Thing{KV: stm.KV, NS: stm.NS, DB: stm.DB, TB: tng.TB, ID: tng.ID} key := &keys.Thing{KV: stm.KV, NS: stm.NS, DB: stm.DB, TB: tng.TB, ID: tng.ID}

View file

@ -76,7 +76,7 @@ const (
var ( var (
// workerCount specifies how many workers should be used // workerCount specifies how many workers should be used
// to process each query statement concurrently. // to process each query statement concurrently.
workerCount = runtime.NumCPU() * 2 workerCount = runtime.NumCPU()
// maxRecursiveQueries specifies how many queries will be // maxRecursiveQueries specifies how many queries will be
// processed recursively before the query is cancelled. // processed recursively before the query is cancelled.

File diff suppressed because it is too large Load diff

View file

@ -163,98 +163,91 @@ type KillStatement struct {
// SelectStatement represents a SQL SELECT statement. // SelectStatement represents a SQL SELECT statement.
type SelectStatement struct { type SelectStatement struct {
RW bool RW bool
KV string KV string
NS string NS string
DB string DB string
Expr Fields Expr Fields
What Exprs What Exprs
Cond Expr Cond Expr
Split Idents Split Idents
Group Groups Group Groups
Order Orders Order Orders
Limit Expr Limit Expr
Start Expr Start Expr
Fetch Fetchs Fetch Fetchs
Version Expr Version Expr
Timeout time.Duration Timeout time.Duration
Parallel int
} }
// CreateStatement represents a SQL CREATE statement. // CreateStatement represents a SQL CREATE statement.
type CreateStatement struct { type CreateStatement struct {
KV string KV string
NS string NS string
DB string DB string
What Exprs What Exprs
Data Expr Data Expr
Echo Token Echo Token
Timeout time.Duration Timeout time.Duration
Parallel int
} }
// UpdateStatement represents a SQL UPDATE statement. // UpdateStatement represents a SQL UPDATE statement.
type UpdateStatement struct { type UpdateStatement struct {
KV string KV string
NS string NS string
DB string DB string
What Exprs What Exprs
Data Expr Data Expr
Cond Expr Cond Expr
Echo Token Echo Token
Timeout time.Duration Timeout time.Duration
Parallel int
} }
// DeleteStatement represents a SQL DELETE statement. // DeleteStatement represents a SQL DELETE statement.
type DeleteStatement struct { type DeleteStatement struct {
KV string KV string
NS string NS string
DB string DB string
What Exprs What Exprs
Cond Expr Cond Expr
Echo Token Echo Token
Timeout time.Duration Timeout time.Duration
Parallel int
} }
// RelateStatement represents a SQL RELATE statement. // RelateStatement represents a SQL RELATE statement.
type RelateStatement struct { type RelateStatement struct {
KV string KV string
NS string NS string
DB string DB string
Type Expr Type Expr
From Exprs From Exprs
With Exprs With Exprs
Data Expr Data Expr
Uniq bool Uniq bool
Echo Token Echo Token
Timeout time.Duration Timeout time.Duration
Parallel int
} }
// InsertStatement represents a SQL INSERT statement. // InsertStatement represents a SQL INSERT statement.
type InsertStatement struct { type InsertStatement struct {
KV string KV string
NS string NS string
DB string DB string
Data Expr Data Expr
Into *Table Into *Table
Echo Token Echo Token
Timeout time.Duration Timeout time.Duration
Parallel int
} }
// UpsertStatement represents a SQL UPSERT statement. // UpsertStatement represents a SQL UPSERT statement.
type UpsertStatement struct { type UpsertStatement struct {
KV string KV string
NS string NS string
DB string DB string
Data Expr Data Expr
Into *Table Into *Table
Echo Token Echo Token
Timeout time.Duration Timeout time.Duration
Parallel int
} }
// -------------------------------------------------- // --------------------------------------------------

View file

@ -38,10 +38,6 @@ func (p *parser) parseCreateStatement() (stmt *CreateStatement, err error) {
return nil, err return nil, err
} }
if stmt.Parallel, err = p.parseParallel(); err != nil {
return nil, err
}
return return
} }

View file

@ -40,10 +40,6 @@ func (p *parser) parseDeleteStatement() (stmt *DeleteStatement, err error) {
return nil, err return nil, err
} }
if stmt.Parallel, err = p.parseParallel(); err != nil {
return nil, err
}
return return
} }

View file

@ -44,10 +44,6 @@ func (p *parser) parseInsertStatement() (stmt *InsertStatement, err error) {
return nil, err return nil, err
} }
if stmt.Parallel, err = p.parseParallel(); err != nil {
return nil, err
}
return return
} }

View file

@ -56,10 +56,6 @@ func (p *parser) parseRelateStatement() (stmt *RelateStatement, err error) {
return nil, err return nil, err
} }
if stmt.Parallel, err = p.parseParallel(); err != nil {
return nil, err
}
return return
} }

View file

@ -71,10 +71,6 @@ func (p *parser) parseSelectStatement() (stmt *SelectStatement, err error) {
return nil, err return nil, err
} }
if stmt.Parallel, err = p.parseParallel(); err != nil {
return nil, err
}
if err = checkExpression(aggrs, stmt.Expr, stmt.Group); err != nil { if err = checkExpression(aggrs, stmt.Expr, stmt.Group); err != nil {
return nil, err return nil, err
} }

View file

@ -42,10 +42,6 @@ func (p *parser) parseUpdateStatement() (stmt *UpdateStatement, err error) {
return nil, err return nil, err
} }
if stmt.Parallel, err = p.parseParallel(); err != nil {
return nil, err
}
return return
} }

View file

@ -44,10 +44,6 @@ func (p *parser) parseUpsertStatement() (stmt *UpsertStatement, err error) {
return nil, err return nil, err
} }
if stmt.Parallel, err = p.parseParallel(); err != nil {
return nil, err
}
return return
} }