surrealpatch/db/document.go
2021-12-14 08:13:19 +00:00

444 lines
9.5 KiB
Go

// Copyright © 2016 SurrealDB Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package db
import (
"fmt"
"context"
"github.com/surrealdb/surrealdb/kvs"
"github.com/surrealdb/surrealdb/sql"
"github.com/surrealdb/surrealdb/util/data"
"github.com/surrealdb/surrealdb/util/indx"
"github.com/surrealdb/surrealdb/util/keys"
)
type document struct {
i *iterator
id *sql.Thing
enc []byte
key *keys.Thing
val kvs.KV
lck bool
doc *data.Doc
initial *data.Doc
current *data.Doc
changed bool
}
func newDocument(i *iterator, key *keys.Thing, val kvs.KV, doc *data.Doc) (d *document) {
d = documentPool.Get().(*document)
d.i = i
d.id = nil
d.enc = nil
d.key = key
d.val = val
d.doc = doc
d.lck = false
return
}
func (d *document) close() {
documentPool.Put(d)
}
func (d *document) erase() (err error) {
d.changed, d.current = true, data.Consume(nil)
return
}
func (d *document) query(ctx context.Context, stm sql.Statement) (val interface{}, err error) {
defer func() {
if r := recover(); r != nil {
var ok bool
if err, ok = r.(error); !ok {
err = fmt.Errorf("%v", r)
}
}
d.ulock(ctx)
d.close()
}()
switch stm := stm.(type) {
default:
return nil, nil
case *sql.SelectStatement:
return d.runSelect(ctx, stm)
case *sql.CreateStatement:
return d.runCreate(ctx, stm)
case *sql.UpdateStatement:
return d.runUpdate(ctx, stm)
case *sql.DeleteStatement:
return d.runDelete(ctx, stm)
case *sql.RelateStatement:
return d.runRelate(ctx, stm)
case *sql.InsertStatement:
return d.runInsert(ctx, stm)
case *sql.UpsertStatement:
return d.runUpsert(ctx, stm)
}
}
func (d *document) init(ctx context.Context) (err error) {
// A table of records were requested
// so we have the values, but no key
// yet, so we need to decode the KV
// store key into a Thing key.
if d.key == nil && d.val != nil {
d.enc = d.val.Key()
if val, ok := keyCache.Get(d.val.Key()); ok {
d.key = val.(*keys.Thing)
} else {
d.key = &keys.Thing{}
d.key.Decode(d.enc)
keyCache.Set(d.val.Key(), d.key, 0)
}
}
return
}
func (d *document) lock(ctx context.Context) (err error) {
if d.key != nil {
d.lck = true
d.i.e.lock.Lock(ctx, d.key)
}
return
}
func (d *document) ulock(ctx context.Context) (err error) {
if d.key != nil && d.lck {
d.lck = false
d.i.e.lock.Unlock(ctx, d.key)
}
return
}
func (d *document) setup(ctx context.Context) (err error) {
// A specific record has been requested
// and we have a key, but no value has
// been loaded yet, so the record needs
// to be loaded from the KV store.
if d.key != nil && d.val == nil {
d.enc = d.key.Encode()
d.val, err = d.i.e.tx.Get(ctx, d.i.versn, d.enc)
if err != nil {
return
}
}
// A subquery or data param has been
// loaded, and we might not have a key
// or a value, so let's load the data
// into a document, so that we can
// maniuplate the virtual document.
if d.doc != nil {
d.initial = d.doc
d.current = d.doc
}
// The requested record has been loaded
// from the KV store (and not from a
// subquery or data variable), but does
// not exist. So we'll create a document
// for processing any record changes.
if d.doc == nil && d.val != nil && d.val.Exi() == false {
d.initial = data.New()
d.current = data.New()
}
// The requested record has been loaded
// from the KV store (and not from a
// subquery or data variable). So we'll
// load the KV data into a document for
// processing any record changes.
if d.doc == nil && d.val != nil && d.val.Exi() == true {
if val, ok := valCache.Get(d.val.Val()); ok {
d.initial = val.(*data.Doc)
d.current = d.initial
} else {
d.initial = data.New().Decode(d.val.Val())
d.current = d.initial
valCache.Set(d.val.Val(), d.current, 0)
}
}
// Finally if we are dealing with a record
// which is not data from the result of a
// subquery, then generate the ID.
if d.key != nil {
d.id = sql.NewThing(d.key.TB, d.key.ID)
}
return
}
func (d *document) forced(ctx context.Context) bool {
if val := ctx.Value(ctxKeyForce); val != nil {
return val.(bool)
}
return false
}
func (d *document) hasChanged(ctx context.Context) bool {
return d.initial.Same(d.current) == false
}
func (d *document) shouldDrop(ctx context.Context) (bool, error) {
// Check whether it is specified
// that the table should drop
// writes, and if so, then return.
tb, err := d.i.e.tx.GetTB(ctx, d.key.NS, d.key.DB, d.key.TB)
if err != nil {
return false, err
}
return tb.Drop, err
}
func (d *document) shouldVersn(ctx context.Context) (bool, error) {
// Check whether it is specified
// that the table should keep
// all document versions.
tb, err := d.i.e.tx.GetTB(ctx, d.key.NS, d.key.DB, d.key.TB)
if err != nil {
return false, err
}
return tb.Vers, err
}
func (d *document) storeThing(ctx context.Context) (err error) {
defer d.ulock(ctx)
// Check that the record has been
// changed, and if not, return.
if !d.changed {
return
}
// Check that the table should
// drop data being written.
if ok, err := d.shouldDrop(ctx); ok {
return err
}
// Write the value to the data
// layer and return any errors.
if ok, err := d.shouldVersn(ctx); err != nil {
return err
} else if ok == true {
_, err = d.i.e.tx.Put(ctx, d.i.e.time.UnixNano(), d.enc, d.current.Encode())
} else if ok == false {
_, err = d.i.e.tx.Put(ctx, 0, d.enc, d.current.Encode())
}
return
}
func (d *document) purgeThing(ctx context.Context) (err error) {
defer d.ulock(ctx)
// Check that the table should
// drop data being written.
if ok, err := d.shouldDrop(ctx); ok {
return err
}
// Reset the item by writing a
// nil value to the storage.
if ok, err := d.shouldVersn(ctx); err != nil {
return err
} else if ok == true {
_, err = d.i.e.tx.Put(ctx, d.i.e.time.UnixNano(), d.enc, nil)
} else if ok == false {
_, err = d.i.e.tx.Clr(ctx, d.enc)
}
return
}
func (d *document) storeIndex(ctx context.Context) (err error) {
// Check if this query has been run
// in forced mode, or return.
forced := d.forced(ctx)
// Check that the rcord has been
// changed, and if not, return.
if !forced && !d.changed {
return
}
// Check that the table should
// drop data being written.
if ok, err := d.shouldDrop(ctx); ok {
return err
}
// Get the index values specified
// for this table, loop through
// them, and compute the changes.
ixs, err := d.i.e.tx.AllIX(ctx, d.key.NS, d.key.DB, d.key.TB)
if err != nil {
return err
}
for _, ix := range ixs {
del := indx.Build(ix.Cols, d.initial)
add := indx.Build(ix.Cols, d.current)
if !forced {
del, add = indx.Diff(del, add)
}
if ix.Uniq == true {
for _, f := range del {
enfd := data.Consume(f).Encode()
didx := &keys.Index{KV: d.key.KV, NS: d.key.NS, DB: d.key.DB, TB: d.key.TB, IX: ix.Name.VA, FD: enfd}
d.i.e.tx.DelC(ctx, 0, didx.Encode(), d.id.Bytes())
}
for _, f := range add {
enfd := data.Consume(f).Encode()
aidx := &keys.Index{KV: d.key.KV, NS: d.key.NS, DB: d.key.DB, TB: d.key.TB, IX: ix.Name.VA, FD: enfd}
if _, err = d.i.e.tx.PutC(ctx, 0, aidx.Encode(), d.id.Bytes(), nil); err != nil {
return &IndexError{tb: d.key.TB, name: ix.Name, cols: ix.Cols, vals: f}
}
}
}
if ix.Uniq == false {
for _, f := range del {
enfd := data.Consume(f).Encode()
didx := &keys.Point{KV: d.key.KV, NS: d.key.NS, DB: d.key.DB, TB: d.key.TB, IX: ix.Name.VA, FD: enfd, ID: d.key.ID}
d.i.e.tx.DelC(ctx, 0, didx.Encode(), d.id.Bytes())
}
for _, f := range add {
enfd := data.Consume(f).Encode()
aidx := &keys.Point{KV: d.key.KV, NS: d.key.NS, DB: d.key.DB, TB: d.key.TB, IX: ix.Name.VA, FD: enfd, ID: d.key.ID}
if _, err = d.i.e.tx.PutC(ctx, 0, aidx.Encode(), d.id.Bytes(), nil); err != nil {
return &IndexError{tb: d.key.TB, name: ix.Name, cols: ix.Cols, vals: f}
}
}
}
}
return
}
func (d *document) purgeIndex(ctx context.Context) (err error) {
// Check if this query has been run
// in forced mode, or return.
forced := d.forced(ctx)
// Check that the rcord has been
// changed, and if not, return.
if !forced && !d.changed {
return
}
// Check that the table should
// drop data being written.
if ok, err := d.shouldDrop(ctx); ok {
return err
}
// Get the index values specified
// for this table, loop through
// them, and compute the changes.
ixs, err := d.i.e.tx.AllIX(ctx, d.key.NS, d.key.DB, d.key.TB)
if err != nil {
return err
}
for _, ix := range ixs {
del := indx.Build(ix.Cols, d.initial)
if ix.Uniq == true {
for _, f := range del {
enfd := data.Consume(f).Encode()
didx := &keys.Index{KV: d.key.KV, NS: d.key.NS, DB: d.key.DB, TB: d.key.TB, IX: ix.Name.VA, FD: enfd}
d.i.e.tx.DelC(ctx, 0, didx.Encode(), d.id.Bytes())
}
}
if ix.Uniq == false {
for _, f := range del {
enfd := data.Consume(f).Encode()
aidx := &keys.Point{KV: d.key.KV, NS: d.key.NS, DB: d.key.DB, TB: d.key.TB, IX: ix.Name.VA, FD: enfd, ID: d.key.ID}
d.i.e.tx.DelC(ctx, 0, aidx.Encode(), d.id.Bytes())
}
}
}
return
}