From 9b03178dfd45a8b84970d1f51ef747d6c78d5c99 Mon Sep 17 00:00:00 2001 From: Tobie Morgan Hitchcock Date: Fri, 27 Apr 2018 00:04:36 +0100 Subject: [PATCH] Improve live query performance and simplicity --- db/live.go | 2 +- db/lives.go | 140 ++++++++++++++++++++++++++------------------------- db/socket.go | 4 +- 3 files changed, 75 insertions(+), 71 deletions(-) diff --git a/db/live.go b/db/live.go index b71cb2d4..b41ebd20 100644 --- a/db/live.go +++ b/db/live.go @@ -28,7 +28,7 @@ var sockets sync.Map func register(fib *fibre.Context, id string) func() { return func() { - sockets.LoadOrStore(id, &socket{ + sockets.Store(id, &socket{ fibre: fib, items: make(map[string][]interface{}), lives: make(map[string]*sql.LiveStatement), diff --git a/db/lives.go b/db/lives.go index e9c0455a..57ba65c1 100644 --- a/db/lives.go +++ b/db/lives.go @@ -47,83 +47,87 @@ func (d *document) lives(ctx context.Context, when method) (err error) { return err } - if len(lvs) > 0 { + // Loop over the currently running + // live queries so that we can pass + // change notifications to the socket. - for _, lv := range lvs { + for _, lv := range lvs { - if sck, ok := sockets.Load(lv.FB); ok { + // Check whether the change was made by + // the same connection as the live query, + // and if it is then don't notify changes. - var out interface{} + if id == lv.FB { + continue + } - // Create a new context for this socket - // which has the correct connection - // variables, and auth levels. + // Load the socket which owns the live + // query so that we can check the socket + // permissions, and send the notifications. - ctx = sck.(*socket).ctx(d.ns, d.db) + if sck, ok := sockets.Load(lv.FB); ok { - // Check whether the change was made by - // the same connection as the live query, - // and if it is then don't notify changes. + var out interface{} - if id == lv.FB { - continue + // Create a new context for this socket + // which has the correct connection + // variables, and auth levels. + + ctx = sck.(*socket).ctx(d.ns, d.db) + + // Check whether this live query has the + // necessary permissions to view this + // document, or continue to the next query. + + ok, err = d.grant(ctx, when) + if err != nil { + continue + } else if !ok { + continue + } + + // Check whether this document matches the + // filter conditions for the live query and + // if not, then continue to the next query. + + ok, err = d.check(ctx, lv.Cond) + if err != nil { + continue + } else if !ok { + continue + } + + switch lv.Diff { + + // If the live query has specified to only + // receive diff changes, then there will be + // no projected fields for this query. + + case true: + + out, _ = d.yield(ctx, lv, sql.DIFF) + + // If the query has projected fields which it + // wants to receive, then let's fetch these + // fields, and return them to the socket. + + case false: + + out, _ = d.yield(ctx, lv, sql.ILLEGAL) + + } + + switch when { + case _DELETE: + sck.(*socket).queue(id, lv.ID, "DELETE", d.id) + case _CREATE: + if out != nil { + sck.(*socket).queue(id, lv.ID, "CREATE", out) } - - // Check whether this live query has the - // necessary permissions to view this - // document, or continue to the next query. - - ok, err = d.grant(ctx, when) - if err != nil { - continue - } else if !ok { - continue + case _UPDATE: + if out != nil { + sck.(*socket).queue(id, lv.ID, "UPDATE", out) } - - // Check whether this document matches the - // filter conditions for the live query and - // if not, then continue to the next query. - - ok, err = d.check(ctx, lv.Cond) - if err != nil { - continue - } else if !ok { - continue - } - - switch lv.Diff { - - // If the live query has specified to only - // receive diff changes, then there will be - // no projected fields for this query. - - case true: - - out, _ = d.yield(ctx, lv, sql.DIFF) - - // If the query has projected fields which it - // wants to receive, then let's fetch these - // fields, and return them to the socket. - - case false: - - out, _ = d.yield(ctx, lv, sql.ILLEGAL) - - } - - switch when { - case _DELETE: - sck.(*socket).queue(id, lv.ID, "DELETE", d.id) - case _CREATE: - if out != nil { - sck.(*socket).queue(id, lv.ID, "CREATE", out) - } - case _UPDATE: - if out != nil { - sck.(*socket).queue(id, lv.ID, "UPDATE", out) - } - } - } } diff --git a/db/socket.go b/db/socket.go index eed5b8c6..8f9a9a7e 100644 --- a/db/socket.go +++ b/db/socket.go @@ -97,7 +97,7 @@ func (s *socket) clear(id string) (err error) { s.mutex.Lock() defer s.mutex.Unlock() - s.items[id] = nil + delete(s.items, id) return @@ -135,7 +135,7 @@ func (s *socket) flush(id string) (err error) { // pending message notifications // for this socket when done. - s.items[id] = nil + delete(s.items, id) return