Improve live query notification delivery

This commit is contained in:
Tobie Morgan Hitchcock 2018-04-14 19:20:15 +01:00
parent 677767d85b
commit bc31140308
4 changed files with 9 additions and 35 deletions

View file

@ -243,12 +243,6 @@ func Process(fib *fibre.Context, ast *sql.Query, vars map[string]interface{}) (o
go executor.execute(ctx, ast) go executor.execute(ctx, ast)
// Ensure that we flush all websocket events
// once the query has been fully processed
// whilst ignoring this connection itself.
defer flush(id)
// Wait for all of the processed queries to // Wait for all of the processed queries to
// return results, buffer the output, and // return results, buffer the output, and
// return the output when finished. // return the output when finished.

View file

@ -400,9 +400,10 @@ func (e *executor) operate(ctx context.Context, stm sql.Statement) (res []interf
if err = e.dbo.Commit(); err != nil { if err = e.dbo.Commit(); err != nil {
clear(id) clear(id)
} else { } else {
shift(id) flush(id)
} }
} }
} }
} }

View file

@ -39,8 +39,7 @@ func register(fib *fibre.Context, id string) func() {
sockets[id] = &socket{ sockets[id] = &socket{
fibre: fib, fibre: fib,
holds: make(map[string][]interface{}), items: make(map[string][]interface{}),
waits: make(map[string][]interface{}),
lives: make(map[string]*sql.LiveStatement), lives: make(map[string]*sql.LiveStatement),
} }

View file

@ -32,8 +32,7 @@ import (
type socket struct { type socket struct {
mutex sync.Mutex mutex sync.Mutex
fibre *fibre.Context fibre *fibre.Context
holds map[string][]interface{} items map[string][]interface{}
waits map[string][]interface{}
lives map[string]*sql.LiveStatement lives map[string]*sql.LiveStatement
} }
@ -43,12 +42,6 @@ func clear(id string) {
} }
} }
func shift(id string) {
for _, s := range sockets {
s.shift(id)
}
}
func flush(id string) { func flush(id string) {
for _, s := range sockets { for _, s := range sockets {
s.flush(id) s.flush(id)
@ -82,7 +75,7 @@ func (s *socket) queue(id, query, action string, result interface{}) {
s.mutex.Lock() s.mutex.Lock()
defer s.mutex.Unlock() defer s.mutex.Unlock()
s.holds[id] = append(s.holds[id], &Dispatch{ s.items[id] = append(s.items[id], &Dispatch{
Query: query, Query: query,
Action: action, Action: action,
Result: result, Result: result,
@ -95,20 +88,7 @@ func (s *socket) clear(id string) (err error) {
s.mutex.Lock() s.mutex.Lock()
defer s.mutex.Unlock() defer s.mutex.Unlock()
s.holds[id] = nil s.items[id] = nil
return
}
func (s *socket) shift(id string) (err error) {
s.mutex.Lock()
defer s.mutex.Unlock()
s.waits[id] = s.holds[id]
s.holds[id] = nil
return return
@ -123,7 +103,7 @@ func (s *socket) flush(id string) (err error) {
// notifications for this socket // notifications for this socket
// then ignore this method call. // then ignore this method call.
if len(s.waits[id]) == 0 { if len(s.items[id]) == 0 {
return nil return nil
} }
@ -133,7 +113,7 @@ func (s *socket) flush(id string) (err error) {
obj := &fibre.RPCNotification{ obj := &fibre.RPCNotification{
Method: "notify", Method: "notify",
Params: s.waits[id], Params: s.items[id],
} }
// Check the websocket subprotocol // Check the websocket subprotocol
@ -157,7 +137,7 @@ func (s *socket) flush(id string) (err error) {
// pending message notifications // pending message notifications
// for this socket when done. // for this socket when done.
s.waits[id] = nil s.items[id] = nil
return return