Inject datastore instance instead of globally instantiated instance

This commit is contained in:
Tobie Morgan Hitchcock 2022-02-17 07:39:40 +00:00
parent b98986fa12
commit 10a76ec3a1
12 changed files with 128 additions and 96 deletions

View file

@ -1,4 +1,3 @@
use crate::kvs;
use crate::err::Error; use crate::err::Error;
use crate::net; use crate::net;
use clap; use clap;
@ -22,10 +21,8 @@ pub fn init(matches: &clap::ArgMatches) -> Result<(), Error> {
let path = matches.value_of("path").unwrap(); let path = matches.value_of("path").unwrap();
// Parse the server binding address // Parse the server binding address
let bind = matches.value_of("bind").unwrap(); let bind = matches.value_of("bind").unwrap();
// Start up the kvs storage
kvs::init(path)?;
// Start up the web server // Start up the web server
net::init(bind)?; net::init(bind, path)?;
// Don't error when done // Don't error when done
Ok(()) Ok(())
} }

View file

@ -5,16 +5,22 @@ use crate::dbs::Responses;
use crate::dbs::Session; use crate::dbs::Session;
use crate::dbs::Variables; use crate::dbs::Variables;
use crate::err::Error; use crate::err::Error;
use crate::kvs::Store;
use crate::sql; use crate::sql;
use crate::sql::query::Query; use crate::sql::query::Query;
use hyper::body::Sender; use hyper::body::Sender;
use std::sync::Arc; use std::sync::Arc;
pub async fn execute(txt: &str, session: Session, vars: Variables) -> Result<Responses, Error> { pub async fn execute(
db: Store,
txt: &str,
session: Session,
vars: Variables,
) -> Result<Responses, Error> {
// Create a new query options // Create a new query options
let mut opt = Options::default(); let mut opt = Options::default();
// Create a new query executor // Create a new query executor
let mut exe = Executor::new(); let mut exe = Executor::new(db);
// Create a new execution context // Create a new execution context
let ctx = session.context(); let ctx = session.context();
// Attach the defined variables // Attach the defined variables
@ -27,11 +33,16 @@ pub async fn execute(txt: &str, session: Session, vars: Variables) -> Result<Res
exe.execute(ctx, opt, ast).await exe.execute(ctx, opt, ast).await
} }
pub async fn process(ast: Query, session: Session, vars: Variables) -> Result<Responses, Error> { pub async fn process(
db: Store,
ast: Query,
session: Session,
vars: Variables,
) -> Result<Responses, Error> {
// Create a new query options // Create a new query options
let mut opt = Options::default(); let mut opt = Options::default();
// Create a new query executor // Create a new query executor
let mut exe = Executor::new(); let mut exe = Executor::new(db);
// Store session info on context // Store session info on context
let ctx = session.context(); let ctx = session.context();
// Attach the defined variables // Attach the defined variables
@ -42,11 +53,11 @@ pub async fn process(ast: Query, session: Session, vars: Variables) -> Result<Re
exe.execute(ctx, opt, ast).await exe.execute(ctx, opt, ast).await
} }
pub async fn export(session: Session, sender: Sender) -> Result<(), Error> { pub async fn export(db: Store, session: Session, sender: Sender) -> Result<(), Error> {
// Create a new query options // Create a new query options
let mut opt = Options::default(); let mut opt = Options::default();
// Create a new query executor // Create a new query executor
let mut exe = Executor::new(); let mut exe = Executor::new(db);
// Create a new execution context // Create a new execution context
let ctx = session.context(); let ctx = session.context();
// Process database export // Process database export

View file

@ -3,9 +3,9 @@ use crate::dbs::response::{Response, Responses, Status};
use crate::dbs::Auth; use crate::dbs::Auth;
use crate::dbs::Options; use crate::dbs::Options;
use crate::dbs::Runtime; use crate::dbs::Runtime;
use crate::dbs::Transaction;
use crate::err::Error; use crate::err::Error;
use crate::kvs::transaction; use crate::kvs::Store;
use crate::kvs::Transaction;
use crate::sql::query::Query; use crate::sql::query::Query;
use crate::sql::statement::Statement; use crate::sql::statement::Statement;
use crate::sql::value::Value; use crate::sql::value::Value;
@ -13,30 +13,32 @@ use futures::lock::Mutex;
use std::sync::Arc; use std::sync::Arc;
use std::time::Instant; use std::time::Instant;
#[derive(Default)]
pub struct Executor { pub struct Executor {
txn: Option<Arc<Mutex<Transaction>>>, pub(super) dbs: Store,
err: Option<Error>, pub(super) err: Option<Error>,
pub(super) txn: Option<Transaction>,
} }
impl Executor { impl Executor {
pub fn new() -> Executor { pub fn new(dbs: Store) -> Executor {
Executor { Executor {
..Executor::default() dbs,
txn: None,
err: None,
} }
} }
fn txn(&self) -> Arc<Mutex<Transaction>> { fn txn(&self) -> Transaction {
match &self.txn { match self.txn.as_ref() {
Some(txn) => txn.clone(), Some(txn) => txn.clone(),
None => unreachable!(), None => unreachable!(),
} }
} }
async fn begin(&mut self) -> bool { async fn begin(&mut self) -> bool {
match &self.txn { match self.txn.as_ref() {
Some(_) => false, Some(_) => false,
None => match transaction(true, false).await { None => match self.dbs.transaction(true, false).await {
Ok(v) => { Ok(v) => {
self.txn = Some(Arc::new(Mutex::new(v))); self.txn = Some(Arc::new(Mutex::new(v)));
true true
@ -51,7 +53,7 @@ impl Executor {
async fn commit(&mut self, local: bool) { async fn commit(&mut self, local: bool) {
if local { if local {
match &self.txn { match self.txn.as_ref() {
Some(txn) => match &self.err { Some(txn) => match &self.err {
Some(_) => { Some(_) => {
let txn = txn.clone(); let txn = txn.clone();
@ -77,7 +79,7 @@ impl Executor {
async fn cancel(&mut self, local: bool) { async fn cancel(&mut self, local: bool) {
if local { if local {
match &self.txn { match self.txn.as_ref() {
Some(txn) => { Some(txn) => {
let txn = txn.clone(); let txn = txn.clone();
let mut txn = txn.lock().await; let mut txn = txn.lock().await;

View file

@ -2,7 +2,6 @@ use crate::dbs::Executor;
use crate::dbs::Options; use crate::dbs::Options;
use crate::dbs::Runtime; use crate::dbs::Runtime;
use crate::err::Error; use crate::err::Error;
use crate::kvs::transaction;
use bytes::Bytes; use bytes::Bytes;
use hyper::body::Sender; use hyper::body::Sender;
@ -20,7 +19,7 @@ impl Executor {
mut chn: Sender, mut chn: Sender,
) -> Result<(), Error> { ) -> Result<(), Error> {
// Start a new transaction // Start a new transaction
let txn = transaction(false, false).await?; let txn = self.dbs.transaction(false, false).await?;
// Output OPTIONS // Output OPTIONS
chn.send_data(output!("-- ------------------------------")).await?; chn.send_data(output!("-- ------------------------------")).await?;
chn.send_data(output!("-- OPTION")).await?; chn.send_data(output!("-- OPTION")).await?;

View file

@ -18,7 +18,7 @@ pub struct Transaction {
impl Datastore { impl Datastore {
// Open a new database // Open a new database
pub fn new(_path: &str) -> Result<Datastore, Error> { pub async fn new(_path: &str) -> Result<Datastore, Error> {
Ok(Datastore { Ok(Datastore {
db: echodb::db::new(), db: echodb::db::new(),
}) })

View file

@ -18,7 +18,7 @@ pub struct Transaction {
impl Datastore { impl Datastore {
// Open a new database // Open a new database
pub fn new() -> Result<Datastore, Error> { pub async fn new() -> Result<Datastore, Error> {
Ok(Datastore { Ok(Datastore {
db: echodb::db::new(), db: echodb::db::new(),
}) })

View file

@ -8,7 +8,9 @@ pub use self::kv::*;
pub use self::tx::*; pub use self::tx::*;
use crate::err::Error; use crate::err::Error;
use once_cell::sync::OnceCell; use std::sync::Arc;
pub type Store = Arc<Datastore>;
pub enum Datastore { pub enum Datastore {
Mock, Mock,
@ -24,52 +26,53 @@ pub enum Transaction {
TiKV(tikv::Transaction), TiKV(tikv::Transaction),
} }
static DB: OnceCell<Datastore> = OnceCell::new(); impl Datastore {
// Create a new datastore
pub fn init(path: &str) -> Result<(), Error> { pub async fn new(path: &str) -> Result<Self, Error> {
// Instantiate the database endpoint match path {
match path { "memory" => {
"memory" => { info!("Starting kvs store in {}", path);
info!("Starting kvs store in {}", path); mem::Datastore::new().await.map(Datastore::Mem)
let ds = mem::Datastore::new()?; }
let _ = DB.set(Datastore::Mem(ds)); // Parse and initiate an File database
Ok(()) #[cfg(not(target_arch = "wasm32"))]
s if s.starts_with("file:") => {
info!("Starting kvs store at {}", path);
let s = s.trim_start_matches("tikv://");
file::Datastore::new(s).await.map(Datastore::File)
}
// Parse and initiate an TiKV database
#[cfg(not(target_arch = "wasm32"))]
s if s.starts_with("tikv:") => {
info!("Starting kvs store at {}", path);
let s = s.trim_start_matches("tikv://");
tikv::Datastore::new(s).await.map(Datastore::TiKV)
}
// The datastore path is not valid
_ => unreachable!(),
} }
s if s.starts_with("file:") => {
info!("Starting kvs store at {}", path);
let s = s.trim_start_matches("file://");
let ds = file::Datastore::new(s)?;
let _ = DB.set(Datastore::File(ds));
Ok(())
}
s if s.starts_with("tikv:") => {
info!("Starting kvs store at {}", path);
let s = s.trim_start_matches("tikv://");
let ds = tikv::Datastore::new(s)?;
let _ = DB.set(Datastore::TiKV(ds));
Ok(())
}
_ => unreachable!(),
} }
} // Create a new transaction
pub async fn transaction(&self, write: bool, lock: bool) -> Result<Transaction, Error> {
pub async fn transaction<'a>(write: bool, lock: bool) -> Result<Transaction, Error> { match self {
match DB.get().unwrap() { Datastore::Mock => {
Datastore::Mock => { let tx = Transaction::Mock;
let tx = Transaction::Mock; Ok(tx)
Ok(tx) }
} Datastore::Mem(v) => {
Datastore::Mem(v) => { let tx = v.transaction(write, lock).await?;
let tx = v.transaction(write, lock).await?; Ok(Transaction::Mem(tx))
Ok(Transaction::Mem(tx)) }
} #[cfg(not(target_arch = "wasm32"))]
Datastore::File(v) => { Datastore::File(v) => {
let tx = v.transaction(write, lock).await?; let tx = v.transaction(write, lock).await?;
Ok(Transaction::File(tx)) Ok(Transaction::File(tx))
} }
Datastore::TiKV(v) => { #[cfg(not(target_arch = "wasm32"))]
let tx = v.transaction(write, lock).await?; Datastore::TiKV(v) => {
Ok(Transaction::TiKV(tx)) let tx = v.transaction(write, lock).await?;
Ok(Transaction::TiKV(tx))
}
} }
} }
} }

View file

@ -18,11 +18,9 @@ pub struct Transaction {
impl Datastore { impl Datastore {
// Open a new database // Open a new database
pub fn new(path: &str) -> Result<Datastore, Error> { pub async fn new(path: &str) -> Result<Datastore, Error> {
let db = tikv::TransactionClient::new(vec![path]);
let db = futures::executor::block_on(db)?;
Ok(Datastore { Ok(Datastore {
db, db: tikv::TransactionClient::new(vec![path]).await?,
}) })
} }
// Start a new transaction // Start a new transaction

View file

@ -1,6 +1,7 @@
use crate::dbs::export; use crate::dbs::export;
use crate::dbs::Session; use crate::dbs::Session;
use crate::net::conf; use crate::net::conf;
use crate::net::DB;
use hyper::body::Body; use hyper::body::Body;
use warp::Filter; use warp::Filter;
@ -16,7 +17,8 @@ pub fn config() -> impl Filter<Extract = impl warp::Reply, Error = warp::Rejecti
} }
async fn handler(session: Session) -> Result<impl warp::Reply, warp::Rejection> { async fn handler(session: Session) -> Result<impl warp::Reply, warp::Rejection> {
let db = DB.get().unwrap().clone();
let (chn, body) = Body::channel(); let (chn, body) = Body::channel();
tokio::spawn(export(session, chn)); tokio::spawn(export(db, session, chn));
Ok(warp::reply::Response::new(body)) Ok(warp::reply::Response::new(body))
} }

View file

@ -3,6 +3,7 @@ use crate::err::Error;
use crate::net::conf; use crate::net::conf;
use crate::net::head; use crate::net::head;
use crate::net::output; use crate::net::output;
use crate::net::DB;
use crate::sql::value::Value; use crate::sql::value::Value;
use bytes::Bytes; use bytes::Bytes;
use serde::Deserialize; use serde::Deserialize;
@ -105,6 +106,7 @@ async fn select_all(
table: String, table: String,
query: Query, query: Query,
) -> Result<impl warp::Reply, warp::Rejection> { ) -> Result<impl warp::Reply, warp::Rejection> {
let db = DB.get().unwrap().clone();
let sql = format!( let sql = format!(
"SELECT * FROM type::table($table) LIMIT {l} START {s}", "SELECT * FROM type::table($table) LIMIT {l} START {s}",
l = query.limit.unwrap_or(String::from("100")), l = query.limit.unwrap_or(String::from("100")),
@ -113,7 +115,7 @@ async fn select_all(
let vars = hmap! { let vars = hmap! {
String::from("table") => Value::from(table), String::from("table") => Value::from(table),
}; };
match crate::dbs::execute(sql.as_str(), session, Some(vars)).await { match crate::dbs::execute(db, sql.as_str(), session, Some(vars)).await {
Ok(ref res) => match output.as_ref() { Ok(ref res) => match output.as_ref() {
"application/json" => Ok(output::json(res)), "application/json" => Ok(output::json(res)),
"application/cbor" => Ok(output::cbor(res)), "application/cbor" => Ok(output::cbor(res)),
@ -130,6 +132,7 @@ async fn create_all(
table: String, table: String,
body: Bytes, body: Bytes,
) -> Result<impl warp::Reply, warp::Rejection> { ) -> Result<impl warp::Reply, warp::Rejection> {
let db = DB.get().unwrap().clone();
let data = str::from_utf8(&body).unwrap(); let data = str::from_utf8(&body).unwrap();
match crate::sql::value::json(data) { match crate::sql::value::json(data) {
Ok((_, data)) => { Ok((_, data)) => {
@ -138,7 +141,7 @@ async fn create_all(
String::from("table") => Value::from(table), String::from("table") => Value::from(table),
String::from("data") => Value::from(data), String::from("data") => Value::from(data),
}; };
match crate::dbs::execute(sql, session, Some(vars)).await { match crate::dbs::execute(db, sql, session, Some(vars)).await {
Ok(res) => match output.as_ref() { Ok(res) => match output.as_ref() {
"application/json" => Ok(output::json(&res)), "application/json" => Ok(output::json(&res)),
"application/cbor" => Ok(output::cbor(&res)), "application/cbor" => Ok(output::cbor(&res)),
@ -157,11 +160,12 @@ async fn delete_all(
output: String, output: String,
table: String, table: String,
) -> Result<impl warp::Reply, warp::Rejection> { ) -> Result<impl warp::Reply, warp::Rejection> {
let db = DB.get().unwrap().clone();
let sql = "DELETE type::table($table)"; let sql = "DELETE type::table($table)";
let vars = hmap! { let vars = hmap! {
String::from("table") => Value::from(table), String::from("table") => Value::from(table),
}; };
match crate::dbs::execute(sql, session, Some(vars)).await { match crate::dbs::execute(db, sql, session, Some(vars)).await {
Ok(res) => match output.as_ref() { Ok(res) => match output.as_ref() {
"application/json" => Ok(output::json(&res)), "application/json" => Ok(output::json(&res)),
"application/cbor" => Ok(output::cbor(&res)), "application/cbor" => Ok(output::cbor(&res)),
@ -182,12 +186,13 @@ async fn select_one(
table: String, table: String,
id: String, id: String,
) -> Result<impl warp::Reply, warp::Rejection> { ) -> Result<impl warp::Reply, warp::Rejection> {
let db = DB.get().unwrap().clone();
let sql = "SELECT * FROM type::thing($table, $id)"; let sql = "SELECT * FROM type::thing($table, $id)";
let vars = hmap! { let vars = hmap! {
String::from("table") => Value::from(table), String::from("table") => Value::from(table),
String::from("id") => Value::from(id), String::from("id") => Value::from(id),
}; };
match crate::dbs::execute(sql, session, Some(vars)).await { match crate::dbs::execute(db, sql, session, Some(vars)).await {
Ok(res) => match output.as_ref() { Ok(res) => match output.as_ref() {
"application/json" => Ok(output::json(&res)), "application/json" => Ok(output::json(&res)),
"application/cbor" => Ok(output::cbor(&res)), "application/cbor" => Ok(output::cbor(&res)),
@ -205,6 +210,7 @@ async fn create_one(
id: String, id: String,
body: Bytes, body: Bytes,
) -> Result<impl warp::Reply, warp::Rejection> { ) -> Result<impl warp::Reply, warp::Rejection> {
let db = DB.get().unwrap().clone();
let data = str::from_utf8(&body).unwrap(); let data = str::from_utf8(&body).unwrap();
match crate::sql::value::json(data) { match crate::sql::value::json(data) {
Ok((_, data)) => { Ok((_, data)) => {
@ -214,7 +220,7 @@ async fn create_one(
String::from("id") => Value::from(id), String::from("id") => Value::from(id),
String::from("data") => Value::from(data), String::from("data") => Value::from(data),
}; };
match crate::dbs::execute(sql, session, Some(vars)).await { match crate::dbs::execute(db, sql, session, Some(vars)).await {
Ok(res) => match output.as_ref() { Ok(res) => match output.as_ref() {
"application/json" => Ok(output::json(&res)), "application/json" => Ok(output::json(&res)),
"application/cbor" => Ok(output::cbor(&res)), "application/cbor" => Ok(output::cbor(&res)),
@ -235,6 +241,7 @@ async fn update_one(
id: String, id: String,
body: Bytes, body: Bytes,
) -> Result<impl warp::Reply, warp::Rejection> { ) -> Result<impl warp::Reply, warp::Rejection> {
let db = DB.get().unwrap().clone();
let data = str::from_utf8(&body).unwrap(); let data = str::from_utf8(&body).unwrap();
match crate::sql::value::json(data) { match crate::sql::value::json(data) {
Ok((_, data)) => { Ok((_, data)) => {
@ -244,7 +251,7 @@ async fn update_one(
String::from("id") => Value::from(id), String::from("id") => Value::from(id),
String::from("data") => Value::from(data), String::from("data") => Value::from(data),
}; };
match crate::dbs::execute(sql, session, Some(vars)).await { match crate::dbs::execute(db, sql, session, Some(vars)).await {
Ok(res) => match output.as_ref() { Ok(res) => match output.as_ref() {
"application/json" => Ok(output::json(&res)), "application/json" => Ok(output::json(&res)),
"application/cbor" => Ok(output::cbor(&res)), "application/cbor" => Ok(output::cbor(&res)),
@ -265,6 +272,7 @@ async fn modify_one(
id: String, id: String,
body: Bytes, body: Bytes,
) -> Result<impl warp::Reply, warp::Rejection> { ) -> Result<impl warp::Reply, warp::Rejection> {
let db = DB.get().unwrap().clone();
let data = str::from_utf8(&body).unwrap(); let data = str::from_utf8(&body).unwrap();
match crate::sql::value::json(data) { match crate::sql::value::json(data) {
Ok((_, data)) => { Ok((_, data)) => {
@ -274,7 +282,7 @@ async fn modify_one(
String::from("id") => Value::from(id), String::from("id") => Value::from(id),
String::from("data") => Value::from(data), String::from("data") => Value::from(data),
}; };
match crate::dbs::execute(sql, session, Some(vars)).await { match crate::dbs::execute(db, sql, session, Some(vars)).await {
Ok(res) => match output.as_ref() { Ok(res) => match output.as_ref() {
"application/json" => Ok(output::json(&res)), "application/json" => Ok(output::json(&res)),
"application/cbor" => Ok(output::cbor(&res)), "application/cbor" => Ok(output::cbor(&res)),
@ -294,12 +302,13 @@ async fn delete_one(
table: String, table: String,
id: String, id: String,
) -> Result<impl warp::Reply, warp::Rejection> { ) -> Result<impl warp::Reply, warp::Rejection> {
let db = DB.get().unwrap().clone();
let sql = "DELETE type::thing($table, $id)"; let sql = "DELETE type::thing($table, $id)";
let vars = hmap! { let vars = hmap! {
String::from("table") => Value::from(table), String::from("table") => Value::from(table),
String::from("id") => Value::from(id), String::from("id") => Value::from(id),
}; };
match crate::dbs::execute(sql, session, Some(vars)).await { match crate::dbs::execute(db, sql, session, Some(vars)).await {
Ok(res) => match output.as_ref() { Ok(res) => match output.as_ref() {
"application/json" => Ok(output::json(&res)), "application/json" => Ok(output::json(&res)),
"application/cbor" => Ok(output::cbor(&res)), "application/cbor" => Ok(output::cbor(&res)),

View file

@ -15,17 +15,26 @@ mod sync;
mod version; mod version;
use crate::err::Error; use crate::err::Error;
use crate::kvs::Datastore;
use once_cell::sync::OnceCell;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::sync::Arc;
use uuid::Uuid; use uuid::Uuid;
use warp::Filter; use warp::Filter;
const ID: &'static str = "Request-Id"; const ID: &'static str = "Request-Id";
#[tokio::main] static DB: OnceCell<Arc<Datastore>> = OnceCell::new();
pub async fn init(bind: &str) -> Result<(), Error> {
//
let adr: SocketAddr = bind.parse().expect("Unable to parse socket address");
#[tokio::main]
pub async fn init(bind: &str, path: &str) -> Result<(), Error> {
// Parse the desired binding socket address
let adr: SocketAddr = bind.parse().expect("Unable to parse socket address");
// Parse and setup desired datastore
let dbs = Datastore::new(path).await.expect("Unable to parse datastore path");
// Store database instance
let _ = DB.set(Arc::new(dbs));
// Setup web routes
let net = root::config() let net = root::config()
// Version endpoint // Version endpoint
.or(version::config()) .or(version::config())
@ -49,18 +58,18 @@ pub async fn init(bind: &str) -> Result<(), Error> {
.recover(fail::recover) .recover(fail::recover)
// End routes setup // End routes setup
; ;
// Enable response compression
let net = net.with(warp::compression::gzip()); let net = net.with(warp::compression::gzip());
// Specify a generic version header
let net = net.with(head::server());
let net = net.with(head::version()); let net = net.with(head::version());
// Specify a generic server header
let net = net.with(head::server());
// Specify an ID for each request
let net = net.map(|reply| { let net = net.map(|reply| {
let val = Uuid::new_v4().to_string(); let val = Uuid::new_v4().to_string();
warp::reply::with_header(reply, ID, val) warp::reply::with_header(reply, ID, val)
}); });
// Log all requests to the console
let net = net.with(log::write()); let net = net.with(log::write());
info!("Starting web server on {}", adr); info!("Starting web server on {}", adr);

View file

@ -2,6 +2,7 @@ use crate::dbs::Session;
use crate::net::conf; use crate::net::conf;
use crate::net::head; use crate::net::head;
use crate::net::output; use crate::net::output;
use crate::net::DB;
use bytes::Bytes; use bytes::Bytes;
use futures::{FutureExt, StreamExt}; use futures::{FutureExt, StreamExt};
use warp::Filter; use warp::Filter;
@ -40,8 +41,9 @@ async fn handler(
output: String, output: String,
sql: Bytes, sql: Bytes,
) -> Result<impl warp::Reply, warp::Rejection> { ) -> Result<impl warp::Reply, warp::Rejection> {
let db = DB.get().unwrap().clone();
let sql = std::str::from_utf8(&sql).unwrap(); let sql = std::str::from_utf8(&sql).unwrap();
match crate::dbs::execute(sql, session, None).await { match crate::dbs::execute(db, sql, session, None).await {
Ok(res) => match output.as_ref() { Ok(res) => match output.as_ref() {
"application/json" => Ok(output::json(&res)), "application/json" => Ok(output::json(&res)),
"application/cbor" => Ok(output::cbor(&res)), "application/cbor" => Ok(output::cbor(&res)),