surrealpatch/db/socket.go

422 lines
7.8 KiB
Go
Raw Normal View History

2017-11-16 20:53:39 +00:00
// Copyright © 2016 Abcum Ltd
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package db
import (
2017-12-08 10:05:21 +00:00
"fmt"
2017-11-16 20:53:39 +00:00
"sync"
"context"
"github.com/abcum/fibre"
"github.com/abcum/surreal/cnf"
2019-11-20 13:20:27 +00:00
"github.com/abcum/surreal/kvs"
2017-11-16 20:53:39 +00:00
"github.com/abcum/surreal/sql"
2019-11-21 09:21:29 +00:00
"github.com/abcum/surreal/txn"
2017-11-16 20:53:39 +00:00
"github.com/abcum/surreal/util/data"
"github.com/abcum/surreal/util/keys"
"github.com/abcum/surreal/util/uuid"
)
type socket struct {
mutex sync.Mutex
fibre *fibre.Context
sends map[string][]interface{}
items map[string][]interface{}
2017-11-16 20:53:39 +00:00
lives map[string]*sql.LiveStatement
}
func clear(id string) {
go func() {
sockets.Range(func(key, val interface{}) bool {
val.(*socket).clear(id + "-bg")
val.(*socket).clear(id)
return true
})
}()
2017-11-16 20:53:39 +00:00
}
func flush(id string) {
go func() {
sockets.Range(func(key, val interface{}) bool {
val.(*socket).flush(id + "-bg")
val.(*socket).flush(id)
return true
})
}()
2017-11-16 20:53:39 +00:00
}
func send(id string) {
go func() {
sockets.Range(func(key, val interface{}) bool {
val.(*socket).send(id + "-bg")
val.(*socket).send(id)
return true
})
}()
}
2019-12-08 13:51:49 +00:00
// TODO remove this when distributed
2019-11-21 09:21:29 +00:00
// We need to remove this when moving
// to a distributed cluster as
// websockets might be managed by an
// alternative server, and should not
// be removed on node startup.
func tidy() error {
ctx := context.Background()
txn, _ := txn.New(ctx, true)
defer txn.Commit()
nss, err := txn.AllNS(ctx)
if err != nil {
return err
}
for _, ns := range nss {
dbs, err := txn.AllDB(ctx, ns.Name.VA)
if err != nil {
return err
}
for _, db := range dbs {
tbs, err := txn.AllTB(ctx, ns.Name.VA, db.Name.VA)
if err != nil {
return err
}
for _, tb := range tbs {
key := &keys.LV{KV: KV, NS: ns.Name.VA, DB: db.Name.VA, TB: tb.Name.VA, LV: keys.Ignore}
if _, err = txn.ClrP(ctx, key.Encode(), 0); err != nil {
return err
}
}
}
}
return nil
}
2019-01-31 10:03:50 +00:00
func (s *socket) ctx() (ctx context.Context) {
2017-11-16 20:53:39 +00:00
ctx = context.Background()
auth := s.fibre.Get(ctxKeyAuth).(*cnf.Auth)
sess := s.fibre.Get(ctxKeyVars).(map[string]interface{})
2017-11-16 20:53:39 +00:00
vars := data.Consume(sess)
2019-01-31 10:03:50 +00:00
vars.Set(ENV, varKeyEnv)
2017-11-16 20:53:39 +00:00
vars.Set(auth.Data, varKeyAuth)
vars.Set(auth.Scope, varKeyScope)
vars.Set(session(s.fibre), varKeySession)
2017-11-16 20:53:39 +00:00
ctx = context.WithValue(ctx, ctxKeyVars, vars)
2019-01-31 10:03:50 +00:00
ctx = context.WithValue(ctx, ctxKeyKind, auth.Kind)
2017-11-16 20:53:39 +00:00
return
}
func (s *socket) queue(id, query, action string, result interface{}) {
2017-11-16 20:53:39 +00:00
s.mutex.Lock()
defer s.mutex.Unlock()
s.items[id] = append(s.items[id], &Dispatch{
2017-11-16 20:53:39 +00:00
Query: query,
Action: action,
Result: result,
})
}
func (s *socket) clear(id string) (err error) {
s.mutex.Lock()
defer s.mutex.Unlock()
delete(s.items, id)
2017-11-16 20:53:39 +00:00
return
}
func (s *socket) flush(id string) (err error) {
2017-11-16 20:53:39 +00:00
s.mutex.Lock()
defer s.mutex.Unlock()
s.sends[id] = append(s.sends[id], s.items[id]...)
delete(s.items, id)
return
}
func (s *socket) send(id string) (err error) {
s.mutex.Lock()
defer s.mutex.Unlock()
2017-11-16 20:53:39 +00:00
// If there are no pending message
// notifications for this socket
// then ignore this method call.
if len(s.sends[id]) == 0 {
2017-11-16 20:53:39 +00:00
return nil
}
// Create a new rpc notification
// object so that we can send the
// batch changes in one go.
obj := &fibre.RPCNotification{
Method: "notify",
Params: s.sends[id],
2017-11-16 20:53:39 +00:00
}
// Notify the websocket connection
// y sending an RPCNotification type
// to the notify channel.
2017-11-16 20:53:39 +00:00
s.fibre.Socket().Notify(obj)
2017-11-16 20:53:39 +00:00
// Make sure that we clear all the
// pending message notifications
// for this socket when done.
delete(s.sends, id)
2017-11-16 20:53:39 +00:00
return
}
2017-12-08 10:05:21 +00:00
func (s *socket) check(e *executor, ctx context.Context, ns, db, tb string) (err error) {
2017-11-16 20:53:39 +00:00
2017-12-08 10:05:21 +00:00
var tbv *sql.DefineTableStatement
2017-11-16 20:53:39 +00:00
// If we are authenticated using DB, NS,
// or KV permissions level, then we can
// ignore all permissions checks.
2019-01-31 10:03:50 +00:00
if perm(ctx) < cnf.AuthSC {
2017-11-16 20:53:39 +00:00
return nil
}
// First check that the NS exists, as
// otherwise, the scoped authentication
// request can not do anything.
2019-11-20 13:20:27 +00:00
_, err = e.tx.GetNS(ctx, ns)
2017-11-16 20:53:39 +00:00
if err != nil {
return err
}
// Next check that the DB exists, as
// otherwise, the scoped authentication
// request can not do anything.
2019-11-20 13:20:27 +00:00
_, err = e.tx.GetDB(ctx, ns, db)
2017-11-16 20:53:39 +00:00
if err != nil {
return err
}
// Then check that the TB exists, as
// otherwise, the scoped authentication
// request can not do anything.
2019-11-20 13:20:27 +00:00
tbv, err = e.tx.GetTB(ctx, ns, db, tb)
2017-11-16 20:53:39 +00:00
if err != nil {
return err
}
2018-05-30 12:25:42 +00:00
// If the table has any permissions
// specified, then let's check if this
// query is allowed access to the table.
2017-11-16 20:53:39 +00:00
2017-12-08 10:05:21 +00:00
switch p := tbv.Perms.(type) {
2017-11-16 20:53:39 +00:00
case *sql.PermExpression:
2018-05-30 12:25:42 +00:00
return e.fetchPerms(ctx, p.Select, tbv.Name)
2017-11-16 20:53:39 +00:00
default:
2017-12-08 10:05:21 +00:00
return &PermsError{table: tb}
2017-11-16 20:53:39 +00:00
}
}
func (s *socket) deregister(id string) {
sockets.Delete(id)
2017-11-16 20:53:39 +00:00
ctx := context.Background()
2019-11-20 13:20:27 +00:00
txn, _ := kvs.Begin(ctx, true)
2017-11-16 20:53:39 +00:00
defer txn.Commit()
for id, stm := range s.lives {
2017-12-08 10:05:21 +00:00
for _, w := range stm.What {
switch what := w.(type) {
case *sql.Table:
2019-02-06 08:08:07 +00:00
key := &keys.LV{KV: KV, NS: stm.NS, DB: stm.DB, TB: what.TB, LV: id}
txn.Clr(ctx, key.Encode())
2017-12-08 10:05:21 +00:00
case *sql.Ident:
2019-02-06 08:08:07 +00:00
key := &keys.LV{KV: KV, NS: stm.NS, DB: stm.DB, TB: what.VA, LV: id}
txn.Clr(ctx, key.Encode())
2017-12-08 10:05:21 +00:00
}
}
2017-11-16 20:53:39 +00:00
}
}
func (s *socket) executeLive(e *executor, ctx context.Context, stm *sql.LiveStatement) (out []interface{}, err error) {
2019-02-06 08:08:07 +00:00
stm.FB = e.id
stm.NS = e.ns
stm.DB = e.db
2017-11-16 20:53:39 +00:00
s.mutex.Lock()
defer s.mutex.Unlock()
2017-12-08 10:05:21 +00:00
// Generate a new query uuid.
2017-11-16 20:53:39 +00:00
2017-11-27 11:34:59 +00:00
stm.ID = uuid.New().String()
2017-11-16 20:53:39 +00:00
// Store the live query on the socket.
s.lives[stm.ID] = stm
// Return the query id to the user.
out = append(out, stm.ID)
2017-12-08 10:05:21 +00:00
// Store the live query in the database layer.
2019-03-05 11:40:08 +00:00
for key, val := range stm.What {
2017-12-08 10:05:21 +00:00
w, err := e.fetch(ctx, val, nil)
if err != nil {
return nil, err
}
2019-03-05 11:40:08 +00:00
stm.What[key] = w
2017-12-08 10:05:21 +00:00
}
2019-03-05 11:40:08 +00:00
for _, w := range stm.What {
2017-12-08 10:05:21 +00:00
switch what := w.(type) {
default:
return nil, fmt.Errorf("Can not execute LIVE query using value '%v'", what)
case *sql.Table:
2019-02-06 08:08:07 +00:00
key := &keys.LV{KV: KV, NS: stm.NS, DB: stm.DB, TB: what.TB, LV: stm.ID}
2019-11-20 13:20:27 +00:00
if _, err = e.tx.Put(ctx, 0, key.Encode(), stm.Encode()); err != nil {
2017-12-08 10:05:21 +00:00
return nil, err
}
case *sql.Ident:
2019-02-06 08:08:07 +00:00
key := &keys.LV{KV: KV, NS: stm.NS, DB: stm.DB, TB: what.VA, LV: stm.ID}
2019-11-20 13:20:27 +00:00
if _, err = e.tx.Put(ctx, 0, key.Encode(), stm.Encode()); err != nil {
2017-12-08 10:05:21 +00:00
return nil, err
}
}
}
2017-11-16 20:53:39 +00:00
return
}
func (s *socket) executeKill(e *executor, ctx context.Context, stm *sql.KillStatement) (out []interface{}, err error) {
s.mutex.Lock()
defer s.mutex.Unlock()
2017-12-08 10:05:21 +00:00
// Remove the live query from the database layer.
2019-02-06 08:08:07 +00:00
var what sql.Exprs
for _, val := range stm.What {
2017-12-08 10:05:21 +00:00
w, err := e.fetch(ctx, val, nil)
if err != nil {
return nil, err
}
2019-02-06 08:08:07 +00:00
what = append(what, w)
2017-12-08 10:05:21 +00:00
}
2019-02-06 08:08:07 +00:00
for _, w := range what {
2017-12-08 10:05:21 +00:00
switch what := w.(type) {
default:
return nil, fmt.Errorf("Can not execute KILL query using value '%v'", what)
case string:
if qry, ok := s.lives[what]; ok {
// Delete the live query from the saved queries.
2017-11-16 20:53:39 +00:00
2017-12-08 10:05:21 +00:00
delete(s.lives, qry.ID)
2017-11-16 20:53:39 +00:00
2017-12-08 10:05:21 +00:00
// Delete the live query from the database layer.
2017-11-16 20:53:39 +00:00
2017-12-08 10:05:21 +00:00
for _, w := range qry.What {
2017-11-16 20:53:39 +00:00
2017-12-08 10:05:21 +00:00
switch what := w.(type) {
2017-11-16 20:53:39 +00:00
2017-12-08 10:05:21 +00:00
case *sql.Table:
2019-02-06 08:08:07 +00:00
key := &keys.LV{KV: KV, NS: qry.NS, DB: qry.DB, TB: what.TB, LV: qry.ID}
2019-11-20 13:20:27 +00:00
_, err = e.tx.Clr(ctx, key.Encode())
2017-12-08 10:05:21 +00:00
case *sql.Ident:
2019-02-06 08:08:07 +00:00
key := &keys.LV{KV: KV, NS: qry.NS, DB: qry.DB, TB: what.VA, LV: qry.ID}
2019-11-20 13:20:27 +00:00
_, err = e.tx.Clr(ctx, key.Encode())
2017-12-08 10:05:21 +00:00
}
}
}
}
2017-11-16 20:53:39 +00:00
}
return
}