// Copyright © 2016 Abcum Ltd // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package db import ( "fmt" "context" "github.com/abcum/surreal/kvs" "github.com/abcum/surreal/sql" "github.com/abcum/surreal/util/data" "github.com/abcum/surreal/util/indx" "github.com/abcum/surreal/util/keys" ) type document struct { i *iterator id *sql.Thing enc []byte key *keys.Thing val kvs.KV lck bool doc *data.Doc initial *data.Doc current *data.Doc changed bool } func newDocument(i *iterator, key *keys.Thing, val kvs.KV, doc *data.Doc) (d *document) { d = documentPool.Get().(*document) d.i = i d.id = nil d.enc = nil d.key = key d.val = val d.doc = doc d.lck = false return } func (d *document) close() { documentPool.Put(d) } func (d *document) erase() (err error) { d.changed, d.current = true, data.Consume(nil) return } func (d *document) query(ctx context.Context, stm sql.Statement) (val interface{}, err error) { defer func() { if r := recover(); r != nil { var ok bool if err, ok = r.(error); !ok { err = fmt.Errorf("%v", r) } } d.ulock(ctx) d.close() }() switch stm := stm.(type) { default: return nil, nil case *sql.SelectStatement: return d.runSelect(ctx, stm) case *sql.CreateStatement: return d.runCreate(ctx, stm) case *sql.UpdateStatement: return d.runUpdate(ctx, stm) case *sql.DeleteStatement: return d.runDelete(ctx, stm) case *sql.RelateStatement: return d.runRelate(ctx, stm) case *sql.InsertStatement: return d.runInsert(ctx, stm) case *sql.UpsertStatement: return d.runUpsert(ctx, stm) } } func (d *document) init(ctx context.Context) (err error) { // A table of records were requested // so we have the values, but no key // yet, so we need to decode the KV // store key into a Thing key. if d.key == nil && d.val != nil { d.enc = d.val.Key() if val, ok := keyCache.Get(d.val.Key()); ok { d.key = val.(*keys.Thing) } else { d.key = &keys.Thing{} d.key.Decode(d.enc) keyCache.Set(d.val.Key(), d.key, 0) } } return } func (d *document) lock(ctx context.Context) (err error) { if d.key != nil { d.lck = true d.i.e.lock.Lock(ctx, d.key) } return } func (d *document) ulock(ctx context.Context) (err error) { if d.key != nil && d.lck { d.lck = false d.i.e.lock.Unlock(ctx, d.key) } return } func (d *document) setup(ctx context.Context) (err error) { // A specific record has been requested // and we have a key, but no value has // been loaded yet, so the record needs // to be loaded from the KV store. if d.key != nil && d.val == nil { d.enc = d.key.Encode() d.val, err = d.i.e.tx.Get(ctx, d.i.versn, d.enc) if err != nil { return } } // A subquery or data param has been // loaded, and we might not have a key // or a value, so let's load the data // into a document, so that we can // maniuplate the virtual document. if d.doc != nil { d.initial = d.doc d.current = d.doc } // The requested record has been loaded // from the KV store (and not from a // subquery or data variable), but does // not exist. So we'll create a document // for processing any record changes. if d.doc == nil && d.val != nil && d.val.Exi() == false { d.initial = data.New() d.current = data.New() } // The requested record has been loaded // from the KV store (and not from a // subquery or data variable). So we'll // load the KV data into a document for // processing any record changes. if d.doc == nil && d.val != nil && d.val.Exi() == true { if val, ok := valCache.Get(d.val.Val()); ok { d.initial = val.(*data.Doc) d.current = d.initial } else { d.initial = data.New().Decode(d.val.Val()) d.current = d.initial valCache.Set(d.val.Val(), d.current, 0) } } // Finally if we are dealing with a record // which is not data from the result of a // subquery, then generate the ID. if d.key != nil { d.id = sql.NewThing(d.key.TB, d.key.ID) } return } func (d *document) forced(ctx context.Context) bool { if val := ctx.Value(ctxKeyForce); val != nil { return val.(bool) } return false } func (d *document) hasChanged(ctx context.Context) bool { return d.initial.Same(d.current) == false } func (d *document) shouldDrop(ctx context.Context) (bool, error) { // Check whether it is specified // that the table should drop // writes, and if so, then return. tb, err := d.i.e.tx.GetTB(ctx, d.key.NS, d.key.DB, d.key.TB) if err != nil { return false, err } return tb.Drop, err } func (d *document) shouldVersn(ctx context.Context) (bool, error) { // Check whether it is specified // that the table should keep // all document versions. tb, err := d.i.e.tx.GetTB(ctx, d.key.NS, d.key.DB, d.key.TB) if err != nil { return false, err } return tb.Vers, err } func (d *document) storeThing(ctx context.Context) (err error) { defer d.ulock(ctx) // Check that the record has been // changed, and if not, return. if !d.changed { return } // Check that the table should // drop data being written. if ok, err := d.shouldDrop(ctx); ok { return err } // Write the value to the data // layer and return any errors. if ok, err := d.shouldVersn(ctx); err != nil { return err } else if ok == true { _, err = d.i.e.tx.Put(ctx, d.i.e.time.UnixNano(), d.enc, d.current.Encode()) } else if ok == false { _, err = d.i.e.tx.Put(ctx, 0, d.enc, d.current.Encode()) } return } func (d *document) purgeThing(ctx context.Context) (err error) { defer d.ulock(ctx) // Check that the table should // drop data being written. if ok, err := d.shouldDrop(ctx); ok { return err } // Reset the item by writing a // nil value to the storage. if ok, err := d.shouldVersn(ctx); err != nil { return err } else if ok == true { _, err = d.i.e.tx.Put(ctx, d.i.e.time.UnixNano(), d.enc, nil) } else if ok == false { _, err = d.i.e.tx.Clr(ctx, d.enc) } return } func (d *document) storeIndex(ctx context.Context) (err error) { // Check if this query has been run // in forced mode, or return. forced := d.forced(ctx) // Check that the rcord has been // changed, and if not, return. if !forced && !d.changed { return } // Check that the table should // drop data being written. if ok, err := d.shouldDrop(ctx); ok { return err } // Get the index values specified // for this table, loop through // them, and compute the changes. ixs, err := d.i.e.tx.AllIX(ctx, d.key.NS, d.key.DB, d.key.TB) if err != nil { return err } for _, ix := range ixs { del := indx.Build(ix.Cols, d.initial) add := indx.Build(ix.Cols, d.current) if !forced { del, add = indx.Diff(del, add) } if ix.Uniq == true { for _, f := range del { enfd := data.Consume(f).Encode() didx := &keys.Index{KV: d.key.KV, NS: d.key.NS, DB: d.key.DB, TB: d.key.TB, IX: ix.Name.VA, FD: enfd} d.i.e.tx.DelC(ctx, d.i.e.time.UnixNano(), didx.Encode(), d.id.Bytes()) } for _, f := range add { enfd := data.Consume(f).Encode() aidx := &keys.Index{KV: d.key.KV, NS: d.key.NS, DB: d.key.DB, TB: d.key.TB, IX: ix.Name.VA, FD: enfd} if _, err = d.i.e.tx.PutC(ctx, 0, aidx.Encode(), d.id.Bytes(), nil); err != nil { return &IndexError{tb: d.key.TB, name: ix.Name, cols: ix.Cols, vals: f} } } } if ix.Uniq == false { for _, f := range del { enfd := data.Consume(f).Encode() didx := &keys.Point{KV: d.key.KV, NS: d.key.NS, DB: d.key.DB, TB: d.key.TB, IX: ix.Name.VA, FD: enfd, ID: d.key.ID} d.i.e.tx.DelC(ctx, d.i.e.time.UnixNano(), didx.Encode(), d.id.Bytes()) } for _, f := range add { enfd := data.Consume(f).Encode() aidx := &keys.Point{KV: d.key.KV, NS: d.key.NS, DB: d.key.DB, TB: d.key.TB, IX: ix.Name.VA, FD: enfd, ID: d.key.ID} if _, err = d.i.e.tx.PutC(ctx, 0, aidx.Encode(), d.id.Bytes(), nil); err != nil { return &IndexError{tb: d.key.TB, name: ix.Name, cols: ix.Cols, vals: f} } } } } return } func (d *document) purgeIndex(ctx context.Context) (err error) { // Check if this query has been run // in forced mode, or return. forced := d.forced(ctx) // Check that the rcord has been // changed, and if not, return. if !forced && !d.changed { return } // Check that the table should // drop data being written. if ok, err := d.shouldDrop(ctx); ok { return err } // Get the index values specified // for this table, loop through // them, and compute the changes. ixs, err := d.i.e.tx.AllIX(ctx, d.key.NS, d.key.DB, d.key.TB) if err != nil { return err } for _, ix := range ixs { del := indx.Build(ix.Cols, d.initial) if ix.Uniq == true { for _, v := range del { key := &keys.Index{KV: d.key.KV, NS: d.key.NS, DB: d.key.DB, TB: d.key.TB, IX: ix.Name.VA, FD: v} d.i.e.tx.DelC(ctx, 0, key.Encode(), d.id.Bytes()) } } if ix.Uniq == false { for _, v := range del { key := &keys.Point{KV: d.key.KV, NS: d.key.NS, DB: d.key.DB, TB: d.key.TB, IX: ix.Name.VA, FD: v, ID: d.key.ID} d.i.e.tx.DelC(ctx, 0, key.Encode(), d.id.Bytes()) } } } return }