Fix incremental table processing in iterator
This commit is contained in:
parent
e24cc5b620
commit
6c939c756c
2 changed files with 24 additions and 1 deletions
|
@ -579,13 +579,15 @@ func (i *iterator) processTable(ctx context.Context, key *keys.Table) {
|
||||||
beg := &keys.Thing{KV: key.KV, NS: key.NS, DB: key.DB, TB: key.TB, ID: keys.Ignore}
|
beg := &keys.Thing{KV: key.KV, NS: key.NS, DB: key.DB, TB: key.TB, ID: keys.Ignore}
|
||||||
end := &keys.Thing{KV: key.KV, NS: key.NS, DB: key.DB, TB: key.TB, ID: keys.Suffix}
|
end := &keys.Thing{KV: key.KV, NS: key.NS, DB: key.DB, TB: key.TB, ID: keys.Suffix}
|
||||||
|
|
||||||
|
min, max := beg.Encode(), end.Encode()
|
||||||
|
|
||||||
for x := 0; ; x = 1 {
|
for x := 0; ; x = 1 {
|
||||||
|
|
||||||
if !i.checkState(ctx) {
|
if !i.checkState(ctx) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
vals, err := i.e.dbo.GetR(i.versn, beg.Encode(), end.Encode(), 1000)
|
vals, err := i.e.dbo.GetR(i.versn, min, max, 1000)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
i.fail <- err
|
i.fail <- err
|
||||||
return
|
return
|
||||||
|
@ -616,6 +618,8 @@ func (i *iterator) processTable(ctx context.Context, key *keys.Table) {
|
||||||
|
|
||||||
beg.Decode(vals[len(vals)-1].Key())
|
beg.Decode(vals[len(vals)-1].Key())
|
||||||
|
|
||||||
|
min = append(beg.Encode(), byte(0))
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -1073,6 +1073,25 @@ func TestSelect(t *testing.T) {
|
||||||
|
|
||||||
})
|
})
|
||||||
|
|
||||||
|
Convey("Group and retrieve more than 1000 records to test incremental processing", t, func() {
|
||||||
|
|
||||||
|
setupDB()
|
||||||
|
|
||||||
|
txt := `
|
||||||
|
USE NS test DB test;
|
||||||
|
CREATE |person:1..10000|;
|
||||||
|
SELECT meta.tb, count(*) AS test FROM person GROUP BY meta.tb;
|
||||||
|
`
|
||||||
|
|
||||||
|
res, err := Execute(setupKV(), txt, nil)
|
||||||
|
So(err, ShouldBeNil)
|
||||||
|
So(res, ShouldHaveLength, 3)
|
||||||
|
So(res[1].Result, ShouldHaveLength, 10000)
|
||||||
|
So(res[2].Result, ShouldHaveLength, 1)
|
||||||
|
So(data.Consume(res[2].Result[0]).Get("test").Data(), ShouldEqual, 10000)
|
||||||
|
|
||||||
|
})
|
||||||
|
|
||||||
Convey("Order records ascending", t, func() {
|
Convey("Order records ascending", t, func() {
|
||||||
|
|
||||||
setupDB()
|
setupDB()
|
||||||
|
|
Loading…
Reference in a new issue