Use a dedicated executor thread for CPU-intensive functions (#186)
This commit is contained in:
parent
de7d9299fd
commit
b3ec3b4d8e
3 changed files with 53 additions and 12 deletions
19
lib/src/exe/mod.rs
Normal file
19
lib/src/exe/mod.rs
Normal file
|
@ -0,0 +1,19 @@
|
|||
#![cfg(feature = "parallel")]
|
||||
|
||||
use executor::{Executor, Task};
|
||||
use once_cell::sync::Lazy;
|
||||
use std::future::Future;
|
||||
use std::panic::catch_unwind;
|
||||
|
||||
pub fn spawn<T: Send + 'static>(future: impl Future<Output = T> + Send + 'static) -> Task<T> {
|
||||
static GLOBAL: Lazy<Executor<'_>> = Lazy::new(|| {
|
||||
std::thread::spawn(|| {
|
||||
catch_unwind(|| {
|
||||
futures::executor::block_on(GLOBAL.run(futures::future::pending::<()>()))
|
||||
})
|
||||
.ok();
|
||||
});
|
||||
Executor::new()
|
||||
});
|
||||
GLOBAL.spawn(future)
|
||||
}
|
|
@ -25,21 +25,42 @@ pub mod util;
|
|||
|
||||
/// Attempts to run any function.
|
||||
pub async fn run(ctx: &Context<'_>, name: &str, args: Vec<Value>) -> Result<Value, Error> {
|
||||
// Wrappers return a function as opposed to a value so that the dispatch! method can always
|
||||
// perform a function call.
|
||||
#[cfg(feature = "parallel")]
|
||||
fn cpu_intensive<R: Send + 'static>(
|
||||
function: impl FnOnce() -> R + Send + 'static,
|
||||
) -> impl FnOnce() -> executor::Task<R> {
|
||||
|| crate::exe::spawn(async move { function() })
|
||||
}
|
||||
|
||||
#[cfg(not(feature = "parallel"))]
|
||||
fn cpu_intensive<R: Send + 'static>(
|
||||
function: impl FnOnce() -> R + Send + 'static,
|
||||
) -> impl FnOnce() -> std::future::Ready<R> {
|
||||
|| std::future::ready(function())
|
||||
}
|
||||
|
||||
macro_rules! dispatch {
|
||||
($name: ident, $args: ident, $($function_name: literal => $($function_path: ident)::+ $(($ctx_arg: expr))* $(.$await:tt)*,)+) => {
|
||||
($name: ident, $args: ident, $($function_name: literal => $(($wrapper: tt))* $($function_path: ident)::+ $(($ctx_arg: expr))* $(.$await:tt)*,)+) => {
|
||||
{
|
||||
match $name {
|
||||
$($function_name => $($function_path)::+($($ctx_arg,)* args::FromArgs::from_args($name, $args)?)$(.$await)*,)+
|
||||
$($function_name => {
|
||||
let args = args::FromArgs::from_args($name, $args)?;
|
||||
#[allow(clippy::redundant_closure_call)]
|
||||
$($wrapper)*(|| $($function_path)::+($($ctx_arg,)* args))()$(.$await)*
|
||||
},)+
|
||||
_ => unreachable!()
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
// Each function is specified by its name (a string literal) followed by its path. The path
|
||||
// may be followed by one parenthesized argument, e.g. ctx, which is passed to the function
|
||||
// before the remainder of the arguments. The path may be followed by `.await` to signify that
|
||||
// it is `async`.
|
||||
// it is `async`. Finally, the path may be prefixed by a parenthesized wrapper function e.g.
|
||||
// `cpu_intensive`.
|
||||
dispatch!(
|
||||
name,
|
||||
args,
|
||||
|
@ -60,14 +81,14 @@ pub async fn run(ctx: &Context<'_>, name: &str, args: Vec<Value>) -> Result<Valu
|
|||
"crypto::sha1" => crypto::sha1,
|
||||
"crypto::sha256" => crypto::sha256,
|
||||
"crypto::sha512" => crypto::sha512,
|
||||
"crypto::argon2::compare" => crypto::argon2::cmp,
|
||||
"crypto::argon2::generate" => crypto::argon2::gen,
|
||||
"crypto::bcrypt::compare" => crypto::bcrypt::cmp,
|
||||
"crypto::bcrypt::generate" => crypto::bcrypt::gen,
|
||||
"crypto::pbkdf2::compare" => crypto::pbkdf2::cmp,
|
||||
"crypto::pbkdf2::generate" => crypto::pbkdf2::gen,
|
||||
"crypto::scrypt::compare" => crypto::scrypt::cmp,
|
||||
"crypto::scrypt::generate" => crypto::scrypt::gen,
|
||||
"crypto::argon2::compare" => (cpu_intensive) crypto::argon2::cmp.await,
|
||||
"crypto::argon2::generate" => (cpu_intensive) crypto::argon2::gen.await,
|
||||
"crypto::bcrypt::compare" => (cpu_intensive) crypto::bcrypt::cmp.await,
|
||||
"crypto::bcrypt::generate" => (cpu_intensive) crypto::bcrypt::gen.await,
|
||||
"crypto::pbkdf2::compare" => (cpu_intensive) crypto::pbkdf2::cmp.await,
|
||||
"crypto::pbkdf2::generate" => (cpu_intensive) crypto::pbkdf2::gen.await,
|
||||
"crypto::scrypt::compare" => (cpu_intensive) crypto::scrypt::cmp.await,
|
||||
"crypto::scrypt::generate" => (cpu_intensive) crypto::scrypt::gen.await,
|
||||
//
|
||||
"geo::area" => geo::area,
|
||||
"geo::bearing" => geo::bearing,
|
||||
|
|
|
@ -18,6 +18,7 @@ mod ctx;
|
|||
mod dbs;
|
||||
mod doc;
|
||||
mod err;
|
||||
mod exe;
|
||||
mod fnc;
|
||||
mod key;
|
||||
mod kvs;
|
||||
|
|
Loading…
Reference in a new issue