From bc3114030857a534db54ecda5afba6c1e997eb8a Mon Sep 17 00:00:00 2001 From: Tobie Morgan Hitchcock Date: Sat, 14 Apr 2018 19:20:15 +0100 Subject: [PATCH] Improve live query notification delivery --- db/db.go | 6 ------ db/executor.go | 3 ++- db/live.go | 3 +-- db/socket.go | 32 ++++++-------------------------- 4 files changed, 9 insertions(+), 35 deletions(-) diff --git a/db/db.go b/db/db.go index fd22aa46..10a93418 100644 --- a/db/db.go +++ b/db/db.go @@ -243,12 +243,6 @@ func Process(fib *fibre.Context, ast *sql.Query, vars map[string]interface{}) (o go executor.execute(ctx, ast) - // Ensure that we flush all websocket events - // once the query has been fully processed - // whilst ignoring this connection itself. - - defer flush(id) - // Wait for all of the processed queries to // return results, buffer the output, and // return the output when finished. diff --git a/db/executor.go b/db/executor.go index a6f3c979..6459f4e2 100644 --- a/db/executor.go +++ b/db/executor.go @@ -400,9 +400,10 @@ func (e *executor) operate(ctx context.Context, stm sql.Statement) (res []interf if err = e.dbo.Commit(); err != nil { clear(id) } else { - shift(id) + flush(id) } } + } } diff --git a/db/live.go b/db/live.go index abbda4ff..258bee2b 100644 --- a/db/live.go +++ b/db/live.go @@ -39,8 +39,7 @@ func register(fib *fibre.Context, id string) func() { sockets[id] = &socket{ fibre: fib, - holds: make(map[string][]interface{}), - waits: make(map[string][]interface{}), + items: make(map[string][]interface{}), lives: make(map[string]*sql.LiveStatement), } diff --git a/db/socket.go b/db/socket.go index cd9a4a12..1dc00795 100644 --- a/db/socket.go +++ b/db/socket.go @@ -32,8 +32,7 @@ import ( type socket struct { mutex sync.Mutex fibre *fibre.Context - holds map[string][]interface{} - waits map[string][]interface{} + items map[string][]interface{} lives map[string]*sql.LiveStatement } @@ -43,12 +42,6 @@ func clear(id string) { } } -func shift(id string) { - for _, s := range sockets { - s.shift(id) - } -} - func flush(id string) { for _, s := range sockets { s.flush(id) @@ -82,7 +75,7 @@ func (s *socket) queue(id, query, action string, result interface{}) { s.mutex.Lock() defer s.mutex.Unlock() - s.holds[id] = append(s.holds[id], &Dispatch{ + s.items[id] = append(s.items[id], &Dispatch{ Query: query, Action: action, Result: result, @@ -95,20 +88,7 @@ func (s *socket) clear(id string) (err error) { s.mutex.Lock() defer s.mutex.Unlock() - s.holds[id] = nil - - return - -} - -func (s *socket) shift(id string) (err error) { - - s.mutex.Lock() - defer s.mutex.Unlock() - - s.waits[id] = s.holds[id] - - s.holds[id] = nil + s.items[id] = nil return @@ -123,7 +103,7 @@ func (s *socket) flush(id string) (err error) { // notifications for this socket // then ignore this method call. - if len(s.waits[id]) == 0 { + if len(s.items[id]) == 0 { return nil } @@ -133,7 +113,7 @@ func (s *socket) flush(id string) (err error) { obj := &fibre.RPCNotification{ Method: "notify", - Params: s.waits[id], + Params: s.items[id], } // Check the websocket subprotocol @@ -157,7 +137,7 @@ func (s *socket) flush(id string) (err error) { // pending message notifications // for this socket when done. - s.waits[id] = nil + s.items[id] = nil return