Improve live query performance and simplicity

This commit is contained in:
Tobie Morgan Hitchcock 2018-04-27 00:04:36 +01:00
parent 5d5fdc296f
commit 9b03178dfd
3 changed files with 75 additions and 71 deletions

View file

@ -28,7 +28,7 @@ var sockets sync.Map
func register(fib *fibre.Context, id string) func() {
return func() {
sockets.LoadOrStore(id, &socket{
sockets.Store(id, &socket{
fibre: fib,
items: make(map[string][]interface{}),
lives: make(map[string]*sql.LiveStatement),

View file

@ -47,83 +47,87 @@ func (d *document) lives(ctx context.Context, when method) (err error) {
return err
}
if len(lvs) > 0 {
// Loop over the currently running
// live queries so that we can pass
// change notifications to the socket.
for _, lv := range lvs {
for _, lv := range lvs {
if sck, ok := sockets.Load(lv.FB); ok {
// Check whether the change was made by
// the same connection as the live query,
// and if it is then don't notify changes.
var out interface{}
if id == lv.FB {
continue
}
// Create a new context for this socket
// which has the correct connection
// variables, and auth levels.
// Load the socket which owns the live
// query so that we can check the socket
// permissions, and send the notifications.
ctx = sck.(*socket).ctx(d.ns, d.db)
if sck, ok := sockets.Load(lv.FB); ok {
// Check whether the change was made by
// the same connection as the live query,
// and if it is then don't notify changes.
var out interface{}
if id == lv.FB {
continue
// Create a new context for this socket
// which has the correct connection
// variables, and auth levels.
ctx = sck.(*socket).ctx(d.ns, d.db)
// Check whether this live query has the
// necessary permissions to view this
// document, or continue to the next query.
ok, err = d.grant(ctx, when)
if err != nil {
continue
} else if !ok {
continue
}
// Check whether this document matches the
// filter conditions for the live query and
// if not, then continue to the next query.
ok, err = d.check(ctx, lv.Cond)
if err != nil {
continue
} else if !ok {
continue
}
switch lv.Diff {
// If the live query has specified to only
// receive diff changes, then there will be
// no projected fields for this query.
case true:
out, _ = d.yield(ctx, lv, sql.DIFF)
// If the query has projected fields which it
// wants to receive, then let's fetch these
// fields, and return them to the socket.
case false:
out, _ = d.yield(ctx, lv, sql.ILLEGAL)
}
switch when {
case _DELETE:
sck.(*socket).queue(id, lv.ID, "DELETE", d.id)
case _CREATE:
if out != nil {
sck.(*socket).queue(id, lv.ID, "CREATE", out)
}
// Check whether this live query has the
// necessary permissions to view this
// document, or continue to the next query.
ok, err = d.grant(ctx, when)
if err != nil {
continue
} else if !ok {
continue
case _UPDATE:
if out != nil {
sck.(*socket).queue(id, lv.ID, "UPDATE", out)
}
// Check whether this document matches the
// filter conditions for the live query and
// if not, then continue to the next query.
ok, err = d.check(ctx, lv.Cond)
if err != nil {
continue
} else if !ok {
continue
}
switch lv.Diff {
// If the live query has specified to only
// receive diff changes, then there will be
// no projected fields for this query.
case true:
out, _ = d.yield(ctx, lv, sql.DIFF)
// If the query has projected fields which it
// wants to receive, then let's fetch these
// fields, and return them to the socket.
case false:
out, _ = d.yield(ctx, lv, sql.ILLEGAL)
}
switch when {
case _DELETE:
sck.(*socket).queue(id, lv.ID, "DELETE", d.id)
case _CREATE:
if out != nil {
sck.(*socket).queue(id, lv.ID, "CREATE", out)
}
case _UPDATE:
if out != nil {
sck.(*socket).queue(id, lv.ID, "UPDATE", out)
}
}
}
}

View file

@ -97,7 +97,7 @@ func (s *socket) clear(id string) (err error) {
s.mutex.Lock()
defer s.mutex.Unlock()
s.items[id] = nil
delete(s.items, id)
return
@ -135,7 +135,7 @@ func (s *socket) flush(id string) (err error) {
// pending message notifications
// for this socket when done.
s.items[id] = nil
delete(s.items, id)
return