diff --git a/db/db.go b/db/db.go index 2c8330d3..303cf927 100644 --- a/db/db.go +++ b/db/db.go @@ -246,6 +246,12 @@ func Process(fib *fibre.Context, ast *sql.Query, vars map[string]interface{}) (o go executor.execute(ctx, ast) + // Send all flushed asynchronous messages + // to the websocket connections which are + // listening to the queries. + + defer send(id) + // Wait for all of the processed queries to // return results, buffer the output, and // return the output when finished. diff --git a/db/live.go b/db/live.go index 807c6aff..13f90101 100644 --- a/db/live.go +++ b/db/live.go @@ -31,6 +31,7 @@ func register(fib *fibre.Context, id string) func() { sockets.Store(id, &socket{ fibre: fib, + sends: make(map[string][]interface{}), items: make(map[string][]interface{}), lives: make(map[string]*sql.LiveStatement), }) diff --git a/db/socket.go b/db/socket.go index 529d746f..b79de3fb 100644 --- a/db/socket.go +++ b/db/socket.go @@ -34,6 +34,7 @@ import ( type socket struct { mutex sync.Mutex fibre *fibre.Context + sends map[string][]interface{} items map[string][]interface{} lives map[string]*sql.LiveStatement } @@ -58,6 +59,16 @@ func flush(id string) { }() } +func send(id string) { + go func() { + sockets.Range(func(key, val interface{}) bool { + val.(*socket).send(id + "-bg") + val.(*socket).send(id) + return true + }) + }() +} + // TODO remove this when distributed // We need to remove this when moving // to a distributed cluster as @@ -157,11 +168,24 @@ func (s *socket) flush(id string) (err error) { s.mutex.Lock() defer s.mutex.Unlock() + s.sends[id] = append(s.sends[id], s.items[id]...) + + delete(s.items, id) + + return + +} + +func (s *socket) send(id string) (err error) { + + s.mutex.Lock() + defer s.mutex.Unlock() + // If there are no pending message // notifications for this socket // then ignore this method call. - if len(s.items[id]) == 0 { + if len(s.sends[id]) == 0 { return nil } @@ -171,7 +195,7 @@ func (s *socket) flush(id string) (err error) { obj := &fibre.RPCNotification{ Method: "notify", - Params: s.items[id], + Params: s.sends[id], } // Notify the websocket connection @@ -184,7 +208,7 @@ func (s *socket) flush(id string) (err error) { // pending message notifications // for this socket when done. - delete(s.items, id) + delete(s.sends, id) return