Ensure linked records are fetched using the same query version

This commit is contained in:
Tobie Morgan Hitchcock 2018-04-28 20:17:21 +01:00
parent 14c0d93635
commit 99d050b238
3 changed files with 30 additions and 11 deletions

View file

@ -339,12 +339,18 @@ func (e *executor) fetchPaths(ctx context.Context, doc *data.Doc, exprs ...sql.E
func (e *executor) fetchThing(ctx context.Context, val *sql.Thing, doc *data.Doc) (interface{}, error) { func (e *executor) fetchThing(ctx context.Context, val *sql.Thing, doc *data.Doc) (interface{}, error) {
ver, err := e.fetchVersion(ctx, ctx.Value(ctxKeyVersion))
if err != nil {
return nil, err
}
res, err := e.executeSelect(ctx, &sql.SelectStatement{ res, err := e.executeSelect(ctx, &sql.SelectStatement{
KV: cnf.Settings.DB.Base, KV: cnf.Settings.DB.Base,
NS: ctx.Value(ctxKeyNs).(string), NS: ctx.Value(ctxKeyNs).(string),
DB: ctx.Value(ctxKeyDb).(string), DB: ctx.Value(ctxKeyDb).(string),
Expr: []*sql.Field{{Expr: &sql.All{}}}, Expr: []*sql.Field{{Expr: &sql.All{}}},
What: []sql.Expr{val}, What: []sql.Expr{val},
Version: sql.Expr(ver),
Parallel: 1, Parallel: 1,
}) })
@ -362,12 +368,18 @@ func (e *executor) fetchThing(ctx context.Context, val *sql.Thing, doc *data.Doc
func (e *executor) fetchArray(ctx context.Context, val []interface{}, doc *data.Doc) (interface{}, error) { func (e *executor) fetchArray(ctx context.Context, val []interface{}, doc *data.Doc) (interface{}, error) {
ver, err := e.fetchVersion(ctx, ctx.Value(ctxKeyVersion))
if err != nil {
return nil, err
}
res, err := e.executeSelect(ctx, &sql.SelectStatement{ res, err := e.executeSelect(ctx, &sql.SelectStatement{
KV: cnf.Settings.DB.Base, KV: cnf.Settings.DB.Base,
NS: ctx.Value(ctxKeyNs).(string), NS: ctx.Value(ctxKeyNs).(string),
DB: ctx.Value(ctxKeyDb).(string), DB: ctx.Value(ctxKeyDb).(string),
Expr: []*sql.Field{{Expr: &sql.All{}}}, Expr: []*sql.Field{{Expr: &sql.All{}}},
What: []sql.Expr{val}, What: []sql.Expr{val},
Version: sql.Expr(ver),
Parallel: 1, Parallel: 1,
}) })
@ -442,6 +454,10 @@ func (e *executor) fetchStart(ctx context.Context, val sql.Expr) (int, error) {
func (e *executor) fetchVersion(ctx context.Context, val sql.Expr) (int64, error) { func (e *executor) fetchVersion(ctx context.Context, val sql.Expr) (int64, error) {
if v, ok := val.(int64); ok {
return v, nil
}
v, err := e.fetch(ctx, val, nil) v, err := e.fetch(ctx, val, nil)
if err != nil { if err != nil {
return math.MaxInt64, err return math.MaxInt64, err

View file

@ -24,6 +24,8 @@ import (
func (e *executor) executeSelect(ctx context.Context, stm *sql.SelectStatement) ([]interface{}, error) { func (e *executor) executeSelect(ctx context.Context, stm *sql.SelectStatement) ([]interface{}, error) {
ctx = context.WithValue(ctx, ctxKeyVersion, stm.Version)
var what sql.Exprs var what sql.Exprs
for _, val := range stm.What { for _, val := range stm.What {

View file

@ -46,6 +46,7 @@ const (
ctxKeyAuth = "auth" ctxKeyAuth = "auth"
ctxKeyKind = "kind" ctxKeyKind = "kind"
ctxKeyScope = "scope" ctxKeyScope = "scope"
ctxKeyVersion = "version"
) )
const ( const (