From fa1061b3a5b94920e77632d1907f85a1a2c4ce2f Mon Sep 17 00:00:00 2001 From: Tobie Morgan Hitchcock Date: Tue, 24 Apr 2018 15:57:41 +0100 Subject: [PATCH] Use query version time from executor not document --- db/document.go | 11 ++++------- db/executor.go | 7 +++++++ 2 files changed, 11 insertions(+), 7 deletions(-) diff --git a/db/document.go b/db/document.go index 3535632e..2b1fda2d 100644 --- a/db/document.go +++ b/db/document.go @@ -15,7 +15,6 @@ package db import ( - "time" "context" @@ -36,7 +35,6 @@ type document struct { id *sql.Thing key *keys.Thing val kvs.KV - now int64 doc *data.Doc initial *data.Doc current *data.Doc @@ -69,7 +67,6 @@ func newDocument(i *iterator, key *keys.Thing, val kvs.KV, doc *data.Doc) (d *do d.val = val d.doc = doc - d.now = time.Now().UnixNano() return @@ -316,7 +313,7 @@ func (d *document) storeThing() (err error) { // Write the value to the data // layer and return any errors. - _, err = d.i.e.dbo.Put(d.now, d.key.Encode(), d.current.Encode()) + _, err = d.i.e.dbo.Put(d.i.e.time, d.key.Encode(), d.current.Encode()) return @@ -334,7 +331,7 @@ func (d *document) purgeThing() (err error) { // Reset the item by writing a // nil value to the storage. - _, err = d.i.e.dbo.Put(d.now, d.key.Encode(), nil) + _, err = d.i.e.dbo.Put(d.i.e.time, d.key.Encode(), nil) return @@ -400,7 +397,7 @@ func (d *document) storeIndex() (err error) { if ix.Uniq == true { for _, v := range del { didx := &keys.Index{KV: d.key.KV, NS: d.key.NS, DB: d.key.DB, TB: d.key.TB, IX: ix.Name.ID, FD: v} - d.i.e.dbo.DelC(d.now, didx.Encode(), d.id.Bytes()) + d.i.e.dbo.DelC(d.i.e.time, didx.Encode(), d.id.Bytes()) } for _, v := range add { aidx := &keys.Index{KV: d.key.KV, NS: d.key.NS, DB: d.key.DB, TB: d.key.TB, IX: ix.Name.ID, FD: v} @@ -413,7 +410,7 @@ func (d *document) storeIndex() (err error) { if ix.Uniq == false { for _, v := range del { didx := &keys.Point{KV: d.key.KV, NS: d.key.NS, DB: d.key.DB, TB: d.key.TB, IX: ix.Name.ID, FD: v, ID: d.key.ID} - d.i.e.dbo.DelC(d.now, didx.Encode(), d.id.Bytes()) + d.i.e.dbo.DelC(d.i.e.time, didx.Encode(), d.id.Bytes()) } for _, v := range add { aidx := &keys.Point{KV: d.key.KV, NS: d.key.NS, DB: d.key.DB, TB: d.key.TB, IX: ix.Name.ID, FD: v, ID: d.key.ID} diff --git a/db/executor.go b/db/executor.go index 0f0e8519..92c89518 100644 --- a/db/executor.go +++ b/db/executor.go @@ -29,6 +29,7 @@ import ( type executor struct { dbo *mem.Cache + time int64 send chan *Response } @@ -256,6 +257,12 @@ func (e *executor) operate(ctx context.Context, stm sql.Statement) (res []interf } } + // Specify a new time for the current executor + // iteration, so that all subqueries and async + // events are saved with the same version time. + + e.time = time.Now().UnixNano() + // Get the fibre context ID so that we can use // it to clear or flush websocket notification // changes linked to this context.