Switch underlying KV store to rixxdb/dendrodb

This commit is contained in:
Tobie Morgan Hitchcock 2017-02-09 11:16:59 +00:00
parent 07b5c5ce38
commit a3b9362adb
25 changed files with 295 additions and 1844 deletions

View file

@ -15,16 +15,16 @@
package cli package cli
var flags = map[string]string{ var flags = map[string]string{
"db": `Database configuration path used for storing data. Available backend stores are boltdb, mysql, or pgsql. (default "boltdb://surreal.db").`, "db": `Database configuration path used for storing data. Available backend stores are memory, rixxdb, or dendrodb. (default "memory").`,
"key": `Encryption key to use for intra-cluster communications, and on-disk encryption. For AES-128 encryption use a 16 bit key, for AES-192 encryption use a 24 bit key, and for AES-256 encryption use a 32 bit key.`, "key": `Encryption key to use for intra-cluster communications, and on-disk encryption. For AES-128 encryption use a 16 bit key, for AES-192 encryption use a 24 bit key, and for AES-256 encryption use a 32 bit key.`,
"join": `A comma-separated list of addresses to use when a new node is joining an existing cluster. For the first node in a cluster, --join should NOT be specified.`, "join": `A comma-separated list of addresses to use when a new node is joining an existing cluster. For the first node in a cluster, --join should NOT be specified.`,
} }
var usage = map[string][]string{ var usage = map[string][]string{
"db": { "db": {
"--db-path boltdb://surreal.db", "--db-path memory",
"--db-path mysql://user:pass@127.0.0.1:3306/database", "--db-path rixxdb://surreal.db",
"--db-path pgsql://user:pass@127.0.0.1:5432/database", "--db-path dendro://user:pass@192.168.1.100",
}, },
"join": { "join": {
"--join 10.0.0.1", "--join 10.0.0.1",
@ -33,8 +33,8 @@ var usage = map[string][]string{
"--join 89.13.7.33:33693,example.com:33693", "--join 89.13.7.33:33693,example.com:33693",
}, },
"key": { "key": {
"--enc 1hg7dbrma8ghe547", "--key 1hg7dbrma8ghe547",
"--enc 1hg7dbrma8ghe5473kghvie6", "--key 1hg7dbrma8ghe5473kghvie6",
"--enc 1hg7dbrma8ghe5473kghvie64jgi3ph4", "--key 1hg7dbrma8ghe5473kghvie64jgi3ph4",
}, },
} }

View file

@ -19,6 +19,7 @@ import (
"os" "os"
"regexp" "regexp"
"strings" "strings"
"time"
"github.com/abcum/surreal/cnf" "github.com/abcum/surreal/cnf"
"github.com/abcum/surreal/log" "github.com/abcum/surreal/log"
@ -34,20 +35,23 @@ func setup() {
// Ensure that the default // Ensure that the default
// database options are set // database options are set
if opts.DB.Base == "" { if opts.DB.Path == "" {
opts.DB.Base = "surreal" opts.DB.Path = "memory"
} }
if opts.DB.Path == "" { if opts.DB.Base == "" {
opts.DB.Path = "boltdb://surreal.db" opts.DB.Base = "surreal"
} }
if opts.DB.Code != "" { if opts.DB.Code != "" {
opts.DB.Key = []byte(opts.DB.Code) opts.DB.Key = []byte(opts.DB.Code)
} }
if ok, _ := regexp.MatchString(`^(boltdb|mysql|pgsql)://(.+)$`, opts.DB.Path); !ok { if opts.DB.Time != "" {
log.Fatal("Specify a valid data store configuration path") var err error
if opts.DB.Sync, err = time.ParseDuration(opts.DB.Time); err != nil {
log.Fatal("Specify a valid database sync time frequency")
}
} }
switch len(opts.DB.Key) { switch len(opts.DB.Key) {
@ -56,6 +60,12 @@ func setup() {
log.Fatal("Specify a valid encryption key length. Valid key sizes are 16bit, 24bit, or 32bit.") log.Fatal("Specify a valid encryption key length. Valid key sizes are 16bit, 24bit, or 32bit.")
} }
if opts.DB.Path != "memory" {
if ok, _ := regexp.MatchString(`^(s3|gcs|logr|file|rixxdb|dendrodb)://(.+)$`, opts.DB.Path); !ok {
log.Fatal("Specify a valid data store configuration path")
}
}
if strings.HasPrefix(opts.DB.Cert.CA, "-----") { if strings.HasPrefix(opts.DB.Cert.CA, "-----") {
var err error var err error
var doc *os.File var doc *os.File

View file

@ -78,6 +78,7 @@ func init() {
startCmd.PersistentFlags().StringVar(&opts.DB.Cert.Crt, "db-crt", "", "Path to the certificate file used to connect to the remote database.") startCmd.PersistentFlags().StringVar(&opts.DB.Cert.Crt, "db-crt", "", "Path to the certificate file used to connect to the remote database.")
startCmd.PersistentFlags().StringVar(&opts.DB.Cert.Key, "db-key", "", "Path to the private key file used to connect to the remote database.") startCmd.PersistentFlags().StringVar(&opts.DB.Cert.Key, "db-key", "", "Path to the private key file used to connect to the remote database.")
startCmd.PersistentFlags().StringVar(&opts.DB.Path, "db-path", "", flag("db")) startCmd.PersistentFlags().StringVar(&opts.DB.Path, "db-path", "", flag("db"))
startCmd.PersistentFlags().StringVar(&opts.DB.Time, "db-sync", "0s", "Something here")
startCmd.PersistentFlags().StringVarP(&opts.Cluster.Join, "join", "j", "", flag("join")) startCmd.PersistentFlags().StringVarP(&opts.Cluster.Join, "join", "j", "", flag("join"))

View file

@ -14,6 +14,8 @@
package cnf package cnf
import "time"
var Settings *Options var Settings *Options
type Auth struct { type Auth struct {
@ -38,6 +40,8 @@ type Options struct {
Host string // Surreal host to connect to Host string // Surreal host to connect to
Port string // Surreal port to connect to Port string // Surreal port to connect to
Base string // Base key to use in KV stores Base string // Base key to use in KV stores
Time string // Timeframe for syncing data
Sync time.Duration
Cert struct { Cert struct {
CA string CA string
Crt string Crt string

View file

@ -40,7 +40,7 @@ func (e *executor) executeCreateStatement(txn kvs.TX, ast *sql.CreateStatement)
case *sql.Thing: case *sql.Thing:
key := &keys.Thing{KV: ast.KV, NS: ast.NS, DB: ast.DB, TB: what.TB, ID: what.ID} key := &keys.Thing{KV: ast.KV, NS: ast.NS, DB: ast.DB, TB: what.TB, ID: what.ID}
kv, _ := txn.Get(key.Encode()) kv, _ := txn.Get(0, key.Encode())
doc := item.New(kv, txn, key, e.ctx) doc := item.New(kv, txn, key, e.ctx)
if ret, err := create(doc, ast); err != nil { if ret, err := create(doc, ast); err != nil {
return nil, err return nil, err
@ -50,7 +50,7 @@ func (e *executor) executeCreateStatement(txn kvs.TX, ast *sql.CreateStatement)
case *sql.Table: case *sql.Table:
key := &keys.Thing{KV: ast.KV, NS: ast.NS, DB: ast.DB, TB: what.TB, ID: uuid.NewV5(uuid.NewV4().UUID, ast.KV).String()} key := &keys.Thing{KV: ast.KV, NS: ast.NS, DB: ast.DB, TB: what.TB, ID: uuid.NewV5(uuid.NewV4().UUID, ast.KV).String()}
kv, _ := txn.Get(key.Encode()) kv, _ := txn.Get(0, key.Encode())
doc := item.New(kv, txn, key, e.ctx) doc := item.New(kv, txn, key, e.ctx)
if ret, err := create(doc, ast); err != nil { if ret, err := create(doc, ast); err != nil {
return nil, err return nil, err

View file

@ -39,8 +39,8 @@ func (e *executor) executeDeleteStatement(txn kvs.TX, ast *sql.DeleteStatement)
case *sql.Thing: case *sql.Thing:
key := &keys.Thing{KV: ast.KV, NS: ast.NS, DB: ast.DB, TB: what.TB, ID: what.ID} key := &keys.Thing{KV: ast.KV, NS: ast.NS, DB: ast.DB, TB: what.TB, ID: what.ID}
kv, _ := txn.Get(key.Encode()) kv, _ := txn.Get(0, key.Encode())
doc := item.New(kv, txn, key, e.ctx) doc := item.New(kv, e.txn, key, e.ctx)
if ret, err := delete(doc, ast); err != nil { if ret, err := delete(doc, ast); err != nil {
return nil, err return nil, err
} else if ret != nil { } else if ret != nil {
@ -48,9 +48,8 @@ func (e *executor) executeDeleteStatement(txn kvs.TX, ast *sql.DeleteStatement)
} }
case *sql.Table: case *sql.Table:
beg := &keys.Thing{KV: ast.KV, NS: ast.NS, DB: ast.DB, TB: what.TB, ID: keys.Prefix} key := &keys.Table{KV: ast.KV, NS: ast.NS, DB: ast.DB, TB: what.TB}
end := &keys.Thing{KV: ast.KV, NS: ast.NS, DB: ast.DB, TB: what.TB, ID: keys.Suffix} kvs, _ := txn.GetL(0, key.Encode())
kvs, _ := txn.RGet(beg.Encode(), end.Encode(), 0)
for _, kv := range kvs { for _, kv := range kvs {
doc := item.New(kv, txn, nil, e.ctx) doc := item.New(kv, txn, nil, e.ctx)
if ret, err := delete(doc, ast); err != nil { if ret, err := delete(doc, ast); err != nil {

View file

@ -33,8 +33,8 @@ func (e *executor) executeSelectStatement(txn kvs.TX, ast *sql.SelectStatement)
if what, ok := w.(*sql.Thing); ok { if what, ok := w.(*sql.Thing); ok {
key := &keys.Thing{KV: ast.KV, NS: ast.NS, DB: ast.DB, TB: what.TB, ID: what.ID} key := &keys.Thing{KV: ast.KV, NS: ast.NS, DB: ast.DB, TB: what.TB, ID: what.ID}
kv, _ := txn.Get(key.Encode()) kv, _ := txn.Get(0, key.Encode())
doc := item.New(kv, txn, key, e.ctx) doc := item.New(kv, e.txn, key, e.ctx)
if ret, err := detect(doc, ast); err != nil { if ret, err := detect(doc, ast); err != nil {
return nil, err return nil, err
} else if ret != nil { } else if ret != nil {
@ -43,9 +43,8 @@ func (e *executor) executeSelectStatement(txn kvs.TX, ast *sql.SelectStatement)
} }
if what, ok := w.(*sql.Table); ok { if what, ok := w.(*sql.Table); ok {
beg := &keys.Thing{KV: ast.KV, NS: ast.NS, DB: ast.DB, TB: what.TB, ID: keys.Prefix} key := &keys.Table{KV: ast.KV, NS: ast.NS, DB: ast.DB, TB: what.TB}
end := &keys.Thing{KV: ast.KV, NS: ast.NS, DB: ast.DB, TB: what.TB, ID: keys.Suffix} kvs, _ := txn.GetL(0, key.Encode())
kvs, _ := txn.RGet(beg.Encode(), end.Encode(), 0)
for _, kv := range kvs { for _, kv := range kvs {
doc := item.New(kv, txn, nil, e.ctx) doc := item.New(kv, txn, nil, e.ctx)
if ret, err := detect(doc, ast); err != nil { if ret, err := detect(doc, ast); err != nil {

View file

@ -39,8 +39,8 @@ func (e *executor) executeUpdateStatement(txn kvs.TX, ast *sql.UpdateStatement)
case *sql.Thing: case *sql.Thing:
key := &keys.Thing{KV: ast.KV, NS: ast.NS, DB: ast.DB, TB: what.TB, ID: what.ID} key := &keys.Thing{KV: ast.KV, NS: ast.NS, DB: ast.DB, TB: what.TB, ID: what.ID}
kv, _ := txn.Get(key.Encode()) kv, _ := txn.Get(0, key.Encode())
doc := item.New(kv, txn, key, e.ctx) doc := item.New(kv, e.txn, key, e.ctx)
if ret, err := update(doc, ast); err != nil { if ret, err := update(doc, ast); err != nil {
return nil, err return nil, err
} else if ret != nil { } else if ret != nil {
@ -48,9 +48,8 @@ func (e *executor) executeUpdateStatement(txn kvs.TX, ast *sql.UpdateStatement)
} }
case *sql.Table: case *sql.Table:
beg := &keys.Thing{KV: ast.KV, NS: ast.NS, DB: ast.DB, TB: what.TB, ID: keys.Prefix} key := &keys.Table{KV: ast.KV, NS: ast.NS, DB: ast.DB, TB: what.TB}
end := &keys.Thing{KV: ast.KV, NS: ast.NS, DB: ast.DB, TB: what.TB, ID: keys.Suffix} kvs, _ := txn.GetL(0, key.Encode())
kvs, _ := txn.RGet(beg.Encode(), end.Encode(), 0)
for _, kv := range kvs { for _, kv := range kvs {
doc := item.New(kv, txn, nil, e.ctx) doc := item.New(kv, txn, nil, e.ctx)
if ret, err := update(doc, ast); err != nil { if ret, err := update(doc, ast); err != nil {

View file

@ -1,47 +0,0 @@
// Copyright © 2016 Abcum Ltd
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package boltdb
import (
"github.com/boltdb/bolt"
"github.com/abcum/surreal/kvs"
)
type DS struct {
db *bolt.DB
ck []byte
}
func (ds *DS) Txn(writable bool) (txn kvs.TX, err error) {
tx, err := ds.db.Begin(writable)
if err != nil {
err = &kvs.DSError{Err: err}
if tx != nil {
tx.Rollback()
}
return
}
return &TX{ds: ds, tx: tx, bu: tx.Bucket(bucket)}, err
}
func (ds *DS) Close() (err error) {
return ds.db.Close()
}

View file

@ -1,42 +0,0 @@
// Copyright © 2016 Abcum Ltd
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package boltdb
// KV represents a database key:value item
type KV struct {
exi bool
key []byte
val []byte
}
// Exists is true if the key exists
func (kv *KV) Exists() bool {
return kv.exi
}
// Key returns a byte slice of the key
func (kv *KV) Key() []byte {
return kv.key
}
// Val returns a byte slice of the value
func (kv *KV) Val() []byte {
return kv.val
}
// Str returns a string of the value
func (kv *KV) Str() string {
return string(kv.val)
}

View file

@ -1,380 +0,0 @@
// Copyright © 2016 Abcum Ltd
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package boltdb
import (
"bytes"
"math"
"github.com/boltdb/bolt"
"github.com/abcum/surreal/kvs"
"github.com/abcum/surreal/util/cryp"
"github.com/abcum/surreal/util/snap"
)
// TX is a distributed database transaction.
type TX struct {
ds *DS
do bool
ck []byte
tx *bolt.Tx
bu *bolt.Bucket
}
// All retrieves all key:value items in the db.
func (tx *TX) All() (kvs []kvs.KV, err error) {
err = tx.bu.ForEach(func(key, val []byte) (err error) {
kv, err := get(tx, key, val)
if err != nil {
return
}
kvs = append(kvs, kv)
return nil
})
return
}
// Get retrieves a single key:value item.
func (tx *TX) Get(key []byte) (kv kvs.KV, err error) {
val := tx.bu.Get(key)
return get(tx, key, val)
}
// MGet retrieves multiple key:value items.
func (tx *TX) MGet(keys ...[]byte) (kvs []kvs.KV, err error) {
for _, key := range keys {
val := tx.bu.Get(key)
kv, err := get(tx, key, val)
if err != nil {
return nil, err
}
kvs = append(kvs, kv)
}
return
}
// PGet retrieves the range of rows which are prefixed with `pre`.
func (tx *TX) PGet(pre []byte) (kvs []kvs.KV, err error) {
cu := tx.bu.Cursor()
for key, val := cu.Seek(pre); bytes.HasPrefix(key, pre); key, val = cu.Next() {
kv, err := get(tx, key, val)
if err != nil {
return nil, err
}
kvs = append(kvs, kv)
}
return
}
// RGet retrieves the range of `max` rows between `beg` (inclusive) and
// `end` (exclusive). To return the range in descending order, ensure
// that `end` sorts lower than `beg` in the key value store.
func (tx *TX) RGet(beg, end []byte, max uint64) (kvs []kvs.KV, err error) {
if max == 0 {
max = math.MaxUint64
}
cu := tx.bu.Cursor()
if bytes.Compare(beg, end) < 1 {
for key, val := cu.Seek(beg); key != nil && max > 0 && bytes.Compare(key, end) < 0; key, val = cu.Next() {
kv, err := get(tx, key, val)
if err != nil {
return nil, err
}
kvs = append(kvs, kv)
max--
}
}
if bytes.Compare(beg, end) > 1 {
for key, val := cu.Seek(end); key != nil && max > 0 && bytes.Compare(beg, key) < 0; key, val = cu.Prev() {
kv, err := get(tx, key, val)
if err != nil {
return nil, err
}
kvs = append(kvs, kv)
max--
}
}
return
}
// Put sets the value for a key.
func (tx *TX) Put(key, val []byte) (err error) {
if !tx.tx.Writable() {
err = &kvs.TXError{Err: err}
return
}
if val, err = snap.Encode(val); err != nil {
err = &kvs.DBError{Err: err}
return
}
if val, err = cryp.Encrypt(tx.ds.ck, val); err != nil {
err = &kvs.CKError{Err: err}
return
}
if err = tx.bu.Put(key, val); err != nil {
err = &kvs.DBError{Err: err}
return
}
return
}
// CPut conditionally sets the value for a key if the existing value is equal
// to the expected value. To conditionally set a value only if there is no
// existing entry pass nil for the expected value.
func (tx *TX) CPut(key, val, exp []byte) (err error) {
if !tx.tx.Writable() {
err = &kvs.TXError{Err: err}
return
}
now, _ := tx.Get(key)
act := now.(*KV).val
if !bytes.Equal(act, exp) {
err = &kvs.KVError{Err: err, Key: key, Val: act, Exp: exp}
return
}
if val, err = snap.Encode(val); err != nil {
err = &kvs.DBError{Err: err}
return
}
if val, err = cryp.Encrypt(tx.ds.ck, val); err != nil {
err = &kvs.CKError{Err: err}
return
}
if err = tx.bu.Put(key, val); err != nil {
err = &kvs.DBError{Err: err}
return
}
return
}
// Del deletes a single key:value item.
func (tx *TX) Del(key []byte) (err error) {
if !tx.tx.Writable() {
err = &kvs.TXError{Err: err}
return
}
if err = tx.bu.Delete(key); err != nil {
err = &kvs.DBError{Err: err}
return
}
return
}
// CDel conditionally deletes a key if the existing value is equal to the
// expected value.
func (tx *TX) CDel(key, exp []byte) (err error) {
if !tx.tx.Writable() {
err = &kvs.TXError{Err: err}
return
}
now, _ := tx.Get(key)
act := now.(*KV).val
if !bytes.Equal(act, exp) {
err = &kvs.KVError{Err: err, Key: key, Val: act, Exp: exp}
return
}
if err = tx.bu.Delete(key); err != nil {
err = &kvs.DBError{Err: err}
return
}
return
}
// MDel deletes multiple key:value items.
func (tx *TX) MDel(keys ...[]byte) (err error) {
if !tx.tx.Writable() {
err = &kvs.TXError{Err: err}
return
}
for _, key := range keys {
if err = tx.bu.Delete(key); err != nil {
err = &kvs.DBError{Err: err}
return
}
}
return
}
// PDel deletes the range of rows which are prefixed with `pre`.
func (tx *TX) PDel(pre []byte) (err error) {
cu := tx.bu.Cursor()
for key, _ := cu.Seek(pre); bytes.HasPrefix(key, pre); key, _ = cu.Seek(pre) {
if err = tx.bu.Delete(key); err != nil {
err = &kvs.DBError{Err: err}
return
}
}
return
}
// RDel deletes the range of `max` rows between `beg` (inclusive) and
// `end` (exclusive). To delete the range in descending order, ensure
// that `end` sorts lower than `beg` in the key value store.
func (tx *TX) RDel(beg, end []byte, max uint64) (err error) {
if max == 0 {
max = math.MaxUint64
}
if !tx.tx.Writable() {
err = &kvs.TXError{Err: err}
return
}
cu := tx.bu.Cursor()
if bytes.Compare(beg, end) < 1 {
for key, _ := cu.Seek(beg); key != nil && max > 0 && bytes.Compare(key, end) < 0; key, _ = cu.Seek(beg) {
if err = tx.bu.Delete(key); err != nil {
err = &kvs.DBError{Err: err}
return
}
max--
}
}
if bytes.Compare(beg, end) > 1 {
for key, _ := cu.Seek(end); key != nil && max > 0 && bytes.Compare(beg, key) < 0; key, _ = cu.Seek(end) {
if err = tx.bu.Delete(key); err != nil {
err = &kvs.DBError{Err: err}
return
}
max--
}
}
return
}
func (tx *TX) Done() (val bool) {
return tx.do
}
func (tx *TX) Close() (err error) {
return tx.Rollback()
}
func (tx *TX) Cancel() (err error) {
return tx.Rollback()
}
func (tx *TX) Commit() (err error) {
tx.do = true
if tx.tx.Writable() {
return tx.tx.Commit()
}
return tx.tx.Rollback()
}
func (tx *TX) Rollback() (err error) {
tx.do = true
return tx.tx.Rollback()
}
func get(tx *TX, key, val []byte) (kv *KV, err error) {
kv = &KV{
exi: (val != nil),
key: key,
val: val,
}
kv.val, err = cryp.Decrypt(tx.ds.ck, kv.val)
if err != nil {
err = &kvs.CKError{Err: err}
return
}
kv.val, err = snap.Decode(kv.val)
if err != nil {
err = &kvs.DBError{Err: err}
return
}
return
}

268
kvs/db.go
View file

@ -14,268 +14,8 @@
package kvs package kvs
import ( // DB represents a database implementation
"strings" type DB interface {
Begin(bool) (TX, error)
"github.com/abcum/surreal/cnf" Close() error
"github.com/abcum/surreal/util/keys"
)
var stores = make(map[string]func(*cnf.Options) (DS, error))
// DB is a database handle to a single Surreal cluster.
type DB struct {
ds DS
}
// New sets up the underlying key-value store
func New(opts *cnf.Options) (db *DB, err error) {
var ds DS
if strings.HasPrefix(opts.DB.Path, "boltdb://") {
if ds, err = stores["boltdb"](opts); err != nil {
return
}
}
if strings.HasPrefix(opts.DB.Path, "mysql://") {
if ds, err = stores["mysql"](opts); err != nil {
return
}
}
if strings.HasPrefix(opts.DB.Path, "pgsql://") {
if ds, err = stores["pgsql"](opts); err != nil {
return
}
}
db = &DB{ds: ds}
err = db.enc(opts)
return
}
func (db *DB) enc(opts *cnf.Options) (err error) {
ck := &keys.CK{KV: opts.DB.Base}
kv, err := db.Get(ck.Encode())
if err != nil {
return err
}
if kv.Exists() == false {
err = db.Put(ck.Encode(), []byte("±"))
}
if kv.Exists() == true && kv.Str() != "±" {
err = new(CKError)
}
return
}
// All retrieves all key:value items in the db.
func (db *DB) All() (kvs []KV, err error) {
tx, err := db.Txn(false)
if err != nil {
return
}
defer tx.Close()
return tx.All()
}
// Get retrieves a single key:value item.
func (db *DB) Get(key []byte) (kv KV, err error) {
tx, err := db.Txn(false)
if err != nil {
return
}
defer tx.Close()
return tx.Get(key)
}
// MGet retrieves multiple key:value items.
func (db *DB) MGet(keys ...[]byte) (kvs []KV, err error) {
tx, err := db.Txn(false)
if err != nil {
return
}
defer tx.Close()
return tx.MGet(keys...)
}
// PGet retrieves the range of rows which are prefixed with `pre`.
func (db *DB) PGet(pre []byte) (kvs []KV, err error) {
tx, err := db.Txn(false)
if err != nil {
return
}
defer tx.Close()
return tx.PGet(pre)
}
// RGet retrieves the range of `max` rows between `beg` (inclusive) and
// `end` (exclusive). To return the range in descending order, ensure
// that `end` sorts lower than `beg` in the key value store.
func (db *DB) RGet(beg, end []byte, max uint64) (kvs []KV, err error) {
tx, err := db.Txn(false)
if err != nil {
return
}
defer tx.Close()
return tx.RGet(beg, end, max)
}
// Put sets the value for a key.
func (db *DB) Put(key, val []byte) (err error) {
tx, err := db.Txn(true)
if err != nil {
return
}
defer tx.Commit()
return tx.Put(key, val)
}
// CPut conditionally sets the value for a key if the existing value is equal
// to the expected value. To conditionally set a value only if there is no
// existing entry pass nil for the expected value.
func (db *DB) CPut(key, val, exp []byte) (err error) {
tx, err := db.Txn(true)
if err != nil {
return
}
defer tx.Commit()
return tx.CPut(key, val, exp)
}
// Del deletes a single key:value item.
func (db *DB) Del(key []byte) (err error) {
tx, err := db.Txn(true)
if err != nil {
return
}
defer tx.Commit()
return tx.Del(key)
}
// CDel conditionally deletes a key if the existing value is equal to the
// expected value.
func (db *DB) CDel(key, exp []byte) (err error) {
tx, err := db.Txn(true)
if err != nil {
return
}
defer tx.Commit()
return tx.CDel(key, exp)
}
// MDel deletes multiple key:value items.
func (db *DB) MDel(keys ...[]byte) (err error) {
tx, err := db.Txn(true)
if err != nil {
return
}
defer tx.Commit()
return tx.MDel(keys...)
}
// PDel deletes the range of rows which are prefixed with `pre`.
func (db *DB) PDel(pre []byte) (err error) {
tx, err := db.Txn(true)
if err != nil {
return
}
defer tx.Commit()
return tx.PDel(pre)
}
// RDel deletes the range of `max` rows between `beg` (inclusive) and
// `end` (exclusive). To delete the range in descending order, ensure
// that `end` sorts lower than `beg` in the key value store.
func (db *DB) RDel(beg, end []byte, max uint64) (err error) {
tx, err := db.Txn(true)
if err != nil {
return
}
defer tx.Commit()
return tx.RDel(beg, end, max)
}
// Txn executes retryable in the context of a distributed transaction.
// The transaction is automatically aborted if retryable returns any
// error aside from recoverable internal errors, and is automatically
// committed otherwise. The retryable function should have no side
// effects which could cause problems in the event it must be run more
// than once.
func (db *DB) Txn(writable bool) (txn TX, err error) {
return db.ds.Txn(writable)
}
// Close ...
func (db *DB) Close() (err error) {
return db.ds.Close()
}
func Register(name string, constructor func(*cnf.Options) (DS, error)) {
stores[name] = constructor
} }

View file

@ -14,8 +14,68 @@
package kvs package kvs
// DS represents a datastore implementation import (
type DS interface { "strings"
Txn(bool) (TX, error)
Close() error "github.com/abcum/surreal/cnf"
)
var stores = make(map[string]func(*cnf.Options) (DB, error))
// DB represents a backing datastore.
type DS struct {
db DB
}
// New sets up the underlying key-value store
func New(opts *cnf.Options) (ds *DS, err error) {
var db DB
switch {
case opts.DB.Path == "memory":
db, err = stores["rixxdb"](opts)
case strings.HasPrefix(opts.DB.Path, "s3://"):
db, err = stores["rixxdb"](opts)
case strings.HasPrefix(opts.DB.Path, "gcs://"):
db, err = stores["rixxdb"](opts)
case strings.HasPrefix(opts.DB.Path, "logr://"):
db, err = stores["rixxdb"](opts)
case strings.HasPrefix(opts.DB.Path, "file://"):
db, err = stores["rixxdb"](opts)
case strings.HasPrefix(opts.DB.Path, "rixxdb://"):
db, err = stores["rixxdb"](opts)
case strings.HasPrefix(opts.DB.Path, "dendrodb://"):
db, err = stores["dendro"](opts)
}
if err != nil {
return
}
ds = &DS{db: db}
return
}
// Begin ...
func (ds *DS) Begin(writable bool) (txn TX, err error) {
return ds.db.Begin(writable)
}
// Close ...
func (ds *DS) Close() (err error) {
return ds.db.Close()
}
func Register(name string, constructor func(*cnf.Options) (DB, error)) {
stores[name] = constructor
} }

View file

@ -14,10 +14,10 @@
package kvs package kvs
// KV represents a datastore key:value item // KV represents a database item
type KV interface { type KV interface {
Exists() bool Exi() bool
Key() []byte Key() []byte
Val() []byte Val() []byte
Str() string Ver() int64
} }

View file

@ -1,47 +0,0 @@
// Copyright © 2016 Abcum Ltd
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package mysql
import (
"database/sql"
"github.com/abcum/surreal/kvs"
)
type DS struct {
db *sql.DB
ck []byte
}
func (ds *DS) Txn(writable bool) (txn kvs.TX, err error) {
tx, err := ds.db.Begin()
if err != nil {
err = &kvs.DSError{Err: err}
if tx != nil {
tx.Rollback()
}
return
}
return &TX{ds: ds, tx: tx}, err
}
func (ds *DS) Close() (err error) {
return ds.db.Close()
}

View file

@ -1,42 +0,0 @@
// Copyright © 2016 Abcum Ltd
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package mysql
// KV represents a database key:value item
type KV struct {
exi bool
key []byte
val []byte
}
// Exists is true if the key exists
func (kv *KV) Exists() bool {
return kv.exi
}
// Key returns a byte slice of the key
func (kv *KV) Key() []byte {
return kv.key
}
// Val returns a byte slice of the value
func (kv *KV) Val() []byte {
return kv.val
}
// Str returns a string of the value
func (kv *KV) Str() string {
return string(kv.val)
}

View file

@ -1,98 +0,0 @@
// Copyright © 2016 Abcum Ltd
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package mysql
import (
"fmt"
"regexp"
"crypto/tls"
"crypto/x509"
"io/ioutil"
"database/sql"
"github.com/go-sql-driver/mysql"
"github.com/abcum/surreal/cnf"
"github.com/abcum/surreal/kvs"
)
func init() {
kvs.Register("mysql", New)
}
func New(opts *cnf.Options) (ds kvs.DS, err error) {
var db *sql.DB
opts.DB.Path, err = config(opts)
if err != nil {
return
}
db, err = sql.Open("mysql", opts.DB.Path)
if err != nil {
return
}
return &DS{db: db, ck: opts.DB.Key}, err
}
func config(opts *cnf.Options) (path string, err error) {
re := regexp.MustCompile(`^mysql://` +
`((?:(?P<user>.*?)(?::(?P<passwd>.*))?@))?` +
`(?:(?:(?P<addr>[^\/]*))?)?` +
`\/(?P<dbname>.*?)` +
`(?:\?(?P<params>[^\?]*))?$`)
ma := re.FindStringSubmatch(opts.DB.Path)
if len(ma) == 0 || ma[4] == "" || ma[5] == "" {
err = fmt.Errorf("Specify a valid data store configuration path. Use the help command for further instructions.")
}
if opts.DB.Cert.SSL {
pool := x509.NewCertPool()
pem, err := ioutil.ReadFile(opts.DB.Cert.CA)
if err != nil {
err = fmt.Errorf("Could not read file %s", opts.DB.Cert.CA)
}
if ok := pool.AppendCertsFromPEM(pem); !ok {
return "", fmt.Errorf("Could not read file %s", opts.DB.Cert.CA)
}
cert := make([]tls.Certificate, 0, 1)
pair, err := tls.LoadX509KeyPair(opts.DB.Cert.Crt, opts.DB.Cert.Key)
if err != nil {
return "", err
}
cert = append(cert, pair)
mysql.RegisterTLSConfig("custom", &tls.Config{
RootCAs: pool,
Certificates: cert,
InsecureSkipVerify: true,
})
}
if opts.DB.Cert.SSL {
path += fmt.Sprintf("%stcp(%s)/%s?tls=custom", ma[1], ma[4], ma[5])
} else {
path += fmt.Sprintf("%stcp(%s)/%s", ma[1], ma[4], ma[5])
}
return
}

View file

@ -1,358 +0,0 @@
// Copyright © 2016 Abcum Ltd
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package mysql
import (
"bytes"
"math"
"database/sql"
"github.com/abcum/surreal/kvs"
"github.com/abcum/surreal/util/cryp"
"github.com/abcum/surreal/util/snap"
)
// TX is a distributed database transaction.
type TX struct {
ds *DS
do bool
ck []byte
tx *sql.Tx
}
// All retrieves all key:value items in the db.
func (tx *TX) All() (kvs []kvs.KV, err error) {
res, err := tx.tx.Query("SELECT `key`, `val` FROM kv ORDER BY `key` ASC")
if err != nil {
return
}
defer res.Close()
for res.Next() {
var key, val []byte
err := res.Scan(&key, &val)
if err != nil {
return nil, err
}
kv, err := get(tx, key, val)
if err != nil {
return nil, err
}
kvs = append(kvs, kv)
}
err = res.Err()
if err != nil {
return nil, err
}
return
}
// Get retrieves a single key:value item.
func (tx *TX) Get(key []byte) (kv kvs.KV, err error) {
row := tx.tx.QueryRow("SELECT `val` FROM kv WHERE `key` = ?", key)
var val []byte
row.Scan(&val)
return get(tx, key, val)
}
// MGet retrieves multiple key:value items.
func (tx *TX) MGet(keys ...[]byte) (kvs []kvs.KV, err error) {
/*
res, err := tx.tx.Query("SELECT `key`, `val` FROM kv WHERE `key` IN (?)", keys)
if err != nil {
return
}
defer res.Close()
for res.Next() {
var key, val []byte
err := res.Scan(&key, &val)
if err != nil {
return nil, err
}
kv, err := get(tx, key, val)
if err != nil {
return nil, err
}
kvs = append(kvs, kv)
}
err = res.Err()
if err != nil {
return nil, err
}
*/
for _, key := range keys {
kv, _ := tx.Get(key)
kvs = append(kvs, kv)
}
return
}
// PGet retrieves the range of rows which are prefixed with `pre`.
func (tx *TX) PGet(pre []byte) (kvs []kvs.KV, err error) {
end := append(pre, 0xff)
return tx.RGet(pre, end, 0)
}
// RGet retrieves the range of `max` rows between `beg` (inclusive) and
// `end` (exclusive). To return the range in descending order, ensure
// that `end` sorts lower than `beg` in the key value store.
func (tx *TX) RGet(beg, end []byte, max uint64) (kvs []kvs.KV, err error) {
if max == 0 {
max = math.MaxUint64
}
res, err := tx.tx.Query("SELECT `key`, `val` FROM kv WHERE `key` BETWEEN ? AND ? ORDER BY `key` ASC LIMIT ?", beg, end, max)
if err != nil {
return nil, err
}
defer res.Close()
for res.Next() {
var key, val []byte
err := res.Scan(&key, &val)
if err != nil {
return nil, err
}
kv, err := get(tx, key, val)
if err != nil {
return nil, err
}
kvs = append(kvs, kv)
}
err = res.Err()
if err != nil {
return nil, err
}
return
}
// Put sets the value for a key.
func (tx *TX) Put(key, val []byte) (err error) {
if val, err = snap.Encode(val); err != nil {
err = &kvs.DBError{Err: err}
return
}
if val, err = cryp.Encrypt(tx.ds.ck, val); err != nil {
err = &kvs.CKError{Err: err}
return
}
if _, err = tx.tx.Exec("INSERT INTO kv (`key`, `val`) VALUES (?, ?) ON DUPLICATE KEY UPDATE `val` = ?", key, val, val); err != nil {
err = &kvs.DBError{Err: err}
return
}
return
}
// CPut conditionally sets the value for a key if the existing value is equal
// to the expected value. To conditionally set a value only if there is no
// existing entry pass nil for the expected value.
func (tx *TX) CPut(key, val, exp []byte) (err error) {
now, _ := tx.Get(key)
act := now.(*KV).val
if !bytes.Equal(act, exp) {
err = &kvs.KVError{Err: err, Key: key, Val: act, Exp: exp}
return
}
if val, err = snap.Encode(val); err != nil {
err = &kvs.DBError{Err: err}
return
}
if val, err = cryp.Encrypt(tx.ds.ck, val); err != nil {
err = &kvs.CKError{Err: err}
return
}
if _, err = tx.tx.Exec("INSERT INTO kv (`key`, `val`) VALUES (?, ?) ON DUPLICATE KEY UPDATE `val` = ?", key, val, val); err != nil {
err = &kvs.DBError{Err: err}
return
}
return
}
// Del deletes a single key:value item.
func (tx *TX) Del(key []byte) (err error) {
if _, err = tx.tx.Exec("DELETE FROM kv WHERE `key` = ?", key); err != nil {
err = &kvs.DBError{Err: err}
return
}
return
}
// CDel conditionally deletes a key if the existing value is equal to the
// expected value.
func (tx *TX) CDel(key, exp []byte) (err error) {
now, _ := tx.Get(key)
act := now.(*KV).val
if !bytes.Equal(act, exp) {
err = &kvs.KVError{Err: err, Key: key, Val: act, Exp: exp}
return
}
if _, err = tx.tx.Exec("DELETE FROM kv WHERE `key` = ?", key); err != nil {
err = &kvs.DBError{Err: err}
return
}
return
}
// MDel deletes multiple key:value items.
func (tx *TX) MDel(keys ...[]byte) (err error) {
/*
if _, err = tx.tx.Exec("DELETE FROM kv WHERE `key` IN (?)", keys); err != nil {
err = &kvs.DBError{Err:err}
return
}
*/
for _, key := range keys {
err = tx.Del(key)
}
return
}
// PDel deletes the range of rows which are prefixed with `pre`.
func (tx *TX) PDel(pre []byte) (err error) {
end := append(pre, 0xff)
return tx.RDel(pre, end, 0)
}
// RDel deletes the range of `max` rows between `beg` (inclusive) and
// `end` (exclusive). To delete the range in descending order, ensure
// that `end` sorts lower than `beg` in the key value store.
func (tx *TX) RDel(beg, end []byte, max uint64) (err error) {
if max == 0 {
max = math.MaxUint64
}
if _, err = tx.tx.Exec("DELETE FROM kv WHERE `key` BETWEEN ? AND ? ORDER BY `key` ASC LIMIT ?", beg, end, max); err != nil {
err = &kvs.DBError{Err: err}
return
}
return
}
func (tx *TX) Done() (val bool) {
return tx.do
}
func (tx *TX) Close() (err error) {
return tx.Rollback()
}
func (tx *TX) Cancel() (err error) {
return tx.Rollback()
}
func (tx *TX) Commit() (err error) {
tx.do = true
return tx.tx.Commit()
}
func (tx *TX) Rollback() (err error) {
tx.do = true
return tx.tx.Rollback()
}
func get(tx *TX, key, val []byte) (kv *KV, err error) {
kv = &KV{
exi: (val != nil),
key: key,
val: val,
}
kv.val, err = cryp.Decrypt(tx.ds.ck, kv.val)
if err != nil {
err = &kvs.CKError{Err: err}
return
}
kv.val, err = snap.Decode(kv.val)
if err != nil {
err = &kvs.DBError{Err: err}
return
}
return
}

View file

@ -1,42 +0,0 @@
// Copyright © 2016 Abcum Ltd
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package pgsql
// KV represents a database key:value item
type KV struct {
exi bool
key []byte
val []byte
}
// Exists is true if the key exists
func (kv *KV) Exists() bool {
return kv.exi
}
// Key returns a byte slice of the key
func (kv *KV) Key() []byte {
return kv.key
}
// Val returns a byte slice of the value
func (kv *KV) Val() []byte {
return kv.val
}
// Str returns a string of the value
func (kv *KV) Str() string {
return string(kv.val)
}

View file

@ -1,72 +0,0 @@
// Copyright © 2016 Abcum Ltd
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package pgsql
import (
"fmt"
"regexp"
"database/sql"
_ "github.com/lib/pq"
"github.com/abcum/surreal/cnf"
"github.com/abcum/surreal/kvs"
)
func init() {
kvs.Register("pgsql", New)
}
func New(opts *cnf.Options) (ds kvs.DS, err error) {
var db *sql.DB
opts.DB.Path, err = config(opts)
if err != nil {
return
}
db, err = sql.Open("postgres", opts.DB.Path)
if err != nil {
return
}
return &DS{db: db, ck: opts.DB.Key}, err
}
func config(opts *cnf.Options) (path string, err error) {
re := regexp.MustCompile(`^pgsql://` +
`((?:(?P<user>.*?)(?::(?P<passwd>.*))?@))?` +
`(?:(?:(?P<addr>[^\/]*))?)?` +
`\/(?P<dbname>.*?)` +
`(?:\?(?P<params>[^\?]*))?$`)
ma := re.FindStringSubmatch(opts.DB.Path)
if len(ma) == 0 || ma[4] == "" || ma[5] == "" {
err = fmt.Errorf("Specify a valid data store configuration path. Use the help command for further instructions.")
}
if opts.DB.Cert.SSL {
path += fmt.Sprintf("postgres://%s%s/%s?sslmode=verify-ca&sslrootcert=%s&sslcert=%s&sslkey=%s", ma[1], ma[4], ma[5], opts.DB.Cert.CA, opts.DB.Cert.Crt, opts.DB.Cert.Key)
} else {
path += fmt.Sprintf("postgres://%s%s/%s?sslmode=disable", ma[1], ma[4], ma[5])
}
return
}

View file

@ -1,367 +0,0 @@
// Copyright © 2016 Abcum Ltd
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package pgsql
import (
"bytes"
"math"
"database/sql"
"github.com/abcum/surreal/kvs"
"github.com/abcum/surreal/util/cryp"
"github.com/abcum/surreal/util/snap"
)
// TX is a distributed database transaction.
type TX struct {
ds *DS
do bool
ck []byte
tx *sql.Tx
}
// All retrieves all key:value items in the db.
func (tx *TX) All() (kvs []kvs.KV, err error) {
res, err := tx.tx.Query("SELECT key, val FROM kv ORDER BY key ASC")
if err != nil {
return
}
defer res.Close()
for res.Next() {
var key, val []byte
err := res.Scan(&key, &val)
if err != nil {
return nil, err
}
kv, err := get(tx, key, val)
if err != nil {
return nil, err
}
kvs = append(kvs, kv)
}
err = res.Err()
if err != nil {
return nil, err
}
return
}
// Get retrieves a single key:value item.
func (tx *TX) Get(key []byte) (kv kvs.KV, err error) {
row := tx.tx.QueryRow("SELECT val FROM kv WHERE key = $1", key)
var val []byte
row.Scan(&val)
return get(tx, key, val)
}
// MGet retrieves multiple key:value items.
func (tx *TX) MGet(keys ...[]byte) (kvs []kvs.KV, err error) {
/*
res, err := tx.tx.Query("SELECT key, val FROM kv WHERE key IN ($1)", keys)
if err != nil {
return
}
defer res.Close()
for res.Next() {
var key, val []byte
err := res.Scan(&key, &val)
if err != nil {
return nil, err
}
kv, err := get(tx, key, val)
if err != nil {
return nil, err
}
kvs = append(kvs, kv)
}
err = res.Err()
if err != nil {
return nil, err
}
*/
for _, key := range keys {
kv, _ := tx.Get(key)
kvs = append(kvs, kv)
}
return
}
// PGet retrieves the range of rows which are prefixed with `pre`.
func (tx *TX) PGet(pre []byte) (kvs []kvs.KV, err error) {
end := append(pre, 0xff)
return tx.RGet(pre, end, 0)
}
// RGet retrieves the range of `max` rows between `beg` (inclusive) and
// `end` (exclusive). To return the range in descending order, ensure
// that `end` sorts lower than `beg` in the key value store.
func (tx *TX) RGet(beg, end []byte, max uint64) (items []kvs.KV, err error) {
if max == 0 {
max = math.MaxUint64
}
if max > math.MaxInt64 {
max = math.MaxInt64
}
res, err := tx.tx.Query("SELECT key, val FROM kv WHERE key BETWEEN $1 AND $2 ORDER BY key ASC LIMIT $3", beg, end, max)
if err != nil {
err = &kvs.DBError{Err: err}
return nil, err
}
defer res.Close()
for res.Next() {
var key, val []byte
err := res.Scan(&key, &val)
if err != nil {
return nil, err
}
kv, err := get(tx, key, val)
if err != nil {
return nil, err
}
items = append(items, kv)
}
err = res.Err()
if err != nil {
return nil, err
}
return
}
// Put sets the value for a key.
func (tx *TX) Put(key, val []byte) (err error) {
if val, err = snap.Encode(val); err != nil {
err = &kvs.DBError{Err: err}
return
}
if val, err = cryp.Encrypt(tx.ds.ck, val); err != nil {
err = &kvs.CKError{Err: err}
return
}
if _, err = tx.tx.Exec("INSERT INTO kv (key, val) VALUES ($1, $2) ON CONFLICT (key) DO UPDATE SET val = $2", key, val); err != nil {
err = &kvs.DBError{Err: err}
return
}
return
}
// CPut conditionally sets the value for a key if the existing value is equal
// to the expected value. To conditionally set a value only if there is no
// existing entry pass nil for the expected value.
func (tx *TX) CPut(key, val, exp []byte) (err error) {
now, _ := tx.Get(key)
act := now.(*KV).val
if !bytes.Equal(act, exp) {
err = &kvs.KVError{Err: err, Key: key, Val: act, Exp: exp}
return
}
if val, err = snap.Encode(val); err != nil {
err = &kvs.DBError{Err: err}
return
}
if val, err = cryp.Encrypt(tx.ds.ck, val); err != nil {
err = &kvs.CKError{Err: err}
return
}
if _, err = tx.tx.Exec("INSERT INTO kv (key, val) VALUES ($1, $2) ON CONFLICT (key) DO UPDATE SET val = $2", key, val); err != nil {
err = &kvs.DBError{Err: err}
return
}
return
}
// Del deletes a single key:value item.
func (tx *TX) Del(key []byte) (err error) {
if _, err = tx.tx.Exec("DELETE FROM kv WHERE key = $1", key); err != nil {
err = &kvs.DBError{Err: err}
return
}
return
}
// CDel conditionally deletes a key if the existing value is equal to the
// expected value.
func (tx *TX) CDel(key, exp []byte) (err error) {
now, _ := tx.Get(key)
act := now.(*KV).val
if !bytes.Equal(act, exp) {
err = &kvs.KVError{Err: err, Key: key, Val: act, Exp: exp}
return
}
if _, err = tx.tx.Exec("DELETE FROM kv WHERE key = $1", key); err != nil {
err = &kvs.DBError{Err: err}
return
}
return
}
// MDel deletes multiple key:value items.
func (tx *TX) MDel(keys ...[]byte) (err error) {
/*
if _, err = tx.tx.Exec("DELETE FROM kv WHERE key IN ($1)", keys); err != nil {
err = &kvs.DBError{Err:err}
return
}
*/
for _, key := range keys {
err = tx.Del(key)
}
return
}
// PDel deletes the range of rows which are prefixed with `pre`.
func (tx *TX) PDel(pre []byte) (err error) {
end := append(pre, 0xff)
return tx.RDel(pre, end, 0)
}
// RDel deletes the range of `max` rows between `beg` (inclusive) and
// `end` (exclusive). To delete the range in descending order, ensure
// that `end` sorts lower than `beg` in the key value store.
func (tx *TX) RDel(beg, end []byte, max uint64) (err error) {
if max == 0 {
max = math.MaxUint64
}
if max > math.MaxInt64 {
max = math.MaxInt64
}
if _, err = tx.tx.Exec("DELETE FROM kv WHERE key BETWEEN $1 AND $2 ORDER BY key ASC LIMIT $3", beg, end, max); err != nil {
err = &kvs.DBError{Err: err}
return
}
return
}
func (tx *TX) Done() (val bool) {
return tx.do
}
func (tx *TX) Close() (err error) {
return tx.Rollback()
}
func (tx *TX) Cancel() (err error) {
return tx.Rollback()
}
func (tx *TX) Commit() (err error) {
tx.do = true
return tx.tx.Commit()
}
func (tx *TX) Rollback() (err error) {
tx.do = true
return tx.tx.Rollback()
}
func get(tx *TX, key, val []byte) (kv *KV, err error) {
kv = &KV{
exi: (val != nil),
key: key,
val: val,
}
kv.val, err = cryp.Decrypt(tx.ds.ck, kv.val)
if err != nil {
err = &kvs.CKError{Err: err}
return
}
kv.val, err = snap.Decode(kv.val)
if err != nil {
err = &kvs.DBError{Err: err}
return
}
return
}

View file

@ -12,36 +12,34 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
package pgsql package rixxdb
import ( import (
"database/sql" "github.com/abcum/rixxdb"
"github.com/abcum/surreal/kvs" "github.com/abcum/surreal/kvs"
) )
type DS struct { type DB struct {
db *sql.DB pntr *rixxdb.DB
ck []byte
} }
func (ds *DS) Txn(writable bool) (txn kvs.TX, err error) { func (db *DB) Begin(writable bool) (txn kvs.TX, err error) {
tx, err := ds.db.Begin() pntr, err := db.pntr.Begin(writable)
if err != nil { if err != nil {
err = &kvs.DSError{Err: err} err = &kvs.DSError{Err: err}
if tx != nil { if pntr != nil {
tx.Rollback() pntr.Cancel()
} }
return return
} }
return &TX{ds: ds, tx: tx}, err return &TX{pntr: pntr}, err
} }
func (ds *DS) Close() (err error) { func (db *DB) Close() (err error) {
return ds.db.Close() return db.pntr.Close()
} }

View file

@ -12,39 +12,35 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
package boltdb package rixxdb
import ( import (
"strings" "strings"
"github.com/boltdb/bolt" "github.com/abcum/rixxdb"
"github.com/abcum/surreal/cnf" "github.com/abcum/surreal/cnf"
"github.com/abcum/surreal/kvs" "github.com/abcum/surreal/kvs"
) )
var bucket = []byte("default")
func init() { func init() {
kvs.Register("boltdb", New)
}
func New(opts *cnf.Options) (ds kvs.DS, err error) { kvs.Register("rixxdb", func(opts *cnf.Options) (db kvs.DB, err error) {
var db *bolt.DB var pntr *rixxdb.DB
path := strings.TrimLeft(opts.DB.Path, "boltdb://") path := strings.TrimLeft(opts.DB.Path, "rixxdb://")
pntr, err = rixxdb.Open(path, &rixxdb.Config{
SyncPolicy: opts.DB.Sync,
EncryptionKey: opts.DB.Key,
})
db, err = bolt.Open(path, 0666, nil)
if err != nil { if err != nil {
return return
} }
db.Update(func(tx *bolt.Tx) error { return &DB{pntr: pntr}, err
tx.CreateBucketIfNotExists(bucket)
return nil
}) })
return &DS{db: db, ck: opts.DB.Key}, err
} }

137
kvs/rixxdb/tx.go Normal file
View file

@ -0,0 +1,137 @@
// Copyright © 2016 Abcum Ltd
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package rixxdb
import (
"github.com/abcum/rixxdb"
"github.com/abcum/surreal/kvs"
)
type TX struct {
pntr *rixxdb.TX
}
func (tx *TX) Closed() bool {
return tx.pntr.Closed()
}
func (tx *TX) Cancel() error {
return tx.pntr.Cancel()
}
func (tx *TX) Commit() error {
return tx.pntr.Commit()
}
func (tx *TX) Get(ver int64, key []byte) (kvs.KV, error) {
return tx.pntr.Get(ver, key)
}
func (tx *TX) GetL(ver int64, key []byte) ([]kvs.KV, error) {
all, err := tx.pntr.GetL(ver, key)
out := make([]kvs.KV, len(all))
for i, v := range all {
out[i] = v
}
return out, err
}
func (tx *TX) GetP(ver int64, key []byte, max uint64) ([]kvs.KV, error) {
all, err := tx.pntr.GetP(ver, key, max)
out := make([]kvs.KV, len(all))
for i, v := range all {
out[i] = v
}
return out, err
}
func (tx *TX) GetR(ver int64, beg []byte, end []byte, max uint64) ([]kvs.KV, error) {
all, err := tx.pntr.GetR(ver, beg, end, max)
out := make([]kvs.KV, len(all))
for i, v := range all {
out[i] = v
}
return out, err
}
func (tx *TX) Del(ver int64, key []byte) (kvs.KV, error) {
return tx.pntr.Del(ver, key)
}
func (tx *TX) DelC(ver int64, key []byte, exp []byte) (kvs.KV, error) {
return tx.pntr.DelC(ver, key, exp)
}
func (tx *TX) DelL(ver int64, key []byte) ([]kvs.KV, error) {
all, err := tx.pntr.DelL(ver, key)
out := make([]kvs.KV, len(all))
for i, v := range all {
out[i] = v
}
return out, err
}
func (tx *TX) DelP(ver int64, key []byte, max uint64) ([]kvs.KV, error) {
all, err := tx.pntr.DelP(ver, key, max)
out := make([]kvs.KV, len(all))
for i, v := range all {
out[i] = v
}
return out, err
}
func (tx *TX) DelR(ver int64, beg []byte, end []byte, max uint64) ([]kvs.KV, error) {
all, err := tx.pntr.DelR(ver, beg, end, max)
out := make([]kvs.KV, len(all))
for i, v := range all {
out[i] = v
}
return out, err
}
func (tx *TX) Put(ver int64, key []byte, val []byte) (kvs.KV, error) {
return tx.pntr.Put(ver, key, val)
}
func (tx *TX) PutC(ver int64, key []byte, val []byte, exp []byte) (kvs.KV, error) {
return tx.pntr.PutC(ver, key, val, exp)
}
func (tx *TX) PutL(ver int64, key []byte, val []byte) ([]kvs.KV, error) {
all, err := tx.pntr.PutL(ver, key, val)
out := make([]kvs.KV, len(all))
for i, v := range all {
out[i] = v
}
return out, err
}
func (tx *TX) PutP(ver int64, key []byte, val []byte, max uint64) ([]kvs.KV, error) {
all, err := tx.pntr.PutP(ver, key, val, max)
out := make([]kvs.KV, len(all))
for i, v := range all {
out[i] = v
}
return out, err
}
func (tx *TX) PutR(ver int64, key []byte, val []byte, exp []byte, max uint64) ([]kvs.KV, error) {
all, err := tx.pntr.PutR(ver, key, val, exp, max)
out := make([]kvs.KV, len(all))
for i, v := range all {
out[i] = v
}
return out, err
}

View file

@ -14,23 +14,26 @@
package kvs package kvs
// TX represents a datastore transaction // TX represents a database transaction
type TX interface { type TX interface {
All() ([]KV, error) Closed() bool
Get([]byte) (KV, error)
MGet(...[]byte) ([]KV, error)
PGet([]byte) ([]KV, error)
RGet([]byte, []byte, uint64) ([]KV, error)
Put([]byte, []byte) error
CPut([]byte, []byte, []byte) error
Del([]byte) error
CDel([]byte, []byte) error
MDel(...[]byte) error
PDel([]byte) error
RDel([]byte, []byte, uint64) error
Done() bool
Close() error
Cancel() error Cancel() error
Commit() error Commit() error
Rollback() error
Get(int64, []byte) (KV, error)
GetL(int64, []byte) ([]KV, error)
GetP(int64, []byte, uint64) ([]KV, error)
GetR(int64, []byte, []byte, uint64) ([]KV, error)
Del(int64, []byte) (KV, error)
DelC(int64, []byte, []byte) (KV, error)
DelL(int64, []byte) ([]KV, error)
DelP(int64, []byte, uint64) ([]KV, error)
DelR(int64, []byte, []byte, uint64) ([]KV, error)
Put(int64, []byte, []byte) (KV, error)
PutC(int64, []byte, []byte, []byte) (KV, error)
PutL(int64, []byte, []byte) ([]KV, error)
PutP(int64, []byte, []byte, uint64) ([]KV, error)
PutR(int64, []byte, []byte, []byte, uint64) ([]KV, error)
} }