Ensure library works in multi-threaded and single-threaded environments

This commit is contained in:
Tobie Morgan Hitchcock 2022-05-15 20:38:46 +01:00
parent 15fc4a0126
commit 52e8954ae4
12 changed files with 200 additions and 221 deletions

1
Cargo.lock generated
View file

@ -2449,7 +2449,6 @@ dependencies = [
"async-executor", "async-executor",
"async-recursion", "async-recursion",
"bigdecimal", "bigdecimal",
"bytes",
"chrono", "chrono",
"dmp", "dmp",
"echodb", "echodb",

View file

@ -17,7 +17,6 @@ kv-yokudb = []
argon2 = "0.4.0" argon2 = "0.4.0"
async-recursion = "1.0.0" async-recursion = "1.0.0"
bigdecimal = { version = "0.3.0", features = ["serde", "string-only"] } bigdecimal = { version = "0.3.0", features = ["serde", "string-only"] }
bytes = "1.1.0"
channel = { version = "1.6.1", package = "async-channel" } channel = { version = "1.6.1", package = "async-channel" }
chrono = { version = "0.4.19", features = ["serde"] } chrono = { version = "0.4.19", features = ["serde"] }
derive = { version = "0.1.2", package = "surrealdb-derive" } derive = { version = "0.1.2", package = "surrealdb-derive" }

View file

@ -1,5 +1,5 @@
// Specifies how many concurrent jobs can be buffered in the worker channel.
#[cfg(feature = "parallel")] #[cfg(feature = "parallel")]
// Specifies how many concurrent jobs can be buffered in the worker channel.
pub const MAX_CONCURRENT_TASKS: usize = 64; pub const MAX_CONCURRENT_TASKS: usize = 64;
// Specifies how many subqueries will be processed recursively before the query fails. // Specifies how many subqueries will be processed recursively before the query fails.

View file

@ -14,6 +14,8 @@ use async_recursion::async_recursion;
use channel::Sender; use channel::Sender;
impl Value { impl Value {
#[cfg_attr(feature = "parallel", async_recursion)]
#[cfg_attr(not(feature = "parallel"), async_recursion(?Send))]
pub(crate) async fn channel( pub(crate) async fn channel(
self, self,
ctx: &Context<'_>, ctx: &Context<'_>,
@ -36,7 +38,8 @@ impl Value {
} }
impl Array { impl Array {
#[async_recursion] #[cfg_attr(feature = "parallel", async_recursion)]
#[cfg_attr(not(feature = "parallel"), async_recursion(?Send))]
pub(crate) async fn process( pub(crate) async fn process(
self, self,
ctx: &Context<'_>, ctx: &Context<'_>,

View file

@ -1,4 +1,3 @@
use crate::cnf::MAX_CONCURRENT_TASKS;
use crate::ctx::Canceller; use crate::ctx::Canceller;
use crate::ctx::Context; use crate::ctx::Context;
use crate::dbs::Options; use crate::dbs::Options;
@ -13,8 +12,6 @@ use crate::sql::part::Part;
use crate::sql::table::Table; use crate::sql::table::Table;
use crate::sql::thing::Thing; use crate::sql::thing::Thing;
use crate::sql::value::Value; use crate::sql::value::Value;
use executor::Executor;
use futures::join;
use std::cmp::Ordering; use std::cmp::Ordering;
use std::collections::BTreeMap; use std::collections::BTreeMap;
use std::mem; use std::mem;
@ -355,13 +352,13 @@ impl Iterator {
// Run statements in parallel // Run statements in parallel
true => { true => {
// Create a new executor // Create a new executor
let exe = Executor::new(); let exe = executor::Executor::new();
// Take all of the iterator values // Take all of the iterator values
let vals = mem::take(&mut self.readies); let vals = mem::take(&mut self.readies);
// Create a channel to shutdown // Create a channel to shutdown
let (end, exit) = channel::bounded::<()>(1); let (end, exit) = channel::bounded::<()>(1);
// Create an unbounded channel // Create an unbounded channel
let (chn, docs) = channel::bounded(MAX_CONCURRENT_TASKS); let (chn, docs) = channel::bounded(crate::cnf::MAX_CONCURRENT_TASKS);
// Create an async closure for prepared values // Create an async closure for prepared values
let adocs = async { let adocs = async {
// Process all prepared values // Process all prepared values
@ -374,7 +371,7 @@ impl Iterator {
drop(chn); drop(chn);
}; };
// Create an unbounded channel // Create an unbounded channel
let (chn, vals) = channel::bounded(MAX_CONCURRENT_TASKS); let (chn, vals) = channel::bounded(crate::cnf::MAX_CONCURRENT_TASKS);
// Create an async closure for received values // Create an async closure for received values
let avals = async { let avals = async {
// Process all received values // Process all received values
@ -398,7 +395,7 @@ impl Iterator {
// Run all executor tasks // Run all executor tasks
let fut = exe.run(exit.recv()); let fut = exe.run(exit.recv());
// Wait for all closures // Wait for all closures
let res = join!(adocs, avals, aproc, fut); let res = futures::join!(adocs, avals, aproc, fut);
// Consume executor error // Consume executor error
let _ = res.3; let _ = res.3;
// Everything processed ok // Everything processed ok

View file

@ -206,7 +206,7 @@ pub enum Error {
}, },
/// There was an error processing a value in parallel /// There was an error processing a value in parallel
#[error("There was an error processing a value in parallel ")] #[error("There was an error processing a value in parallel")]
Channel(String), Channel(String),
/// Represents an underlying error with Serde encoding / decoding /// Represents an underlying error with Serde encoding / decoding
@ -255,8 +255,8 @@ impl From<channel::RecvError> for Error {
} }
} }
impl From<channel::SendError<bytes::Bytes>> for Error { impl From<channel::SendError<Vec<u8>>> for Error {
fn from(e: channel::SendError<bytes::Bytes>) -> Error { fn from(e: channel::SendError<Vec<u8>>) -> Error {
Error::Channel(e.to_string()) Error::Channel(e.to_string())
} }
} }

View file

@ -1,17 +0,0 @@
use executor::{Executor, Task};
use once_cell::sync::Lazy;
use std::future::Future;
use std::panic::catch_unwind;
pub fn spawn<T: Send + 'static>(future: impl Future<Output = T> + Send + 'static) -> Task<T> {
static GLOBAL: Lazy<Executor<'_>> = Lazy::new(|| {
std::thread::spawn(|| {
catch_unwind(|| {
futures::executor::block_on(GLOBAL.run(futures::future::pending::<()>()))
})
.ok();
});
Executor::new()
});
GLOBAL.spawn(future)
}

View file

@ -7,12 +7,9 @@ use crate::dbs::Response;
use crate::dbs::Session; use crate::dbs::Session;
use crate::dbs::Variables; use crate::dbs::Variables;
use crate::err::Error; use crate::err::Error;
use crate::key::thing;
use crate::sql; use crate::sql;
use crate::sql::query::Query; use crate::sql::query::Query;
use crate::sql::thing::Thing; use channel::Sender;
use bytes::Bytes;
use channel::Receiver;
/// The underlying datastore instance which stores the dataset. /// The underlying datastore instance which stores the dataset.
pub struct Datastore { pub struct Datastore {
@ -195,176 +192,12 @@ impl Datastore {
} }
/// Performs a full database export as SQL /// Performs a full database export as SQL
pub async fn export(&self, ns: String, db: String) -> Result<Receiver<Bytes>, Error> { pub async fn export(&self, ns: String, db: String, chn: Sender<Vec<u8>>) -> Result<(), Error> {
// Start a new transaction // Start a new transaction
let mut txn = self.transaction(false, false).await?; let mut txn = self.transaction(false, false).await?;
// Create a new channel // Process the export
let (chn, rcv) = channel::bounded(10); txn.export(&ns, &db, chn).await?;
// Spawn the export // Everythign ok
crate::exe::spawn(async move { Ok(())
// Output OPTIONS
{
chn.send(output!("-- ------------------------------")).await?;
chn.send(output!("-- OPTION")).await?;
chn.send(output!("-- ------------------------------")).await?;
chn.send(output!("")).await?;
chn.send(output!("OPTION IMPORT;")).await?;
chn.send(output!("")).await?;
}
// Output LOGINS
{
let dls = txn.all_dl(&ns, &db).await?;
if !dls.is_empty() {
chn.send(output!("-- ------------------------------")).await?;
chn.send(output!("-- LOGINS")).await?;
chn.send(output!("-- ------------------------------")).await?;
chn.send(output!("")).await?;
for dl in dls {
chn.send(output!(format!("{};", dl))).await?;
}
chn.send(output!("")).await?;
}
}
// Output TOKENS
{
let dts = txn.all_dt(&ns, &db).await?;
if !dts.is_empty() {
chn.send(output!("-- ------------------------------")).await?;
chn.send(output!("-- TOKENS")).await?;
chn.send(output!("-- ------------------------------")).await?;
chn.send(output!("")).await?;
for dt in dts {
chn.send(output!(format!("{};", dt))).await?;
}
chn.send(output!("")).await?;
}
}
// Output SCOPES
{
let scs = txn.all_sc(&ns, &db).await?;
if !scs.is_empty() {
chn.send(output!("-- ------------------------------")).await?;
chn.send(output!("-- SCOPES")).await?;
chn.send(output!("-- ------------------------------")).await?;
chn.send(output!("")).await?;
for sc in scs {
chn.send(output!(format!("{};", sc))).await?;
}
chn.send(output!("")).await?;
}
}
// Output TABLES
{
let tbs = txn.all_tb(&ns, &db).await?;
if !tbs.is_empty() {
for tb in &tbs {
// Output TABLE
chn.send(output!("-- ------------------------------")).await?;
chn.send(output!(format!("-- TABLE: {}", tb.name))).await?;
chn.send(output!("-- ------------------------------")).await?;
chn.send(output!("")).await?;
chn.send(output!(format!("{};", tb))).await?;
chn.send(output!("")).await?;
// Output FIELDS
{
let fds = txn.all_fd(&ns, &db, &tb.name).await?;
if !fds.is_empty() {
for fd in &fds {
chn.send(output!(format!("{};", fd))).await?;
}
chn.send(output!("")).await?;
}
}
// Output INDEXS
let ixs = txn.all_fd(&ns, &db, &tb.name).await?;
if !ixs.is_empty() {
for ix in &ixs {
chn.send(output!(format!("{};", ix))).await?;
}
chn.send(output!("")).await?;
}
// Output EVENTS
let evs = txn.all_ev(&ns, &db, &tb.name).await?;
if !evs.is_empty() {
for ev in &evs {
chn.send(output!(format!("{};", ev))).await?;
}
chn.send(output!("")).await?;
}
}
// Start transaction
chn.send(output!("-- ------------------------------")).await?;
chn.send(output!("-- TRANSACTION")).await?;
chn.send(output!("-- ------------------------------")).await?;
chn.send(output!("")).await?;
chn.send(output!("BEGIN TRANSACTION;")).await?;
chn.send(output!("")).await?;
// Output TABLE data
for tb in &tbs {
chn.send(output!("-- ------------------------------")).await?;
chn.send(output!(format!("-- TABLE DATA: {}", tb.name))).await?;
chn.send(output!("-- ------------------------------")).await?;
chn.send(output!("")).await?;
// Fetch records
let beg = thing::prefix(&ns, &db, &tb.name);
let end = thing::suffix(&ns, &db, &tb.name);
let mut nxt: Option<Vec<u8>> = None;
loop {
let res = match nxt {
None => {
let min = beg.clone();
let max = end.clone();
txn.scan(min..max, 1000).await?
}
Some(ref mut beg) => {
beg.push(0x00);
let min = beg.clone();
let max = end.clone();
txn.scan(min..max, 1000).await?
}
};
if !res.is_empty() {
// Get total results
let n = res.len();
// Exit when settled
if n == 0 {
break;
}
// Loop over results
for (i, (k, v)) in res.into_iter().enumerate() {
// Ready the next
if n == i + 1 {
nxt = Some(k.clone());
}
// Parse the key-value
let k: crate::key::thing::Thing = (&k).into();
let v: crate::sql::value::Value = (&v).into();
let t = Thing::from((k.tb, k.id));
// Write record
chn.send(output!(format!("UPDATE {} CONTENT {};", t, v)))
.await?;
}
continue;
}
break;
}
chn.send(output!("")).await?;
}
// Commit transaction
chn.send(output!("-- ------------------------------")).await?;
chn.send(output!("-- TRANSACTION")).await?;
chn.send(output!("-- ------------------------------")).await?;
chn.send(output!("")).await?;
chn.send(output!("COMMIT TRANSACTION;")).await?;
chn.send(output!("")).await?;
}
};
// Everything exported
Ok::<(), Error>(())
// Task done
})
.detach();
// Send back the receiver
Ok(rcv)
} }
} }

View file

@ -3,7 +3,10 @@ use super::kv::Convert;
use super::Key; use super::Key;
use super::Val; use super::Val;
use crate::err::Error; use crate::err::Error;
use crate::key::thing;
use crate::sql; use crate::sql;
use crate::sql::thing::Thing;
use channel::Sender;
use sql::statements::DefineDatabaseStatement; use sql::statements::DefineDatabaseStatement;
use sql::statements::DefineEventStatement; use sql::statements::DefineEventStatement;
use sql::statements::DefineFieldStatement; use sql::statements::DefineFieldStatement;
@ -750,4 +753,165 @@ impl Transaction {
.await; .await;
Ok(()) Ok(())
} }
/// Writes the full database contents as binary SQL.
pub async fn export(&mut self, ns: &str, db: &str, chn: Sender<Vec<u8>>) -> Result<(), Error> {
// Output OPTIONS
{
chn.send(bytes!("-- ------------------------------")).await?;
chn.send(bytes!("-- OPTION")).await?;
chn.send(bytes!("-- ------------------------------")).await?;
chn.send(bytes!("")).await?;
chn.send(bytes!("OPTION IMPORT;")).await?;
chn.send(bytes!("")).await?;
}
// Output LOGINS
{
let dls = self.all_dl(ns, db).await?;
if !dls.is_empty() {
chn.send(bytes!("-- ------------------------------")).await?;
chn.send(bytes!("-- LOGINS")).await?;
chn.send(bytes!("-- ------------------------------")).await?;
chn.send(bytes!("")).await?;
for dl in dls {
chn.send(bytes!(format!("{};", dl))).await?;
}
chn.send(bytes!("")).await?;
}
}
// Output TOKENS
{
let dts = self.all_dt(ns, db).await?;
if !dts.is_empty() {
chn.send(bytes!("-- ------------------------------")).await?;
chn.send(bytes!("-- TOKENS")).await?;
chn.send(bytes!("-- ------------------------------")).await?;
chn.send(bytes!("")).await?;
for dt in dts {
chn.send(bytes!(format!("{};", dt))).await?;
}
chn.send(bytes!("")).await?;
}
}
// Output SCOPES
{
let scs = self.all_sc(ns, db).await?;
if !scs.is_empty() {
chn.send(bytes!("-- ------------------------------")).await?;
chn.send(bytes!("-- SCOPES")).await?;
chn.send(bytes!("-- ------------------------------")).await?;
chn.send(bytes!("")).await?;
for sc in scs {
chn.send(bytes!(format!("{};", sc))).await?;
}
chn.send(bytes!("")).await?;
}
}
// Output TABLES
{
let tbs = self.all_tb(ns, db).await?;
if !tbs.is_empty() {
for tb in &tbs {
// Output TABLE
chn.send(bytes!("-- ------------------------------")).await?;
chn.send(bytes!(format!("-- TABLE: {}", tb.name))).await?;
chn.send(bytes!("-- ------------------------------")).await?;
chn.send(bytes!("")).await?;
chn.send(bytes!(format!("{};", tb))).await?;
chn.send(bytes!("")).await?;
// Output FIELDS
{
let fds = self.all_fd(ns, db, &tb.name).await?;
if !fds.is_empty() {
for fd in &fds {
chn.send(bytes!(format!("{};", fd))).await?;
}
chn.send(bytes!("")).await?;
}
}
// Output INDEXS
let ixs = self.all_fd(ns, db, &tb.name).await?;
if !ixs.is_empty() {
for ix in &ixs {
chn.send(bytes!(format!("{};", ix))).await?;
}
chn.send(bytes!("")).await?;
}
// Output EVENTS
let evs = self.all_ev(ns, db, &tb.name).await?;
if !evs.is_empty() {
for ev in &evs {
chn.send(bytes!(format!("{};", ev))).await?;
}
chn.send(bytes!("")).await?;
}
}
// Start transaction
chn.send(bytes!("-- ------------------------------")).await?;
chn.send(bytes!("-- TRANSACTION")).await?;
chn.send(bytes!("-- ------------------------------")).await?;
chn.send(bytes!("")).await?;
chn.send(bytes!("BEGIN TRANSACTION;")).await?;
chn.send(bytes!("")).await?;
// Output TABLE data
for tb in &tbs {
chn.send(bytes!("-- ------------------------------")).await?;
chn.send(bytes!(format!("-- TABLE DATA: {}", tb.name))).await?;
chn.send(bytes!("-- ------------------------------")).await?;
chn.send(bytes!("")).await?;
// Fetch records
let beg = thing::prefix(ns, db, &tb.name);
let end = thing::suffix(ns, db, &tb.name);
let mut nxt: Option<Vec<u8>> = None;
loop {
let res = match nxt {
None => {
let min = beg.clone();
let max = end.clone();
self.scan(min..max, 1000).await?
}
Some(ref mut beg) => {
beg.push(0x00);
let min = beg.clone();
let max = end.clone();
self.scan(min..max, 1000).await?
}
};
if !res.is_empty() {
// Get total results
let n = res.len();
// Exit when settled
if n == 0 {
break;
}
// Loop over results
for (i, (k, v)) in res.into_iter().enumerate() {
// Ready the next
if n == i + 1 {
nxt = Some(k.clone());
}
// Parse the key-value
let k: crate::key::thing::Thing = (&k).into();
let v: crate::sql::value::Value = (&v).into();
let t = Thing::from((k.tb, k.id));
// Write record
chn.send(bytes!(format!("UPDATE {} CONTENT {};", t, v))).await?;
}
continue;
}
break;
}
chn.send(bytes!("")).await?;
}
// Commit transaction
chn.send(bytes!("-- ------------------------------")).await?;
chn.send(bytes!("-- TRANSACTION")).await?;
chn.send(bytes!("-- ------------------------------")).await?;
chn.send(bytes!("")).await?;
chn.send(bytes!("COMMIT TRANSACTION;")).await?;
chn.send(bytes!("")).await?;
}
}
// Everything exported
Ok(())
}
} }

View file

@ -19,7 +19,6 @@ mod ctx;
mod dbs; mod dbs;
mod doc; mod doc;
mod err; mod err;
mod exe;
mod fnc; mod fnc;
mod key; mod key;
mod kvs; mod kvs;
@ -38,4 +37,8 @@ pub use kvs::Transaction;
pub use kvs::Val; pub use kvs::Val;
// Re-exports // Re-exports
pub use channel::Receiver; pub mod channel {
pub use channel::bounded as new;
pub use channel::Receiver;
pub use channel::Sender;
}

View file

@ -1,6 +1,6 @@
macro_rules! output { macro_rules! bytes {
($expression:expr) => { ($expression:expr) => {
bytes::Bytes::from(format!("{}\n", $expression)) format!("{}\n", $expression).into_bytes()
}; };
} }

View file

@ -1,6 +1,7 @@
use crate::err::Error; use crate::err::Error;
use crate::net::session; use crate::net::session;
use crate::net::DB; use crate::net::DB;
use bytes::Bytes;
use hyper::body::Body; use hyper::body::Body;
use surrealdb::Session; use surrealdb::Session;
use warp::Filter; use warp::Filter;
@ -25,22 +26,19 @@ async fn handler(session: Session) -> Result<impl warp::Reply, warp::Rejection>
let dbv = session.db.clone().unwrap(); let dbv = session.db.clone().unwrap();
// Create a chunked response // Create a chunked response
let (mut chn, bdy) = Body::channel(); let (mut chn, bdy) = Body::channel();
// Create a new bounded channel
let (snd, rcv) = surrealdb::channel::new(1);
// Spawn a new database export // Spawn a new database export
match db.export(nsv, dbv).await { tokio::spawn(db.export(nsv, dbv, snd));
Ok(rcv) => {
// Process all processed values // Process all processed values
tokio::spawn(async move { tokio::spawn(async move {
while let Ok(v) = rcv.recv().await { while let Ok(v) = rcv.recv().await {
let _ = chn.send_data(v).await; let _ = chn.send_data(Bytes::from(v)).await;
} }
}); });
// Return the chunked body // Return the chunked body
Ok(warp::reply::Response::new(bdy)) Ok(warp::reply::Response::new(bdy))
} }
// There was en error with the export
_ => Err(warp::reject::custom(Error::InvalidAuth)),
}
}
// There was an error with permissions // There was an error with permissions
_ => Err(warp::reject::custom(Error::InvalidAuth)), _ => Err(warp::reject::custom(Error::InvalidAuth)),
} }