From a3b9362adb0b8a6171b66bb1852e31546ad5728d Mon Sep 17 00:00:00 2001 From: Tobie Morgan Hitchcock Date: Thu, 9 Feb 2017 11:16:59 +0000 Subject: [PATCH] Switch underlying KV store to rixxdb/dendrodb --- cli/flags.go | 14 +- cli/setup.go | 22 +- cli/start.go | 1 + cnf/cnf.go | 4 + db/create.go | 4 +- db/delete.go | 9 +- db/select.go | 9 +- db/update.go | 9 +- kvs/boltdb/ds.go | 47 ---- kvs/boltdb/kv.go | 42 ---- kvs/boltdb/tx.go | 380 ------------------------------ kvs/db.go | 268 +-------------------- kvs/ds.go | 68 +++++- kvs/kv.go | 6 +- kvs/mysql/ds.go | 47 ---- kvs/mysql/kv.go | 42 ---- kvs/mysql/main.go | 98 -------- kvs/mysql/tx.go | 358 ---------------------------- kvs/pgsql/kv.go | 42 ---- kvs/pgsql/main.go | 72 ------ kvs/pgsql/tx.go | 367 ----------------------------- kvs/{pgsql/ds.go => rixxdb/db.go} | 24 +- kvs/{boltdb => rixxdb}/main.go | 34 ++- kvs/rixxdb/tx.go | 137 +++++++++++ kvs/tx.go | 35 +-- 25 files changed, 295 insertions(+), 1844 deletions(-) delete mode 100644 kvs/boltdb/ds.go delete mode 100644 kvs/boltdb/kv.go delete mode 100644 kvs/boltdb/tx.go delete mode 100644 kvs/mysql/ds.go delete mode 100644 kvs/mysql/kv.go delete mode 100644 kvs/mysql/main.go delete mode 100644 kvs/mysql/tx.go delete mode 100644 kvs/pgsql/kv.go delete mode 100644 kvs/pgsql/main.go delete mode 100644 kvs/pgsql/tx.go rename kvs/{pgsql/ds.go => rixxdb/db.go} (70%) rename kvs/{boltdb => rixxdb}/main.go (62%) create mode 100644 kvs/rixxdb/tx.go diff --git a/cli/flags.go b/cli/flags.go index 2f195398..0c6f1b91 100644 --- a/cli/flags.go +++ b/cli/flags.go @@ -15,16 +15,16 @@ package cli var flags = map[string]string{ - "db": `Database configuration path used for storing data. Available backend stores are boltdb, mysql, or pgsql. (default "boltdb://surreal.db").`, + "db": `Database configuration path used for storing data. Available backend stores are memory, rixxdb, or dendrodb. (default "memory").`, "key": `Encryption key to use for intra-cluster communications, and on-disk encryption. For AES-128 encryption use a 16 bit key, for AES-192 encryption use a 24 bit key, and for AES-256 encryption use a 32 bit key.`, "join": `A comma-separated list of addresses to use when a new node is joining an existing cluster. For the first node in a cluster, --join should NOT be specified.`, } var usage = map[string][]string{ "db": { - "--db-path boltdb://surreal.db", - "--db-path mysql://user:pass@127.0.0.1:3306/database", - "--db-path pgsql://user:pass@127.0.0.1:5432/database", + "--db-path memory", + "--db-path rixxdb://surreal.db", + "--db-path dendro://user:pass@192.168.1.100", }, "join": { "--join 10.0.0.1", @@ -33,8 +33,8 @@ var usage = map[string][]string{ "--join 89.13.7.33:33693,example.com:33693", }, "key": { - "--enc 1hg7dbrma8ghe547", - "--enc 1hg7dbrma8ghe5473kghvie6", - "--enc 1hg7dbrma8ghe5473kghvie64jgi3ph4", + "--key 1hg7dbrma8ghe547", + "--key 1hg7dbrma8ghe5473kghvie6", + "--key 1hg7dbrma8ghe5473kghvie64jgi3ph4", }, } diff --git a/cli/setup.go b/cli/setup.go index 2d497887..d9f26899 100644 --- a/cli/setup.go +++ b/cli/setup.go @@ -19,6 +19,7 @@ import ( "os" "regexp" "strings" + "time" "github.com/abcum/surreal/cnf" "github.com/abcum/surreal/log" @@ -34,20 +35,23 @@ func setup() { // Ensure that the default // database options are set - if opts.DB.Base == "" { - opts.DB.Base = "surreal" + if opts.DB.Path == "" { + opts.DB.Path = "memory" } - if opts.DB.Path == "" { - opts.DB.Path = "boltdb://surreal.db" + if opts.DB.Base == "" { + opts.DB.Base = "surreal" } if opts.DB.Code != "" { opts.DB.Key = []byte(opts.DB.Code) } - if ok, _ := regexp.MatchString(`^(boltdb|mysql|pgsql)://(.+)$`, opts.DB.Path); !ok { - log.Fatal("Specify a valid data store configuration path") + if opts.DB.Time != "" { + var err error + if opts.DB.Sync, err = time.ParseDuration(opts.DB.Time); err != nil { + log.Fatal("Specify a valid database sync time frequency") + } } switch len(opts.DB.Key) { @@ -56,6 +60,12 @@ func setup() { log.Fatal("Specify a valid encryption key length. Valid key sizes are 16bit, 24bit, or 32bit.") } + if opts.DB.Path != "memory" { + if ok, _ := regexp.MatchString(`^(s3|gcs|logr|file|rixxdb|dendrodb)://(.+)$`, opts.DB.Path); !ok { + log.Fatal("Specify a valid data store configuration path") + } + } + if strings.HasPrefix(opts.DB.Cert.CA, "-----") { var err error var doc *os.File diff --git a/cli/start.go b/cli/start.go index 49ce9498..691e7ede 100644 --- a/cli/start.go +++ b/cli/start.go @@ -78,6 +78,7 @@ func init() { startCmd.PersistentFlags().StringVar(&opts.DB.Cert.Crt, "db-crt", "", "Path to the certificate file used to connect to the remote database.") startCmd.PersistentFlags().StringVar(&opts.DB.Cert.Key, "db-key", "", "Path to the private key file used to connect to the remote database.") startCmd.PersistentFlags().StringVar(&opts.DB.Path, "db-path", "", flag("db")) + startCmd.PersistentFlags().StringVar(&opts.DB.Time, "db-sync", "0s", "Something here") startCmd.PersistentFlags().StringVarP(&opts.Cluster.Join, "join", "j", "", flag("join")) diff --git a/cnf/cnf.go b/cnf/cnf.go index 02261838..2583ac4f 100644 --- a/cnf/cnf.go +++ b/cnf/cnf.go @@ -14,6 +14,8 @@ package cnf +import "time" + var Settings *Options type Auth struct { @@ -38,6 +40,8 @@ type Options struct { Host string // Surreal host to connect to Port string // Surreal port to connect to Base string // Base key to use in KV stores + Time string // Timeframe for syncing data + Sync time.Duration Cert struct { CA string Crt string diff --git a/db/create.go b/db/create.go index bb9711f1..469a7e06 100644 --- a/db/create.go +++ b/db/create.go @@ -40,7 +40,7 @@ func (e *executor) executeCreateStatement(txn kvs.TX, ast *sql.CreateStatement) case *sql.Thing: key := &keys.Thing{KV: ast.KV, NS: ast.NS, DB: ast.DB, TB: what.TB, ID: what.ID} - kv, _ := txn.Get(key.Encode()) + kv, _ := txn.Get(0, key.Encode()) doc := item.New(kv, txn, key, e.ctx) if ret, err := create(doc, ast); err != nil { return nil, err @@ -50,7 +50,7 @@ func (e *executor) executeCreateStatement(txn kvs.TX, ast *sql.CreateStatement) case *sql.Table: key := &keys.Thing{KV: ast.KV, NS: ast.NS, DB: ast.DB, TB: what.TB, ID: uuid.NewV5(uuid.NewV4().UUID, ast.KV).String()} - kv, _ := txn.Get(key.Encode()) + kv, _ := txn.Get(0, key.Encode()) doc := item.New(kv, txn, key, e.ctx) if ret, err := create(doc, ast); err != nil { return nil, err diff --git a/db/delete.go b/db/delete.go index a263aed7..1a6e5eed 100644 --- a/db/delete.go +++ b/db/delete.go @@ -39,8 +39,8 @@ func (e *executor) executeDeleteStatement(txn kvs.TX, ast *sql.DeleteStatement) case *sql.Thing: key := &keys.Thing{KV: ast.KV, NS: ast.NS, DB: ast.DB, TB: what.TB, ID: what.ID} - kv, _ := txn.Get(key.Encode()) - doc := item.New(kv, txn, key, e.ctx) + kv, _ := txn.Get(0, key.Encode()) + doc := item.New(kv, e.txn, key, e.ctx) if ret, err := delete(doc, ast); err != nil { return nil, err } else if ret != nil { @@ -48,9 +48,8 @@ func (e *executor) executeDeleteStatement(txn kvs.TX, ast *sql.DeleteStatement) } case *sql.Table: - beg := &keys.Thing{KV: ast.KV, NS: ast.NS, DB: ast.DB, TB: what.TB, ID: keys.Prefix} - end := &keys.Thing{KV: ast.KV, NS: ast.NS, DB: ast.DB, TB: what.TB, ID: keys.Suffix} - kvs, _ := txn.RGet(beg.Encode(), end.Encode(), 0) + key := &keys.Table{KV: ast.KV, NS: ast.NS, DB: ast.DB, TB: what.TB} + kvs, _ := txn.GetL(0, key.Encode()) for _, kv := range kvs { doc := item.New(kv, txn, nil, e.ctx) if ret, err := delete(doc, ast); err != nil { diff --git a/db/select.go b/db/select.go index 64ece8e6..f957cf1e 100644 --- a/db/select.go +++ b/db/select.go @@ -33,8 +33,8 @@ func (e *executor) executeSelectStatement(txn kvs.TX, ast *sql.SelectStatement) if what, ok := w.(*sql.Thing); ok { key := &keys.Thing{KV: ast.KV, NS: ast.NS, DB: ast.DB, TB: what.TB, ID: what.ID} - kv, _ := txn.Get(key.Encode()) - doc := item.New(kv, txn, key, e.ctx) + kv, _ := txn.Get(0, key.Encode()) + doc := item.New(kv, e.txn, key, e.ctx) if ret, err := detect(doc, ast); err != nil { return nil, err } else if ret != nil { @@ -43,9 +43,8 @@ func (e *executor) executeSelectStatement(txn kvs.TX, ast *sql.SelectStatement) } if what, ok := w.(*sql.Table); ok { - beg := &keys.Thing{KV: ast.KV, NS: ast.NS, DB: ast.DB, TB: what.TB, ID: keys.Prefix} - end := &keys.Thing{KV: ast.KV, NS: ast.NS, DB: ast.DB, TB: what.TB, ID: keys.Suffix} - kvs, _ := txn.RGet(beg.Encode(), end.Encode(), 0) + key := &keys.Table{KV: ast.KV, NS: ast.NS, DB: ast.DB, TB: what.TB} + kvs, _ := txn.GetL(0, key.Encode()) for _, kv := range kvs { doc := item.New(kv, txn, nil, e.ctx) if ret, err := detect(doc, ast); err != nil { diff --git a/db/update.go b/db/update.go index 3f12c17d..bf49b5fc 100644 --- a/db/update.go +++ b/db/update.go @@ -39,8 +39,8 @@ func (e *executor) executeUpdateStatement(txn kvs.TX, ast *sql.UpdateStatement) case *sql.Thing: key := &keys.Thing{KV: ast.KV, NS: ast.NS, DB: ast.DB, TB: what.TB, ID: what.ID} - kv, _ := txn.Get(key.Encode()) - doc := item.New(kv, txn, key, e.ctx) + kv, _ := txn.Get(0, key.Encode()) + doc := item.New(kv, e.txn, key, e.ctx) if ret, err := update(doc, ast); err != nil { return nil, err } else if ret != nil { @@ -48,9 +48,8 @@ func (e *executor) executeUpdateStatement(txn kvs.TX, ast *sql.UpdateStatement) } case *sql.Table: - beg := &keys.Thing{KV: ast.KV, NS: ast.NS, DB: ast.DB, TB: what.TB, ID: keys.Prefix} - end := &keys.Thing{KV: ast.KV, NS: ast.NS, DB: ast.DB, TB: what.TB, ID: keys.Suffix} - kvs, _ := txn.RGet(beg.Encode(), end.Encode(), 0) + key := &keys.Table{KV: ast.KV, NS: ast.NS, DB: ast.DB, TB: what.TB} + kvs, _ := txn.GetL(0, key.Encode()) for _, kv := range kvs { doc := item.New(kv, txn, nil, e.ctx) if ret, err := update(doc, ast); err != nil { diff --git a/kvs/boltdb/ds.go b/kvs/boltdb/ds.go deleted file mode 100644 index 6a8ba689..00000000 --- a/kvs/boltdb/ds.go +++ /dev/null @@ -1,47 +0,0 @@ -// Copyright © 2016 Abcum Ltd -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package boltdb - -import ( - "github.com/boltdb/bolt" - - "github.com/abcum/surreal/kvs" -) - -type DS struct { - db *bolt.DB - ck []byte -} - -func (ds *DS) Txn(writable bool) (txn kvs.TX, err error) { - - tx, err := ds.db.Begin(writable) - if err != nil { - err = &kvs.DSError{Err: err} - if tx != nil { - tx.Rollback() - } - return - } - - return &TX{ds: ds, tx: tx, bu: tx.Bucket(bucket)}, err - -} - -func (ds *DS) Close() (err error) { - - return ds.db.Close() - -} diff --git a/kvs/boltdb/kv.go b/kvs/boltdb/kv.go deleted file mode 100644 index 5700b097..00000000 --- a/kvs/boltdb/kv.go +++ /dev/null @@ -1,42 +0,0 @@ -// Copyright © 2016 Abcum Ltd -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package boltdb - -// KV represents a database key:value item -type KV struct { - exi bool - key []byte - val []byte -} - -// Exists is true if the key exists -func (kv *KV) Exists() bool { - return kv.exi -} - -// Key returns a byte slice of the key -func (kv *KV) Key() []byte { - return kv.key -} - -// Val returns a byte slice of the value -func (kv *KV) Val() []byte { - return kv.val -} - -// Str returns a string of the value -func (kv *KV) Str() string { - return string(kv.val) -} diff --git a/kvs/boltdb/tx.go b/kvs/boltdb/tx.go deleted file mode 100644 index 65eab515..00000000 --- a/kvs/boltdb/tx.go +++ /dev/null @@ -1,380 +0,0 @@ -// Copyright © 2016 Abcum Ltd -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package boltdb - -import ( - "bytes" - "math" - - "github.com/boltdb/bolt" - - "github.com/abcum/surreal/kvs" - "github.com/abcum/surreal/util/cryp" - "github.com/abcum/surreal/util/snap" -) - -// TX is a distributed database transaction. -type TX struct { - ds *DS - do bool - ck []byte - tx *bolt.Tx - bu *bolt.Bucket -} - -// All retrieves all key:value items in the db. -func (tx *TX) All() (kvs []kvs.KV, err error) { - - err = tx.bu.ForEach(func(key, val []byte) (err error) { - - kv, err := get(tx, key, val) - if err != nil { - return - } - - kvs = append(kvs, kv) - - return nil - - }) - - return - -} - -// Get retrieves a single key:value item. -func (tx *TX) Get(key []byte) (kv kvs.KV, err error) { - - val := tx.bu.Get(key) - - return get(tx, key, val) - -} - -// MGet retrieves multiple key:value items. -func (tx *TX) MGet(keys ...[]byte) (kvs []kvs.KV, err error) { - - for _, key := range keys { - - val := tx.bu.Get(key) - - kv, err := get(tx, key, val) - if err != nil { - return nil, err - } - - kvs = append(kvs, kv) - - } - - return - -} - -// PGet retrieves the range of rows which are prefixed with `pre`. -func (tx *TX) PGet(pre []byte) (kvs []kvs.KV, err error) { - - cu := tx.bu.Cursor() - - for key, val := cu.Seek(pre); bytes.HasPrefix(key, pre); key, val = cu.Next() { - - kv, err := get(tx, key, val) - if err != nil { - return nil, err - } - - kvs = append(kvs, kv) - - } - - return - -} - -// RGet retrieves the range of `max` rows between `beg` (inclusive) and -// `end` (exclusive). To return the range in descending order, ensure -// that `end` sorts lower than `beg` in the key value store. -func (tx *TX) RGet(beg, end []byte, max uint64) (kvs []kvs.KV, err error) { - - if max == 0 { - max = math.MaxUint64 - } - - cu := tx.bu.Cursor() - - if bytes.Compare(beg, end) < 1 { - for key, val := cu.Seek(beg); key != nil && max > 0 && bytes.Compare(key, end) < 0; key, val = cu.Next() { - - kv, err := get(tx, key, val) - if err != nil { - return nil, err - } - - kvs = append(kvs, kv) - - max-- - } - } - - if bytes.Compare(beg, end) > 1 { - for key, val := cu.Seek(end); key != nil && max > 0 && bytes.Compare(beg, key) < 0; key, val = cu.Prev() { - - kv, err := get(tx, key, val) - if err != nil { - return nil, err - } - - kvs = append(kvs, kv) - - max-- - } - } - - return - -} - -// Put sets the value for a key. -func (tx *TX) Put(key, val []byte) (err error) { - - if !tx.tx.Writable() { - err = &kvs.TXError{Err: err} - return - } - - if val, err = snap.Encode(val); err != nil { - err = &kvs.DBError{Err: err} - return - } - - if val, err = cryp.Encrypt(tx.ds.ck, val); err != nil { - err = &kvs.CKError{Err: err} - return - } - - if err = tx.bu.Put(key, val); err != nil { - err = &kvs.DBError{Err: err} - return - } - - return - -} - -// CPut conditionally sets the value for a key if the existing value is equal -// to the expected value. To conditionally set a value only if there is no -// existing entry pass nil for the expected value. -func (tx *TX) CPut(key, val, exp []byte) (err error) { - - if !tx.tx.Writable() { - err = &kvs.TXError{Err: err} - return - } - - now, _ := tx.Get(key) - act := now.(*KV).val - - if !bytes.Equal(act, exp) { - err = &kvs.KVError{Err: err, Key: key, Val: act, Exp: exp} - return - } - - if val, err = snap.Encode(val); err != nil { - err = &kvs.DBError{Err: err} - return - } - - if val, err = cryp.Encrypt(tx.ds.ck, val); err != nil { - err = &kvs.CKError{Err: err} - return - } - - if err = tx.bu.Put(key, val); err != nil { - err = &kvs.DBError{Err: err} - return - } - - return - -} - -// Del deletes a single key:value item. -func (tx *TX) Del(key []byte) (err error) { - - if !tx.tx.Writable() { - err = &kvs.TXError{Err: err} - return - } - - if err = tx.bu.Delete(key); err != nil { - err = &kvs.DBError{Err: err} - return - } - - return - -} - -// CDel conditionally deletes a key if the existing value is equal to the -// expected value. -func (tx *TX) CDel(key, exp []byte) (err error) { - - if !tx.tx.Writable() { - err = &kvs.TXError{Err: err} - return - } - - now, _ := tx.Get(key) - act := now.(*KV).val - - if !bytes.Equal(act, exp) { - err = &kvs.KVError{Err: err, Key: key, Val: act, Exp: exp} - return - } - - if err = tx.bu.Delete(key); err != nil { - err = &kvs.DBError{Err: err} - return - } - - return - -} - -// MDel deletes multiple key:value items. -func (tx *TX) MDel(keys ...[]byte) (err error) { - - if !tx.tx.Writable() { - err = &kvs.TXError{Err: err} - return - } - - for _, key := range keys { - - if err = tx.bu.Delete(key); err != nil { - err = &kvs.DBError{Err: err} - return - } - - } - - return - -} - -// PDel deletes the range of rows which are prefixed with `pre`. -func (tx *TX) PDel(pre []byte) (err error) { - - cu := tx.bu.Cursor() - - for key, _ := cu.Seek(pre); bytes.HasPrefix(key, pre); key, _ = cu.Seek(pre) { - if err = tx.bu.Delete(key); err != nil { - err = &kvs.DBError{Err: err} - return - } - } - - return - -} - -// RDel deletes the range of `max` rows between `beg` (inclusive) and -// `end` (exclusive). To delete the range in descending order, ensure -// that `end` sorts lower than `beg` in the key value store. -func (tx *TX) RDel(beg, end []byte, max uint64) (err error) { - - if max == 0 { - max = math.MaxUint64 - } - - if !tx.tx.Writable() { - err = &kvs.TXError{Err: err} - return - } - - cu := tx.bu.Cursor() - - if bytes.Compare(beg, end) < 1 { - for key, _ := cu.Seek(beg); key != nil && max > 0 && bytes.Compare(key, end) < 0; key, _ = cu.Seek(beg) { - if err = tx.bu.Delete(key); err != nil { - err = &kvs.DBError{Err: err} - return - } - max-- - } - } - - if bytes.Compare(beg, end) > 1 { - for key, _ := cu.Seek(end); key != nil && max > 0 && bytes.Compare(beg, key) < 0; key, _ = cu.Seek(end) { - if err = tx.bu.Delete(key); err != nil { - err = &kvs.DBError{Err: err} - return - } - max-- - } - } - - return - -} - -func (tx *TX) Done() (val bool) { - return tx.do -} - -func (tx *TX) Close() (err error) { - return tx.Rollback() -} - -func (tx *TX) Cancel() (err error) { - return tx.Rollback() -} - -func (tx *TX) Commit() (err error) { - tx.do = true - if tx.tx.Writable() { - return tx.tx.Commit() - } - return tx.tx.Rollback() -} - -func (tx *TX) Rollback() (err error) { - tx.do = true - return tx.tx.Rollback() -} - -func get(tx *TX, key, val []byte) (kv *KV, err error) { - - kv = &KV{ - exi: (val != nil), - key: key, - val: val, - } - - kv.val, err = cryp.Decrypt(tx.ds.ck, kv.val) - if err != nil { - err = &kvs.CKError{Err: err} - return - } - - kv.val, err = snap.Decode(kv.val) - if err != nil { - err = &kvs.DBError{Err: err} - return - } - - return - -} diff --git a/kvs/db.go b/kvs/db.go index 0989670f..0e76eeee 100644 --- a/kvs/db.go +++ b/kvs/db.go @@ -14,268 +14,8 @@ package kvs -import ( - "strings" - - "github.com/abcum/surreal/cnf" - "github.com/abcum/surreal/util/keys" -) - -var stores = make(map[string]func(*cnf.Options) (DS, error)) - -// DB is a database handle to a single Surreal cluster. -type DB struct { - ds DS -} - -// New sets up the underlying key-value store -func New(opts *cnf.Options) (db *DB, err error) { - - var ds DS - - if strings.HasPrefix(opts.DB.Path, "boltdb://") { - if ds, err = stores["boltdb"](opts); err != nil { - return - } - } - - if strings.HasPrefix(opts.DB.Path, "mysql://") { - if ds, err = stores["mysql"](opts); err != nil { - return - } - } - - if strings.HasPrefix(opts.DB.Path, "pgsql://") { - if ds, err = stores["pgsql"](opts); err != nil { - return - } - } - - db = &DB{ds: ds} - - err = db.enc(opts) - - return - -} - -func (db *DB) enc(opts *cnf.Options) (err error) { - - ck := &keys.CK{KV: opts.DB.Base} - - kv, err := db.Get(ck.Encode()) - if err != nil { - return err - } - - if kv.Exists() == false { - err = db.Put(ck.Encode(), []byte("±")) - } - - if kv.Exists() == true && kv.Str() != "±" { - err = new(CKError) - } - - return - -} - -// All retrieves all key:value items in the db. -func (db *DB) All() (kvs []KV, err error) { - - tx, err := db.Txn(false) - if err != nil { - return - } - - defer tx.Close() - - return tx.All() - -} - -// Get retrieves a single key:value item. -func (db *DB) Get(key []byte) (kv KV, err error) { - - tx, err := db.Txn(false) - if err != nil { - return - } - - defer tx.Close() - - return tx.Get(key) - -} - -// MGet retrieves multiple key:value items. -func (db *DB) MGet(keys ...[]byte) (kvs []KV, err error) { - - tx, err := db.Txn(false) - if err != nil { - return - } - - defer tx.Close() - - return tx.MGet(keys...) - -} - -// PGet retrieves the range of rows which are prefixed with `pre`. -func (db *DB) PGet(pre []byte) (kvs []KV, err error) { - - tx, err := db.Txn(false) - if err != nil { - return - } - - defer tx.Close() - - return tx.PGet(pre) - -} - -// RGet retrieves the range of `max` rows between `beg` (inclusive) and -// `end` (exclusive). To return the range in descending order, ensure -// that `end` sorts lower than `beg` in the key value store. -func (db *DB) RGet(beg, end []byte, max uint64) (kvs []KV, err error) { - - tx, err := db.Txn(false) - if err != nil { - return - } - - defer tx.Close() - - return tx.RGet(beg, end, max) - -} - -// Put sets the value for a key. -func (db *DB) Put(key, val []byte) (err error) { - - tx, err := db.Txn(true) - if err != nil { - return - } - - defer tx.Commit() - - return tx.Put(key, val) - -} - -// CPut conditionally sets the value for a key if the existing value is equal -// to the expected value. To conditionally set a value only if there is no -// existing entry pass nil for the expected value. -func (db *DB) CPut(key, val, exp []byte) (err error) { - - tx, err := db.Txn(true) - if err != nil { - return - } - - defer tx.Commit() - - return tx.CPut(key, val, exp) - -} - -// Del deletes a single key:value item. -func (db *DB) Del(key []byte) (err error) { - - tx, err := db.Txn(true) - if err != nil { - return - } - - defer tx.Commit() - - return tx.Del(key) - -} - -// CDel conditionally deletes a key if the existing value is equal to the -// expected value. -func (db *DB) CDel(key, exp []byte) (err error) { - - tx, err := db.Txn(true) - if err != nil { - return - } - - defer tx.Commit() - - return tx.CDel(key, exp) - -} - -// MDel deletes multiple key:value items. -func (db *DB) MDel(keys ...[]byte) (err error) { - - tx, err := db.Txn(true) - if err != nil { - return - } - - defer tx.Commit() - - return tx.MDel(keys...) - -} - -// PDel deletes the range of rows which are prefixed with `pre`. -func (db *DB) PDel(pre []byte) (err error) { - - tx, err := db.Txn(true) - if err != nil { - return - } - - defer tx.Commit() - - return tx.PDel(pre) - -} - -// RDel deletes the range of `max` rows between `beg` (inclusive) and -// `end` (exclusive). To delete the range in descending order, ensure -// that `end` sorts lower than `beg` in the key value store. -func (db *DB) RDel(beg, end []byte, max uint64) (err error) { - - tx, err := db.Txn(true) - if err != nil { - return - } - - defer tx.Commit() - - return tx.RDel(beg, end, max) - -} - -// Txn executes retryable in the context of a distributed transaction. -// The transaction is automatically aborted if retryable returns any -// error aside from recoverable internal errors, and is automatically -// committed otherwise. The retryable function should have no side -// effects which could cause problems in the event it must be run more -// than once. -func (db *DB) Txn(writable bool) (txn TX, err error) { - - return db.ds.Txn(writable) - -} - -// Close ... -func (db *DB) Close() (err error) { - - return db.ds.Close() - -} - -func Register(name string, constructor func(*cnf.Options) (DS, error)) { - - stores[name] = constructor - +// DB represents a database implementation +type DB interface { + Begin(bool) (TX, error) + Close() error } diff --git a/kvs/ds.go b/kvs/ds.go index d73af024..4a49a307 100644 --- a/kvs/ds.go +++ b/kvs/ds.go @@ -14,8 +14,68 @@ package kvs -// DS represents a datastore implementation -type DS interface { - Txn(bool) (TX, error) - Close() error +import ( + "strings" + + "github.com/abcum/surreal/cnf" +) + +var stores = make(map[string]func(*cnf.Options) (DB, error)) + +// DB represents a backing datastore. +type DS struct { + db DB +} + +// New sets up the underlying key-value store +func New(opts *cnf.Options) (ds *DS, err error) { + + var db DB + + switch { + + case opts.DB.Path == "memory": + db, err = stores["rixxdb"](opts) + case strings.HasPrefix(opts.DB.Path, "s3://"): + db, err = stores["rixxdb"](opts) + case strings.HasPrefix(opts.DB.Path, "gcs://"): + db, err = stores["rixxdb"](opts) + case strings.HasPrefix(opts.DB.Path, "logr://"): + db, err = stores["rixxdb"](opts) + case strings.HasPrefix(opts.DB.Path, "file://"): + db, err = stores["rixxdb"](opts) + case strings.HasPrefix(opts.DB.Path, "rixxdb://"): + db, err = stores["rixxdb"](opts) + case strings.HasPrefix(opts.DB.Path, "dendrodb://"): + db, err = stores["dendro"](opts) + } + + if err != nil { + return + } + + ds = &DS{db: db} + + return + +} + +// Begin ... +func (ds *DS) Begin(writable bool) (txn TX, err error) { + + return ds.db.Begin(writable) + +} + +// Close ... +func (ds *DS) Close() (err error) { + + return ds.db.Close() + +} + +func Register(name string, constructor func(*cnf.Options) (DB, error)) { + + stores[name] = constructor + } diff --git a/kvs/kv.go b/kvs/kv.go index 07f35566..cf5eae0d 100644 --- a/kvs/kv.go +++ b/kvs/kv.go @@ -14,10 +14,10 @@ package kvs -// KV represents a datastore key:value item +// KV represents a database item type KV interface { - Exists() bool + Exi() bool Key() []byte Val() []byte - Str() string + Ver() int64 } diff --git a/kvs/mysql/ds.go b/kvs/mysql/ds.go deleted file mode 100644 index f515852e..00000000 --- a/kvs/mysql/ds.go +++ /dev/null @@ -1,47 +0,0 @@ -// Copyright © 2016 Abcum Ltd -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package mysql - -import ( - "database/sql" - - "github.com/abcum/surreal/kvs" -) - -type DS struct { - db *sql.DB - ck []byte -} - -func (ds *DS) Txn(writable bool) (txn kvs.TX, err error) { - - tx, err := ds.db.Begin() - if err != nil { - err = &kvs.DSError{Err: err} - if tx != nil { - tx.Rollback() - } - return - } - - return &TX{ds: ds, tx: tx}, err - -} - -func (ds *DS) Close() (err error) { - - return ds.db.Close() - -} diff --git a/kvs/mysql/kv.go b/kvs/mysql/kv.go deleted file mode 100644 index 181dc8e4..00000000 --- a/kvs/mysql/kv.go +++ /dev/null @@ -1,42 +0,0 @@ -// Copyright © 2016 Abcum Ltd -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package mysql - -// KV represents a database key:value item -type KV struct { - exi bool - key []byte - val []byte -} - -// Exists is true if the key exists -func (kv *KV) Exists() bool { - return kv.exi -} - -// Key returns a byte slice of the key -func (kv *KV) Key() []byte { - return kv.key -} - -// Val returns a byte slice of the value -func (kv *KV) Val() []byte { - return kv.val -} - -// Str returns a string of the value -func (kv *KV) Str() string { - return string(kv.val) -} diff --git a/kvs/mysql/main.go b/kvs/mysql/main.go deleted file mode 100644 index ae0bb54c..00000000 --- a/kvs/mysql/main.go +++ /dev/null @@ -1,98 +0,0 @@ -// Copyright © 2016 Abcum Ltd -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package mysql - -import ( - "fmt" - "regexp" - - "crypto/tls" - "crypto/x509" - "io/ioutil" - - "database/sql" - "github.com/go-sql-driver/mysql" - - "github.com/abcum/surreal/cnf" - "github.com/abcum/surreal/kvs" -) - -func init() { - kvs.Register("mysql", New) -} - -func New(opts *cnf.Options) (ds kvs.DS, err error) { - - var db *sql.DB - - opts.DB.Path, err = config(opts) - if err != nil { - return - } - - db, err = sql.Open("mysql", opts.DB.Path) - if err != nil { - return - } - - return &DS{db: db, ck: opts.DB.Key}, err - -} - -func config(opts *cnf.Options) (path string, err error) { - - re := regexp.MustCompile(`^mysql://` + - `((?:(?P.*?)(?::(?P.*))?@))?` + - `(?:(?:(?P[^\/]*))?)?` + - `\/(?P.*?)` + - `(?:\?(?P[^\?]*))?$`) - - ma := re.FindStringSubmatch(opts.DB.Path) - - if len(ma) == 0 || ma[4] == "" || ma[5] == "" { - err = fmt.Errorf("Specify a valid data store configuration path. Use the help command for further instructions.") - } - - if opts.DB.Cert.SSL { - pool := x509.NewCertPool() - pem, err := ioutil.ReadFile(opts.DB.Cert.CA) - if err != nil { - err = fmt.Errorf("Could not read file %s", opts.DB.Cert.CA) - } - if ok := pool.AppendCertsFromPEM(pem); !ok { - return "", fmt.Errorf("Could not read file %s", opts.DB.Cert.CA) - } - cert := make([]tls.Certificate, 0, 1) - pair, err := tls.LoadX509KeyPair(opts.DB.Cert.Crt, opts.DB.Cert.Key) - if err != nil { - return "", err - } - cert = append(cert, pair) - mysql.RegisterTLSConfig("custom", &tls.Config{ - RootCAs: pool, - Certificates: cert, - InsecureSkipVerify: true, - }) - } - - if opts.DB.Cert.SSL { - path += fmt.Sprintf("%stcp(%s)/%s?tls=custom", ma[1], ma[4], ma[5]) - } else { - path += fmt.Sprintf("%stcp(%s)/%s", ma[1], ma[4], ma[5]) - } - - return - -} diff --git a/kvs/mysql/tx.go b/kvs/mysql/tx.go deleted file mode 100644 index 15b5c9ed..00000000 --- a/kvs/mysql/tx.go +++ /dev/null @@ -1,358 +0,0 @@ -// Copyright © 2016 Abcum Ltd -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package mysql - -import ( - "bytes" - "math" - - "database/sql" - - "github.com/abcum/surreal/kvs" - "github.com/abcum/surreal/util/cryp" - "github.com/abcum/surreal/util/snap" -) - -// TX is a distributed database transaction. -type TX struct { - ds *DS - do bool - ck []byte - tx *sql.Tx -} - -// All retrieves all key:value items in the db. -func (tx *TX) All() (kvs []kvs.KV, err error) { - - res, err := tx.tx.Query("SELECT `key`, `val` FROM kv ORDER BY `key` ASC") - if err != nil { - return - } - - defer res.Close() - - for res.Next() { - - var key, val []byte - - err := res.Scan(&key, &val) - if err != nil { - return nil, err - } - - kv, err := get(tx, key, val) - if err != nil { - return nil, err - } - - kvs = append(kvs, kv) - - } - - err = res.Err() - if err != nil { - return nil, err - } - - return - -} - -// Get retrieves a single key:value item. -func (tx *TX) Get(key []byte) (kv kvs.KV, err error) { - - row := tx.tx.QueryRow("SELECT `val` FROM kv WHERE `key` = ?", key) - - var val []byte - - row.Scan(&val) - - return get(tx, key, val) - -} - -// MGet retrieves multiple key:value items. -func (tx *TX) MGet(keys ...[]byte) (kvs []kvs.KV, err error) { - - /* - res, err := tx.tx.Query("SELECT `key`, `val` FROM kv WHERE `key` IN (?)", keys) - if err != nil { - return - } - - defer res.Close() - - for res.Next() { - - var key, val []byte - - err := res.Scan(&key, &val) - if err != nil { - return nil, err - } - - kv, err := get(tx, key, val) - if err != nil { - return nil, err - } - - kvs = append(kvs, kv) - - } - - err = res.Err() - if err != nil { - return nil, err - } - */ - - for _, key := range keys { - kv, _ := tx.Get(key) - kvs = append(kvs, kv) - } - - return - -} - -// PGet retrieves the range of rows which are prefixed with `pre`. -func (tx *TX) PGet(pre []byte) (kvs []kvs.KV, err error) { - - end := append(pre, 0xff) - - return tx.RGet(pre, end, 0) - -} - -// RGet retrieves the range of `max` rows between `beg` (inclusive) and -// `end` (exclusive). To return the range in descending order, ensure -// that `end` sorts lower than `beg` in the key value store. -func (tx *TX) RGet(beg, end []byte, max uint64) (kvs []kvs.KV, err error) { - - if max == 0 { - max = math.MaxUint64 - } - - res, err := tx.tx.Query("SELECT `key`, `val` FROM kv WHERE `key` BETWEEN ? AND ? ORDER BY `key` ASC LIMIT ?", beg, end, max) - if err != nil { - return nil, err - } - - defer res.Close() - - for res.Next() { - - var key, val []byte - - err := res.Scan(&key, &val) - if err != nil { - return nil, err - } - - kv, err := get(tx, key, val) - if err != nil { - return nil, err - } - - kvs = append(kvs, kv) - - } - - err = res.Err() - if err != nil { - return nil, err - } - - return - -} - -// Put sets the value for a key. -func (tx *TX) Put(key, val []byte) (err error) { - - if val, err = snap.Encode(val); err != nil { - err = &kvs.DBError{Err: err} - return - } - - if val, err = cryp.Encrypt(tx.ds.ck, val); err != nil { - err = &kvs.CKError{Err: err} - return - } - - if _, err = tx.tx.Exec("INSERT INTO kv (`key`, `val`) VALUES (?, ?) ON DUPLICATE KEY UPDATE `val` = ?", key, val, val); err != nil { - err = &kvs.DBError{Err: err} - return - } - - return - -} - -// CPut conditionally sets the value for a key if the existing value is equal -// to the expected value. To conditionally set a value only if there is no -// existing entry pass nil for the expected value. -func (tx *TX) CPut(key, val, exp []byte) (err error) { - - now, _ := tx.Get(key) - act := now.(*KV).val - - if !bytes.Equal(act, exp) { - err = &kvs.KVError{Err: err, Key: key, Val: act, Exp: exp} - return - } - - if val, err = snap.Encode(val); err != nil { - err = &kvs.DBError{Err: err} - return - } - - if val, err = cryp.Encrypt(tx.ds.ck, val); err != nil { - err = &kvs.CKError{Err: err} - return - } - - if _, err = tx.tx.Exec("INSERT INTO kv (`key`, `val`) VALUES (?, ?) ON DUPLICATE KEY UPDATE `val` = ?", key, val, val); err != nil { - err = &kvs.DBError{Err: err} - return - } - - return - -} - -// Del deletes a single key:value item. -func (tx *TX) Del(key []byte) (err error) { - - if _, err = tx.tx.Exec("DELETE FROM kv WHERE `key` = ?", key); err != nil { - err = &kvs.DBError{Err: err} - return - } - - return - -} - -// CDel conditionally deletes a key if the existing value is equal to the -// expected value. -func (tx *TX) CDel(key, exp []byte) (err error) { - - now, _ := tx.Get(key) - act := now.(*KV).val - - if !bytes.Equal(act, exp) { - err = &kvs.KVError{Err: err, Key: key, Val: act, Exp: exp} - return - } - - if _, err = tx.tx.Exec("DELETE FROM kv WHERE `key` = ?", key); err != nil { - err = &kvs.DBError{Err: err} - return - } - - return - -} - -// MDel deletes multiple key:value items. -func (tx *TX) MDel(keys ...[]byte) (err error) { - - /* - if _, err = tx.tx.Exec("DELETE FROM kv WHERE `key` IN (?)", keys); err != nil { - err = &kvs.DBError{Err:err} - return - } - */ - - for _, key := range keys { - err = tx.Del(key) - } - - return - -} - -// PDel deletes the range of rows which are prefixed with `pre`. -func (tx *TX) PDel(pre []byte) (err error) { - - end := append(pre, 0xff) - - return tx.RDel(pre, end, 0) - -} - -// RDel deletes the range of `max` rows between `beg` (inclusive) and -// `end` (exclusive). To delete the range in descending order, ensure -// that `end` sorts lower than `beg` in the key value store. -func (tx *TX) RDel(beg, end []byte, max uint64) (err error) { - - if max == 0 { - max = math.MaxUint64 - } - - if _, err = tx.tx.Exec("DELETE FROM kv WHERE `key` BETWEEN ? AND ? ORDER BY `key` ASC LIMIT ?", beg, end, max); err != nil { - err = &kvs.DBError{Err: err} - return - } - - return - -} - -func (tx *TX) Done() (val bool) { - return tx.do -} - -func (tx *TX) Close() (err error) { - return tx.Rollback() -} - -func (tx *TX) Cancel() (err error) { - return tx.Rollback() -} - -func (tx *TX) Commit() (err error) { - tx.do = true - return tx.tx.Commit() -} - -func (tx *TX) Rollback() (err error) { - tx.do = true - return tx.tx.Rollback() -} - -func get(tx *TX, key, val []byte) (kv *KV, err error) { - - kv = &KV{ - exi: (val != nil), - key: key, - val: val, - } - - kv.val, err = cryp.Decrypt(tx.ds.ck, kv.val) - if err != nil { - err = &kvs.CKError{Err: err} - return - } - - kv.val, err = snap.Decode(kv.val) - if err != nil { - err = &kvs.DBError{Err: err} - return - } - - return - -} diff --git a/kvs/pgsql/kv.go b/kvs/pgsql/kv.go deleted file mode 100644 index e1193002..00000000 --- a/kvs/pgsql/kv.go +++ /dev/null @@ -1,42 +0,0 @@ -// Copyright © 2016 Abcum Ltd -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package pgsql - -// KV represents a database key:value item -type KV struct { - exi bool - key []byte - val []byte -} - -// Exists is true if the key exists -func (kv *KV) Exists() bool { - return kv.exi -} - -// Key returns a byte slice of the key -func (kv *KV) Key() []byte { - return kv.key -} - -// Val returns a byte slice of the value -func (kv *KV) Val() []byte { - return kv.val -} - -// Str returns a string of the value -func (kv *KV) Str() string { - return string(kv.val) -} diff --git a/kvs/pgsql/main.go b/kvs/pgsql/main.go deleted file mode 100644 index d2098dd3..00000000 --- a/kvs/pgsql/main.go +++ /dev/null @@ -1,72 +0,0 @@ -// Copyright © 2016 Abcum Ltd -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package pgsql - -import ( - "fmt" - "regexp" - - "database/sql" - _ "github.com/lib/pq" - - "github.com/abcum/surreal/cnf" - "github.com/abcum/surreal/kvs" -) - -func init() { - kvs.Register("pgsql", New) -} - -func New(opts *cnf.Options) (ds kvs.DS, err error) { - - var db *sql.DB - - opts.DB.Path, err = config(opts) - if err != nil { - return - } - - db, err = sql.Open("postgres", opts.DB.Path) - if err != nil { - return - } - - return &DS{db: db, ck: opts.DB.Key}, err - -} - -func config(opts *cnf.Options) (path string, err error) { - - re := regexp.MustCompile(`^pgsql://` + - `((?:(?P.*?)(?::(?P.*))?@))?` + - `(?:(?:(?P[^\/]*))?)?` + - `\/(?P.*?)` + - `(?:\?(?P[^\?]*))?$`) - - ma := re.FindStringSubmatch(opts.DB.Path) - - if len(ma) == 0 || ma[4] == "" || ma[5] == "" { - err = fmt.Errorf("Specify a valid data store configuration path. Use the help command for further instructions.") - } - - if opts.DB.Cert.SSL { - path += fmt.Sprintf("postgres://%s%s/%s?sslmode=verify-ca&sslrootcert=%s&sslcert=%s&sslkey=%s", ma[1], ma[4], ma[5], opts.DB.Cert.CA, opts.DB.Cert.Crt, opts.DB.Cert.Key) - } else { - path += fmt.Sprintf("postgres://%s%s/%s?sslmode=disable", ma[1], ma[4], ma[5]) - } - - return - -} diff --git a/kvs/pgsql/tx.go b/kvs/pgsql/tx.go deleted file mode 100644 index a6e40111..00000000 --- a/kvs/pgsql/tx.go +++ /dev/null @@ -1,367 +0,0 @@ -// Copyright © 2016 Abcum Ltd -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package pgsql - -import ( - "bytes" - "math" - - "database/sql" - - "github.com/abcum/surreal/kvs" - "github.com/abcum/surreal/util/cryp" - "github.com/abcum/surreal/util/snap" -) - -// TX is a distributed database transaction. -type TX struct { - ds *DS - do bool - ck []byte - tx *sql.Tx -} - -// All retrieves all key:value items in the db. -func (tx *TX) All() (kvs []kvs.KV, err error) { - - res, err := tx.tx.Query("SELECT key, val FROM kv ORDER BY key ASC") - if err != nil { - return - } - - defer res.Close() - - for res.Next() { - - var key, val []byte - - err := res.Scan(&key, &val) - if err != nil { - return nil, err - } - - kv, err := get(tx, key, val) - if err != nil { - return nil, err - } - - kvs = append(kvs, kv) - - } - - err = res.Err() - if err != nil { - return nil, err - } - - return - -} - -// Get retrieves a single key:value item. -func (tx *TX) Get(key []byte) (kv kvs.KV, err error) { - - row := tx.tx.QueryRow("SELECT val FROM kv WHERE key = $1", key) - - var val []byte - - row.Scan(&val) - - return get(tx, key, val) - -} - -// MGet retrieves multiple key:value items. -func (tx *TX) MGet(keys ...[]byte) (kvs []kvs.KV, err error) { - - /* - res, err := tx.tx.Query("SELECT key, val FROM kv WHERE key IN ($1)", keys) - if err != nil { - return - } - - defer res.Close() - - for res.Next() { - - var key, val []byte - - err := res.Scan(&key, &val) - if err != nil { - return nil, err - } - - kv, err := get(tx, key, val) - if err != nil { - return nil, err - } - - kvs = append(kvs, kv) - - } - - err = res.Err() - if err != nil { - return nil, err - } - */ - - for _, key := range keys { - kv, _ := tx.Get(key) - kvs = append(kvs, kv) - } - - return - -} - -// PGet retrieves the range of rows which are prefixed with `pre`. -func (tx *TX) PGet(pre []byte) (kvs []kvs.KV, err error) { - - end := append(pre, 0xff) - - return tx.RGet(pre, end, 0) - -} - -// RGet retrieves the range of `max` rows between `beg` (inclusive) and -// `end` (exclusive). To return the range in descending order, ensure -// that `end` sorts lower than `beg` in the key value store. -func (tx *TX) RGet(beg, end []byte, max uint64) (items []kvs.KV, err error) { - - if max == 0 { - max = math.MaxUint64 - } - - if max > math.MaxInt64 { - max = math.MaxInt64 - } - - res, err := tx.tx.Query("SELECT key, val FROM kv WHERE key BETWEEN $1 AND $2 ORDER BY key ASC LIMIT $3", beg, end, max) - if err != nil { - err = &kvs.DBError{Err: err} - return nil, err - } - - defer res.Close() - - for res.Next() { - - var key, val []byte - - err := res.Scan(&key, &val) - if err != nil { - return nil, err - } - - kv, err := get(tx, key, val) - if err != nil { - return nil, err - } - - items = append(items, kv) - - } - - err = res.Err() - if err != nil { - return nil, err - } - - return - -} - -// Put sets the value for a key. -func (tx *TX) Put(key, val []byte) (err error) { - - if val, err = snap.Encode(val); err != nil { - err = &kvs.DBError{Err: err} - return - } - - if val, err = cryp.Encrypt(tx.ds.ck, val); err != nil { - err = &kvs.CKError{Err: err} - return - } - - if _, err = tx.tx.Exec("INSERT INTO kv (key, val) VALUES ($1, $2) ON CONFLICT (key) DO UPDATE SET val = $2", key, val); err != nil { - err = &kvs.DBError{Err: err} - return - } - - return - -} - -// CPut conditionally sets the value for a key if the existing value is equal -// to the expected value. To conditionally set a value only if there is no -// existing entry pass nil for the expected value. -func (tx *TX) CPut(key, val, exp []byte) (err error) { - - now, _ := tx.Get(key) - act := now.(*KV).val - - if !bytes.Equal(act, exp) { - err = &kvs.KVError{Err: err, Key: key, Val: act, Exp: exp} - return - } - - if val, err = snap.Encode(val); err != nil { - err = &kvs.DBError{Err: err} - return - } - - if val, err = cryp.Encrypt(tx.ds.ck, val); err != nil { - err = &kvs.CKError{Err: err} - return - } - - if _, err = tx.tx.Exec("INSERT INTO kv (key, val) VALUES ($1, $2) ON CONFLICT (key) DO UPDATE SET val = $2", key, val); err != nil { - err = &kvs.DBError{Err: err} - return - } - - return - -} - -// Del deletes a single key:value item. -func (tx *TX) Del(key []byte) (err error) { - - if _, err = tx.tx.Exec("DELETE FROM kv WHERE key = $1", key); err != nil { - err = &kvs.DBError{Err: err} - return - } - - return - -} - -// CDel conditionally deletes a key if the existing value is equal to the -// expected value. -func (tx *TX) CDel(key, exp []byte) (err error) { - - now, _ := tx.Get(key) - act := now.(*KV).val - - if !bytes.Equal(act, exp) { - err = &kvs.KVError{Err: err, Key: key, Val: act, Exp: exp} - return - } - - if _, err = tx.tx.Exec("DELETE FROM kv WHERE key = $1", key); err != nil { - err = &kvs.DBError{Err: err} - return - } - - return - -} - -// MDel deletes multiple key:value items. -func (tx *TX) MDel(keys ...[]byte) (err error) { - - /* - if _, err = tx.tx.Exec("DELETE FROM kv WHERE key IN ($1)", keys); err != nil { - err = &kvs.DBError{Err:err} - return - } - */ - - for _, key := range keys { - err = tx.Del(key) - } - - return - -} - -// PDel deletes the range of rows which are prefixed with `pre`. -func (tx *TX) PDel(pre []byte) (err error) { - - end := append(pre, 0xff) - - return tx.RDel(pre, end, 0) - -} - -// RDel deletes the range of `max` rows between `beg` (inclusive) and -// `end` (exclusive). To delete the range in descending order, ensure -// that `end` sorts lower than `beg` in the key value store. -func (tx *TX) RDel(beg, end []byte, max uint64) (err error) { - - if max == 0 { - max = math.MaxUint64 - } - - if max > math.MaxInt64 { - max = math.MaxInt64 - } - - if _, err = tx.tx.Exec("DELETE FROM kv WHERE key BETWEEN $1 AND $2 ORDER BY key ASC LIMIT $3", beg, end, max); err != nil { - err = &kvs.DBError{Err: err} - return - } - - return - -} - -func (tx *TX) Done() (val bool) { - return tx.do -} - -func (tx *TX) Close() (err error) { - return tx.Rollback() -} - -func (tx *TX) Cancel() (err error) { - return tx.Rollback() -} - -func (tx *TX) Commit() (err error) { - tx.do = true - return tx.tx.Commit() -} - -func (tx *TX) Rollback() (err error) { - tx.do = true - return tx.tx.Rollback() -} - -func get(tx *TX, key, val []byte) (kv *KV, err error) { - - kv = &KV{ - exi: (val != nil), - key: key, - val: val, - } - - kv.val, err = cryp.Decrypt(tx.ds.ck, kv.val) - if err != nil { - err = &kvs.CKError{Err: err} - return - } - - kv.val, err = snap.Decode(kv.val) - if err != nil { - err = &kvs.DBError{Err: err} - return - } - - return - -} diff --git a/kvs/pgsql/ds.go b/kvs/rixxdb/db.go similarity index 70% rename from kvs/pgsql/ds.go rename to kvs/rixxdb/db.go index 1a285598..7f8a72b0 100644 --- a/kvs/pgsql/ds.go +++ b/kvs/rixxdb/db.go @@ -12,36 +12,34 @@ // See the License for the specific language governing permissions and // limitations under the License. -package pgsql +package rixxdb import ( - "database/sql" - + "github.com/abcum/rixxdb" "github.com/abcum/surreal/kvs" ) -type DS struct { - db *sql.DB - ck []byte +type DB struct { + pntr *rixxdb.DB } -func (ds *DS) Txn(writable bool) (txn kvs.TX, err error) { +func (db *DB) Begin(writable bool) (txn kvs.TX, err error) { - tx, err := ds.db.Begin() + pntr, err := db.pntr.Begin(writable) if err != nil { err = &kvs.DSError{Err: err} - if tx != nil { - tx.Rollback() + if pntr != nil { + pntr.Cancel() } return } - return &TX{ds: ds, tx: tx}, err + return &TX{pntr: pntr}, err } -func (ds *DS) Close() (err error) { +func (db *DB) Close() (err error) { - return ds.db.Close() + return db.pntr.Close() } diff --git a/kvs/boltdb/main.go b/kvs/rixxdb/main.go similarity index 62% rename from kvs/boltdb/main.go rename to kvs/rixxdb/main.go index 8f5a55b1..866b4dc7 100644 --- a/kvs/boltdb/main.go +++ b/kvs/rixxdb/main.go @@ -12,39 +12,35 @@ // See the License for the specific language governing permissions and // limitations under the License. -package boltdb +package rixxdb import ( "strings" - "github.com/boltdb/bolt" - + "github.com/abcum/rixxdb" "github.com/abcum/surreal/cnf" "github.com/abcum/surreal/kvs" ) -var bucket = []byte("default") - func init() { - kvs.Register("boltdb", New) -} -func New(opts *cnf.Options) (ds kvs.DS, err error) { + kvs.Register("rixxdb", func(opts *cnf.Options) (db kvs.DB, err error) { - var db *bolt.DB + var pntr *rixxdb.DB - path := strings.TrimLeft(opts.DB.Path, "boltdb://") + path := strings.TrimLeft(opts.DB.Path, "rixxdb://") - db, err = bolt.Open(path, 0666, nil) - if err != nil { - return - } + pntr, err = rixxdb.Open(path, &rixxdb.Config{ + SyncPolicy: opts.DB.Sync, + EncryptionKey: opts.DB.Key, + }) + + if err != nil { + return + } + + return &DB{pntr: pntr}, err - db.Update(func(tx *bolt.Tx) error { - tx.CreateBucketIfNotExists(bucket) - return nil }) - return &DS{db: db, ck: opts.DB.Key}, err - } diff --git a/kvs/rixxdb/tx.go b/kvs/rixxdb/tx.go new file mode 100644 index 00000000..26ef6cad --- /dev/null +++ b/kvs/rixxdb/tx.go @@ -0,0 +1,137 @@ +// Copyright © 2016 Abcum Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package rixxdb + +import ( + "github.com/abcum/rixxdb" + "github.com/abcum/surreal/kvs" +) + +type TX struct { + pntr *rixxdb.TX +} + +func (tx *TX) Closed() bool { + return tx.pntr.Closed() +} + +func (tx *TX) Cancel() error { + return tx.pntr.Cancel() +} + +func (tx *TX) Commit() error { + return tx.pntr.Commit() +} + +func (tx *TX) Get(ver int64, key []byte) (kvs.KV, error) { + return tx.pntr.Get(ver, key) +} + +func (tx *TX) GetL(ver int64, key []byte) ([]kvs.KV, error) { + all, err := tx.pntr.GetL(ver, key) + out := make([]kvs.KV, len(all)) + for i, v := range all { + out[i] = v + } + return out, err +} + +func (tx *TX) GetP(ver int64, key []byte, max uint64) ([]kvs.KV, error) { + all, err := tx.pntr.GetP(ver, key, max) + out := make([]kvs.KV, len(all)) + for i, v := range all { + out[i] = v + } + return out, err +} + +func (tx *TX) GetR(ver int64, beg []byte, end []byte, max uint64) ([]kvs.KV, error) { + all, err := tx.pntr.GetR(ver, beg, end, max) + out := make([]kvs.KV, len(all)) + for i, v := range all { + out[i] = v + } + return out, err +} + +func (tx *TX) Del(ver int64, key []byte) (kvs.KV, error) { + return tx.pntr.Del(ver, key) +} + +func (tx *TX) DelC(ver int64, key []byte, exp []byte) (kvs.KV, error) { + return tx.pntr.DelC(ver, key, exp) +} + +func (tx *TX) DelL(ver int64, key []byte) ([]kvs.KV, error) { + all, err := tx.pntr.DelL(ver, key) + out := make([]kvs.KV, len(all)) + for i, v := range all { + out[i] = v + } + return out, err +} + +func (tx *TX) DelP(ver int64, key []byte, max uint64) ([]kvs.KV, error) { + all, err := tx.pntr.DelP(ver, key, max) + out := make([]kvs.KV, len(all)) + for i, v := range all { + out[i] = v + } + return out, err +} + +func (tx *TX) DelR(ver int64, beg []byte, end []byte, max uint64) ([]kvs.KV, error) { + all, err := tx.pntr.DelR(ver, beg, end, max) + out := make([]kvs.KV, len(all)) + for i, v := range all { + out[i] = v + } + return out, err +} + +func (tx *TX) Put(ver int64, key []byte, val []byte) (kvs.KV, error) { + return tx.pntr.Put(ver, key, val) +} + +func (tx *TX) PutC(ver int64, key []byte, val []byte, exp []byte) (kvs.KV, error) { + return tx.pntr.PutC(ver, key, val, exp) +} + +func (tx *TX) PutL(ver int64, key []byte, val []byte) ([]kvs.KV, error) { + all, err := tx.pntr.PutL(ver, key, val) + out := make([]kvs.KV, len(all)) + for i, v := range all { + out[i] = v + } + return out, err +} + +func (tx *TX) PutP(ver int64, key []byte, val []byte, max uint64) ([]kvs.KV, error) { + all, err := tx.pntr.PutP(ver, key, val, max) + out := make([]kvs.KV, len(all)) + for i, v := range all { + out[i] = v + } + return out, err +} + +func (tx *TX) PutR(ver int64, key []byte, val []byte, exp []byte, max uint64) ([]kvs.KV, error) { + all, err := tx.pntr.PutR(ver, key, val, exp, max) + out := make([]kvs.KV, len(all)) + for i, v := range all { + out[i] = v + } + return out, err +} diff --git a/kvs/tx.go b/kvs/tx.go index 9b37117d..154ac74b 100644 --- a/kvs/tx.go +++ b/kvs/tx.go @@ -14,23 +14,26 @@ package kvs -// TX represents a datastore transaction +// TX represents a database transaction type TX interface { - All() ([]KV, error) - Get([]byte) (KV, error) - MGet(...[]byte) ([]KV, error) - PGet([]byte) ([]KV, error) - RGet([]byte, []byte, uint64) ([]KV, error) - Put([]byte, []byte) error - CPut([]byte, []byte, []byte) error - Del([]byte) error - CDel([]byte, []byte) error - MDel(...[]byte) error - PDel([]byte) error - RDel([]byte, []byte, uint64) error - Done() bool - Close() error + Closed() bool Cancel() error Commit() error - Rollback() error + + Get(int64, []byte) (KV, error) + GetL(int64, []byte) ([]KV, error) + GetP(int64, []byte, uint64) ([]KV, error) + GetR(int64, []byte, []byte, uint64) ([]KV, error) + + Del(int64, []byte) (KV, error) + DelC(int64, []byte, []byte) (KV, error) + DelL(int64, []byte) ([]KV, error) + DelP(int64, []byte, uint64) ([]KV, error) + DelR(int64, []byte, []byte, uint64) ([]KV, error) + + Put(int64, []byte, []byte) (KV, error) + PutC(int64, []byte, []byte, []byte) (KV, error) + PutL(int64, []byte, []byte) ([]KV, error) + PutP(int64, []byte, []byte, uint64) ([]KV, error) + PutR(int64, []byte, []byte, []byte, uint64) ([]KV, error) }