From 36e7d8ed3ab1b2cff42d7be6f1d12998ffa8d094 Mon Sep 17 00:00:00 2001 From: Tobie Morgan Hitchcock Date: Wed, 31 Jan 2018 09:15:29 +0000 Subject: [PATCH] Flush websocket notifications correctly Websocket notifications were cleared/flushed regardless of whether individual statements were successful or not. Now notifications are shifted onto the stack, or removed if the statement is unsuccessful. Once the full query has been processed, all pending notifications are flushed to all websockets (ignoring the current connection frin which th query originated). --- db/db.go | 31 ++++++++++++++++++++++++++++++- db/executor.go | 34 +++++++++++++++++++++++----------- db/live.go | 3 ++- db/lives.go | 20 +++++++++++++++++--- db/socket.go | 46 +++++++++++++++++++++++++++++++++------------- 5 files changed, 105 insertions(+), 29 deletions(-) diff --git a/db/db.go b/db/db.go index 50d993bc..18f75d0b 100644 --- a/db/db.go +++ b/db/db.go @@ -28,6 +28,7 @@ import ( "github.com/abcum/surreal/sql" "github.com/abcum/surreal/util/data" + "github.com/abcum/surreal/util/uuid" "cloud.google.com/go/trace" @@ -112,6 +113,14 @@ func Execute(fib *fibre.Context, txt interface{}, vars map[string]interface{}) ( vars = make(map[string]interface{}) } + // Ensure that we have a unique id assigned + // to this fibre connection, as we need it + // to detect unique websocket notifications. + + if fib.Get(ctxKeyId) == nil { + fib.Set(ctxKeyId, uuid.New().String()) + } + // Ensure that the IP address of the // user signing in is available so that // it can be used within signin queries. @@ -165,6 +174,14 @@ func Process(fib *fibre.Context, ast *sql.Query, vars map[string]interface{}) (o vars = make(map[string]interface{}) } + // Ensure that we have a unique id assigned + // to this fibre connection, as we need it + // to detect unique websocket notifications. + + if fib.Get(ctxKeyId) == nil { + fib.Set(ctxKeyId, uuid.New().String()) + } + // Create a new context so that we can quit // all goroutine workers if the http client // itself is closed before finishing. @@ -177,11 +194,17 @@ func Process(fib *fibre.Context, ast *sql.Query, vars map[string]interface{}) (o defer quit() + // Get the unique id for this connection + // so that we can assign it to the context + // and detect any websocket notifications. + + id := fib.Get(ctxKeyId).(string) + // Assign the fibre request context id to // the context so that we can log the id // together with the request. - ctx = context.WithValue(ctx, ctxKeyId, fib.Get("id")) + ctx = context.WithValue(ctx, ctxKeyId, id) // Assign the authentication data to the // context so that we can log the auth kind @@ -225,6 +248,12 @@ func Process(fib *fibre.Context, ast *sql.Query, vars map[string]interface{}) (o go executor.execute(ctx, ast) + // Ensure that we flush all websocket events + // once the query has been fully processed + // whilst ignoring this connection itself. + + defer flush(id) + // Wait for all of the processed queries to // return results, buffer the output, and // return the output when finished. diff --git a/db/executor.go b/db/executor.go index 26a9a004..9c554779 100644 --- a/db/executor.go +++ b/db/executor.go @@ -55,6 +55,12 @@ func (e *executor) execute(ctx context.Context, ast *sql.Query) { var buf []*Response var res []interface{} + // Get the fibre context ID so that we can use + // it to clear or flush websocket notification + // changes linked to this context. + + id := ctx.Value(ctxKeyId).(string) + // Ensure that the executor is added back into // the executor pool when the executor has // finished processing the request. @@ -74,7 +80,7 @@ func (e *executor) execute(ctx context.Context, ast *sql.Query) { defer func() { if e.dbo.TX != nil { e.dbo.Cancel() - clear() + clear(id) } }() @@ -145,18 +151,18 @@ func (e *executor) execute(ctx context.Context, ast *sql.Query) { case *sql.CancelStatement: err, buf = e.cancel(buf, err, e.send) if err != nil { - clear() + clear(id) } else { - clear() + clear(id) } trc.Finish() continue case *sql.CommitStatement: err, buf = e.commit(buf, err, e.send) if err != nil { - clear() + clear(id) } else { - flush() + flush(id) } trc.Finish() continue @@ -262,6 +268,12 @@ func (e *executor) operate(ctx context.Context, stm sql.Statement) (res []interf } } + // Get the fibre context ID so that we can use + // it to clear or flush websocket notification + // changes linked to this context. + + id := ctx.Value(ctxKeyId).(string) + // Execute the defined statement, receiving the // result set, and any errors which occured // while processing the query. @@ -357,7 +369,7 @@ func (e *executor) operate(ctx context.Context, stm sql.Statement) (res []interf e.dbo.Cancel() e.dbo.Reset() - clear() + clear(id) default: @@ -379,7 +391,7 @@ func (e *executor) operate(ctx context.Context, stm sql.Statement) (res []interf if err != nil { e.dbo.Cancel() - clear() + clear(id) return } @@ -389,15 +401,15 @@ func (e *executor) operate(ctx context.Context, stm sql.Statement) (res []interf if !trw { if err = e.dbo.Cancel(); err != nil { - clear() + clear(id) } else { - clear() + clear(id) } } else { if err = e.dbo.Commit(); err != nil { - clear() + clear(id) } else { - flush() + shift(id) } } } diff --git a/db/live.go b/db/live.go index 5b6c9f77..abbda4ff 100644 --- a/db/live.go +++ b/db/live.go @@ -39,7 +39,8 @@ func register(fib *fibre.Context, id string) func() { sockets[id] = &socket{ fibre: fib, - waits: make([]interface{}, 0), + holds: make(map[string][]interface{}), + waits: make(map[string][]interface{}), lives: make(map[string]*sql.LiveStatement), } diff --git a/db/lives.go b/db/lives.go index 981af65c..be7d1f79 100644 --- a/db/lives.go +++ b/db/lives.go @@ -25,6 +25,12 @@ import ( // this table, and executes them in name order. func (d *document) lives(ctx context.Context, when method) (err error) { + // Get the ID of the current fibre + // connection so that we can check + // against the ID of live queries. + + id := ctx.Value(ctxKeyId).(string) + // If this document has not changed // then there is no need to update // any registered live queries. @@ -54,6 +60,14 @@ func (d *document) lives(ctx context.Context, when method) (err error) { ctx = con.ctx(d.ns, d.db) + // Check whether the change was made by + // the same connection as the live query, + // and if it is then don't notify changes. + + if id == lv.FB { + continue + } + // Check whether this live query has the // necessary permissions to view this // document, or continue to the next query. @@ -120,11 +134,11 @@ func (d *document) lives(ctx context.Context, when method) (err error) { switch when { case _CREATE: - con.queue(lv.ID, "CREATE", doc.Data()) + con.queue(id, lv.ID, "CREATE", doc.Data()) case _UPDATE: - con.queue(lv.ID, "UPDATE", doc.Data()) + con.queue(id, lv.ID, "UPDATE", doc.Data()) case _DELETE: - con.queue(lv.ID, "DELETE", d.id) + con.queue(id, lv.ID, "DELETE", d.id) } } diff --git a/db/socket.go b/db/socket.go index ff08b796..2f57b829 100644 --- a/db/socket.go +++ b/db/socket.go @@ -32,19 +32,26 @@ import ( type socket struct { mutex sync.Mutex fibre *fibre.Context - waits []interface{} + holds map[string][]interface{} + waits map[string][]interface{} lives map[string]*sql.LiveStatement } -func clear() { +func clear(id string) { for _, s := range sockets { - s.clear() + s.clear(id) } } -func flush() { +func shift(id string) { for _, s := range sockets { - s.flush() + s.shift(id) + } +} + +func flush(id string) { + for _, s := range sockets { + s.flush(id) } } @@ -70,12 +77,12 @@ func (s *socket) ctx(ns, db string) (ctx context.Context) { } -func (s *socket) queue(query, action string, result interface{}) { +func (s *socket) queue(id, query, action string, result interface{}) { s.mutex.Lock() defer s.mutex.Unlock() - s.waits = append(s.waits, &Dispatch{ + s.holds[id] = append(s.holds[id], &Dispatch{ Query: query, Action: action, Result: result, @@ -83,18 +90,31 @@ func (s *socket) queue(query, action string, result interface{}) { } -func (s *socket) clear() (err error) { +func (s *socket) clear(id string) (err error) { s.mutex.Lock() defer s.mutex.Unlock() - s.waits = nil + s.holds[id] = nil return } -func (s *socket) flush() (err error) { +func (s *socket) shift(id string) (err error) { + + s.mutex.Lock() + defer s.mutex.Unlock() + + s.waits[id] = s.holds[id] + + s.holds[id] = nil + + return + +} + +func (s *socket) flush(id string) (err error) { s.mutex.Lock() defer s.mutex.Unlock() @@ -103,7 +123,7 @@ func (s *socket) flush() (err error) { // notifications for this socket // then ignore this method call. - if len(s.waits) == 0 { + if len(s.waits[id]) == 0 { return nil } @@ -113,7 +133,7 @@ func (s *socket) flush() (err error) { obj := &fibre.RPCNotification{ Method: "notify", - Params: s.waits, + Params: s.waits[id], } // Check the websocket subprotocol @@ -137,7 +157,7 @@ func (s *socket) flush() (err error) { // pending message notifications // for this socket when done. - s.waits = nil + s.waits[id] = nil return