Remove mysql kvs storage backend

This commit is contained in:
Tobie Morgan Hitchcock 2018-07-24 11:18:17 +01:00
parent 2932bac73e
commit 48df950be5
11 changed files with 1 additions and 912 deletions

View file

@ -58,7 +58,7 @@ func setup() {
}
if opts.DB.Path != "memory" {
if ok, _ := regexp.MatchString(`^(s3|gcs|logr|file|mysql|dendrodb)://(.+)$`, opts.DB.Path); !ok {
if ok, _ := regexp.MatchString(`^(s3|gcs|logr|file|dendrodb)://(.+)$`, opts.DB.Path); !ok {
log.Fatalf("Invalid path %s. Specify a valid data store configuration path", opts.DB.Path)
}
}

View file

@ -31,7 +31,6 @@ import (
"github.com/abcum/surreal/util/data"
"github.com/abcum/surreal/util/uuid"
_ "github.com/abcum/surreal/kvs/mysql"
_ "github.com/abcum/surreal/kvs/rixxdb"
)

View file

@ -10,8 +10,6 @@ import:
version: ^3.2.0
- package: github.com/elithrar/simple-scrypt
version: ^1.3.0
- package: github.com/go-sql-driver/mysql
version: ^1.3.0
- package: github.com/gorilla/websocket
version: ^1.2.0
- package: github.com/hashicorp/serf

View file

@ -45,8 +45,6 @@ func New(opts *cnf.Options) (ds *DS, err error) {
db, err = stores["rixxdb"](opts)
case strings.HasPrefix(opts.DB.Path, "file://"):
db, err = stores["rixxdb"](opts)
case strings.HasPrefix(opts.DB.Path, "mysql://"):
db, err = stores["mysql"](opts)
case strings.HasPrefix(opts.DB.Path, "rixxdb://"):
db, err = stores["rixxdb"](opts)
case strings.HasPrefix(opts.DB.Path, "dendrodb://"):

View file

@ -1,52 +0,0 @@
// Copyright © 2016 Abcum Ltd
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package mysql
import (
"io"
"context"
"database/sql"
"github.com/abcum/surreal/kvs"
"github.com/abcum/surreal/log"
)
type DB struct {
pntr *sql.DB
}
func (db *DB) Begin(ctx context.Context, writable bool) (txn kvs.TX, err error) {
var pntr *sql.Tx
if pntr, err = db.pntr.BeginTx(ctx, &sql.TxOptions{}); err != nil {
log.WithPrefix("kvs").Errorln(err)
err = &kvs.DBError{Err: err}
return
}
return &TX{pntr: pntr}, err
}
func (db *DB) Import(r io.Reader) (err error) {
return nil
}
func (db *DB) Export(w io.Writer) (err error) {
return nil
}
func (db *DB) Close() (err error) {
return db.pntr.Close()
}

View file

@ -1,22 +0,0 @@
// Copyright © 2016 Abcum Ltd
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package mysql
import (
"errors"
)
// ErrKvNotExpectedValue occurs when conditionally putting or deleting a key-value item.
var ErrTxNotExpectedValue = errors.New("KV val is not expected value")

View file

@ -1,42 +0,0 @@
// Copyright © 2016 Abcum Ltd
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package mysql
// KV represents a row stored in the database.
type KV struct {
ver uint64
key []byte
val []byte
}
// Exi returns whether this key-value item actually exists.
func (kv *KV) Exi() bool {
return kv.val != nil
}
// Key returns the key for the underlying key-value item.
func (kv *KV) Key() []byte {
return kv.key
}
// Val returns the value for the underlying key-value item.
func (kv *KV) Val() []byte {
return kv.val
}
// Ver returns the version for the underlying key-value item.
func (kv *KV) Ver() uint64 {
return kv.ver
}

View file

@ -1,95 +0,0 @@
// Copyright © 2016 Abcum Ltd
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package mysql
import (
"time"
"strings"
"crypto/tls"
"crypto/x509"
"database/sql"
"github.com/abcum/surreal/cnf"
"github.com/abcum/surreal/kvs"
"github.com/abcum/surreal/log"
"github.com/go-sql-driver/mysql"
)
func init() {
kvs.Register("mysql", func(opts *cnf.Options) (db kvs.DB, err error) {
var pntr *sql.DB
path := strings.TrimPrefix(opts.DB.Path, "mysql://")
if cnf.Settings.DB.Cert.SSL {
cas := x509.NewCertPool()
all := make([]tls.Certificate, 0, 1)
car := []byte(cnf.Settings.DB.Cert.CA)
crt := []byte(cnf.Settings.DB.Cert.Crt)
key := []byte(cnf.Settings.DB.Cert.Key)
if ok := cas.AppendCertsFromPEM(car); !ok {
log.WithPrefix("kvs").Errorln("Failed to append CA file.")
}
par, err := tls.X509KeyPair(crt, key)
if err != nil {
log.WithPrefix("kvs").Errorln(err)
}
mysql.RegisterTLSConfig("default", &tls.Config{
InsecureSkipVerify: true,
RootCAs: cas,
Certificates: append(all, par),
})
}
pntr, err = sql.Open("mysql", path)
if err != nil {
log.WithPrefix("kvs").Errorln(err)
return
}
// Set the maximum connection lifetime
pntr.SetConnMaxLifetime(1 * time.Hour)
// Output logs to the default logger
mysql.SetLogger(log.Instance())
// Set the max number of idle connections
pntr.SetMaxIdleConns(350)
// Set the max number of open connections
pntr.SetMaxOpenConns(350)
// Return the database pointer
return &DB{pntr: pntr}, err
})
}

View file

@ -1,115 +0,0 @@
// Copyright © 2016 Abcum Ltd
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package mysql
const sqlClr = `
DELETE FROM kv
WHERE k=?
`
const sqlClrP = `
DELETE FROM kv
WHERE k LIKE CONCAT(?, '%')
LIMIT ?
`
const sqlClrR = `
DELETE FROM kv
WHERE k>=? AND k<?
LIMIT ?
`
const sqlGet = `
SELECT * FROM kv
WHERE t<=? AND k=?
ORDER BY t DESC
LIMIT 1
`
const sqlGetP = `
SELECT q1.t, q1.k, v
FROM kv q1
JOIN (
SELECT k, MAX(t) AS t
FROM kv
WHERE t<=? AND k LIKE CONCAT(?, '%')
GROUP BY k
) AS q2
ON q1.t = q2.t AND q1.k = q2.k
ORDER BY q1.k
LIMIT ?
`
const sqlGetR = `
SELECT q1.t, q1.k, v
FROM kv q1
JOIN (
SELECT k, MAX(t) AS t
FROM kv
WHERE t<=? AND k>=? AND k<?
GROUP BY k
) AS q2
ON q1.t = q2.t AND q1.k = q2.k
ORDER BY q1.k
LIMIT ?
`
const sqlDel = `
DELETE FROM kv
WHERE t<=? AND k=?
ORDER BY t DESC
LIMIT 1
`
const sqlDelP = `
DELETE q1 FROM kv
JOIN (
SELECT k, MAX(t) AS t
FROM kv
WHERE t<=? AND k LIKE CONCAT(?, '%')
GROUP BY k
) AS q2
ON q1.t = q2.t AND q1.k = q2.k
ORDER BY q1.k
LIMIT ?
`
const sqlDelR = `
DELETE q1 FROM kv
JOIN (
SELECT k, MAX(t) AS t
FROM kv
WHERE t<=? AND k>=? AND k<?
GROUP BY k
) AS q2
ON q1.t = q2.t AND q1.k = q2.k
ORDER BY q1.k
LIMIT ?
`
const sqlPut = `
INSERT INTO kv
(t, k, v)
VALUES
(?, ?, ?)
ON DUPLICATE KEY UPDATE v=?
`
const sqlPutN = `
INSERT INTO kv
(t, k, v)
VALUES
(?, ?, ?)
`

View file

@ -1,457 +0,0 @@
// Copyright © 2016 Abcum Ltd
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package mysql
import (
"math"
"sync"
"context"
"database/sql"
"github.com/abcum/surreal/cnf"
"github.com/abcum/surreal/kvs"
)
type TX struct {
done bool
pntr *sql.Tx
lock sync.Mutex
stmt struct {
clr *sql.Stmt
clrP *sql.Stmt
clrR *sql.Stmt
get *sql.Stmt
getP *sql.Stmt
getR *sql.Stmt
del *sql.Stmt
delP *sql.Stmt
delR *sql.Stmt
put *sql.Stmt
putN *sql.Stmt
}
}
const maximum = math.MaxUint64
func dec(src []byte) (dst []byte, err error) {
if dst, err = decrypt(cnf.Settings.DB.Key, src); err != nil {
return nil, &kvs.DBError{}
}
return
}
func enc(src []byte) (dst []byte, err error) {
if dst, err = encrypt(cnf.Settings.DB.Key, src); err != nil {
return nil, &kvs.DBError{}
}
return
}
func one(res *sql.Rows, err error) (kvs.KV, error) {
switch err {
case nil:
break
default:
return nil, &kvs.DBError{}
}
defer res.Close()
var out = &KV{}
for res.Next() {
err = res.Scan(&out.ver, &out.key, &out.val)
if err != nil {
return nil, &kvs.DBError{}
}
out.val, err = dec(out.val)
if err != nil {
return nil, &kvs.DBError{}
}
}
if err = res.Err(); err != nil {
return nil, &kvs.DBError{}
}
return out, err
}
func many(res *sql.Rows, err error) ([]kvs.KV, error) {
switch err {
case nil:
break
default:
return nil, &kvs.DBError{}
}
defer res.Close()
var out []kvs.KV
for res.Next() {
kv := &KV{}
err = res.Scan(&kv.ver, &kv.key, &kv.val)
if err != nil {
return nil, &kvs.DBError{}
}
kv.val, err = dec(kv.val)
if err != nil {
return nil, &kvs.DBError{}
}
if kv.val != nil {
out = append(out, kv)
}
}
if err = res.Err(); err != nil {
return nil, &kvs.DBError{}
}
return out, err
}
func (tx *TX) Closed() bool {
return tx.done
}
func (tx *TX) Cancel() error {
tx.done = true
return tx.pntr.Rollback()
}
func (tx *TX) Commit() error {
tx.done = true
return tx.pntr.Commit()
}
func (tx *TX) Clr(ctx context.Context, key []byte) (kvs.KV, error) {
var err error
var res *sql.Rows
tx.lock.Lock()
defer tx.lock.Unlock()
if tx.stmt.clr == nil {
tx.stmt.clr, _ = tx.pntr.PrepareContext(ctx, sqlClr)
}
res, err = tx.stmt.clr.QueryContext(ctx, key)
return one(res, err)
}
func (tx *TX) ClrP(ctx context.Context, key []byte, max uint64) ([]kvs.KV, error) {
var err error
var res *sql.Rows
if max == 0 {
max = maximum
}
tx.lock.Lock()
defer tx.lock.Unlock()
if tx.stmt.clrP == nil {
tx.stmt.clrP, _ = tx.pntr.PrepareContext(ctx, sqlClrP)
}
res, err = tx.stmt.clrP.QueryContext(ctx, key, max)
return many(res, err)
}
func (tx *TX) ClrR(ctx context.Context, beg []byte, end []byte, max uint64) ([]kvs.KV, error) {
var err error
var res *sql.Rows
if max == 0 {
max = maximum
}
tx.lock.Lock()
defer tx.lock.Unlock()
if tx.stmt.clrR == nil {
tx.stmt.clrR, _ = tx.pntr.PrepareContext(ctx, sqlClrR)
}
res, err = tx.stmt.clrR.QueryContext(ctx, beg, end, max)
return many(res, err)
}
func (tx *TX) Get(ctx context.Context, ver int64, key []byte) (kvs.KV, error) {
var err error
var res *sql.Rows
tx.lock.Lock()
defer tx.lock.Unlock()
if tx.stmt.get == nil {
tx.stmt.get, _ = tx.pntr.PrepareContext(ctx, sqlGet)
}
res, err = tx.stmt.get.QueryContext(ctx, ver, key)
return one(res, err)
}
func (tx *TX) GetP(ctx context.Context, ver int64, key []byte, max uint64) ([]kvs.KV, error) {
var err error
var res *sql.Rows
if max == 0 {
max = maximum
}
tx.lock.Lock()
defer tx.lock.Unlock()
if tx.stmt.getP == nil {
tx.stmt.getP, _ = tx.pntr.PrepareContext(ctx, sqlGetP)
}
res, err = tx.stmt.getP.QueryContext(ctx, ver, key, max)
return many(res, err)
}
func (tx *TX) GetR(ctx context.Context, ver int64, beg []byte, end []byte, max uint64) ([]kvs.KV, error) {
var err error
var res *sql.Rows
if max == 0 {
max = maximum
}
tx.lock.Lock()
defer tx.lock.Unlock()
if tx.stmt.getR == nil {
tx.stmt.getR, _ = tx.pntr.PrepareContext(ctx, sqlGetR)
}
res, err = tx.stmt.getR.QueryContext(ctx, ver, beg, end, max)
return many(res, err)
}
func (tx *TX) Del(ctx context.Context, ver int64, key []byte) (kvs.KV, error) {
var err error
var res *sql.Rows
tx.lock.Lock()
defer tx.lock.Unlock()
if tx.stmt.del == nil {
tx.stmt.del, _ = tx.pntr.PrepareContext(ctx, sqlDel)
}
res, err = tx.stmt.del.QueryContext(ctx, ver, key)
return one(res, err)
}
func (tx *TX) DelC(ctx context.Context, ver int64, key []byte, exp []byte) (kvs.KV, error) {
var err error
var now kvs.KV
var res *sql.Rows
tx.lock.Lock()
defer tx.lock.Unlock()
// Get the item at the key
if tx.stmt.get == nil {
tx.stmt.get, _ = tx.pntr.PrepareContext(ctx, sqlGet)
}
res, err = tx.stmt.get.QueryContext(ctx, ver, key)
if err != nil {
return nil, err
}
now, err = one(res, err)
if err != nil {
return nil, err
}
// Check if the values match
if !alter(now.Val(), exp) {
return nil, ErrTxNotExpectedValue
}
// If they match then delete
if tx.stmt.del == nil {
tx.stmt.del, _ = tx.pntr.PrepareContext(ctx, sqlDel)
}
res, err = tx.stmt.del.QueryContext(ctx, ver, key)
return one(res, err)
}
func (tx *TX) DelP(ctx context.Context, ver int64, key []byte, max uint64) ([]kvs.KV, error) {
var err error
var res *sql.Rows
if max == 0 {
max = maximum
}
tx.lock.Lock()
defer tx.lock.Unlock()
if tx.stmt.delP == nil {
tx.stmt.delP, _ = tx.pntr.PrepareContext(ctx, sqlDelP)
}
res, err = tx.stmt.delP.QueryContext(ctx, ver, key, max)
return many(res, err)
}
func (tx *TX) DelR(ctx context.Context, ver int64, beg []byte, end []byte, max uint64) ([]kvs.KV, error) {
var err error
var res *sql.Rows
if max == 0 {
max = maximum
}
tx.lock.Lock()
defer tx.lock.Unlock()
if tx.stmt.delR == nil {
tx.stmt.delR, _ = tx.pntr.PrepareContext(ctx, sqlDelR)
}
res, err = tx.stmt.delR.QueryContext(ctx, ver, beg, end, max)
return many(res, err)
}
func (tx *TX) Put(ctx context.Context, ver int64, key []byte, val []byte) (kvs.KV, error) {
var err error
var res *sql.Rows
val, err = enc(val)
if err != nil {
return nil, err
}
tx.lock.Lock()
defer tx.lock.Unlock()
if tx.stmt.put == nil {
tx.stmt.put, _ = tx.pntr.PrepareContext(ctx, sqlPut)
}
res, err = tx.stmt.put.QueryContext(ctx, ver, key, val, val)
return one(res, err)
}
func (tx *TX) PutC(ctx context.Context, ver int64, key []byte, val []byte, exp []byte) (kvs.KV, error) {
var err error
var now kvs.KV
var res *sql.Rows
val, err = enc(val)
if err != nil {
return nil, err
}
switch exp {
case nil:
if tx.stmt.putN == nil {
tx.stmt.putN, _ = tx.pntr.PrepareContext(ctx, sqlPutN)
}
res, err = tx.stmt.putN.QueryContext(ctx, ver, key, val)
return one(res, err)
default:
// Get the item at the key
if tx.stmt.get == nil {
tx.stmt.get, _ = tx.pntr.PrepareContext(ctx, sqlGet)
}
res, err = tx.stmt.get.QueryContext(ctx, ver, key)
if err != nil {
return nil, err
}
now, err = one(res, err)
if err != nil {
return nil, err
}
// Check if the values match
if !check(now.Val(), exp) {
return nil, ErrTxNotExpectedValue
}
// If they match then delete
if tx.stmt.del == nil {
tx.stmt.put, _ = tx.pntr.PrepareContext(ctx, sqlPut)
}
res, err = tx.stmt.put.QueryContext(ctx, ver, key, val, val)
return one(res, err)
}
}

View file

@ -1,123 +0,0 @@
// Copyright © 2016 Abcum Ltd
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package mysql
import (
"bytes"
"crypto/aes"
"crypto/cipher"
"crypto/rand"
"errors"
)
var chars = []byte("ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789")
// Used to see if we can conditionally put a value. We can only put
// a value if the value is the same, or if both items are nil.
func check(a, b []byte) bool {
if a != nil && b != nil {
return bytes.Equal(a, b)
} else if a == nil && b == nil {
return true
}
return false
}
// Used to see if we can conditionally del a value. We can only del
// a value if the value is the same, and neither item is nil.
func alter(a, b []byte) bool {
if a != nil && b != nil {
return bytes.Equal(a, b)
} else if a == nil && b == nil {
return false
}
return false
}
func encrypt(key []byte, src []byte) (dst []byte, err error) {
if key == nil || len(key) == 0 || len(src) == 0 {
return src, nil
}
// Initiate AES
block, _ := aes.NewCipher(key)
// Initiate cipher
cipher, _ := cipher.NewGCM(block)
// Initiate nonce
nonce := random(12)
dst = cipher.Seal(nil, nonce, src, nil)
dst = append(nonce[:], dst[:]...)
return
}
func decrypt(key []byte, src []byte) (dst []byte, err error) {
if key == nil || len(key) == 0 || len(src) == 0 {
return src, nil
}
// Corrupt
if len(src) < 12 {
return src, errors.New("Invalid data")
}
// Initiate AES
block, _ := aes.NewCipher(key)
// Initiate cipher
cipher, _ := cipher.NewGCM(block)
return cipher.Open(nil, src[:12], src[12:], nil)
}
func random(l int) []byte {
if l == 0 {
return nil
}
i := 0
t := len(chars)
m := 255 - (256 % t)
b := make([]byte, l)
r := make([]byte, l+(l/4))
for {
rand.Read(r)
for _, rb := range r {
c := int(rb)
if c > m {
continue
}
b[i] = chars[c%t]
i++
if i == l {
return b
}
}
}
}