Add request context to query executor
This commit is contained in:
parent
73c56e4df5
commit
07c034bb75
8 changed files with 72 additions and 58 deletions
|
@ -26,7 +26,7 @@ func (e *executor) executeCreateStatement(ast *sql.CreateStatement) (out []inter
|
|||
|
||||
for k, w := range ast.What {
|
||||
if what, ok := w.(*sql.Param); ok {
|
||||
ast.What[k] = e.ctx.Get(what.ID).Data()
|
||||
ast.What[k] = e.Get(what.ID)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
45
db/db.go
45
db/db.go
|
@ -17,7 +17,6 @@ package db
|
|||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"net/http"
|
||||
|
@ -30,7 +29,6 @@ import (
|
|||
"github.com/abcum/surreal/log"
|
||||
"github.com/abcum/surreal/mem"
|
||||
"github.com/abcum/surreal/sql"
|
||||
"github.com/abcum/surreal/util/data"
|
||||
|
||||
_ "github.com/abcum/surreal/kvs/rixxdb"
|
||||
// _ "github.com/abcum/surreal/kvs/dendro"
|
||||
|
@ -38,47 +36,6 @@ import (
|
|||
|
||||
var QueryNotExecuted = fmt.Errorf("Query not executed")
|
||||
|
||||
var pool sync.Pool
|
||||
|
||||
func init() {
|
||||
|
||||
pool.New = func() interface{} {
|
||||
return newExecutor(new(sql.Query), make(map[string]interface{}))
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
type executor struct {
|
||||
txn kvs.TX
|
||||
ctx *data.Doc
|
||||
ast *sql.Query
|
||||
mem *mem.Store
|
||||
}
|
||||
|
||||
func newExecutor(ast *sql.Query, vars map[string]interface{}) *executor {
|
||||
return &executor{ast: ast, ctx: data.Consume(vars)}
|
||||
}
|
||||
|
||||
func (e *executor) Reset(ast *sql.Query, vars map[string]interface{}) {
|
||||
e.ast, e.ctx = ast, data.Consume(vars)
|
||||
}
|
||||
|
||||
func (e *executor) Txn() kvs.TX {
|
||||
return e.txn
|
||||
}
|
||||
|
||||
func (e *executor) Mem() *mem.Store {
|
||||
return e.mem
|
||||
}
|
||||
|
||||
func (e *executor) Set(key string, val interface{}) {
|
||||
e.ctx.Set(val, key)
|
||||
}
|
||||
|
||||
func (e *executor) Get(key string) (val interface{}) {
|
||||
return e.ctx.Get(key).Data()
|
||||
}
|
||||
|
||||
type Response struct {
|
||||
Time string `codec:"time,omitempty"`
|
||||
Status string `codec:"status,omitempty"`
|
||||
|
@ -207,7 +164,7 @@ func Process(ctx *fibre.Context, ast *sql.Query, vars map[string]interface{}) (o
|
|||
|
||||
defer pool.Put(exec)
|
||||
|
||||
exec.Reset(ast, vars)
|
||||
exec.Reset(ast, ctx, vars)
|
||||
|
||||
go exec.execute(quit, recv)
|
||||
|
||||
|
|
|
@ -25,7 +25,7 @@ func (e *executor) executeDeleteStatement(ast *sql.DeleteStatement) (out []inter
|
|||
|
||||
for k, w := range ast.What {
|
||||
if what, ok := w.(*sql.Param); ok {
|
||||
ast.What[k] = e.ctx.Get(what.ID).Data()
|
||||
ast.What[k] = e.Get(what.ID)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
57
db/exec.go
Normal file
57
db/exec.go
Normal file
|
@ -0,0 +1,57 @@
|
|||
// Copyright © 2016 Abcum Ltd
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package db
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"github.com/abcum/fibre"
|
||||
"github.com/abcum/surreal/kvs"
|
||||
"github.com/abcum/surreal/mem"
|
||||
"github.com/abcum/surreal/sql"
|
||||
"github.com/abcum/surreal/util/data"
|
||||
)
|
||||
|
||||
var pool sync.Pool
|
||||
|
||||
func init() {
|
||||
|
||||
pool.New = func() interface{} {
|
||||
return &executor{}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
type executor struct {
|
||||
txn kvs.TX
|
||||
ctx *data.Doc
|
||||
ast *sql.Query
|
||||
mem *mem.Store
|
||||
web *fibre.Context
|
||||
}
|
||||
|
||||
func (e *executor) Reset(ast *sql.Query, web *fibre.Context, vars map[string]interface{}) {
|
||||
e.ast = ast
|
||||
e.web = web
|
||||
e.ctx = data.Consume(vars)
|
||||
}
|
||||
|
||||
func (e *executor) Set(key string, val interface{}) {
|
||||
e.ctx.Set(val, key)
|
||||
}
|
||||
|
||||
func (e *executor) Get(key string) (val interface{}) {
|
||||
return e.ctx.Get(key).Data()
|
||||
}
|
18
db/info.go
18
db/info.go
|
@ -36,17 +36,17 @@ func (e *executor) executeInfoStatement(ast *sql.InfoStatement) (out []interface
|
|||
|
||||
func (e *executor) executeInfoNSStatement(ast *sql.InfoStatement) (out []interface{}, err error) {
|
||||
|
||||
db, err := e.Mem().AllDB(ast.NS)
|
||||
db, err := e.mem.AllDB(ast.NS)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
nt, err := e.Mem().AllNT(ast.NS)
|
||||
nt, err := e.mem.AllNT(ast.NS)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
nu, err := e.Mem().AllNU(ast.NS)
|
||||
nu, err := e.mem.AllNU(ast.NS)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -76,17 +76,17 @@ func (e *executor) executeInfoNSStatement(ast *sql.InfoStatement) (out []interfa
|
|||
|
||||
func (e *executor) executeInfoDBStatement(ast *sql.InfoStatement) (out []interface{}, err error) {
|
||||
|
||||
tb, err := e.Mem().AllTB(ast.NS, ast.DB)
|
||||
tb, err := e.mem.AllTB(ast.NS, ast.DB)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
dt, err := e.Mem().AllDT(ast.NS, ast.DB)
|
||||
dt, err := e.mem.AllDT(ast.NS, ast.DB)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
du, err := e.Mem().AllDU(ast.NS, ast.DB)
|
||||
du, err := e.mem.AllDU(ast.NS, ast.DB)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -116,17 +116,17 @@ func (e *executor) executeInfoDBStatement(ast *sql.InfoStatement) (out []interfa
|
|||
|
||||
func (e *executor) executeInfoTBStatement(ast *sql.InfoStatement) (out []interface{}, err error) {
|
||||
|
||||
tb, err := e.Mem().GetTB(ast.NS, ast.DB, ast.What)
|
||||
tb, err := e.mem.GetTB(ast.NS, ast.DB, ast.What)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
fd, err := e.Mem().AllFD(ast.NS, ast.DB, ast.What)
|
||||
fd, err := e.mem.AllFD(ast.NS, ast.DB, ast.What)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
ix, err := e.Mem().AllIX(ast.NS, ast.DB, ast.What)
|
||||
ix, err := e.mem.AllIX(ast.NS, ast.DB, ast.What)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
@ -30,7 +30,7 @@ func (e *executor) executeReturnStatement(ast *sql.ReturnStatement) (out []inter
|
|||
case *sql.Empty:
|
||||
// Ignore
|
||||
case *sql.Param:
|
||||
out = append(out, e.ctx.Get(what.ID).Data())
|
||||
out = append(out, e.Get(what.ID))
|
||||
}
|
||||
|
||||
return
|
||||
|
|
|
@ -24,7 +24,7 @@ func (e *executor) executeSelectStatement(ast *sql.SelectStatement) (out []inter
|
|||
|
||||
for k, w := range ast.What {
|
||||
if what, ok := w.(*sql.Param); ok {
|
||||
ast.What[k] = e.ctx.Get(what.ID).Data()
|
||||
ast.What[k] = e.Get(what.ID)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -25,7 +25,7 @@ func (e *executor) executeUpdateStatement(ast *sql.UpdateStatement) (out []inter
|
|||
|
||||
for k, w := range ast.What {
|
||||
if what, ok := w.(*sql.Param); ok {
|
||||
ast.What[k] = e.ctx.Get(what.ID).Data()
|
||||
ast.What[k] = e.Get(what.ID)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in a new issue