Ensure HTTP requests timeout correctly

Closes #1677
This commit is contained in:
Tobie Morgan Hitchcock 2023-03-07 08:32:41 +00:00
parent fa0ec302ca
commit 06d2d88594
4 changed files with 82 additions and 33 deletions

View file

@ -107,6 +107,12 @@ impl<'a> Context<'a> {
self.deadline self.deadline
} }
/// Get the timeout for this operation, if any. This is useful for
/// checking if a long job should be started or not.
pub fn timeout(&self) -> Option<Duration> {
self.deadline.map(|v| v.saturating_duration_since(Instant::now()))
}
/// Check if the context is done. If it returns `None` the operation may /// Check if the context is done. If it returns `None` the operation may
/// proceed, otherwise the operation should be stopped. /// proceed, otherwise the operation should be stopped.
pub fn done(&self) -> Option<Reason> { pub fn done(&self) -> Option<Reason> {

View file

@ -1,3 +1,4 @@
use crate::ctx::Context;
use crate::err::Error; use crate::err::Error;
use crate::sql::value::Value; use crate::sql::value::Value;
@ -61,47 +62,55 @@ fn try_as_opts(
} }
#[cfg(feature = "http")] #[cfg(feature = "http")]
pub async fn head((uri, opts): (Value, Option<Value>)) -> Result<Value, Error> { pub async fn head(ctx: &Context<'_>, (uri, opts): (Value, Option<Value>)) -> Result<Value, Error> {
let uri = try_as_uri("http::head", uri)?; let uri = try_as_uri("http::head", uri)?;
let opts = try_as_opts("http::head", "The second argument should be an object.", opts)?; let opts = try_as_opts("http::head", "The second argument should be an object.", opts)?;
crate::fnc::util::http::head(uri, opts).await crate::fnc::util::http::head(ctx, uri, opts).await
} }
#[cfg(feature = "http")] #[cfg(feature = "http")]
pub async fn get((uri, opts): (Value, Option<Value>)) -> Result<Value, Error> { pub async fn get(ctx: &Context<'_>, (uri, opts): (Value, Option<Value>)) -> Result<Value, Error> {
let uri = try_as_uri("http::get", uri)?; let uri = try_as_uri("http::get", uri)?;
let opts = try_as_opts("http::get", "The second argument should be an object.", opts)?; let opts = try_as_opts("http::get", "The second argument should be an object.", opts)?;
crate::fnc::util::http::get(uri, opts).await crate::fnc::util::http::get(ctx, uri, opts).await
} }
#[cfg(feature = "http")] #[cfg(feature = "http")]
pub async fn put((uri, body, opts): (Value, Option<Value>, Option<Value>)) -> Result<Value, Error> { pub async fn put(
ctx: &Context<'_>,
(uri, body, opts): (Value, Option<Value>, Option<Value>),
) -> Result<Value, Error> {
let uri = try_as_uri("http::put", uri)?; let uri = try_as_uri("http::put", uri)?;
let opts = try_as_opts("http::put", "The third argument should be an object.", opts)?; let opts = try_as_opts("http::put", "The third argument should be an object.", opts)?;
crate::fnc::util::http::put(uri, body.unwrap_or(Value::Null), opts).await crate::fnc::util::http::put(ctx, uri, body.unwrap_or(Value::Null), opts).await
} }
#[cfg(feature = "http")] #[cfg(feature = "http")]
pub async fn post( pub async fn post(
ctx: &Context<'_>,
(uri, body, opts): (Value, Option<Value>, Option<Value>), (uri, body, opts): (Value, Option<Value>, Option<Value>),
) -> Result<Value, Error> { ) -> Result<Value, Error> {
let uri = try_as_uri("http::post", uri)?; let uri = try_as_uri("http::post", uri)?;
let opts = try_as_opts("http::post", "The third argument should be an object.", opts)?; let opts = try_as_opts("http::post", "The third argument should be an object.", opts)?;
crate::fnc::util::http::post(uri, body.unwrap_or(Value::Null), opts).await crate::fnc::util::http::post(ctx, uri, body.unwrap_or(Value::Null), opts).await
} }
#[cfg(feature = "http")] #[cfg(feature = "http")]
pub async fn patch( pub async fn patch(
ctx: &Context<'_>,
(uri, body, opts): (Value, Option<Value>, Option<Value>), (uri, body, opts): (Value, Option<Value>, Option<Value>),
) -> Result<Value, Error> { ) -> Result<Value, Error> {
let uri = try_as_uri("http::patch", uri)?; let uri = try_as_uri("http::patch", uri)?;
let opts = try_as_opts("http::patch", "The third argument should be an object.", opts)?; let opts = try_as_opts("http::patch", "The third argument should be an object.", opts)?;
crate::fnc::util::http::patch(uri, body.unwrap_or(Value::Null), opts).await crate::fnc::util::http::patch(ctx, uri, body.unwrap_or(Value::Null), opts).await
} }
#[cfg(feature = "http")] #[cfg(feature = "http")]
pub async fn delete((uri, opts): (Value, Option<Value>)) -> Result<Value, Error> { pub async fn delete(
ctx: &Context<'_>,
(uri, opts): (Value, Option<Value>),
) -> Result<Value, Error> {
let uri = try_as_uri("http::delete", uri)?; let uri = try_as_uri("http::delete", uri)?;
let opts = try_as_opts("http::delete", "The second argument should be an object.", opts)?; let opts = try_as_opts("http::delete", "The second argument should be an object.", opts)?;
crate::fnc::util::http::delete(uri, opts).await crate::fnc::util::http::delete(ctx, uri, opts).await
} }

View file

@ -227,11 +227,7 @@ pub fn synchronous(ctx: &Context<'_>, name: &str, args: Vec<Value>) -> Result<Va
} }
/// Attempts to run any asynchronous function. /// Attempts to run any asynchronous function.
pub async fn asynchronous( pub async fn asynchronous(ctx: &Context<'_>, name: &str, args: Vec<Value>) -> Result<Value, Error> {
_ctx: &Context<'_>,
name: &str,
args: Vec<Value>,
) -> Result<Value, Error> {
// Wrappers return a function as opposed to a value so that the dispatch! method can always // Wrappers return a function as opposed to a value so that the dispatch! method can always
// perform a function call. // perform a function call.
#[cfg(not(target_arch = "wasm32"))] #[cfg(not(target_arch = "wasm32"))]
@ -260,11 +256,11 @@ pub async fn asynchronous(
"crypto::scrypt::compare" => (cpu_intensive) crypto::scrypt::cmp.await, "crypto::scrypt::compare" => (cpu_intensive) crypto::scrypt::cmp.await,
"crypto::scrypt::generate" => (cpu_intensive) crypto::scrypt::gen.await, "crypto::scrypt::generate" => (cpu_intensive) crypto::scrypt::gen.await,
// //
"http::head" => http::head.await, "http::head" => http::head(ctx).await,
"http::get" => http::get.await, "http::get" => http::get(ctx).await,
"http::put" => http::put.await, "http::put" => http::put(ctx).await,
"http::post" => http::post.await, "http::post" => http::post(ctx).await,
"http::patch" => http::patch.await, "http::patch" => http::patch(ctx).await,
"http::delete" => http::delete.await, "http::delete" => http::delete(ctx).await,
) )
} }

View file

@ -1,3 +1,4 @@
use crate::ctx::Context;
use crate::err::Error; use crate::err::Error;
use crate::sql::json; use crate::sql::json;
use crate::sql::object::Object; use crate::sql::object::Object;
@ -10,7 +11,7 @@ pub(crate) fn uri_is_valid(uri: &str) -> bool {
reqwest::Url::parse(uri).is_ok() reqwest::Url::parse(uri).is_ok()
} }
pub async fn head(uri: Strand, opts: impl Into<Object>) -> Result<Value, Error> { pub async fn head(ctx: &Context<'_>, uri: Strand, opts: impl Into<Object>) -> Result<Value, Error> {
// Set a default client with no timeout // Set a default client with no timeout
let cli = Client::builder().build()?; let cli = Client::builder().build()?;
// Start a new HEAD request // Start a new HEAD request
@ -24,7 +25,10 @@ pub async fn head(uri: Strand, opts: impl Into<Object>) -> Result<Value, Error>
req = req.header(k.as_str(), v.to_strand().as_str()); req = req.header(k.as_str(), v.to_strand().as_str());
} }
// Send the request and wait // Send the request and wait
let res = req.send().await?; let res = match ctx.timeout() {
Some(d) => req.timeout(d).send().await?,
None => req.send().await?,
};
// Check the response status // Check the response status
match res.status() { match res.status() {
s if s.is_success() => Ok(Value::None), s if s.is_success() => Ok(Value::None),
@ -32,7 +36,7 @@ pub async fn head(uri: Strand, opts: impl Into<Object>) -> Result<Value, Error>
} }
} }
pub async fn get(uri: Strand, opts: impl Into<Object>) -> Result<Value, Error> { pub async fn get(ctx: &Context<'_>, uri: Strand, opts: impl Into<Object>) -> Result<Value, Error> {
// Set a default client with no timeout // Set a default client with no timeout
let cli = Client::builder().build()?; let cli = Client::builder().build()?;
// Start a new GET request // Start a new GET request
@ -46,7 +50,10 @@ pub async fn get(uri: Strand, opts: impl Into<Object>) -> Result<Value, Error> {
req = req.header(k.as_str(), v.to_strand().as_str()); req = req.header(k.as_str(), v.to_strand().as_str());
} }
// Send the request and wait // Send the request and wait
let res = req.send().await?; let res = match ctx.timeout() {
Some(d) => req.timeout(d).send().await?,
None => req.send().await?,
};
// Check the response status // Check the response status
match res.status() { match res.status() {
s if s.is_success() => match res.headers().get(CONTENT_TYPE) { s if s.is_success() => match res.headers().get(CONTENT_TYPE) {
@ -69,7 +76,12 @@ pub async fn get(uri: Strand, opts: impl Into<Object>) -> Result<Value, Error> {
} }
} }
pub async fn put(uri: Strand, body: Value, opts: impl Into<Object>) -> Result<Value, Error> { pub async fn put(
ctx: &Context<'_>,
uri: Strand,
body: Value,
opts: impl Into<Object>,
) -> Result<Value, Error> {
// Set a default client with no timeout // Set a default client with no timeout
let cli = Client::builder().build()?; let cli = Client::builder().build()?;
// Start a new GET request // Start a new GET request
@ -87,7 +99,10 @@ pub async fn put(uri: Strand, body: Value, opts: impl Into<Object>) -> Result<Va
req = req.json(&body); req = req.json(&body);
} }
// Send the request and wait // Send the request and wait
let res = req.send().await?; let res = match ctx.timeout() {
Some(d) => req.timeout(d).send().await?,
None => req.send().await?,
};
// Check the response status // Check the response status
match res.status() { match res.status() {
s if s.is_success() => match res.headers().get(CONTENT_TYPE) { s if s.is_success() => match res.headers().get(CONTENT_TYPE) {
@ -110,7 +125,12 @@ pub async fn put(uri: Strand, body: Value, opts: impl Into<Object>) -> Result<Va
} }
} }
pub async fn post(uri: Strand, body: Value, opts: impl Into<Object>) -> Result<Value, Error> { pub async fn post(
ctx: &Context<'_>,
uri: Strand,
body: Value,
opts: impl Into<Object>,
) -> Result<Value, Error> {
// Set a default client with no timeout // Set a default client with no timeout
let cli = Client::builder().build()?; let cli = Client::builder().build()?;
// Start a new GET request // Start a new GET request
@ -128,7 +148,10 @@ pub async fn post(uri: Strand, body: Value, opts: impl Into<Object>) -> Result<V
req = req.json(&body); req = req.json(&body);
} }
// Send the request and wait // Send the request and wait
let res = req.send().await?; let res = match ctx.timeout() {
Some(d) => req.timeout(d).send().await?,
None => req.send().await?,
};
// Check the response status // Check the response status
match res.status() { match res.status() {
s if s.is_success() => match res.headers().get(CONTENT_TYPE) { s if s.is_success() => match res.headers().get(CONTENT_TYPE) {
@ -151,7 +174,12 @@ pub async fn post(uri: Strand, body: Value, opts: impl Into<Object>) -> Result<V
} }
} }
pub async fn patch(uri: Strand, body: Value, opts: impl Into<Object>) -> Result<Value, Error> { pub async fn patch(
ctx: &Context<'_>,
uri: Strand,
body: Value,
opts: impl Into<Object>,
) -> Result<Value, Error> {
// Set a default client with no timeout // Set a default client with no timeout
let cli = Client::builder().build()?; let cli = Client::builder().build()?;
// Start a new GET request // Start a new GET request
@ -169,7 +197,10 @@ pub async fn patch(uri: Strand, body: Value, opts: impl Into<Object>) -> Result<
req = req.json(&body); req = req.json(&body);
} }
// Send the request and wait // Send the request and wait
let res = req.send().await?; let res = match ctx.timeout() {
Some(d) => req.timeout(d).send().await?,
None => req.send().await?,
};
// Check the response status // Check the response status
match res.status() { match res.status() {
s if s.is_success() => match res.headers().get(CONTENT_TYPE) { s if s.is_success() => match res.headers().get(CONTENT_TYPE) {
@ -192,7 +223,11 @@ pub async fn patch(uri: Strand, body: Value, opts: impl Into<Object>) -> Result<
} }
} }
pub async fn delete(uri: Strand, opts: impl Into<Object>) -> Result<Value, Error> { pub async fn delete(
ctx: &Context<'_>,
uri: Strand,
opts: impl Into<Object>,
) -> Result<Value, Error> {
// Set a default client with no timeout // Set a default client with no timeout
let cli = Client::builder().build()?; let cli = Client::builder().build()?;
// Start a new GET request // Start a new GET request
@ -206,7 +241,10 @@ pub async fn delete(uri: Strand, opts: impl Into<Object>) -> Result<Value, Error
req = req.header(k.as_str(), v.to_strand().as_str()); req = req.header(k.as_str(), v.to_strand().as_str());
} }
// Send the request and wait // Send the request and wait
let res = req.send().await?; let res = match ctx.timeout() {
Some(d) => req.timeout(d).send().await?,
None => req.send().await?,
};
// Check the response status // Check the response status
match res.status() { match res.status() {
s if s.is_success() => match res.headers().get(CONTENT_TYPE) { s if s.is_success() => match res.headers().get(CONTENT_TYPE) {