Add support for mysql as a backend kv store

This commit is contained in:
Tobie Morgan Hitchcock 2018-02-11 00:16:28 +00:00
parent 84b72e25d2
commit ccf57df085
9 changed files with 800 additions and 1 deletions

View file

@ -56,7 +56,7 @@ func setup() {
}
if opts.DB.Path != "memory" {
if ok, _ := regexp.MatchString(`^(s3|gcs|logr|file|rixxdb|dendrodb)://(.+)$`, opts.DB.Path); !ok {
if ok, _ := regexp.MatchString(`^(s3|gcs|logr|file|mysql|rixxdb|dendrodb)://(.+)$`, opts.DB.Path); !ok {
log.Fatalf("Invalid path %s. Specify a valid data store configuration path", opts.DB.Path)
}
}

View file

@ -32,6 +32,7 @@ import (
"cloud.google.com/go/trace"
_ "github.com/abcum/surreal/kvs/mysql"
_ "github.com/abcum/surreal/kvs/rixxdb"
)

View file

@ -45,6 +45,8 @@ func New(opts *cnf.Options) (ds *DS, err error) {
db, err = stores["rixxdb"](opts)
case strings.HasPrefix(opts.DB.Path, "file://"):
db, err = stores["rixxdb"](opts)
case strings.HasPrefix(opts.DB.Path, "mysql://"):
db, err = stores["mysql"](opts)
case strings.HasPrefix(opts.DB.Path, "rixxdb://"):
db, err = stores["rixxdb"](opts)
case strings.HasPrefix(opts.DB.Path, "dendrodb://"):

57
kvs/mysql/db.go Normal file
View file

@ -0,0 +1,57 @@
// Copyright © 2016 Abcum Ltd
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package mysql
import (
"io"
"context"
"database/sql"
"github.com/abcum/surreal/kvs"
)
type DB struct {
pntr *sql.DB
}
func (db *DB) Begin(ctx context.Context, writable bool) (txn kvs.TX, err error) {
var pntr *sql.Tx
if pntr, err = db.pntr.BeginTx(ctx, db.opt(writable)); err != nil {
err = &kvs.DBError{Err: err}
return
}
return &TX{pntr: pntr}, err
}
func (db *DB) Import(r io.Reader) (err error) {
return nil
}
func (db *DB) Export(w io.Writer) (err error) {
return nil
}
func (db *DB) Close() (err error) {
return db.pntr.Close()
}
func (db *DB) opt(writable bool) *sql.TxOptions {
return &sql.TxOptions{
ReadOnly: !writable,
Isolation: sql.LevelReadCommitted,
}
}

42
kvs/mysql/kv.go Normal file
View file

@ -0,0 +1,42 @@
// Copyright © 2016 Abcum Ltd
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package mysql
// KV represents a row stored in the database.
type KV struct {
ver uint64
key []byte
val []byte
}
// Exi returns whether this key-value item actually exists.
func (kv *KV) Exi() bool {
return kv.val != nil
}
// Key returns the key for the underlying key-value item.
func (kv *KV) Key() []byte {
return kv.key
}
// Val returns the value for the underlying key-value item.
func (kv *KV) Val() []byte {
return kv.val
}
// Ver returns the version for the underlying key-value item.
func (kv *KV) Ver() uint64 {
return kv.ver
}

49
kvs/mysql/main.go Normal file
View file

@ -0,0 +1,49 @@
// Copyright © 2016 Abcum Ltd
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package mysql
import (
"strings"
"database/sql"
"github.com/abcum/surreal/cnf"
"github.com/abcum/surreal/kvs"
"github.com/go-sql-driver/mysql"
"github.com/GoogleCloudPlatform/cloudsql-proxy/proxy/proxy"
)
func init() {
mysql.RegisterDial("cloudsql", proxy.Dial)
kvs.Register("mysql", func(opts *cnf.Options) (db kvs.DB, err error) {
var pntr *sql.DB
path := strings.TrimPrefix(opts.DB.Path, "mysql://")
pntr, err = sql.Open("mysql", path)
if err != nil {
return
}
return &DB{pntr: pntr}, err
})
}

130
kvs/mysql/sql.go Normal file
View file

@ -0,0 +1,130 @@
// Copyright © 2016 Abcum Ltd
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package mysql
const sqlClr = `
DELETE FROM kv
WHERE k=?
`
const sqlClrP = `
DELETE FROM kv
WHERE k LIKE CONCAT(?, '%')
LIMIT ?
`
const sqlClrR = `
DELETE FROM kv
WHERE k>=? AND k<?
LIMIT ?
`
const sqlGet = `
SELECT * FROM kv
WHERE t<=? AND k=?
ORDER BY t DESC
LIMIT 1
`
const sqlGetP = `
SELECT q1.t, q1.k, v
FROM kv q1
JOIN (
SELECT k, MAX(t) AS t
FROM kv
WHERE t<=? AND k LIKE CONCAT(?, '%')
GROUP BY k
) AS q2
ON q1.t = q2.t AND q1.k = q2.k
ORDER BY q1.k
LIMIT ?
`
const sqlGetR = `
SELECT q1.t, q1.k, v
FROM kv q1
JOIN (
SELECT k, MAX(t) AS t
FROM kv
WHERE t<=? AND k>=? AND k<?
GROUP BY k
) AS q2
ON q1.t = q2.t AND q1.k = q2.k
ORDER BY q1.k
LIMIT ?
`
const sqlDel = `
DELETE FROM kv
WHERE t<=? AND k=?
ORDER BY t DESC
LIMIT 1
`
const sqlDelC = `
DELETE FROM kv
WHERE t<=? AND k=? AND v=?
ORDER BY t DESC
LIMIT 1
`
const sqlDelP = `
DELETE q1 FROM kv
JOIN (
SELECT k, MAX(t) AS t
FROM kv
WHERE t<=? AND k LIKE CONCAT(?, '%')
GROUP BY k
) AS q2
ON q1.t = q2.t AND q1.k = q2.k
ORDER BY q1.k
LIMIT ?
`
const sqlDelR = `
DELETE q1 FROM kv
JOIN (
SELECT k, MAX(t) AS t
FROM kv
WHERE t<=? AND k>=? AND k<?
GROUP BY k
) AS q2
ON q1.t = q2.t AND q1.k = q2.k
ORDER BY q1.k
LIMIT ?
`
const sqlPut = `
INSERT INTO kv
(t, k, v)
VALUES
(?, ?, ?)
ON DUPLICATE KEY UPDATE v=?
`
const sqlPutN = `
INSERT INTO kv
(t, k, v)
VALUES
(?, ?, ?)
`
const sqlPutC = `
UPDATE kv
SET v=?
WHERE t<=? AND k=? AND v=?
ORDER BY t DESC
LIMIT 1
`

418
kvs/mysql/tx.go Normal file
View file

@ -0,0 +1,418 @@
// Copyright © 2016 Abcum Ltd
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package mysql
import (
"math"
"sync"
"database/sql"
"github.com/abcum/surreal/cnf"
"github.com/abcum/surreal/kvs"
)
type TX struct {
done bool
pntr *sql.Tx
lock sync.Mutex
stmt struct {
clr *sql.Stmt
clrP *sql.Stmt
clrR *sql.Stmt
get *sql.Stmt
getP *sql.Stmt
getR *sql.Stmt
del *sql.Stmt
delC *sql.Stmt
delP *sql.Stmt
delR *sql.Stmt
put *sql.Stmt
putN *sql.Stmt
putC *sql.Stmt
}
}
const maximum = math.MaxUint64
func dec(src []byte) (dst []byte, err error) {
if dst, err = decrypt(cnf.Settings.DB.Key, src); err != nil {
return nil, &kvs.DBError{}
}
return
}
func enc(src []byte) (dst []byte, err error) {
if dst, err = encrypt(cnf.Settings.DB.Key, src); err != nil {
return nil, &kvs.DBError{}
}
return
}
func one(res *sql.Rows, err error) (kvs.KV, error) {
switch err {
case nil:
break
default:
return nil, &kvs.DBError{}
}
defer res.Close()
var out = &KV{}
for res.Next() {
err = res.Scan(&out.ver, &out.key, &out.val)
if err != nil {
return nil, &kvs.DBError{}
}
out.val, err = dec(out.val)
if err != nil {
return nil, &kvs.DBError{}
}
}
if err = res.Err(); err != nil {
return nil, &kvs.DBError{}
}
return out, err
}
func many(res *sql.Rows, err error) ([]kvs.KV, error) {
switch err {
case nil:
break
default:
return nil, &kvs.DBError{}
}
defer res.Close()
var out []kvs.KV
for res.Next() {
kv := &KV{}
err = res.Scan(&kv.ver, &kv.key, &kv.val)
if err != nil {
return nil, &kvs.DBError{}
}
kv.val, err = dec(kv.val)
if err != nil {
return nil, &kvs.DBError{}
}
out = append(out, kv)
}
if err = res.Err(); err != nil {
return nil, &kvs.DBError{}
}
return out, err
}
func (tx *TX) Closed() bool {
return tx.done
}
func (tx *TX) Cancel() error {
tx.done = true
return tx.pntr.Rollback()
}
func (tx *TX) Commit() error {
tx.done = true
return tx.pntr.Commit()
}
func (tx *TX) Clr(key []byte) (kvs.KV, error) {
var err error
var res *sql.Rows
tx.lock.Lock()
defer tx.lock.Unlock()
if tx.stmt.clr == nil {
tx.stmt.clr, _ = tx.pntr.Prepare(sqlClr)
}
res, err = tx.stmt.clr.Query(key)
return one(res, err)
}
func (tx *TX) ClrP(key []byte, max uint64) ([]kvs.KV, error) {
var err error
var res *sql.Rows
if max == 0 {
max = maximum
}
tx.lock.Lock()
defer tx.lock.Unlock()
if tx.stmt.clrP == nil {
tx.stmt.clrP, _ = tx.pntr.Prepare(sqlClrP)
}
res, err = tx.stmt.clrP.Query(key, max)
return many(res, err)
}
func (tx *TX) ClrR(beg []byte, end []byte, max uint64) ([]kvs.KV, error) {
var err error
var res *sql.Rows
if max == 0 {
max = maximum
}
tx.lock.Lock()
defer tx.lock.Unlock()
if tx.stmt.clrR == nil {
tx.stmt.clrR, _ = tx.pntr.Prepare(sqlClrR)
}
res, err = tx.stmt.clrR.Query(beg, end, max)
return many(res, err)
}
func (tx *TX) Get(ver int64, key []byte) (kvs.KV, error) {
var err error
var res *sql.Rows
tx.lock.Lock()
defer tx.lock.Unlock()
if tx.stmt.get == nil {
tx.stmt.get, _ = tx.pntr.Prepare(sqlGet)
}
res, err = tx.stmt.get.Query(ver, key)
return one(res, err)
}
func (tx *TX) GetP(ver int64, key []byte, max uint64) ([]kvs.KV, error) {
var err error
var res *sql.Rows
if max == 0 {
max = maximum
}
tx.lock.Lock()
defer tx.lock.Unlock()
if tx.stmt.getP == nil {
tx.stmt.getP, _ = tx.pntr.Prepare(sqlGetP)
}
res, err = tx.stmt.getP.Query(ver, key, max)
return many(res, err)
}
func (tx *TX) GetR(ver int64, beg []byte, end []byte, max uint64) ([]kvs.KV, error) {
var err error
var res *sql.Rows
if max == 0 {
max = maximum
}
tx.lock.Lock()
defer tx.lock.Unlock()
if tx.stmt.getR == nil {
tx.stmt.getR, _ = tx.pntr.Prepare(sqlGetR)
}
res, err = tx.stmt.getR.Query(ver, beg, end, max)
return many(res, err)
}
func (tx *TX) Del(ver int64, key []byte) (kvs.KV, error) {
var err error
var res *sql.Rows
tx.lock.Lock()
defer tx.lock.Unlock()
if tx.stmt.del == nil {
tx.stmt.del, _ = tx.pntr.Prepare(sqlDel)
}
res, err = tx.stmt.del.Query(ver, key)
return one(res, err)
}
func (tx *TX) DelC(ver int64, key []byte, exp []byte) (kvs.KV, error) {
var err error
var res *sql.Rows
exp, err = enc(exp)
if err != nil {
return nil, err
}
tx.lock.Lock()
defer tx.lock.Unlock()
if tx.stmt.delC == nil {
tx.stmt.delC, _ = tx.pntr.Prepare(sqlDelC)
}
res, err = tx.stmt.delC.Query(ver, key, exp)
return one(res, err)
}
func (tx *TX) DelP(ver int64, key []byte, max uint64) ([]kvs.KV, error) {
var err error
var res *sql.Rows
if max == 0 {
max = maximum
}
tx.lock.Lock()
defer tx.lock.Unlock()
if tx.stmt.delP == nil {
tx.stmt.delP, _ = tx.pntr.Prepare(sqlDelP)
}
res, err = tx.stmt.delP.Query(ver, key, max)
return many(res, err)
}
func (tx *TX) DelR(ver int64, beg []byte, end []byte, max uint64) ([]kvs.KV, error) {
var err error
var res *sql.Rows
if max == 0 {
max = maximum
}
tx.lock.Lock()
defer tx.lock.Unlock()
if tx.stmt.delR == nil {
tx.stmt.delR, _ = tx.pntr.Prepare(sqlDelR)
}
res, err = tx.stmt.delR.Query(ver, beg, end, max)
return many(res, err)
}
func (tx *TX) Put(ver int64, key []byte, val []byte) (kvs.KV, error) {
var err error
var res *sql.Rows
val, err = enc(val)
if err != nil {
return nil, err
}
tx.lock.Lock()
defer tx.lock.Unlock()
if tx.stmt.put == nil {
tx.stmt.put, _ = tx.pntr.Prepare(sqlPut)
}
res, err = tx.stmt.put.Query(ver, key, val, val)
return one(res, err)
}
func (tx *TX) PutC(ver int64, key []byte, val []byte, exp []byte) (kvs.KV, error) {
var err error
var res *sql.Rows
val, err = enc(val)
if err != nil {
return nil, err
}
exp, err = enc(exp)
if err != nil {
return nil, err
}
tx.lock.Lock()
defer tx.lock.Unlock()
switch exp {
case nil:
if tx.stmt.putN == nil {
tx.stmt.putN, _ = tx.pntr.Prepare(sqlPutN)
}
res, err = tx.stmt.putN.Query(ver, key, val)
return one(res, err)
default:
if tx.stmt.putC == nil {
tx.stmt.putC, _ = tx.pntr.Prepare(sqlPutC)
}
res, err = tx.stmt.putC.Query(val, ver, key, exp)
return one(res, err)
}
}

100
kvs/mysql/util.go Normal file
View file

@ -0,0 +1,100 @@
// Copyright © 2016 Abcum Ltd
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package mysql
import (
"crypto/aes"
"crypto/cipher"
"crypto/rand"
"errors"
)
var chars = []byte("ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789")
func encrypt(key []byte, src []byte) (dst []byte, err error) {
if key == nil || len(key) == 0 || len(src) == 0 {
return src, nil
}
// Initiate AES
block, _ := aes.NewCipher(key)
// Initiate cipher
cipher, _ := cipher.NewGCM(block)
// Initiate nonce
nonce := random(12)
dst = cipher.Seal(nil, nonce, src, nil)
dst = append(nonce[:], dst[:]...)
return
}
func decrypt(key []byte, src []byte) (dst []byte, err error) {
if key == nil || len(key) == 0 || len(src) == 0 {
return src, nil
}
// Corrupt
if len(src) < 12 {
return src, errors.New("Invalid data")
}
// Initiate AES
block, _ := aes.NewCipher(key)
// Initiate cipher
cipher, _ := cipher.NewGCM(block)
return cipher.Open(nil, src[:12], src[12:], nil)
}
func random(l int) []byte {
if l == 0 {
return nil
}
i := 0
t := len(chars)
m := 255 - (256 % t)
b := make([]byte, l)
r := make([]byte, l+(l/4))
for {
rand.Read(r)
for _, rb := range r {
c := int(rb)
if c > m {
continue
}
b[i] = chars[c%t]
i++
if i == l {
return b
}
}
}
}