From 06d2d88594ea8392b47ea769f376b0bf36388f14 Mon Sep 17 00:00:00 2001 From: Tobie Morgan Hitchcock Date: Tue, 7 Mar 2023 08:32:41 +0000 Subject: [PATCH] Ensure HTTP requests timeout correctly Closes #1677 --- lib/src/ctx/context.rs | 6 ++++ lib/src/fnc/http.rs | 29 +++++++++++------ lib/src/fnc/mod.rs | 18 ++++------- lib/src/fnc/util/http/mod.rs | 62 +++++++++++++++++++++++++++++------- 4 files changed, 82 insertions(+), 33 deletions(-) diff --git a/lib/src/ctx/context.rs b/lib/src/ctx/context.rs index 191947f4..da518544 100644 --- a/lib/src/ctx/context.rs +++ b/lib/src/ctx/context.rs @@ -107,6 +107,12 @@ impl<'a> Context<'a> { self.deadline } + /// Get the timeout for this operation, if any. This is useful for + /// checking if a long job should be started or not. + pub fn timeout(&self) -> Option { + self.deadline.map(|v| v.saturating_duration_since(Instant::now())) + } + /// Check if the context is done. If it returns `None` the operation may /// proceed, otherwise the operation should be stopped. pub fn done(&self) -> Option { diff --git a/lib/src/fnc/http.rs b/lib/src/fnc/http.rs index ad4216e5..5140fc8e 100644 --- a/lib/src/fnc/http.rs +++ b/lib/src/fnc/http.rs @@ -1,3 +1,4 @@ +use crate::ctx::Context; use crate::err::Error; use crate::sql::value::Value; @@ -61,47 +62,55 @@ fn try_as_opts( } #[cfg(feature = "http")] -pub async fn head((uri, opts): (Value, Option)) -> Result { +pub async fn head(ctx: &Context<'_>, (uri, opts): (Value, Option)) -> Result { let uri = try_as_uri("http::head", uri)?; let opts = try_as_opts("http::head", "The second argument should be an object.", opts)?; - crate::fnc::util::http::head(uri, opts).await + crate::fnc::util::http::head(ctx, uri, opts).await } #[cfg(feature = "http")] -pub async fn get((uri, opts): (Value, Option)) -> Result { +pub async fn get(ctx: &Context<'_>, (uri, opts): (Value, Option)) -> Result { let uri = try_as_uri("http::get", uri)?; let opts = try_as_opts("http::get", "The second argument should be an object.", opts)?; - crate::fnc::util::http::get(uri, opts).await + crate::fnc::util::http::get(ctx, uri, opts).await } #[cfg(feature = "http")] -pub async fn put((uri, body, opts): (Value, Option, Option)) -> Result { +pub async fn put( + ctx: &Context<'_>, + (uri, body, opts): (Value, Option, Option), +) -> Result { let uri = try_as_uri("http::put", uri)?; let opts = try_as_opts("http::put", "The third argument should be an object.", opts)?; - crate::fnc::util::http::put(uri, body.unwrap_or(Value::Null), opts).await + crate::fnc::util::http::put(ctx, uri, body.unwrap_or(Value::Null), opts).await } #[cfg(feature = "http")] pub async fn post( + ctx: &Context<'_>, (uri, body, opts): (Value, Option, Option), ) -> Result { let uri = try_as_uri("http::post", uri)?; let opts = try_as_opts("http::post", "The third argument should be an object.", opts)?; - crate::fnc::util::http::post(uri, body.unwrap_or(Value::Null), opts).await + crate::fnc::util::http::post(ctx, uri, body.unwrap_or(Value::Null), opts).await } #[cfg(feature = "http")] pub async fn patch( + ctx: &Context<'_>, (uri, body, opts): (Value, Option, Option), ) -> Result { let uri = try_as_uri("http::patch", uri)?; let opts = try_as_opts("http::patch", "The third argument should be an object.", opts)?; - crate::fnc::util::http::patch(uri, body.unwrap_or(Value::Null), opts).await + crate::fnc::util::http::patch(ctx, uri, body.unwrap_or(Value::Null), opts).await } #[cfg(feature = "http")] -pub async fn delete((uri, opts): (Value, Option)) -> Result { +pub async fn delete( + ctx: &Context<'_>, + (uri, opts): (Value, Option), +) -> Result { let uri = try_as_uri("http::delete", uri)?; let opts = try_as_opts("http::delete", "The second argument should be an object.", opts)?; - crate::fnc::util::http::delete(uri, opts).await + crate::fnc::util::http::delete(ctx, uri, opts).await } diff --git a/lib/src/fnc/mod.rs b/lib/src/fnc/mod.rs index 7e6c7f76..136fd9a8 100644 --- a/lib/src/fnc/mod.rs +++ b/lib/src/fnc/mod.rs @@ -227,11 +227,7 @@ pub fn synchronous(ctx: &Context<'_>, name: &str, args: Vec) -> Result, - name: &str, - args: Vec, -) -> Result { +pub async fn asynchronous(ctx: &Context<'_>, name: &str, args: Vec) -> Result { // Wrappers return a function as opposed to a value so that the dispatch! method can always // perform a function call. #[cfg(not(target_arch = "wasm32"))] @@ -260,11 +256,11 @@ pub async fn asynchronous( "crypto::scrypt::compare" => (cpu_intensive) crypto::scrypt::cmp.await, "crypto::scrypt::generate" => (cpu_intensive) crypto::scrypt::gen.await, // - "http::head" => http::head.await, - "http::get" => http::get.await, - "http::put" => http::put.await, - "http::post" => http::post.await, - "http::patch" => http::patch.await, - "http::delete" => http::delete.await, + "http::head" => http::head(ctx).await, + "http::get" => http::get(ctx).await, + "http::put" => http::put(ctx).await, + "http::post" => http::post(ctx).await, + "http::patch" => http::patch(ctx).await, + "http::delete" => http::delete(ctx).await, ) } diff --git a/lib/src/fnc/util/http/mod.rs b/lib/src/fnc/util/http/mod.rs index 6bd78d14..26729512 100644 --- a/lib/src/fnc/util/http/mod.rs +++ b/lib/src/fnc/util/http/mod.rs @@ -1,3 +1,4 @@ +use crate::ctx::Context; use crate::err::Error; use crate::sql::json; use crate::sql::object::Object; @@ -10,7 +11,7 @@ pub(crate) fn uri_is_valid(uri: &str) -> bool { reqwest::Url::parse(uri).is_ok() } -pub async fn head(uri: Strand, opts: impl Into) -> Result { +pub async fn head(ctx: &Context<'_>, uri: Strand, opts: impl Into) -> Result { // Set a default client with no timeout let cli = Client::builder().build()?; // Start a new HEAD request @@ -24,7 +25,10 @@ pub async fn head(uri: Strand, opts: impl Into) -> Result req = req.header(k.as_str(), v.to_strand().as_str()); } // Send the request and wait - let res = req.send().await?; + let res = match ctx.timeout() { + Some(d) => req.timeout(d).send().await?, + None => req.send().await?, + }; // Check the response status match res.status() { s if s.is_success() => Ok(Value::None), @@ -32,7 +36,7 @@ pub async fn head(uri: Strand, opts: impl Into) -> Result } } -pub async fn get(uri: Strand, opts: impl Into) -> Result { +pub async fn get(ctx: &Context<'_>, uri: Strand, opts: impl Into) -> Result { // Set a default client with no timeout let cli = Client::builder().build()?; // Start a new GET request @@ -46,7 +50,10 @@ pub async fn get(uri: Strand, opts: impl Into) -> Result { req = req.header(k.as_str(), v.to_strand().as_str()); } // Send the request and wait - let res = req.send().await?; + let res = match ctx.timeout() { + Some(d) => req.timeout(d).send().await?, + None => req.send().await?, + }; // Check the response status match res.status() { s if s.is_success() => match res.headers().get(CONTENT_TYPE) { @@ -69,7 +76,12 @@ pub async fn get(uri: Strand, opts: impl Into) -> Result { } } -pub async fn put(uri: Strand, body: Value, opts: impl Into) -> Result { +pub async fn put( + ctx: &Context<'_>, + uri: Strand, + body: Value, + opts: impl Into, +) -> Result { // Set a default client with no timeout let cli = Client::builder().build()?; // Start a new GET request @@ -87,7 +99,10 @@ pub async fn put(uri: Strand, body: Value, opts: impl Into) -> Result req.timeout(d).send().await?, + None => req.send().await?, + }; // Check the response status match res.status() { s if s.is_success() => match res.headers().get(CONTENT_TYPE) { @@ -110,7 +125,12 @@ pub async fn put(uri: Strand, body: Value, opts: impl Into) -> Result) -> Result { +pub async fn post( + ctx: &Context<'_>, + uri: Strand, + body: Value, + opts: impl Into, +) -> Result { // Set a default client with no timeout let cli = Client::builder().build()?; // Start a new GET request @@ -128,7 +148,10 @@ pub async fn post(uri: Strand, body: Value, opts: impl Into) -> Result req.timeout(d).send().await?, + None => req.send().await?, + }; // Check the response status match res.status() { s if s.is_success() => match res.headers().get(CONTENT_TYPE) { @@ -151,7 +174,12 @@ pub async fn post(uri: Strand, body: Value, opts: impl Into) -> Result) -> Result { +pub async fn patch( + ctx: &Context<'_>, + uri: Strand, + body: Value, + opts: impl Into, +) -> Result { // Set a default client with no timeout let cli = Client::builder().build()?; // Start a new GET request @@ -169,7 +197,10 @@ pub async fn patch(uri: Strand, body: Value, opts: impl Into) -> Result< req = req.json(&body); } // Send the request and wait - let res = req.send().await?; + let res = match ctx.timeout() { + Some(d) => req.timeout(d).send().await?, + None => req.send().await?, + }; // Check the response status match res.status() { s if s.is_success() => match res.headers().get(CONTENT_TYPE) { @@ -192,7 +223,11 @@ pub async fn patch(uri: Strand, body: Value, opts: impl Into) -> Result< } } -pub async fn delete(uri: Strand, opts: impl Into) -> Result { +pub async fn delete( + ctx: &Context<'_>, + uri: Strand, + opts: impl Into, +) -> Result { // Set a default client with no timeout let cli = Client::builder().build()?; // Start a new GET request @@ -206,7 +241,10 @@ pub async fn delete(uri: Strand, opts: impl Into) -> Result req.timeout(d).send().await?, + None => req.send().await?, + }; // Check the response status match res.status() { s if s.is_success() => match res.headers().get(CONTENT_TYPE) {