surrealpatch/db/socket.go
2018-04-27 00:04:36 +01:00

373 lines
7.4 KiB
Go

// Copyright © 2016 Abcum Ltd
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package db
import (
"fmt"
"sync"
"context"
"github.com/abcum/fibre"
"github.com/abcum/surreal/cnf"
"github.com/abcum/surreal/sql"
"github.com/abcum/surreal/util/data"
"github.com/abcum/surreal/util/keys"
"github.com/abcum/surreal/util/uuid"
)
type socket struct {
mutex sync.Mutex
fibre *fibre.Context
items map[string][]interface{}
lives map[string]*sql.LiveStatement
}
func clear(id string) {
go func() {
sockets.Range(func(key, val interface{}) bool {
val.(*socket).clear(id)
return true
})
}()
}
func flush(id string) {
go func() {
sockets.Range(func(key, val interface{}) bool {
val.(*socket).flush(id)
return true
})
}()
}
func (s *socket) ctx(ns, db string) (ctx context.Context) {
ctx = context.Background()
ctx = context.WithValue(ctx, ctxKeyNs, ns)
ctx = context.WithValue(ctx, ctxKeyDb, db)
keep := s.fibre.Get(ctxKeyKeep)
ctx = context.WithValue(ctx, ctxKeyKeep, keep)
auth := s.fibre.Get(ctxKeyAuth).(*cnf.Auth)
ctx = context.WithValue(ctx, ctxKeyAuth, auth.Data)
ctx = context.WithValue(ctx, ctxKeyKind, auth.Kind)
vars := data.New()
vars.Set(auth.Data, varKeyAuth)
vars.Set(auth.Scope, varKeyScope)
vars.Set(s.fibre.Origin(), varKeyOrigin)
vars.Set(s.fibre.IP().String(), varKeyIp)
ctx = context.WithValue(ctx, ctxKeyVars, vars)
return
}
func (s *socket) queue(id, query, action string, result interface{}) {
s.mutex.Lock()
defer s.mutex.Unlock()
s.items[id] = append(s.items[id], &Dispatch{
Query: query,
Action: action,
Result: result,
})
}
func (s *socket) clear(id string) (err error) {
s.mutex.Lock()
defer s.mutex.Unlock()
delete(s.items, id)
return
}
func (s *socket) flush(id string) (err error) {
s.mutex.Lock()
defer s.mutex.Unlock()
// If there are no pending message
// notifications for this socket
// then ignore this method call.
if len(s.items[id]) == 0 {
return nil
}
// Create a new rpc notification
// object so that we can send the
// batch changes in one go.
obj := &fibre.RPCNotification{
Method: "notify",
Params: s.items[id],
}
// Notify the websocket connection
// y sending an RPCNotification type
// to the notify channel.
s.fibre.Socket().Notify(obj)
// Make sure that we clear all the
// pending message notifications
// for this socket when done.
delete(s.items, id)
return
}
func (s *socket) check(e *executor, ctx context.Context, ns, db, tb string) (err error) {
var tbv *sql.DefineTableStatement
// If we are authenticated using DB, NS,
// or KV permissions level, then we can
// ignore all permissions checks.
if ctx.Value(ctxKeyKind).(cnf.Kind) < cnf.AuthSC {
return nil
}
// First check that the NS exists, as
// otherwise, the scoped authentication
// request can not do anything.
_, err = e.dbo.GetNS(ns)
if err != nil {
return err
}
// Next check that the DB exists, as
// otherwise, the scoped authentication
// request can not do anything.
_, err = e.dbo.GetDB(ns, db)
if err != nil {
return err
}
// Then check that the TB exists, as
// otherwise, the scoped authentication
// request can not do anything.
tbv, err = e.dbo.GetTB(ns, db, tb)
if err != nil {
return err
}
// Once we have the table we reset the
// context to DB level so that no other
// embedded permissions are checked on
// records within these permissions.
ctx = context.WithValue(ctx, ctxKeyKind, cnf.AuthDB)
// If the table does exist we then try
// to process the relevant permissions
// expression, but only if they don't
// reference any document fields.
var val interface{}
switch p := tbv.Perms.(type) {
case *sql.PermExpression:
val, err = e.fetch(ctx, p.Select, ign)
default:
return &PermsError{table: tb}
}
// If we receive an 'ident failed' error
// it is because the table permission
// expression contains a field check,
// and therefore we must check each
// record individually to see if it can
// be accessed or not.
if err != queryIdentFailed {
if val, ok := val.(bool); ok && !val {
return &PermsError{table: tb}
}
}
return nil
}
func (s *socket) deregister(id string) {
sockets.Delete(id)
txn, _ := db.Begin(context.Background(), true)
defer txn.Commit()
for id, stm := range s.lives {
for _, w := range stm.What {
switch what := w.(type) {
case *sql.Table:
key := &keys.LV{KV: stm.KV, NS: stm.NS, DB: stm.DB, TB: what.TB, LV: id}
txn.Clr(key.Encode())
case *sql.Ident:
key := &keys.LV{KV: stm.KV, NS: stm.NS, DB: stm.DB, TB: what.ID, LV: id}
txn.Clr(key.Encode())
}
}
}
}
func (s *socket) executeLive(e *executor, ctx context.Context, stm *sql.LiveStatement) (out []interface{}, err error) {
s.mutex.Lock()
defer s.mutex.Unlock()
// Generate a new query uuid.
stm.ID = uuid.New().String()
// Store the live query on the socket.
s.lives[stm.ID] = stm
// Return the query id to the user.
out = append(out, stm.ID)
// Store the live query in the database layer.
for key, val := range stm.What {
w, err := e.fetch(ctx, val, nil)
if err != nil {
return nil, err
}
stm.What[key] = w
}
for _, w := range stm.What {
switch what := w.(type) {
default:
return nil, fmt.Errorf("Can not execute LIVE query using value '%v'", what)
case *sql.Table:
if err = s.check(e, ctx, stm.NS, stm.DB, what.TB); err != nil {
return nil, err
}
key := &keys.LV{KV: stm.KV, NS: stm.NS, DB: stm.DB, TB: what.TB, LV: stm.ID}
if _, err = e.dbo.Put(0, key.Encode(), stm.Encode()); err != nil {
return nil, err
}
case *sql.Ident:
if err = s.check(e, ctx, stm.NS, stm.DB, what.ID); err != nil {
return nil, err
}
key := &keys.LV{KV: stm.KV, NS: stm.NS, DB: stm.DB, TB: what.ID, LV: stm.ID}
if _, err = e.dbo.Put(0, key.Encode(), stm.Encode()); err != nil {
return nil, err
}
}
}
return
}
func (s *socket) executeKill(e *executor, ctx context.Context, stm *sql.KillStatement) (out []interface{}, err error) {
s.mutex.Lock()
defer s.mutex.Unlock()
// Remove the live query from the database layer.
for key, val := range stm.What {
w, err := e.fetch(ctx, val, nil)
if err != nil {
return nil, err
}
stm.What[key] = w
}
for _, w := range stm.What {
switch what := w.(type) {
default:
return nil, fmt.Errorf("Can not execute KILL query using value '%v'", what)
case string:
if qry, ok := s.lives[what]; ok {
// Delete the live query from the saved queries.
delete(s.lives, qry.ID)
// Delete the live query from the database layer.
for _, w := range qry.What {
switch what := w.(type) {
case *sql.Table:
key := &keys.LV{KV: qry.KV, NS: qry.NS, DB: qry.DB, TB: what.TB, LV: qry.ID}
_, err = e.dbo.Clr(key.Encode())
case *sql.Ident:
key := &keys.LV{KV: qry.KV, NS: qry.NS, DB: qry.DB, TB: what.ID, LV: qry.ID}
_, err = e.dbo.Clr(key.Encode())
}
}
}
}
}
return
}