From ae7b6ca0e4c1e7229e1df5ca83e7820bb17a3a14 Mon Sep 17 00:00:00 2001 From: Tobie Morgan Hitchcock Date: Wed, 6 Feb 2019 08:08:07 +0000 Subject: [PATCH] Fix live queries --- db/live.go | 12 ++---------- db/socket.go | 34 ++++++++++++++++++++-------------- sql/ast.go | 2 ++ 3 files changed, 24 insertions(+), 24 deletions(-) diff --git a/db/live.go b/db/live.go index 6411060a..807c6aff 100644 --- a/db/live.go +++ b/db/live.go @@ -29,11 +29,7 @@ var sockets sync.Map func register(fib *fibre.Context, id string) func() { return func() { - auth := fib.Get(varKeyAuth).(*cnf.Auth) - sockets.Store(id, &socket{ - ns: auth.NS, - db: auth.DB, fibre: fib, items: make(map[string][]interface{}), lives: make(map[string]*sql.LiveStatement), @@ -54,13 +50,11 @@ func deregister(fib *fibre.Context, id string) func() { func (e *executor) executeLive(ctx context.Context, stm *sql.LiveStatement) (out []interface{}, err error) { - stm.FB = e.id - if err := e.access(ctx, cnf.AuthNO); err != nil { return nil, err } - if sck, ok := sockets.Load(stm.FB); ok { + if sck, ok := sockets.Load(e.id); ok { return sck.(*socket).executeLive(e, ctx, stm) } @@ -70,13 +64,11 @@ func (e *executor) executeLive(ctx context.Context, stm *sql.LiveStatement) (out func (e *executor) executeKill(ctx context.Context, stm *sql.KillStatement) (out []interface{}, err error) { - stm.FB = e.id - if err := e.access(ctx, cnf.AuthNO); err != nil { return nil, err } - if sck, ok := sockets.Load(stm.FB); ok { + if sck, ok := sockets.Load(e.id); ok { return sck.(*socket).executeKill(e, ctx, stm) } diff --git a/db/socket.go b/db/socket.go index a57c9429..9530670d 100644 --- a/db/socket.go +++ b/db/socket.go @@ -30,8 +30,6 @@ import ( ) type socket struct { - ns string - db string mutex sync.Mutex fibre *fibre.Context items map[string][]interface{} @@ -206,12 +204,12 @@ func (s *socket) deregister(id string) { case *sql.Table: - key := &keys.LV{KV: KV, NS: s.ns, DB: s.db, TB: what.TB, LV: id} + key := &keys.LV{KV: KV, NS: stm.NS, DB: stm.DB, TB: what.TB, LV: id} txn.Clr(ctx, key.Encode()) case *sql.Ident: - key := &keys.LV{KV: KV, NS: s.ns, DB: s.db, TB: what.VA, LV: id} + key := &keys.LV{KV: KV, NS: stm.NS, DB: stm.DB, TB: what.VA, LV: id} txn.Clr(ctx, key.Encode()) } @@ -224,6 +222,10 @@ func (s *socket) deregister(id string) { func (s *socket) executeLive(e *executor, ctx context.Context, stm *sql.LiveStatement) (out []interface{}, err error) { + stm.FB = e.id + stm.NS = e.ns + stm.DB = e.db + s.mutex.Lock() defer s.mutex.Unlock() @@ -241,15 +243,17 @@ func (s *socket) executeLive(e *executor, ctx context.Context, stm *sql.LiveStat // Store the live query in the database layer. - for key, val := range stm.What { + var what sql.Exprs + + for _, val := range stm.What { w, err := e.fetch(ctx, val, nil) if err != nil { return nil, err } - stm.What[key] = w + what = append(what, w) } - for _, w := range stm.What { + for _, w := range what { switch what := w.(type) { @@ -258,14 +262,14 @@ func (s *socket) executeLive(e *executor, ctx context.Context, stm *sql.LiveStat case *sql.Table: - key := &keys.LV{KV: KV, NS: s.ns, DB: s.db, TB: what.TB, LV: stm.ID} + key := &keys.LV{KV: KV, NS: stm.NS, DB: stm.DB, TB: what.TB, LV: stm.ID} if _, err = e.dbo.Put(ctx, 0, key.Encode(), stm.Encode()); err != nil { return nil, err } case *sql.Ident: - key := &keys.LV{KV: KV, NS: s.ns, DB: s.db, TB: what.VA, LV: stm.ID} + key := &keys.LV{KV: KV, NS: stm.NS, DB: stm.DB, TB: what.VA, LV: stm.ID} if _, err = e.dbo.Put(ctx, 0, key.Encode(), stm.Encode()); err != nil { return nil, err } @@ -285,15 +289,17 @@ func (s *socket) executeKill(e *executor, ctx context.Context, stm *sql.KillStat // Remove the live query from the database layer. - for key, val := range stm.What { + var what sql.Exprs + + for _, val := range stm.What { w, err := e.fetch(ctx, val, nil) if err != nil { return nil, err } - stm.What[key] = w + what = append(what, w) } - for _, w := range stm.What { + for _, w := range what { switch what := w.(type) { @@ -315,11 +321,11 @@ func (s *socket) executeKill(e *executor, ctx context.Context, stm *sql.KillStat switch what := w.(type) { case *sql.Table: - key := &keys.LV{KV: KV, NS: s.ns, DB: s.db, TB: what.TB, LV: qry.ID} + key := &keys.LV{KV: KV, NS: qry.NS, DB: qry.DB, TB: what.TB, LV: qry.ID} _, err = e.dbo.Clr(ctx, key.Encode()) case *sql.Ident: - key := &keys.LV{KV: KV, NS: s.ns, DB: s.db, TB: what.VA, LV: qry.ID} + key := &keys.LV{KV: KV, NS: qry.NS, DB: qry.DB, TB: what.VA, LV: qry.ID} _, err = e.dbo.Clr(ctx, key.Encode()) } diff --git a/sql/ast.go b/sql/ast.go index 26dff949..31abce70 100644 --- a/sql/ast.go +++ b/sql/ast.go @@ -125,6 +125,8 @@ type LetStatement struct { type LiveStatement struct { ID string FB string + NS string + DB string Diff bool Expr Fields What Exprs