diff --git a/Cargo.lock b/Cargo.lock index 7d724c5f..5d791f30 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2449,7 +2449,6 @@ dependencies = [ "async-executor", "async-recursion", "bigdecimal", - "bytes", "chrono", "dmp", "echodb", diff --git a/lib/Cargo.toml b/lib/Cargo.toml index bcbd923f..674af9a1 100644 --- a/lib/Cargo.toml +++ b/lib/Cargo.toml @@ -17,7 +17,6 @@ kv-yokudb = [] argon2 = "0.4.0" async-recursion = "1.0.0" bigdecimal = { version = "0.3.0", features = ["serde", "string-only"] } -bytes = "1.1.0" channel = { version = "1.6.1", package = "async-channel" } chrono = { version = "0.4.19", features = ["serde"] } derive = { version = "0.1.2", package = "surrealdb-derive" } diff --git a/lib/src/cnf/mod.rs b/lib/src/cnf/mod.rs index c29d158c..7176ce75 100644 --- a/lib/src/cnf/mod.rs +++ b/lib/src/cnf/mod.rs @@ -1,5 +1,5 @@ -// Specifies how many concurrent jobs can be buffered in the worker channel. #[cfg(feature = "parallel")] +// Specifies how many concurrent jobs can be buffered in the worker channel. pub const MAX_CONCURRENT_TASKS: usize = 64; // Specifies how many subqueries will be processed recursively before the query fails. diff --git a/lib/src/dbs/channel.rs b/lib/src/dbs/channel.rs index d1923278..8ce15723 100644 --- a/lib/src/dbs/channel.rs +++ b/lib/src/dbs/channel.rs @@ -14,6 +14,8 @@ use async_recursion::async_recursion; use channel::Sender; impl Value { + #[cfg_attr(feature = "parallel", async_recursion)] + #[cfg_attr(not(feature = "parallel"), async_recursion(?Send))] pub(crate) async fn channel( self, ctx: &Context<'_>, @@ -36,7 +38,8 @@ impl Value { } impl Array { - #[async_recursion] + #[cfg_attr(feature = "parallel", async_recursion)] + #[cfg_attr(not(feature = "parallel"), async_recursion(?Send))] pub(crate) async fn process( self, ctx: &Context<'_>, diff --git a/lib/src/dbs/iterator.rs b/lib/src/dbs/iterator.rs index 286cbd88..c89e9dd9 100644 --- a/lib/src/dbs/iterator.rs +++ b/lib/src/dbs/iterator.rs @@ -1,4 +1,3 @@ -use crate::cnf::MAX_CONCURRENT_TASKS; use crate::ctx::Canceller; use crate::ctx::Context; use crate::dbs::Options; @@ -13,8 +12,6 @@ use crate::sql::part::Part; use crate::sql::table::Table; use crate::sql::thing::Thing; use crate::sql::value::Value; -use executor::Executor; -use futures::join; use std::cmp::Ordering; use std::collections::BTreeMap; use std::mem; @@ -355,13 +352,13 @@ impl Iterator { // Run statements in parallel true => { // Create a new executor - let exe = Executor::new(); + let exe = executor::Executor::new(); // Take all of the iterator values let vals = mem::take(&mut self.readies); // Create a channel to shutdown let (end, exit) = channel::bounded::<()>(1); // Create an unbounded channel - let (chn, docs) = channel::bounded(MAX_CONCURRENT_TASKS); + let (chn, docs) = channel::bounded(crate::cnf::MAX_CONCURRENT_TASKS); // Create an async closure for prepared values let adocs = async { // Process all prepared values @@ -374,7 +371,7 @@ impl Iterator { drop(chn); }; // Create an unbounded channel - let (chn, vals) = channel::bounded(MAX_CONCURRENT_TASKS); + let (chn, vals) = channel::bounded(crate::cnf::MAX_CONCURRENT_TASKS); // Create an async closure for received values let avals = async { // Process all received values @@ -398,7 +395,7 @@ impl Iterator { // Run all executor tasks let fut = exe.run(exit.recv()); // Wait for all closures - let res = join!(adocs, avals, aproc, fut); + let res = futures::join!(adocs, avals, aproc, fut); // Consume executor error let _ = res.3; // Everything processed ok diff --git a/lib/src/err/mod.rs b/lib/src/err/mod.rs index 0434c700..8464001e 100644 --- a/lib/src/err/mod.rs +++ b/lib/src/err/mod.rs @@ -206,7 +206,7 @@ pub enum Error { }, /// There was an error processing a value in parallel - #[error("There was an error processing a value in parallel ")] + #[error("There was an error processing a value in parallel")] Channel(String), /// Represents an underlying error with Serde encoding / decoding @@ -255,8 +255,8 @@ impl From for Error { } } -impl From> for Error { - fn from(e: channel::SendError) -> Error { +impl From>> for Error { + fn from(e: channel::SendError>) -> Error { Error::Channel(e.to_string()) } } diff --git a/lib/src/exe/mod.rs b/lib/src/exe/mod.rs deleted file mode 100644 index 2961a65f..00000000 --- a/lib/src/exe/mod.rs +++ /dev/null @@ -1,17 +0,0 @@ -use executor::{Executor, Task}; -use once_cell::sync::Lazy; -use std::future::Future; -use std::panic::catch_unwind; - -pub fn spawn(future: impl Future + Send + 'static) -> Task { - static GLOBAL: Lazy> = Lazy::new(|| { - std::thread::spawn(|| { - catch_unwind(|| { - futures::executor::block_on(GLOBAL.run(futures::future::pending::<()>())) - }) - .ok(); - }); - Executor::new() - }); - GLOBAL.spawn(future) -} diff --git a/lib/src/kvs/ds.rs b/lib/src/kvs/ds.rs index a06038a1..84f64605 100644 --- a/lib/src/kvs/ds.rs +++ b/lib/src/kvs/ds.rs @@ -7,12 +7,9 @@ use crate::dbs::Response; use crate::dbs::Session; use crate::dbs::Variables; use crate::err::Error; -use crate::key::thing; use crate::sql; use crate::sql::query::Query; -use crate::sql::thing::Thing; -use bytes::Bytes; -use channel::Receiver; +use channel::Sender; /// The underlying datastore instance which stores the dataset. pub struct Datastore { @@ -195,176 +192,12 @@ impl Datastore { } /// Performs a full database export as SQL - pub async fn export(&self, ns: String, db: String) -> Result, Error> { + pub async fn export(&self, ns: String, db: String, chn: Sender>) -> Result<(), Error> { // Start a new transaction let mut txn = self.transaction(false, false).await?; - // Create a new channel - let (chn, rcv) = channel::bounded(10); - // Spawn the export - crate::exe::spawn(async move { - // Output OPTIONS - { - chn.send(output!("-- ------------------------------")).await?; - chn.send(output!("-- OPTION")).await?; - chn.send(output!("-- ------------------------------")).await?; - chn.send(output!("")).await?; - chn.send(output!("OPTION IMPORT;")).await?; - chn.send(output!("")).await?; - } - // Output LOGINS - { - let dls = txn.all_dl(&ns, &db).await?; - if !dls.is_empty() { - chn.send(output!("-- ------------------------------")).await?; - chn.send(output!("-- LOGINS")).await?; - chn.send(output!("-- ------------------------------")).await?; - chn.send(output!("")).await?; - for dl in dls { - chn.send(output!(format!("{};", dl))).await?; - } - chn.send(output!("")).await?; - } - } - // Output TOKENS - { - let dts = txn.all_dt(&ns, &db).await?; - if !dts.is_empty() { - chn.send(output!("-- ------------------------------")).await?; - chn.send(output!("-- TOKENS")).await?; - chn.send(output!("-- ------------------------------")).await?; - chn.send(output!("")).await?; - for dt in dts { - chn.send(output!(format!("{};", dt))).await?; - } - chn.send(output!("")).await?; - } - } - // Output SCOPES - { - let scs = txn.all_sc(&ns, &db).await?; - if !scs.is_empty() { - chn.send(output!("-- ------------------------------")).await?; - chn.send(output!("-- SCOPES")).await?; - chn.send(output!("-- ------------------------------")).await?; - chn.send(output!("")).await?; - for sc in scs { - chn.send(output!(format!("{};", sc))).await?; - } - chn.send(output!("")).await?; - } - } - // Output TABLES - { - let tbs = txn.all_tb(&ns, &db).await?; - if !tbs.is_empty() { - for tb in &tbs { - // Output TABLE - chn.send(output!("-- ------------------------------")).await?; - chn.send(output!(format!("-- TABLE: {}", tb.name))).await?; - chn.send(output!("-- ------------------------------")).await?; - chn.send(output!("")).await?; - chn.send(output!(format!("{};", tb))).await?; - chn.send(output!("")).await?; - // Output FIELDS - { - let fds = txn.all_fd(&ns, &db, &tb.name).await?; - if !fds.is_empty() { - for fd in &fds { - chn.send(output!(format!("{};", fd))).await?; - } - chn.send(output!("")).await?; - } - } - // Output INDEXS - let ixs = txn.all_fd(&ns, &db, &tb.name).await?; - if !ixs.is_empty() { - for ix in &ixs { - chn.send(output!(format!("{};", ix))).await?; - } - chn.send(output!("")).await?; - } - // Output EVENTS - let evs = txn.all_ev(&ns, &db, &tb.name).await?; - if !evs.is_empty() { - for ev in &evs { - chn.send(output!(format!("{};", ev))).await?; - } - chn.send(output!("")).await?; - } - } - // Start transaction - chn.send(output!("-- ------------------------------")).await?; - chn.send(output!("-- TRANSACTION")).await?; - chn.send(output!("-- ------------------------------")).await?; - chn.send(output!("")).await?; - chn.send(output!("BEGIN TRANSACTION;")).await?; - chn.send(output!("")).await?; - // Output TABLE data - for tb in &tbs { - chn.send(output!("-- ------------------------------")).await?; - chn.send(output!(format!("-- TABLE DATA: {}", tb.name))).await?; - chn.send(output!("-- ------------------------------")).await?; - chn.send(output!("")).await?; - // Fetch records - let beg = thing::prefix(&ns, &db, &tb.name); - let end = thing::suffix(&ns, &db, &tb.name); - let mut nxt: Option> = None; - loop { - let res = match nxt { - None => { - let min = beg.clone(); - let max = end.clone(); - txn.scan(min..max, 1000).await? - } - Some(ref mut beg) => { - beg.push(0x00); - let min = beg.clone(); - let max = end.clone(); - txn.scan(min..max, 1000).await? - } - }; - if !res.is_empty() { - // Get total results - let n = res.len(); - // Exit when settled - if n == 0 { - break; - } - // Loop over results - for (i, (k, v)) in res.into_iter().enumerate() { - // Ready the next - if n == i + 1 { - nxt = Some(k.clone()); - } - // Parse the key-value - let k: crate::key::thing::Thing = (&k).into(); - let v: crate::sql::value::Value = (&v).into(); - let t = Thing::from((k.tb, k.id)); - // Write record - chn.send(output!(format!("UPDATE {} CONTENT {};", t, v))) - .await?; - } - continue; - } - break; - } - chn.send(output!("")).await?; - } - // Commit transaction - chn.send(output!("-- ------------------------------")).await?; - chn.send(output!("-- TRANSACTION")).await?; - chn.send(output!("-- ------------------------------")).await?; - chn.send(output!("")).await?; - chn.send(output!("COMMIT TRANSACTION;")).await?; - chn.send(output!("")).await?; - } - }; - // Everything exported - Ok::<(), Error>(()) - // Task done - }) - .detach(); - // Send back the receiver - Ok(rcv) + // Process the export + txn.export(&ns, &db, chn).await?; + // Everythign ok + Ok(()) } } diff --git a/lib/src/kvs/tx.rs b/lib/src/kvs/tx.rs index 5570a9bd..cede5410 100644 --- a/lib/src/kvs/tx.rs +++ b/lib/src/kvs/tx.rs @@ -3,7 +3,10 @@ use super::kv::Convert; use super::Key; use super::Val; use crate::err::Error; +use crate::key::thing; use crate::sql; +use crate::sql::thing::Thing; +use channel::Sender; use sql::statements::DefineDatabaseStatement; use sql::statements::DefineEventStatement; use sql::statements::DefineFieldStatement; @@ -750,4 +753,165 @@ impl Transaction { .await; Ok(()) } + /// Writes the full database contents as binary SQL. + pub async fn export(&mut self, ns: &str, db: &str, chn: Sender>) -> Result<(), Error> { + // Output OPTIONS + { + chn.send(bytes!("-- ------------------------------")).await?; + chn.send(bytes!("-- OPTION")).await?; + chn.send(bytes!("-- ------------------------------")).await?; + chn.send(bytes!("")).await?; + chn.send(bytes!("OPTION IMPORT;")).await?; + chn.send(bytes!("")).await?; + } + // Output LOGINS + { + let dls = self.all_dl(ns, db).await?; + if !dls.is_empty() { + chn.send(bytes!("-- ------------------------------")).await?; + chn.send(bytes!("-- LOGINS")).await?; + chn.send(bytes!("-- ------------------------------")).await?; + chn.send(bytes!("")).await?; + for dl in dls { + chn.send(bytes!(format!("{};", dl))).await?; + } + chn.send(bytes!("")).await?; + } + } + // Output TOKENS + { + let dts = self.all_dt(ns, db).await?; + if !dts.is_empty() { + chn.send(bytes!("-- ------------------------------")).await?; + chn.send(bytes!("-- TOKENS")).await?; + chn.send(bytes!("-- ------------------------------")).await?; + chn.send(bytes!("")).await?; + for dt in dts { + chn.send(bytes!(format!("{};", dt))).await?; + } + chn.send(bytes!("")).await?; + } + } + // Output SCOPES + { + let scs = self.all_sc(ns, db).await?; + if !scs.is_empty() { + chn.send(bytes!("-- ------------------------------")).await?; + chn.send(bytes!("-- SCOPES")).await?; + chn.send(bytes!("-- ------------------------------")).await?; + chn.send(bytes!("")).await?; + for sc in scs { + chn.send(bytes!(format!("{};", sc))).await?; + } + chn.send(bytes!("")).await?; + } + } + // Output TABLES + { + let tbs = self.all_tb(ns, db).await?; + if !tbs.is_empty() { + for tb in &tbs { + // Output TABLE + chn.send(bytes!("-- ------------------------------")).await?; + chn.send(bytes!(format!("-- TABLE: {}", tb.name))).await?; + chn.send(bytes!("-- ------------------------------")).await?; + chn.send(bytes!("")).await?; + chn.send(bytes!(format!("{};", tb))).await?; + chn.send(bytes!("")).await?; + // Output FIELDS + { + let fds = self.all_fd(ns, db, &tb.name).await?; + if !fds.is_empty() { + for fd in &fds { + chn.send(bytes!(format!("{};", fd))).await?; + } + chn.send(bytes!("")).await?; + } + } + // Output INDEXS + let ixs = self.all_fd(ns, db, &tb.name).await?; + if !ixs.is_empty() { + for ix in &ixs { + chn.send(bytes!(format!("{};", ix))).await?; + } + chn.send(bytes!("")).await?; + } + // Output EVENTS + let evs = self.all_ev(ns, db, &tb.name).await?; + if !evs.is_empty() { + for ev in &evs { + chn.send(bytes!(format!("{};", ev))).await?; + } + chn.send(bytes!("")).await?; + } + } + // Start transaction + chn.send(bytes!("-- ------------------------------")).await?; + chn.send(bytes!("-- TRANSACTION")).await?; + chn.send(bytes!("-- ------------------------------")).await?; + chn.send(bytes!("")).await?; + chn.send(bytes!("BEGIN TRANSACTION;")).await?; + chn.send(bytes!("")).await?; + // Output TABLE data + for tb in &tbs { + chn.send(bytes!("-- ------------------------------")).await?; + chn.send(bytes!(format!("-- TABLE DATA: {}", tb.name))).await?; + chn.send(bytes!("-- ------------------------------")).await?; + chn.send(bytes!("")).await?; + // Fetch records + let beg = thing::prefix(ns, db, &tb.name); + let end = thing::suffix(ns, db, &tb.name); + let mut nxt: Option> = None; + loop { + let res = match nxt { + None => { + let min = beg.clone(); + let max = end.clone(); + self.scan(min..max, 1000).await? + } + Some(ref mut beg) => { + beg.push(0x00); + let min = beg.clone(); + let max = end.clone(); + self.scan(min..max, 1000).await? + } + }; + if !res.is_empty() { + // Get total results + let n = res.len(); + // Exit when settled + if n == 0 { + break; + } + // Loop over results + for (i, (k, v)) in res.into_iter().enumerate() { + // Ready the next + if n == i + 1 { + nxt = Some(k.clone()); + } + // Parse the key-value + let k: crate::key::thing::Thing = (&k).into(); + let v: crate::sql::value::Value = (&v).into(); + let t = Thing::from((k.tb, k.id)); + // Write record + chn.send(bytes!(format!("UPDATE {} CONTENT {};", t, v))).await?; + } + continue; + } + break; + } + chn.send(bytes!("")).await?; + } + // Commit transaction + chn.send(bytes!("-- ------------------------------")).await?; + chn.send(bytes!("-- TRANSACTION")).await?; + chn.send(bytes!("-- ------------------------------")).await?; + chn.send(bytes!("")).await?; + chn.send(bytes!("COMMIT TRANSACTION;")).await?; + chn.send(bytes!("")).await?; + } + } + // Everything exported + Ok(()) + } } diff --git a/lib/src/lib.rs b/lib/src/lib.rs index 0243ddeb..b317298b 100644 --- a/lib/src/lib.rs +++ b/lib/src/lib.rs @@ -19,7 +19,6 @@ mod ctx; mod dbs; mod doc; mod err; -mod exe; mod fnc; mod key; mod kvs; @@ -38,4 +37,8 @@ pub use kvs::Transaction; pub use kvs::Val; // Re-exports -pub use channel::Receiver; +pub mod channel { + pub use channel::bounded as new; + pub use channel::Receiver; + pub use channel::Sender; +} diff --git a/lib/src/mac/mod.rs b/lib/src/mac/mod.rs index 9e169717..0b9e1bb8 100644 --- a/lib/src/mac/mod.rs +++ b/lib/src/mac/mod.rs @@ -1,6 +1,6 @@ -macro_rules! output { +macro_rules! bytes { ($expression:expr) => { - bytes::Bytes::from(format!("{}\n", $expression)) + format!("{}\n", $expression).into_bytes() }; } diff --git a/src/net/export.rs b/src/net/export.rs index ae3fd874..5995a3bf 100644 --- a/src/net/export.rs +++ b/src/net/export.rs @@ -1,6 +1,7 @@ use crate::err::Error; use crate::net::session; use crate::net::DB; +use bytes::Bytes; use hyper::body::Body; use surrealdb::Session; use warp::Filter; @@ -25,21 +26,18 @@ async fn handler(session: Session) -> Result let dbv = session.db.clone().unwrap(); // Create a chunked response let (mut chn, bdy) = Body::channel(); + // Create a new bounded channel + let (snd, rcv) = surrealdb::channel::new(1); // Spawn a new database export - match db.export(nsv, dbv).await { - Ok(rcv) => { - // Process all processed values - tokio::spawn(async move { - while let Ok(v) = rcv.recv().await { - let _ = chn.send_data(v).await; - } - }); - // Return the chunked body - Ok(warp::reply::Response::new(bdy)) + tokio::spawn(db.export(nsv, dbv, snd)); + // Process all processed values + tokio::spawn(async move { + while let Ok(v) = rcv.recv().await { + let _ = chn.send_data(Bytes::from(v)).await; } - // There was en error with the export - _ => Err(warp::reject::custom(Error::InvalidAuth)), - } + }); + // Return the chunked body + Ok(warp::reply::Response::new(bdy)) } // There was an error with permissions _ => Err(warp::reject::custom(Error::InvalidAuth)),