surrealpatch/db/db.go

251 lines
6.1 KiB
Go
Raw Normal View History

// Copyright © 2016 Abcum Ltd
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package db
import (
"io"
"os"
2017-11-16 20:53:39 +00:00
"context"
2016-07-21 21:49:47 +00:00
2017-11-16 20:53:39 +00:00
"net/http"
"github.com/abcum/fibre"
"github.com/abcum/surreal/cnf"
2016-05-25 11:33:05 +00:00
"github.com/abcum/surreal/kvs"
"github.com/abcum/surreal/sql"
2017-11-16 20:53:39 +00:00
"github.com/abcum/surreal/util/data"
_ "github.com/abcum/surreal/kvs/rixxdb"
)
2019-01-31 10:03:50 +00:00
var KV string
var NIL string
var ENV string
2017-11-16 20:53:39 +00:00
// Response is a response from the database
type Response struct {
2016-10-27 08:33:14 +00:00
Time string `codec:"time,omitempty"`
Status string `codec:"status,omitempty"`
Detail string `codec:"detail,omitempty"`
Result []interface{} `codec:"result,omitempty"`
}
2017-11-16 20:53:39 +00:00
// Dispatch is a dispatch from the database
type Dispatch struct {
Query string `codec:"query,omitempty"`
Action string `codec:"action,omitempty"`
Result interface{} `codec:"result,omitempty"`
}
2019-01-31 10:03:50 +00:00
func init() {
ENV = os.Getenv(varKeyEnv)
}
// Setup sets up the connection with the data layer
func Setup(opts *cnf.Options) (err error) {
2019-01-31 10:03:50 +00:00
KV = cnf.Settings.DB.Base
return
}
// Exit shuts down the connection with the data layer
2019-11-20 13:20:27 +00:00
func Exit(opts *cnf.Options) (err error) {
sockets.Range(func(key, val interface{}) bool {
id, so := key.(string), val.(*socket)
deregister(so.fibre, id)()
return true
})
2019-11-20 13:20:27 +00:00
return
}
// Import loads database operations from a reader.
// This can be used to playback a database snapshot
// into an already running database.
2018-08-20 06:53:11 +00:00
func Sync(rw interface{}) (err error) {
switch v := rw.(type) {
case io.Reader:
2019-11-20 13:20:27 +00:00
return kvs.Import(v)
2018-08-20 06:53:11 +00:00
case io.Writer:
2019-11-20 13:20:27 +00:00
return kvs.Export(v)
2018-08-20 06:53:11 +00:00
default:
return nil
}
}
2018-08-20 06:53:11 +00:00
// Export saves all database operations to a writer.
// This can be used to save a database snapshot
// to a secondary file or stream.
func Export(fib *fibre.Context, ns, db string) (err error) {
return export(fib, ns, db)
}
2017-11-16 20:53:39 +00:00
// Socket registers a websocket for live queries
// returning two callback functions. The first
// function should be called when the websocket
// connects, and the second function should be
// called when the websocket disconnects.
func Socket(fib *fibre.Context, id string) (beg, end func()) {
return register(fib, id), deregister(fib, id)
}
// Execute parses a single sql query, or multiple
// sql queries, and executes them serially against
// the underlying data layer.
2017-11-16 20:53:39 +00:00
func Execute(fib *fibre.Context, txt interface{}, vars map[string]interface{}) (out []*Response, err error) {
// Parse the received SQL batch query strings
// into SQL ASTs, using any immutable preset
// variables if set.
2019-01-31 10:03:50 +00:00
ast, err := sql.Parse(txt)
2016-02-27 12:05:35 +00:00
if err != nil {
return
}
2017-11-16 20:53:39 +00:00
// Process the parsed SQL batch query using
// the predefined query variables.
2017-11-16 20:53:39 +00:00
return Process(fib, ast, vars)
}
// Process takes a parsed set of sql queries and
// executes them serially against the underlying
// data layer.
2017-11-16 20:53:39 +00:00
func Process(fib *fibre.Context, ast *sql.Query, vars map[string]interface{}) (out []*Response, err error) {
2017-11-16 20:53:39 +00:00
// If no preset variables have been defined
// then ensure that the variables is
// instantiated for future use.
if vars == nil {
vars = make(map[string]interface{})
}
2019-01-31 10:03:50 +00:00
// Get the unique id for this connection
// so that we can assign it to the context
// and detect any websocket notifications.
2019-11-20 13:20:27 +00:00
id := fib.Uniq()
2019-01-31 10:03:50 +00:00
// Assign the authentication data to the
// context so that we can log the auth kind
// and the auth variable data to the request.
2019-01-31 10:03:50 +00:00
auth := fib.Get(ctxKeyAuth).(*cnf.Auth)
// Ensure that the specified environment
// variable 'ENV' is available to the
// request, to detect the environment.
2019-01-31 10:03:50 +00:00
vars[varKeyEnv] = ENV
// Ensure that the current authentication
// data is made available as a runtime
// variable to the query layer.
2019-01-31 10:03:50 +00:00
vars[varKeyAuth] = auth.Data
// Ensure that the current authentication
// scope is made available as a runtime
// variable to the query layer.
2019-01-31 10:03:50 +00:00
vars[varKeyScope] = auth.Scope
// Ensure that the session details, such
// as id, ip, and origin, are available on
// the 'conn' object on each query.
vars[varKeySession] = session(fib)
2017-11-16 20:53:39 +00:00
// Create a new context so that we can quit
// all goroutine workers if the http client
// itself is closed before finishing.
ctx, quit := context.WithCancel(fib.Context())
// When this function has finished ensure
// that we cancel this context so that any
// associated resources are released.
defer quit()
// Assign the authentication data to the
// context so that we can log the auth kind
// and the auth variable data to the request.
2017-11-16 20:53:39 +00:00
ctx = context.WithValue(ctx, ctxKeyKind, auth.Kind)
// Add the request variables to the context
// so that we can access them at a later
// stage within the nested callbacks.
ctx = context.WithValue(ctx, ctxKeyVars, data.Consume(vars))
2016-10-27 08:33:14 +00:00
// If the current connection is a normal http
// connection then force quit any running
2017-11-16 20:53:39 +00:00
// queries if the http socket closes.
2017-11-16 20:53:39 +00:00
if _, ok := fib.Response().Writer().(http.CloseNotifier); ok {
2016-10-27 08:33:14 +00:00
2017-11-16 20:53:39 +00:00
exit := fib.Response().CloseNotify()
done := make(chan struct{})
2016-10-27 08:33:14 +00:00
defer close(done)
go func() {
select {
case <-done:
case <-exit:
2017-11-16 20:53:39 +00:00
quit()
2016-10-27 08:33:14 +00:00
}
}()
2016-10-27 08:33:14 +00:00
}
// Create a new query executor with the query
// details, and the current runtime variables
// and execute the queries within.
2019-01-31 10:03:50 +00:00
executor := newExecutor(id, auth.NS, auth.DB)
// Execute the parsed SQL syntax tree in a
// separate goroutine so that we can send
// the output in chunks to the client.
2017-02-14 20:02:18 +00:00
2017-11-16 20:53:39 +00:00
go executor.execute(ctx, ast)
// Wait for all of the processed queries to
// return results, buffer the output, and
// return the output when finished.
2016-10-27 08:33:14 +00:00
for {
select {
2017-11-16 20:53:39 +00:00
case <-ctx.Done():
return nil, fibre.NewHTTPError(504)
2017-11-16 20:53:39 +00:00
case res, open := <-executor.send:
if !open {
return
}
2018-04-24 23:00:36 +00:00
out = append(out, res)
2016-10-18 12:49:46 +00:00
}
}
2016-10-27 08:33:14 +00:00
return
}