diff --git a/lib/Cargo.toml b/lib/Cargo.toml index 2acc11c2..bcbd923f 100644 --- a/lib/Cargo.toml +++ b/lib/Cargo.toml @@ -7,7 +7,7 @@ authors = ["Tobie Morgan Hitchcock "] [features] default = ["parallel", "kv-tikv", "kv-echodb", "kv-yokudb"] -parallel = ["tokio"] +parallel = [] kv-tikv = ["tikv"] kv-echodb = ["echodb"] kv-indxdb = ["indxdb"] @@ -47,7 +47,6 @@ sha2 = "0.10.2" slug = "0.1.4" thiserror = "1.0.31" tikv = { version = "0.1.0", package = "tikv-client", optional = true } -tokio = { version = "1.18.1", features = ["sync"], optional = true } url = "2.2.2" utf-8 = "0.7.6" uuid = { version = "1.0.0", features = ["serde", "v4"] } diff --git a/lib/src/err/mod.rs b/lib/src/err/mod.rs index 210d563a..0434c700 100644 --- a/lib/src/err/mod.rs +++ b/lib/src/err/mod.rs @@ -7,9 +7,6 @@ use storekey::decode::Error as DecodeError; use storekey::encode::Error as EncodeError; use thiserror::Error; -#[cfg(feature = "parallel")] -use tokio::sync::mpsc::error::SendError as TokioError; - /// An error originating from the SurrealDB client library. #[derive(Error, Debug)] pub enum Error { @@ -252,15 +249,14 @@ impl From for Error { } } -#[cfg(feature = "parallel")] -impl From> for Error { - fn from(e: TokioError) -> Error { +impl From for Error { + fn from(e: channel::RecvError) -> Error { Error::Channel(e.to_string()) } } -impl From for Error { - fn from(e: channel::RecvError) -> Error { +impl From> for Error { + fn from(e: channel::SendError) -> Error { Error::Channel(e.to_string()) } } diff --git a/lib/src/exe/mod.rs b/lib/src/exe/mod.rs new file mode 100644 index 00000000..2961a65f --- /dev/null +++ b/lib/src/exe/mod.rs @@ -0,0 +1,17 @@ +use executor::{Executor, Task}; +use once_cell::sync::Lazy; +use std::future::Future; +use std::panic::catch_unwind; + +pub fn spawn(future: impl Future + Send + 'static) -> Task { + static GLOBAL: Lazy> = Lazy::new(|| { + std::thread::spawn(|| { + catch_unwind(|| { + futures::executor::block_on(GLOBAL.run(futures::future::pending::<()>())) + }) + .ok(); + }); + Executor::new() + }); + GLOBAL.spawn(future) +} diff --git a/lib/src/kvs/ds.rs b/lib/src/kvs/ds.rs index ed72b6a9..fc2ec984 100644 --- a/lib/src/kvs/ds.rs +++ b/lib/src/kvs/ds.rs @@ -12,7 +12,7 @@ use crate::sql; use crate::sql::query::Query; use crate::sql::thing::Thing; use bytes::Bytes; -use tokio::sync::mpsc::Sender; +use channel::Receiver; /// The underlying datastore instance which stores the dataset. pub struct Datastore { @@ -199,166 +199,176 @@ impl Datastore { } /// Performs a full database export as SQL - pub async fn export(&self, ns: String, db: String, chn: Sender) -> Result<(), Error> { + pub async fn export(&self, ns: String, db: String) -> Result, Error> { // Start a new transaction let mut txn = self.transaction(false, false).await?; - // Output OPTIONS - { - chn.send(output!("-- ------------------------------")).await?; - chn.send(output!("-- OPTION")).await?; - chn.send(output!("-- ------------------------------")).await?; - chn.send(output!("")).await?; - chn.send(output!("OPTION IMPORT;")).await?; - chn.send(output!("")).await?; - } - // Output LOGINS - { - let dls = txn.all_dl(&ns, &db).await?; - if !dls.is_empty() { + // Create a new channel + let (chn, rcv) = channel::bounded(10); + // Spawn the export + crate::exe::spawn(async move { + // Output OPTIONS + { chn.send(output!("-- ------------------------------")).await?; - chn.send(output!("-- LOGINS")).await?; + chn.send(output!("-- OPTION")).await?; chn.send(output!("-- ------------------------------")).await?; chn.send(output!("")).await?; - for dl in dls { - chn.send(output!(format!("{};", dl))).await?; - } + chn.send(output!("OPTION IMPORT;")).await?; chn.send(output!("")).await?; } - } - // Output TOKENS - { - let dts = txn.all_dt(&ns, &db).await?; - if !dts.is_empty() { - chn.send(output!("-- ------------------------------")).await?; - chn.send(output!("-- TOKENS")).await?; - chn.send(output!("-- ------------------------------")).await?; - chn.send(output!("")).await?; - for dt in dts { - chn.send(output!(format!("{};", dt))).await?; - } - chn.send(output!("")).await?; - } - } - // Output SCOPES - { - let scs = txn.all_sc(&ns, &db).await?; - if !scs.is_empty() { - chn.send(output!("-- ------------------------------")).await?; - chn.send(output!("-- SCOPES")).await?; - chn.send(output!("-- ------------------------------")).await?; - chn.send(output!("")).await?; - for sc in scs { - chn.send(output!(format!("{};", sc))).await?; - } - chn.send(output!("")).await?; - } - } - // Output TABLES - { - let tbs = txn.all_tb(&ns, &db).await?; - if !tbs.is_empty() { - for tb in &tbs { - // Output TABLE + // Output LOGINS + { + let dls = txn.all_dl(&ns, &db).await?; + if !dls.is_empty() { chn.send(output!("-- ------------------------------")).await?; - chn.send(output!(format!("-- TABLE: {}", tb.name))).await?; + chn.send(output!("-- LOGINS")).await?; chn.send(output!("-- ------------------------------")).await?; chn.send(output!("")).await?; - chn.send(output!(format!("{};", tb))).await?; + for dl in dls { + chn.send(output!(format!("{};", dl))).await?; + } chn.send(output!("")).await?; - // Output FIELDS - { - let fds = txn.all_fd(&ns, &db, &tb.name).await?; - if !fds.is_empty() { - for fd in &fds { - chn.send(output!(format!("{};", fd))).await?; + } + } + // Output TOKENS + { + let dts = txn.all_dt(&ns, &db).await?; + if !dts.is_empty() { + chn.send(output!("-- ------------------------------")).await?; + chn.send(output!("-- TOKENS")).await?; + chn.send(output!("-- ------------------------------")).await?; + chn.send(output!("")).await?; + for dt in dts { + chn.send(output!(format!("{};", dt))).await?; + } + chn.send(output!("")).await?; + } + } + // Output SCOPES + { + let scs = txn.all_sc(&ns, &db).await?; + if !scs.is_empty() { + chn.send(output!("-- ------------------------------")).await?; + chn.send(output!("-- SCOPES")).await?; + chn.send(output!("-- ------------------------------")).await?; + chn.send(output!("")).await?; + for sc in scs { + chn.send(output!(format!("{};", sc))).await?; + } + chn.send(output!("")).await?; + } + } + // Output TABLES + { + let tbs = txn.all_tb(&ns, &db).await?; + if !tbs.is_empty() { + for tb in &tbs { + // Output TABLE + chn.send(output!("-- ------------------------------")).await?; + chn.send(output!(format!("-- TABLE: {}", tb.name))).await?; + chn.send(output!("-- ------------------------------")).await?; + chn.send(output!("")).await?; + chn.send(output!(format!("{};", tb))).await?; + chn.send(output!("")).await?; + // Output FIELDS + { + let fds = txn.all_fd(&ns, &db, &tb.name).await?; + if !fds.is_empty() { + for fd in &fds { + chn.send(output!(format!("{};", fd))).await?; + } + chn.send(output!("")).await?; + } + } + // Output INDEXS + let ixs = txn.all_fd(&ns, &db, &tb.name).await?; + if !ixs.is_empty() { + for ix in &ixs { + chn.send(output!(format!("{};", ix))).await?; + } + chn.send(output!("")).await?; + } + // Output EVENTS + let evs = txn.all_ev(&ns, &db, &tb.name).await?; + if !evs.is_empty() { + for ev in &evs { + chn.send(output!(format!("{};", ev))).await?; } chn.send(output!("")).await?; } } - // Output INDEXS - let ixs = txn.all_fd(&ns, &db, &tb.name).await?; - if !ixs.is_empty() { - for ix in &ixs { - chn.send(output!(format!("{};", ix))).await?; - } - chn.send(output!("")).await?; - } - // Output EVENTS - let evs = txn.all_ev(&ns, &db, &tb.name).await?; - if !evs.is_empty() { - for ev in &evs { - chn.send(output!(format!("{};", ev))).await?; - } - chn.send(output!("")).await?; - } - } - // Start transaction - chn.send(output!("-- ------------------------------")).await?; - chn.send(output!("-- TRANSACTION")).await?; - chn.send(output!("-- ------------------------------")).await?; - chn.send(output!("")).await?; - chn.send(output!("BEGIN TRANSACTION;")).await?; - chn.send(output!("")).await?; - // Output TABLE data - for tb in &tbs { + // Start transaction chn.send(output!("-- ------------------------------")).await?; - chn.send(output!(format!("-- TABLE DATA: {}", tb.name))).await?; + chn.send(output!("-- TRANSACTION")).await?; chn.send(output!("-- ------------------------------")).await?; chn.send(output!("")).await?; - // Fetch records - let beg = thing::prefix(&ns, &db, &tb.name); - let end = thing::suffix(&ns, &db, &tb.name); - let mut nxt: Option> = None; - loop { - let res = match nxt { - None => { - let min = beg.clone(); - let max = end.clone(); - txn.scan(min..max, 1000).await? - } - Some(ref mut beg) => { - beg.push(0x00); - let min = beg.clone(); - let max = end.clone(); - txn.scan(min..max, 1000).await? - } - }; - if !res.is_empty() { - // Get total results - let n = res.len(); - // Exit when settled - if n == 0 { - break; - } - // Loop over results - for (i, (k, v)) in res.into_iter().enumerate() { - // Ready the next - if n == i + 1 { - nxt = Some(k.clone()); + chn.send(output!("BEGIN TRANSACTION;")).await?; + chn.send(output!("")).await?; + // Output TABLE data + for tb in &tbs { + chn.send(output!("-- ------------------------------")).await?; + chn.send(output!(format!("-- TABLE DATA: {}", tb.name))).await?; + chn.send(output!("-- ------------------------------")).await?; + chn.send(output!("")).await?; + // Fetch records + let beg = thing::prefix(&ns, &db, &tb.name); + let end = thing::suffix(&ns, &db, &tb.name); + let mut nxt: Option> = None; + loop { + let res = match nxt { + None => { + let min = beg.clone(); + let max = end.clone(); + txn.scan(min..max, 1000).await? } - // Parse the key-value - let k: crate::key::thing::Thing = (&k).into(); - let v: crate::sql::value::Value = (&v).into(); - let t = Thing::from((k.tb, k.id)); - // Write record - chn.send(output!(format!("UPDATE {} CONTENT {};", t, v))).await?; + Some(ref mut beg) => { + beg.push(0x00); + let min = beg.clone(); + let max = end.clone(); + txn.scan(min..max, 1000).await? + } + }; + if !res.is_empty() { + // Get total results + let n = res.len(); + // Exit when settled + if n == 0 { + break; + } + // Loop over results + for (i, (k, v)) in res.into_iter().enumerate() { + // Ready the next + if n == i + 1 { + nxt = Some(k.clone()); + } + // Parse the key-value + let k: crate::key::thing::Thing = (&k).into(); + let v: crate::sql::value::Value = (&v).into(); + let t = Thing::from((k.tb, k.id)); + // Write record + chn.send(output!(format!("UPDATE {} CONTENT {};", t, v))) + .await?; + } + continue; } - continue; + break; } - break; + chn.send(output!("")).await?; } + // Commit transaction + chn.send(output!("-- ------------------------------")).await?; + chn.send(output!("-- TRANSACTION")).await?; + chn.send(output!("-- ------------------------------")).await?; + chn.send(output!("")).await?; + chn.send(output!("COMMIT TRANSACTION;")).await?; chn.send(output!("")).await?; } - // Commit transaction - chn.send(output!("-- ------------------------------")).await?; - chn.send(output!("-- TRANSACTION")).await?; - chn.send(output!("-- ------------------------------")).await?; - chn.send(output!("")).await?; - chn.send(output!("COMMIT TRANSACTION;")).await?; - chn.send(output!("")).await?; - } - } - // Everything fine - Ok(()) + }; + // Everything exported + Ok::<(), Error>(()) + // Task done + }) + .detach(); + // Send back the receiver + Ok(rcv) } } diff --git a/lib/src/lib.rs b/lib/src/lib.rs index 2cbb2151..0243ddeb 100644 --- a/lib/src/lib.rs +++ b/lib/src/lib.rs @@ -19,12 +19,15 @@ mod ctx; mod dbs; mod doc; mod err; +mod exe; mod fnc; mod key; mod kvs; +// SQL pub mod sql; +// Exports pub use dbs::Auth; pub use dbs::Response; pub use dbs::Session; @@ -33,3 +36,6 @@ pub use kvs::Datastore; pub use kvs::Key; pub use kvs::Transaction; pub use kvs::Val; + +// Re-exports +pub use channel::Receiver; diff --git a/src/net/export.rs b/src/net/export.rs index 200ec531..ae3fd874 100644 --- a/src/net/export.rs +++ b/src/net/export.rs @@ -25,19 +25,23 @@ async fn handler(session: Session) -> Result let dbv = session.db.clone().unwrap(); // Create a chunked response let (mut chn, bdy) = Body::channel(); - // Initiate a new async channel - let (snd, mut rcv) = tokio::sync::mpsc::channel(100); // Spawn a new database export - tokio::spawn(db.export(nsv, dbv, snd)); - // Process all processed values - tokio::spawn(async move { - while let Some(v) = rcv.recv().await { - let _ = chn.send_data(v).await; + match db.export(nsv, dbv).await { + Ok(rcv) => { + // Process all processed values + tokio::spawn(async move { + while let Ok(v) = rcv.recv().await { + let _ = chn.send_data(v).await; + } + }); + // Return the chunked body + Ok(warp::reply::Response::new(bdy)) } - }); - // Return the chunked body - Ok(warp::reply::Response::new(bdy)) + // There was en error with the export + _ => Err(warp::reject::custom(Error::InvalidAuth)), + } } + // There was an error with permissions _ => Err(warp::reject::custom(Error::InvalidAuth)), } }