472 lines
10 KiB
Go
472 lines
10 KiB
Go
// Copyright © 2016 Abcum Ltd
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package db
|
|
|
|
import (
|
|
"time"
|
|
|
|
"context"
|
|
|
|
"github.com/abcum/surreal/kvs"
|
|
"github.com/abcum/surreal/sql"
|
|
"github.com/abcum/surreal/util/data"
|
|
"github.com/abcum/surreal/util/diff"
|
|
"github.com/abcum/surreal/util/indx"
|
|
"github.com/abcum/surreal/util/keys"
|
|
)
|
|
|
|
type document struct {
|
|
i *iterator
|
|
ns string
|
|
db string
|
|
tb string
|
|
md map[string]interface{}
|
|
id *sql.Thing
|
|
key *keys.Thing
|
|
val kvs.KV
|
|
now int64
|
|
doc *data.Doc
|
|
initial *data.Doc
|
|
current *data.Doc
|
|
store struct {
|
|
id int
|
|
tb bool
|
|
ev bool
|
|
fd bool
|
|
ix bool
|
|
ft bool
|
|
lv bool
|
|
}
|
|
cache struct {
|
|
tb *sql.DefineTableStatement
|
|
ev []*sql.DefineEventStatement
|
|
fd []*sql.DefineFieldStatement
|
|
ix []*sql.DefineIndexStatement
|
|
ft []*sql.DefineTableStatement
|
|
lv []*sql.LiveStatement
|
|
}
|
|
}
|
|
|
|
func newDocument(i *iterator, key *keys.Thing, val kvs.KV, doc *data.Doc) (d *document) {
|
|
|
|
d = documentPool.Get().(*document)
|
|
|
|
d.i = i
|
|
d.id = nil
|
|
d.key = key
|
|
d.val = val
|
|
d.doc = doc
|
|
|
|
d.now = time.Now().UnixNano()
|
|
|
|
return
|
|
|
|
}
|
|
|
|
func (d *document) close() {
|
|
documentPool.Put(d)
|
|
}
|
|
|
|
func (d *document) clear() {
|
|
d.store.tb = false
|
|
d.store.ev = false
|
|
d.store.fd = false
|
|
d.store.ix = false
|
|
d.store.ft = false
|
|
d.store.lv = false
|
|
}
|
|
|
|
func (d *document) erase() (err error) {
|
|
d.current = data.Consume(nil)
|
|
return
|
|
}
|
|
|
|
func (d *document) getTB() (out *sql.DefineTableStatement, err error) {
|
|
if !d.store.tb {
|
|
d.store.tb = true
|
|
d.cache.tb, err = d.i.e.dbo.GetTB(d.key.NS, d.key.DB, d.key.TB)
|
|
}
|
|
return d.cache.tb, err
|
|
}
|
|
|
|
func (d *document) getEV() (out []*sql.DefineEventStatement, err error) {
|
|
if !d.store.ev {
|
|
d.store.ev = true
|
|
d.cache.ev, err = d.i.e.dbo.AllEV(d.key.NS, d.key.DB, d.key.TB)
|
|
}
|
|
return d.cache.ev, err
|
|
}
|
|
|
|
func (d *document) getFD() (out []*sql.DefineFieldStatement, err error) {
|
|
if !d.store.fd {
|
|
d.store.fd = true
|
|
d.cache.fd, err = d.i.e.dbo.AllFD(d.key.NS, d.key.DB, d.key.TB)
|
|
}
|
|
return d.cache.fd, err
|
|
}
|
|
|
|
func (d *document) getIX() (out []*sql.DefineIndexStatement, err error) {
|
|
if !d.store.ix {
|
|
d.store.ix = true
|
|
d.cache.ix, err = d.i.e.dbo.AllIX(d.key.NS, d.key.DB, d.key.TB)
|
|
}
|
|
return d.cache.ix, err
|
|
}
|
|
|
|
func (d *document) getFT() (out []*sql.DefineTableStatement, err error) {
|
|
if !d.store.ft {
|
|
d.store.ft = true
|
|
d.cache.ft, err = d.i.e.dbo.AllFT(d.key.NS, d.key.DB, d.key.TB)
|
|
}
|
|
return d.cache.ft, err
|
|
}
|
|
|
|
func (d *document) getLV() (out []*sql.LiveStatement, err error) {
|
|
if !d.store.lv {
|
|
d.store.lv = true
|
|
d.cache.lv, err = d.i.e.dbo.AllLV(d.key.NS, d.key.DB, d.key.TB)
|
|
}
|
|
return d.cache.lv, err
|
|
}
|
|
|
|
func (d *document) query(ctx context.Context, stm sql.Statement) (interface{}, error) {
|
|
|
|
switch stm := stm.(type) {
|
|
default:
|
|
return nil, nil
|
|
case *sql.SelectStatement:
|
|
return d.runSelect(ctx, stm)
|
|
case *sql.CreateStatement:
|
|
return d.runCreate(ctx, stm)
|
|
case *sql.UpdateStatement:
|
|
return d.runUpdate(ctx, stm)
|
|
case *sql.DeleteStatement:
|
|
return d.runDelete(ctx, stm)
|
|
case *sql.RelateStatement:
|
|
return d.runRelate(ctx, stm)
|
|
case *sql.InsertStatement:
|
|
return d.runInsert(ctx, stm)
|
|
case *sql.UpsertStatement:
|
|
return d.runUpsert(ctx, stm)
|
|
}
|
|
|
|
}
|
|
|
|
func (d *document) setup() (err error) {
|
|
|
|
// A table of records were requested
|
|
// so we have the values, but no key
|
|
// yet, so we need to decode the KV
|
|
// store key into a Thing key.
|
|
|
|
if d.key == nil && d.val != nil {
|
|
d.key = &keys.Thing{}
|
|
d.key.Decode(d.val.Key())
|
|
}
|
|
|
|
// A specific record has been requested
|
|
// and we have a key, but no value has
|
|
// been loaded yet, so the record needs
|
|
// to be loaded from the KV store.
|
|
|
|
if d.key != nil && d.val == nil {
|
|
d.val, err = d.i.e.dbo.Get(d.i.versn, d.key.Encode())
|
|
if err != nil {
|
|
return
|
|
}
|
|
}
|
|
|
|
// A subquery or data param has been
|
|
// loaded, and we might not have a key
|
|
// or a value, so let's load the data
|
|
// into a document, so that we can
|
|
// maniuplate the virtual document.
|
|
|
|
if d.doc != nil {
|
|
d.initial = d.doc
|
|
d.current = d.doc.Copy()
|
|
}
|
|
|
|
// The requested record has been loaded
|
|
// from the KV store (and not from a
|
|
// subquery or data variable), but does
|
|
// not exist. So we'll create a document
|
|
// for processing any record changes.
|
|
|
|
if d.doc == nil && d.val != nil && d.val.Exi() == false {
|
|
d.initial = data.New()
|
|
d.current = data.New()
|
|
}
|
|
|
|
// The requested record has been loaded
|
|
// from the KV store (and not from a
|
|
// subquery or data variable). So we'll
|
|
// load the KV data into a document for
|
|
// processing any record changes.
|
|
|
|
if d.doc == nil && d.val != nil && d.val.Exi() == true {
|
|
d.initial = data.New().Decode(d.val.Val())
|
|
d.current = data.New().Decode(d.val.Val())
|
|
}
|
|
|
|
// Finally if we are dealing with a record
|
|
// which is not data from the result of a
|
|
// subquery, then generate the ID from the
|
|
// key and re-calculate any cached data.
|
|
|
|
if d.key != nil {
|
|
|
|
// Check that the cached data for the
|
|
// current document belongs to the same
|
|
// NS, DB, and TB as the pooled document.
|
|
// If it doesn't then reset the cached data.
|
|
|
|
if d.ns != d.key.NS {
|
|
d.ns = d.key.NS
|
|
d.clear()
|
|
}
|
|
|
|
if d.db != d.key.DB {
|
|
d.db = d.key.DB
|
|
d.clear()
|
|
}
|
|
|
|
if d.tb != d.key.TB {
|
|
d.tb = d.key.TB
|
|
d.clear()
|
|
}
|
|
|
|
// Check that the cached data for the
|
|
// current document belongs to the same
|
|
// iterator as the pooled document. If
|
|
// it doesn't then reset the cached data.
|
|
|
|
if d.i.id != d.store.id {
|
|
d.store.id = d.i.id
|
|
d.clear()
|
|
}
|
|
|
|
// Finally, let's specify the ID of the
|
|
// current document, so we can use it
|
|
// for getting and setting data.
|
|
|
|
d.id = sql.NewThing(d.key.TB, d.key.ID)
|
|
|
|
d.md = map[string]interface{}{
|
|
"tb": d.key.TB,
|
|
"id": d.key.ID,
|
|
}
|
|
|
|
}
|
|
|
|
return
|
|
|
|
}
|
|
|
|
func (d *document) changed() bool {
|
|
a, _ := d.initial.Data().(map[string]interface{})
|
|
b, _ := d.current.Data().(map[string]interface{})
|
|
c := diff.Diff(a, b)
|
|
return len(c) > 0
|
|
}
|
|
|
|
func (d *document) shouldDrop() (bool, error) {
|
|
|
|
// Check whether it is specified
|
|
// that the table should drop
|
|
// writes, and if so, then return.
|
|
|
|
tb, err := d.getTB()
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
|
|
return tb.Drop, err
|
|
|
|
}
|
|
|
|
func (d *document) storeThing() (err error) {
|
|
|
|
// Check that the table should
|
|
// drop data being written.
|
|
|
|
if ok, err := d.shouldDrop(); ok {
|
|
return err
|
|
}
|
|
|
|
// Check that the rcord has been
|
|
// changed, and if not, return.
|
|
|
|
if ok := d.changed(); !ok {
|
|
return
|
|
}
|
|
|
|
// Write the value to the data
|
|
// layer and return any errors.
|
|
|
|
_, err = d.i.e.dbo.Put(d.now, d.key.Encode(), d.current.Encode())
|
|
|
|
return
|
|
|
|
}
|
|
|
|
func (d *document) purgeThing() (err error) {
|
|
|
|
// Check that the table should
|
|
// drop data being written.
|
|
|
|
if ok, err := d.shouldDrop(); ok {
|
|
return err
|
|
}
|
|
|
|
// Reset the item by writing a
|
|
// nil value to the storage.
|
|
|
|
_, err = d.i.e.dbo.Put(d.now, d.key.Encode(), nil)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
func (d *document) eraseThing() (err error) {
|
|
|
|
// Check that the table should
|
|
// drop data being written.
|
|
|
|
if ok, err := d.shouldDrop(); ok {
|
|
return err
|
|
}
|
|
|
|
// Delete the item entirely from
|
|
// storage, so no versions exist.
|
|
|
|
_, err = d.i.e.dbo.Clr(d.key.Encode())
|
|
|
|
return
|
|
|
|
}
|
|
|
|
func (d *document) storeIndex() (err error) {
|
|
|
|
// Check that the table should
|
|
// drop data being written.
|
|
|
|
if ok, err := d.shouldDrop(); ok {
|
|
return err
|
|
}
|
|
|
|
// Get the index values specified
|
|
// for this table, loop through
|
|
// them, and compute the changes.
|
|
|
|
ixs, err := d.getIX()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
for _, ix := range ixs {
|
|
|
|
del := indx.Build(ix.Cols, d.initial)
|
|
add := indx.Build(ix.Cols, d.current)
|
|
|
|
// TODO use diffing to speed up indexes
|
|
// We need to use diffing so that only
|
|
// changed values are written to the
|
|
// storage layer. However if an index
|
|
// is redefined, then the diff does not
|
|
// return any changes, and the index is
|
|
// then corrupt. Maybe we could check
|
|
// when the index was created, and check
|
|
// if the d.initial change time is after
|
|
// the index creation time, then perform
|
|
// a diff on the old/new index values.
|
|
|
|
// if d.initial.Get("meta.time") > ix.Time {
|
|
// del, add = indx.Diff(old, now)
|
|
// }
|
|
|
|
if ix.Uniq == true {
|
|
for _, v := range del {
|
|
didx := &keys.Index{KV: d.key.KV, NS: d.key.NS, DB: d.key.DB, TB: d.key.TB, IX: ix.Name.ID, FD: v}
|
|
d.i.e.dbo.DelC(d.now, didx.Encode(), d.id.Bytes())
|
|
}
|
|
for _, v := range add {
|
|
aidx := &keys.Index{KV: d.key.KV, NS: d.key.NS, DB: d.key.DB, TB: d.key.TB, IX: ix.Name.ID, FD: v}
|
|
if _, err = d.i.e.dbo.PutC(0, aidx.Encode(), d.id.Bytes(), nil); err != nil {
|
|
return &IndexError{tb: d.key.TB, name: ix.Name, cols: ix.Cols, vals: v}
|
|
}
|
|
}
|
|
}
|
|
|
|
if ix.Uniq == false {
|
|
for _, v := range del {
|
|
didx := &keys.Point{KV: d.key.KV, NS: d.key.NS, DB: d.key.DB, TB: d.key.TB, IX: ix.Name.ID, FD: v, ID: d.key.ID}
|
|
d.i.e.dbo.DelC(d.now, didx.Encode(), d.id.Bytes())
|
|
}
|
|
for _, v := range add {
|
|
aidx := &keys.Point{KV: d.key.KV, NS: d.key.NS, DB: d.key.DB, TB: d.key.TB, IX: ix.Name.ID, FD: v, ID: d.key.ID}
|
|
if _, err = d.i.e.dbo.PutC(0, aidx.Encode(), d.id.Bytes(), nil); err != nil {
|
|
return &IndexError{tb: d.key.TB, name: ix.Name, cols: ix.Cols, vals: v}
|
|
}
|
|
}
|
|
}
|
|
|
|
}
|
|
|
|
return
|
|
|
|
}
|
|
|
|
func (d *document) purgeIndex() (err error) {
|
|
|
|
// Check that the table should
|
|
// drop data being written.
|
|
|
|
if ok, err := d.shouldDrop(); ok {
|
|
return err
|
|
}
|
|
|
|
// Get the index values specified
|
|
// for this table, loop through
|
|
// them, and compute the changes.
|
|
|
|
ixs, err := d.getIX()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
for _, ix := range ixs {
|
|
|
|
del := indx.Build(ix.Cols, d.initial)
|
|
|
|
if ix.Uniq == true {
|
|
for _, v := range del {
|
|
key := &keys.Index{KV: d.key.KV, NS: d.key.NS, DB: d.key.DB, TB: d.key.TB, IX: ix.Name.ID, FD: v}
|
|
d.i.e.dbo.DelC(0, key.Encode(), d.id.Bytes())
|
|
}
|
|
}
|
|
|
|
if ix.Uniq == false {
|
|
for _, v := range del {
|
|
key := &keys.Point{KV: d.key.KV, NS: d.key.NS, DB: d.key.DB, TB: d.key.TB, IX: ix.Name.ID, FD: v, ID: d.key.ID}
|
|
d.i.e.dbo.DelC(0, key.Encode(), d.id.Bytes())
|
|
}
|
|
}
|
|
|
|
}
|
|
|
|
return
|
|
|
|
}
|