Ensure WebSocket notifications are sent at end of query

This commit is contained in:
Tobie Morgan Hitchcock 2021-06-21 09:13:11 +01:00
parent f5bb7ad255
commit d242fddb26
3 changed files with 34 additions and 3 deletions

View file

@ -246,6 +246,12 @@ func Process(fib *fibre.Context, ast *sql.Query, vars map[string]interface{}) (o
go executor.execute(ctx, ast) go executor.execute(ctx, ast)
// Send all flushed asynchronous messages
// to the websocket connections which are
// listening to the queries.
defer send(id)
// Wait for all of the processed queries to // Wait for all of the processed queries to
// return results, buffer the output, and // return results, buffer the output, and
// return the output when finished. // return the output when finished.

View file

@ -31,6 +31,7 @@ func register(fib *fibre.Context, id string) func() {
sockets.Store(id, &socket{ sockets.Store(id, &socket{
fibre: fib, fibre: fib,
sends: make(map[string][]interface{}),
items: make(map[string][]interface{}), items: make(map[string][]interface{}),
lives: make(map[string]*sql.LiveStatement), lives: make(map[string]*sql.LiveStatement),
}) })

View file

@ -34,6 +34,7 @@ import (
type socket struct { type socket struct {
mutex sync.Mutex mutex sync.Mutex
fibre *fibre.Context fibre *fibre.Context
sends map[string][]interface{}
items map[string][]interface{} items map[string][]interface{}
lives map[string]*sql.LiveStatement lives map[string]*sql.LiveStatement
} }
@ -58,6 +59,16 @@ func flush(id string) {
}() }()
} }
func send(id string) {
go func() {
sockets.Range(func(key, val interface{}) bool {
val.(*socket).send(id + "-bg")
val.(*socket).send(id)
return true
})
}()
}
// TODO remove this when distributed // TODO remove this when distributed
// We need to remove this when moving // We need to remove this when moving
// to a distributed cluster as // to a distributed cluster as
@ -157,11 +168,24 @@ func (s *socket) flush(id string) (err error) {
s.mutex.Lock() s.mutex.Lock()
defer s.mutex.Unlock() defer s.mutex.Unlock()
s.sends[id] = append(s.sends[id], s.items[id]...)
delete(s.items, id)
return
}
func (s *socket) send(id string) (err error) {
s.mutex.Lock()
defer s.mutex.Unlock()
// If there are no pending message // If there are no pending message
// notifications for this socket // notifications for this socket
// then ignore this method call. // then ignore this method call.
if len(s.items[id]) == 0 { if len(s.sends[id]) == 0 {
return nil return nil
} }
@ -171,7 +195,7 @@ func (s *socket) flush(id string) (err error) {
obj := &fibre.RPCNotification{ obj := &fibre.RPCNotification{
Method: "notify", Method: "notify",
Params: s.items[id], Params: s.sends[id],
} }
// Notify the websocket connection // Notify the websocket connection
@ -184,7 +208,7 @@ func (s *socket) flush(id string) (err error) {
// pending message notifications // pending message notifications
// for this socket when done. // for this socket when done.
delete(s.items, id) delete(s.sends, id)
return return