Fix live queries

This commit is contained in:
Tobie Morgan Hitchcock 2019-02-06 08:08:07 +00:00
parent bd8c6fddb2
commit ae7b6ca0e4
3 changed files with 24 additions and 24 deletions

View file

@ -29,11 +29,7 @@ var sockets sync.Map
func register(fib *fibre.Context, id string) func() { func register(fib *fibre.Context, id string) func() {
return func() { return func() {
auth := fib.Get(varKeyAuth).(*cnf.Auth)
sockets.Store(id, &socket{ sockets.Store(id, &socket{
ns: auth.NS,
db: auth.DB,
fibre: fib, fibre: fib,
items: make(map[string][]interface{}), items: make(map[string][]interface{}),
lives: make(map[string]*sql.LiveStatement), lives: make(map[string]*sql.LiveStatement),
@ -54,13 +50,11 @@ func deregister(fib *fibre.Context, id string) func() {
func (e *executor) executeLive(ctx context.Context, stm *sql.LiveStatement) (out []interface{}, err error) { func (e *executor) executeLive(ctx context.Context, stm *sql.LiveStatement) (out []interface{}, err error) {
stm.FB = e.id
if err := e.access(ctx, cnf.AuthNO); err != nil { if err := e.access(ctx, cnf.AuthNO); err != nil {
return nil, err return nil, err
} }
if sck, ok := sockets.Load(stm.FB); ok { if sck, ok := sockets.Load(e.id); ok {
return sck.(*socket).executeLive(e, ctx, stm) return sck.(*socket).executeLive(e, ctx, stm)
} }
@ -70,13 +64,11 @@ func (e *executor) executeLive(ctx context.Context, stm *sql.LiveStatement) (out
func (e *executor) executeKill(ctx context.Context, stm *sql.KillStatement) (out []interface{}, err error) { func (e *executor) executeKill(ctx context.Context, stm *sql.KillStatement) (out []interface{}, err error) {
stm.FB = e.id
if err := e.access(ctx, cnf.AuthNO); err != nil { if err := e.access(ctx, cnf.AuthNO); err != nil {
return nil, err return nil, err
} }
if sck, ok := sockets.Load(stm.FB); ok { if sck, ok := sockets.Load(e.id); ok {
return sck.(*socket).executeKill(e, ctx, stm) return sck.(*socket).executeKill(e, ctx, stm)
} }

View file

@ -30,8 +30,6 @@ import (
) )
type socket struct { type socket struct {
ns string
db string
mutex sync.Mutex mutex sync.Mutex
fibre *fibre.Context fibre *fibre.Context
items map[string][]interface{} items map[string][]interface{}
@ -206,12 +204,12 @@ func (s *socket) deregister(id string) {
case *sql.Table: case *sql.Table:
key := &keys.LV{KV: KV, NS: s.ns, DB: s.db, TB: what.TB, LV: id} key := &keys.LV{KV: KV, NS: stm.NS, DB: stm.DB, TB: what.TB, LV: id}
txn.Clr(ctx, key.Encode()) txn.Clr(ctx, key.Encode())
case *sql.Ident: case *sql.Ident:
key := &keys.LV{KV: KV, NS: s.ns, DB: s.db, TB: what.VA, LV: id} key := &keys.LV{KV: KV, NS: stm.NS, DB: stm.DB, TB: what.VA, LV: id}
txn.Clr(ctx, key.Encode()) txn.Clr(ctx, key.Encode())
} }
@ -224,6 +222,10 @@ func (s *socket) deregister(id string) {
func (s *socket) executeLive(e *executor, ctx context.Context, stm *sql.LiveStatement) (out []interface{}, err error) { func (s *socket) executeLive(e *executor, ctx context.Context, stm *sql.LiveStatement) (out []interface{}, err error) {
stm.FB = e.id
stm.NS = e.ns
stm.DB = e.db
s.mutex.Lock() s.mutex.Lock()
defer s.mutex.Unlock() defer s.mutex.Unlock()
@ -241,15 +243,17 @@ func (s *socket) executeLive(e *executor, ctx context.Context, stm *sql.LiveStat
// Store the live query in the database layer. // Store the live query in the database layer.
for key, val := range stm.What { var what sql.Exprs
for _, val := range stm.What {
w, err := e.fetch(ctx, val, nil) w, err := e.fetch(ctx, val, nil)
if err != nil { if err != nil {
return nil, err return nil, err
} }
stm.What[key] = w what = append(what, w)
} }
for _, w := range stm.What { for _, w := range what {
switch what := w.(type) { switch what := w.(type) {
@ -258,14 +262,14 @@ func (s *socket) executeLive(e *executor, ctx context.Context, stm *sql.LiveStat
case *sql.Table: case *sql.Table:
key := &keys.LV{KV: KV, NS: s.ns, DB: s.db, TB: what.TB, LV: stm.ID} key := &keys.LV{KV: KV, NS: stm.NS, DB: stm.DB, TB: what.TB, LV: stm.ID}
if _, err = e.dbo.Put(ctx, 0, key.Encode(), stm.Encode()); err != nil { if _, err = e.dbo.Put(ctx, 0, key.Encode(), stm.Encode()); err != nil {
return nil, err return nil, err
} }
case *sql.Ident: case *sql.Ident:
key := &keys.LV{KV: KV, NS: s.ns, DB: s.db, TB: what.VA, LV: stm.ID} key := &keys.LV{KV: KV, NS: stm.NS, DB: stm.DB, TB: what.VA, LV: stm.ID}
if _, err = e.dbo.Put(ctx, 0, key.Encode(), stm.Encode()); err != nil { if _, err = e.dbo.Put(ctx, 0, key.Encode(), stm.Encode()); err != nil {
return nil, err return nil, err
} }
@ -285,15 +289,17 @@ func (s *socket) executeKill(e *executor, ctx context.Context, stm *sql.KillStat
// Remove the live query from the database layer. // Remove the live query from the database layer.
for key, val := range stm.What { var what sql.Exprs
for _, val := range stm.What {
w, err := e.fetch(ctx, val, nil) w, err := e.fetch(ctx, val, nil)
if err != nil { if err != nil {
return nil, err return nil, err
} }
stm.What[key] = w what = append(what, w)
} }
for _, w := range stm.What { for _, w := range what {
switch what := w.(type) { switch what := w.(type) {
@ -315,11 +321,11 @@ func (s *socket) executeKill(e *executor, ctx context.Context, stm *sql.KillStat
switch what := w.(type) { switch what := w.(type) {
case *sql.Table: case *sql.Table:
key := &keys.LV{KV: KV, NS: s.ns, DB: s.db, TB: what.TB, LV: qry.ID} key := &keys.LV{KV: KV, NS: qry.NS, DB: qry.DB, TB: what.TB, LV: qry.ID}
_, err = e.dbo.Clr(ctx, key.Encode()) _, err = e.dbo.Clr(ctx, key.Encode())
case *sql.Ident: case *sql.Ident:
key := &keys.LV{KV: KV, NS: s.ns, DB: s.db, TB: what.VA, LV: qry.ID} key := &keys.LV{KV: KV, NS: qry.NS, DB: qry.DB, TB: what.VA, LV: qry.ID}
_, err = e.dbo.Clr(ctx, key.Encode()) _, err = e.dbo.Clr(ctx, key.Encode())
} }

View file

@ -125,6 +125,8 @@ type LetStatement struct {
type LiveStatement struct { type LiveStatement struct {
ID string ID string
FB string FB string
NS string
DB string
Diff bool Diff bool
Expr Fields Expr Fields
What Exprs What Exprs