diff --git a/src/cli/start.rs b/src/cli/start.rs index 6146f866..aba78a07 100644 --- a/src/cli/start.rs +++ b/src/cli/start.rs @@ -1,4 +1,3 @@ -use crate::kvs; use crate::err::Error; use crate::net; use clap; @@ -22,10 +21,8 @@ pub fn init(matches: &clap::ArgMatches) -> Result<(), Error> { let path = matches.value_of("path").unwrap(); // Parse the server binding address let bind = matches.value_of("bind").unwrap(); - // Start up the kvs storage - kvs::init(path)?; // Start up the web server - net::init(bind)?; + net::init(bind, path)?; // Don't error when done Ok(()) } diff --git a/src/dbs/dbs.rs b/src/dbs/dbs.rs index 214fa1dd..4cf7cfa3 100644 --- a/src/dbs/dbs.rs +++ b/src/dbs/dbs.rs @@ -5,16 +5,22 @@ use crate::dbs::Responses; use crate::dbs::Session; use crate::dbs::Variables; use crate::err::Error; +use crate::kvs::Store; use crate::sql; use crate::sql::query::Query; use hyper::body::Sender; use std::sync::Arc; -pub async fn execute(txt: &str, session: Session, vars: Variables) -> Result { +pub async fn execute( + db: Store, + txt: &str, + session: Session, + vars: Variables, +) -> Result { // Create a new query options let mut opt = Options::default(); // Create a new query executor - let mut exe = Executor::new(); + let mut exe = Executor::new(db); // Create a new execution context let ctx = session.context(); // Attach the defined variables @@ -27,11 +33,16 @@ pub async fn execute(txt: &str, session: Session, vars: Variables) -> Result Result { +pub async fn process( + db: Store, + ast: Query, + session: Session, + vars: Variables, +) -> Result { // Create a new query options let mut opt = Options::default(); // Create a new query executor - let mut exe = Executor::new(); + let mut exe = Executor::new(db); // Store session info on context let ctx = session.context(); // Attach the defined variables @@ -42,11 +53,11 @@ pub async fn process(ast: Query, session: Session, vars: Variables) -> Result Result<(), Error> { +pub async fn export(db: Store, session: Session, sender: Sender) -> Result<(), Error> { // Create a new query options let mut opt = Options::default(); // Create a new query executor - let mut exe = Executor::new(); + let mut exe = Executor::new(db); // Create a new execution context let ctx = session.context(); // Process database export diff --git a/src/dbs/executor.rs b/src/dbs/executor.rs index 53e4148b..20bada31 100644 --- a/src/dbs/executor.rs +++ b/src/dbs/executor.rs @@ -3,9 +3,9 @@ use crate::dbs::response::{Response, Responses, Status}; use crate::dbs::Auth; use crate::dbs::Options; use crate::dbs::Runtime; +use crate::dbs::Transaction; use crate::err::Error; -use crate::kvs::transaction; -use crate::kvs::Transaction; +use crate::kvs::Store; use crate::sql::query::Query; use crate::sql::statement::Statement; use crate::sql::value::Value; @@ -13,30 +13,32 @@ use futures::lock::Mutex; use std::sync::Arc; use std::time::Instant; -#[derive(Default)] pub struct Executor { - txn: Option>>, - err: Option, + pub(super) dbs: Store, + pub(super) err: Option, + pub(super) txn: Option, } impl Executor { - pub fn new() -> Executor { + pub fn new(dbs: Store) -> Executor { Executor { - ..Executor::default() + dbs, + txn: None, + err: None, } } - fn txn(&self) -> Arc> { - match &self.txn { + fn txn(&self) -> Transaction { + match self.txn.as_ref() { Some(txn) => txn.clone(), None => unreachable!(), } } async fn begin(&mut self) -> bool { - match &self.txn { + match self.txn.as_ref() { Some(_) => false, - None => match transaction(true, false).await { + None => match self.dbs.transaction(true, false).await { Ok(v) => { self.txn = Some(Arc::new(Mutex::new(v))); true @@ -51,7 +53,7 @@ impl Executor { async fn commit(&mut self, local: bool) { if local { - match &self.txn { + match self.txn.as_ref() { Some(txn) => match &self.err { Some(_) => { let txn = txn.clone(); @@ -77,7 +79,7 @@ impl Executor { async fn cancel(&mut self, local: bool) { if local { - match &self.txn { + match self.txn.as_ref() { Some(txn) => { let txn = txn.clone(); let mut txn = txn.lock().await; diff --git a/src/dbs/export.rs b/src/dbs/export.rs index 5eb97cbc..9ae38a77 100644 --- a/src/dbs/export.rs +++ b/src/dbs/export.rs @@ -2,7 +2,6 @@ use crate::dbs::Executor; use crate::dbs::Options; use crate::dbs::Runtime; use crate::err::Error; -use crate::kvs::transaction; use bytes::Bytes; use hyper::body::Sender; @@ -20,7 +19,7 @@ impl Executor { mut chn: Sender, ) -> Result<(), Error> { // Start a new transaction - let txn = transaction(false, false).await?; + let txn = self.dbs.transaction(false, false).await?; // Output OPTIONS chn.send_data(output!("-- ------------------------------")).await?; chn.send_data(output!("-- OPTION")).await?; diff --git a/src/kvs/file/mod.rs b/src/kvs/file/mod.rs index a42c72cb..2360a460 100644 --- a/src/kvs/file/mod.rs +++ b/src/kvs/file/mod.rs @@ -18,7 +18,7 @@ pub struct Transaction { impl Datastore { // Open a new database - pub fn new(_path: &str) -> Result { + pub async fn new(_path: &str) -> Result { Ok(Datastore { db: echodb::db::new(), }) diff --git a/src/kvs/mem/mod.rs b/src/kvs/mem/mod.rs index 894e5ca5..92de760f 100644 --- a/src/kvs/mem/mod.rs +++ b/src/kvs/mem/mod.rs @@ -18,7 +18,7 @@ pub struct Transaction { impl Datastore { // Open a new database - pub fn new() -> Result { + pub async fn new() -> Result { Ok(Datastore { db: echodb::db::new(), }) diff --git a/src/kvs/mod.rs b/src/kvs/mod.rs index 8a5c849f..3577a64e 100644 --- a/src/kvs/mod.rs +++ b/src/kvs/mod.rs @@ -8,7 +8,9 @@ pub use self::kv::*; pub use self::tx::*; use crate::err::Error; -use once_cell::sync::OnceCell; +use std::sync::Arc; + +pub type Store = Arc; pub enum Datastore { Mock, @@ -24,52 +26,53 @@ pub enum Transaction { TiKV(tikv::Transaction), } -static DB: OnceCell = OnceCell::new(); - -pub fn init(path: &str) -> Result<(), Error> { - // Instantiate the database endpoint - match path { - "memory" => { - info!("Starting kvs store in {}", path); - let ds = mem::Datastore::new()?; - let _ = DB.set(Datastore::Mem(ds)); - Ok(()) +impl Datastore { + // Create a new datastore + pub async fn new(path: &str) -> Result { + match path { + "memory" => { + info!("Starting kvs store in {}", path); + mem::Datastore::new().await.map(Datastore::Mem) + } + // Parse and initiate an File database + #[cfg(not(target_arch = "wasm32"))] + s if s.starts_with("file:") => { + info!("Starting kvs store at {}", path); + let s = s.trim_start_matches("tikv://"); + file::Datastore::new(s).await.map(Datastore::File) + } + // Parse and initiate an TiKV database + #[cfg(not(target_arch = "wasm32"))] + s if s.starts_with("tikv:") => { + info!("Starting kvs store at {}", path); + let s = s.trim_start_matches("tikv://"); + tikv::Datastore::new(s).await.map(Datastore::TiKV) + } + // The datastore path is not valid + _ => unreachable!(), } - s if s.starts_with("file:") => { - info!("Starting kvs store at {}", path); - let s = s.trim_start_matches("file://"); - let ds = file::Datastore::new(s)?; - let _ = DB.set(Datastore::File(ds)); - Ok(()) - } - s if s.starts_with("tikv:") => { - info!("Starting kvs store at {}", path); - let s = s.trim_start_matches("tikv://"); - let ds = tikv::Datastore::new(s)?; - let _ = DB.set(Datastore::TiKV(ds)); - Ok(()) - } - _ => unreachable!(), } -} - -pub async fn transaction<'a>(write: bool, lock: bool) -> Result { - match DB.get().unwrap() { - Datastore::Mock => { - let tx = Transaction::Mock; - Ok(tx) - } - Datastore::Mem(v) => { - let tx = v.transaction(write, lock).await?; - Ok(Transaction::Mem(tx)) - } - Datastore::File(v) => { - let tx = v.transaction(write, lock).await?; - Ok(Transaction::File(tx)) - } - Datastore::TiKV(v) => { - let tx = v.transaction(write, lock).await?; - Ok(Transaction::TiKV(tx)) + // Create a new transaction + pub async fn transaction(&self, write: bool, lock: bool) -> Result { + match self { + Datastore::Mock => { + let tx = Transaction::Mock; + Ok(tx) + } + Datastore::Mem(v) => { + let tx = v.transaction(write, lock).await?; + Ok(Transaction::Mem(tx)) + } + #[cfg(not(target_arch = "wasm32"))] + Datastore::File(v) => { + let tx = v.transaction(write, lock).await?; + Ok(Transaction::File(tx)) + } + #[cfg(not(target_arch = "wasm32"))] + Datastore::TiKV(v) => { + let tx = v.transaction(write, lock).await?; + Ok(Transaction::TiKV(tx)) + } } } } diff --git a/src/kvs/tikv/mod.rs b/src/kvs/tikv/mod.rs index 95661046..8de0f8b2 100644 --- a/src/kvs/tikv/mod.rs +++ b/src/kvs/tikv/mod.rs @@ -18,11 +18,9 @@ pub struct Transaction { impl Datastore { // Open a new database - pub fn new(path: &str) -> Result { - let db = tikv::TransactionClient::new(vec![path]); - let db = futures::executor::block_on(db)?; + pub async fn new(path: &str) -> Result { Ok(Datastore { - db, + db: tikv::TransactionClient::new(vec![path]).await?, }) } // Start a new transaction diff --git a/src/net/export.rs b/src/net/export.rs index 07788d3c..8a7b2313 100644 --- a/src/net/export.rs +++ b/src/net/export.rs @@ -1,6 +1,7 @@ use crate::dbs::export; use crate::dbs::Session; use crate::net::conf; +use crate::net::DB; use hyper::body::Body; use warp::Filter; @@ -16,7 +17,8 @@ pub fn config() -> impl Filter Result { + let db = DB.get().unwrap().clone(); let (chn, body) = Body::channel(); - tokio::spawn(export(session, chn)); + tokio::spawn(export(db, session, chn)); Ok(warp::reply::Response::new(body)) } diff --git a/src/net/key.rs b/src/net/key.rs index 012aa790..e8431689 100644 --- a/src/net/key.rs +++ b/src/net/key.rs @@ -3,6 +3,7 @@ use crate::err::Error; use crate::net::conf; use crate::net::head; use crate::net::output; +use crate::net::DB; use crate::sql::value::Value; use bytes::Bytes; use serde::Deserialize; @@ -105,6 +106,7 @@ async fn select_all( table: String, query: Query, ) -> Result { + let db = DB.get().unwrap().clone(); let sql = format!( "SELECT * FROM type::table($table) LIMIT {l} START {s}", l = query.limit.unwrap_or(String::from("100")), @@ -113,7 +115,7 @@ async fn select_all( let vars = hmap! { String::from("table") => Value::from(table), }; - match crate::dbs::execute(sql.as_str(), session, Some(vars)).await { + match crate::dbs::execute(db, sql.as_str(), session, Some(vars)).await { Ok(ref res) => match output.as_ref() { "application/json" => Ok(output::json(res)), "application/cbor" => Ok(output::cbor(res)), @@ -130,6 +132,7 @@ async fn create_all( table: String, body: Bytes, ) -> Result { + let db = DB.get().unwrap().clone(); let data = str::from_utf8(&body).unwrap(); match crate::sql::value::json(data) { Ok((_, data)) => { @@ -138,7 +141,7 @@ async fn create_all( String::from("table") => Value::from(table), String::from("data") => Value::from(data), }; - match crate::dbs::execute(sql, session, Some(vars)).await { + match crate::dbs::execute(db, sql, session, Some(vars)).await { Ok(res) => match output.as_ref() { "application/json" => Ok(output::json(&res)), "application/cbor" => Ok(output::cbor(&res)), @@ -157,11 +160,12 @@ async fn delete_all( output: String, table: String, ) -> Result { + let db = DB.get().unwrap().clone(); let sql = "DELETE type::table($table)"; let vars = hmap! { String::from("table") => Value::from(table), }; - match crate::dbs::execute(sql, session, Some(vars)).await { + match crate::dbs::execute(db, sql, session, Some(vars)).await { Ok(res) => match output.as_ref() { "application/json" => Ok(output::json(&res)), "application/cbor" => Ok(output::cbor(&res)), @@ -182,12 +186,13 @@ async fn select_one( table: String, id: String, ) -> Result { + let db = DB.get().unwrap().clone(); let sql = "SELECT * FROM type::thing($table, $id)"; let vars = hmap! { String::from("table") => Value::from(table), String::from("id") => Value::from(id), }; - match crate::dbs::execute(sql, session, Some(vars)).await { + match crate::dbs::execute(db, sql, session, Some(vars)).await { Ok(res) => match output.as_ref() { "application/json" => Ok(output::json(&res)), "application/cbor" => Ok(output::cbor(&res)), @@ -205,6 +210,7 @@ async fn create_one( id: String, body: Bytes, ) -> Result { + let db = DB.get().unwrap().clone(); let data = str::from_utf8(&body).unwrap(); match crate::sql::value::json(data) { Ok((_, data)) => { @@ -214,7 +220,7 @@ async fn create_one( String::from("id") => Value::from(id), String::from("data") => Value::from(data), }; - match crate::dbs::execute(sql, session, Some(vars)).await { + match crate::dbs::execute(db, sql, session, Some(vars)).await { Ok(res) => match output.as_ref() { "application/json" => Ok(output::json(&res)), "application/cbor" => Ok(output::cbor(&res)), @@ -235,6 +241,7 @@ async fn update_one( id: String, body: Bytes, ) -> Result { + let db = DB.get().unwrap().clone(); let data = str::from_utf8(&body).unwrap(); match crate::sql::value::json(data) { Ok((_, data)) => { @@ -244,7 +251,7 @@ async fn update_one( String::from("id") => Value::from(id), String::from("data") => Value::from(data), }; - match crate::dbs::execute(sql, session, Some(vars)).await { + match crate::dbs::execute(db, sql, session, Some(vars)).await { Ok(res) => match output.as_ref() { "application/json" => Ok(output::json(&res)), "application/cbor" => Ok(output::cbor(&res)), @@ -265,6 +272,7 @@ async fn modify_one( id: String, body: Bytes, ) -> Result { + let db = DB.get().unwrap().clone(); let data = str::from_utf8(&body).unwrap(); match crate::sql::value::json(data) { Ok((_, data)) => { @@ -274,7 +282,7 @@ async fn modify_one( String::from("id") => Value::from(id), String::from("data") => Value::from(data), }; - match crate::dbs::execute(sql, session, Some(vars)).await { + match crate::dbs::execute(db, sql, session, Some(vars)).await { Ok(res) => match output.as_ref() { "application/json" => Ok(output::json(&res)), "application/cbor" => Ok(output::cbor(&res)), @@ -294,12 +302,13 @@ async fn delete_one( table: String, id: String, ) -> Result { + let db = DB.get().unwrap().clone(); let sql = "DELETE type::thing($table, $id)"; let vars = hmap! { String::from("table") => Value::from(table), String::from("id") => Value::from(id), }; - match crate::dbs::execute(sql, session, Some(vars)).await { + match crate::dbs::execute(db, sql, session, Some(vars)).await { Ok(res) => match output.as_ref() { "application/json" => Ok(output::json(&res)), "application/cbor" => Ok(output::cbor(&res)), diff --git a/src/net/mod.rs b/src/net/mod.rs index 49bf7571..39a380a8 100644 --- a/src/net/mod.rs +++ b/src/net/mod.rs @@ -15,17 +15,26 @@ mod sync; mod version; use crate::err::Error; +use crate::kvs::Datastore; +use once_cell::sync::OnceCell; use std::net::SocketAddr; +use std::sync::Arc; use uuid::Uuid; use warp::Filter; const ID: &'static str = "Request-Id"; -#[tokio::main] -pub async fn init(bind: &str) -> Result<(), Error> { - // - let adr: SocketAddr = bind.parse().expect("Unable to parse socket address"); +static DB: OnceCell> = OnceCell::new(); +#[tokio::main] +pub async fn init(bind: &str, path: &str) -> Result<(), Error> { + // Parse the desired binding socket address + let adr: SocketAddr = bind.parse().expect("Unable to parse socket address"); + // Parse and setup desired datastore + let dbs = Datastore::new(path).await.expect("Unable to parse datastore path"); + // Store database instance + let _ = DB.set(Arc::new(dbs)); + // Setup web routes let net = root::config() // Version endpoint .or(version::config()) @@ -49,18 +58,18 @@ pub async fn init(bind: &str) -> Result<(), Error> { .recover(fail::recover) // End routes setup ; - + // Enable response compression let net = net.with(warp::compression::gzip()); - - let net = net.with(head::server()); - + // Specify a generic version header let net = net.with(head::version()); - + // Specify a generic server header + let net = net.with(head::server()); + // Specify an ID for each request let net = net.map(|reply| { let val = Uuid::new_v4().to_string(); warp::reply::with_header(reply, ID, val) }); - + // Log all requests to the console let net = net.with(log::write()); info!("Starting web server on {}", adr); diff --git a/src/net/sql.rs b/src/net/sql.rs index aa034fb7..e01ac6ec 100644 --- a/src/net/sql.rs +++ b/src/net/sql.rs @@ -2,6 +2,7 @@ use crate::dbs::Session; use crate::net::conf; use crate::net::head; use crate::net::output; +use crate::net::DB; use bytes::Bytes; use futures::{FutureExt, StreamExt}; use warp::Filter; @@ -40,8 +41,9 @@ async fn handler( output: String, sql: Bytes, ) -> Result { + let db = DB.get().unwrap().clone(); let sql = std::str::from_utf8(&sql).unwrap(); - match crate::dbs::execute(sql, session, None).await { + match crate::dbs::execute(db, sql, session, None).await { Ok(res) => match output.as_ref() { "application/json" => Ok(output::json(&res)), "application/cbor" => Ok(output::cbor(&res)),