surrealpatch/db/socket.go
2021-06-21 09:13:24 +01:00

421 lines
7.8 KiB
Go

// Copyright © 2016 Abcum Ltd
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package db
import (
"fmt"
"sync"
"context"
"github.com/abcum/fibre"
"github.com/abcum/surreal/cnf"
"github.com/abcum/surreal/kvs"
"github.com/abcum/surreal/sql"
"github.com/abcum/surreal/txn"
"github.com/abcum/surreal/util/data"
"github.com/abcum/surreal/util/keys"
"github.com/abcum/surreal/util/uuid"
)
type socket struct {
mutex sync.Mutex
fibre *fibre.Context
sends map[string][]interface{}
items map[string][]interface{}
lives map[string]*sql.LiveStatement
}
func clear(id string) {
go func() {
sockets.Range(func(key, val interface{}) bool {
val.(*socket).clear(id + "-bg")
val.(*socket).clear(id)
return true
})
}()
}
func flush(id string) {
go func() {
sockets.Range(func(key, val interface{}) bool {
val.(*socket).flush(id + "-bg")
val.(*socket).flush(id)
return true
})
}()
}
func send(id string) {
go func() {
sockets.Range(func(key, val interface{}) bool {
val.(*socket).send(id + "-bg")
val.(*socket).send(id)
return true
})
}()
}
// TODO remove this when distributed
// We need to remove this when moving
// to a distributed cluster as
// websockets might be managed by an
// alternative server, and should not
// be removed on node startup.
func tidy() error {
ctx := context.Background()
txn, _ := txn.New(ctx, true)
defer txn.Commit()
nss, err := txn.AllNS(ctx)
if err != nil {
return err
}
for _, ns := range nss {
dbs, err := txn.AllDB(ctx, ns.Name.VA)
if err != nil {
return err
}
for _, db := range dbs {
tbs, err := txn.AllTB(ctx, ns.Name.VA, db.Name.VA)
if err != nil {
return err
}
for _, tb := range tbs {
key := &keys.LV{KV: KV, NS: ns.Name.VA, DB: db.Name.VA, TB: tb.Name.VA, LV: keys.Ignore}
if _, err = txn.ClrP(ctx, key.Encode(), 0); err != nil {
return err
}
}
}
}
return nil
}
func (s *socket) ctx() (ctx context.Context) {
ctx = context.Background()
auth := s.fibre.Get(ctxKeyAuth).(*cnf.Auth)
sess := s.fibre.Get(ctxKeyVars).(map[string]interface{})
vars := data.Consume(sess)
vars.Set(ENV, varKeyEnv)
vars.Set(auth.Data, varKeyAuth)
vars.Set(auth.Scope, varKeyScope)
vars.Set(session(s.fibre), varKeySession)
ctx = context.WithValue(ctx, ctxKeyVars, vars)
ctx = context.WithValue(ctx, ctxKeyKind, auth.Kind)
return
}
func (s *socket) queue(id, query, action string, result interface{}) {
s.mutex.Lock()
defer s.mutex.Unlock()
s.items[id] = append(s.items[id], &Dispatch{
Query: query,
Action: action,
Result: result,
})
}
func (s *socket) clear(id string) (err error) {
s.mutex.Lock()
defer s.mutex.Unlock()
delete(s.items, id)
return
}
func (s *socket) flush(id string) (err error) {
s.mutex.Lock()
defer s.mutex.Unlock()
s.sends[id] = append(s.sends[id], s.items[id]...)
delete(s.items, id)
return
}
func (s *socket) send(id string) (err error) {
s.mutex.Lock()
defer s.mutex.Unlock()
// If there are no pending message
// notifications for this socket
// then ignore this method call.
if len(s.sends[id]) == 0 {
return nil
}
// Create a new rpc notification
// object so that we can send the
// batch changes in one go.
obj := &fibre.RPCNotification{
Method: "notify",
Params: s.sends[id],
}
// Notify the websocket connection
// y sending an RPCNotification type
// to the notify channel.
s.fibre.Socket().Notify(obj)
// Make sure that we clear all the
// pending message notifications
// for this socket when done.
delete(s.sends, id)
return
}
func (s *socket) check(e *executor, ctx context.Context, ns, db, tb string) (err error) {
var tbv *sql.DefineTableStatement
// If we are authenticated using DB, NS,
// or KV permissions level, then we can
// ignore all permissions checks.
if perm(ctx) < cnf.AuthSC {
return nil
}
// First check that the NS exists, as
// otherwise, the scoped authentication
// request can not do anything.
_, err = e.tx.GetNS(ctx, ns)
if err != nil {
return err
}
// Next check that the DB exists, as
// otherwise, the scoped authentication
// request can not do anything.
_, err = e.tx.GetDB(ctx, ns, db)
if err != nil {
return err
}
// Then check that the TB exists, as
// otherwise, the scoped authentication
// request can not do anything.
tbv, err = e.tx.GetTB(ctx, ns, db, tb)
if err != nil {
return err
}
// If the table has any permissions
// specified, then let's check if this
// query is allowed access to the table.
switch p := tbv.Perms.(type) {
case *sql.PermExpression:
return e.fetchPerms(ctx, p.Select, tbv.Name)
default:
return &PermsError{table: tb}
}
}
func (s *socket) deregister(id string) {
sockets.Delete(id)
ctx := context.Background()
txn, _ := kvs.Begin(ctx, true)
defer txn.Commit()
for id, stm := range s.lives {
for _, w := range stm.What {
switch what := w.(type) {
case *sql.Table:
key := &keys.LV{KV: KV, NS: stm.NS, DB: stm.DB, TB: what.TB, LV: id}
txn.Clr(ctx, key.Encode())
case *sql.Ident:
key := &keys.LV{KV: KV, NS: stm.NS, DB: stm.DB, TB: what.VA, LV: id}
txn.Clr(ctx, key.Encode())
}
}
}
}
func (s *socket) executeLive(e *executor, ctx context.Context, stm *sql.LiveStatement) (out []interface{}, err error) {
stm.FB = e.id
stm.NS = e.ns
stm.DB = e.db
s.mutex.Lock()
defer s.mutex.Unlock()
// Generate a new query uuid.
stm.ID = uuid.New().String()
// Store the live query on the socket.
s.lives[stm.ID] = stm
// Return the query id to the user.
out = append(out, stm.ID)
// Store the live query in the database layer.
for key, val := range stm.What {
w, err := e.fetch(ctx, val, nil)
if err != nil {
return nil, err
}
stm.What[key] = w
}
for _, w := range stm.What {
switch what := w.(type) {
default:
return nil, fmt.Errorf("Can not execute LIVE query using value '%v'", what)
case *sql.Table:
key := &keys.LV{KV: KV, NS: stm.NS, DB: stm.DB, TB: what.TB, LV: stm.ID}
if _, err = e.tx.Put(ctx, 0, key.Encode(), stm.Encode()); err != nil {
return nil, err
}
case *sql.Ident:
key := &keys.LV{KV: KV, NS: stm.NS, DB: stm.DB, TB: what.VA, LV: stm.ID}
if _, err = e.tx.Put(ctx, 0, key.Encode(), stm.Encode()); err != nil {
return nil, err
}
}
}
return
}
func (s *socket) executeKill(e *executor, ctx context.Context, stm *sql.KillStatement) (out []interface{}, err error) {
s.mutex.Lock()
defer s.mutex.Unlock()
// Remove the live query from the database layer.
var what sql.Exprs
for _, val := range stm.What {
w, err := e.fetch(ctx, val, nil)
if err != nil {
return nil, err
}
what = append(what, w)
}
for _, w := range what {
switch what := w.(type) {
default:
return nil, fmt.Errorf("Can not execute KILL query using value '%v'", what)
case string:
if qry, ok := s.lives[what]; ok {
// Delete the live query from the saved queries.
delete(s.lives, qry.ID)
// Delete the live query from the database layer.
for _, w := range qry.What {
switch what := w.(type) {
case *sql.Table:
key := &keys.LV{KV: KV, NS: qry.NS, DB: qry.DB, TB: what.TB, LV: qry.ID}
_, err = e.tx.Clr(ctx, key.Encode())
case *sql.Ident:
key := &keys.LV{KV: KV, NS: qry.NS, DB: qry.DB, TB: what.VA, LV: qry.ID}
_, err = e.tx.Clr(ctx, key.Encode())
}
}
}
}
}
return
}