From bc2f7fdafa82683c1f4cbc629c917327b6aaff3b Mon Sep 17 00:00:00 2001 From: Emmanuel Keller Date: Mon, 23 Oct 2023 14:37:42 +0100 Subject: [PATCH] Feat: enable compression on the HTTP connector (#2867) --- .github/workflows/ci.yml | 4 ++-- Cargo.lock | 20 ++++++++++++++++++++ Cargo.toml | 4 ++-- src/net/mod.rs | 3 +++ tests/http_integration.rs | 30 ++++++++++++++++++++++++++++++ 5 files changed, 57 insertions(+), 4 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 3bf4ffdc..2a635b39 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -94,7 +94,7 @@ jobs: clippy: name: Check clippy - runs-on: ubuntu-latest + runs-on: ubuntu-latest-16-cores steps: - name: Install stable toolchain @@ -217,7 +217,7 @@ jobs: test: name: Test workspace - runs-on: ubuntu-latest + runs-on: ubuntu-latest-16-cores steps: - name: Install stable toolchain diff --git a/Cargo.lock b/Cargo.lock index b96061cc..04fea5bd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -434,6 +434,22 @@ dependencies = [ "futures-core", ] +[[package]] +name = "async-compression" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "62b74f44609f0f91493e3082d3734d98497e094777144380ea4db9f9905dd5b6" +dependencies = [ + "brotli", + "flate2", + "futures-core", + "memchr", + "pin-project-lite", + "tokio 1.32.0", + "zstd", + "zstd-safe", +] + [[package]] name = "async-executor" version = "1.5.1" @@ -4214,6 +4230,7 @@ version = "0.11.20" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3e9ad3fe7488d7e34558a2033d45a0c90b72d97b4f80705666fea71472e2e6a1" dependencies = [ + "async-compression", "base64 0.21.2", "bytes 1.4.0", "encoding_rs", @@ -6053,6 +6070,7 @@ version = "0.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "55ae70283aba8d2a8b411c695c437fe25b8b5e44e23e780662002fc72fb47a82" dependencies = [ + "async-compression", "base64 0.21.2", "bitflags 2.4.0", "bytes 1.4.0", @@ -6063,6 +6081,8 @@ dependencies = [ "http-range-header", "mime", "pin-project-lite", + "tokio 1.32.0", + "tokio-util", "tower", "tower-layer", "tower-service", diff --git a/Cargo.toml b/Cargo.toml index 9741b30b..f22eab24 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -53,7 +53,7 @@ opentelemetry = { version = "0.19", features = ["rt-tokio"] } opentelemetry-otlp = { version = "0.12.0", features = ["metrics"] } pin-project-lite = "0.2.12" rand = "0.8.5" -reqwest = { version = "0.11.18", default-features = false, features = ["blocking"] } +reqwest = { version = "0.11.18", default-features = false, features = ["blocking", "gzip"] } rustyline = { version = "11.0.0", features = ["derive"] } serde = { version = "1.0.183", features = ["derive"] } serde_cbor = "0.11.2" @@ -65,7 +65,7 @@ thiserror = "1.0.44" tokio = { version = "1.31.0", features = ["macros", "signal"] } tokio-util = { version = "0.7.8", features = ["io"] } tower = "0.4.13" -tower-http = { version = "0.4.3", features = ["trace", "sensitive-headers", "auth", "request-id", "util", "catch-panic", "cors", "set-header", "limit", "add-extension"] } +tower-http = { version = "0.4.3", features = ["trace", "sensitive-headers", "auth", "request-id", "util", "catch-panic", "cors", "set-header", "limit", "add-extension", "compression-full"] } tracing = "0.1" tracing-futures = { version = "0.2.5", features = ["tokio"], default-features = false } tracing-opentelemetry = "0.19.0" diff --git a/src/net/mod.rs b/src/net/mod.rs index 93e84a38..d8a1a25e 100644 --- a/src/net/mod.rs +++ b/src/net/mod.rs @@ -29,6 +29,7 @@ use tokio_util::sync::CancellationToken; use tower::ServiceBuilder; use tower_http::add_extension::AddExtensionLayer; use tower_http::auth::AsyncRequireAuthorizationLayer; +use tower_http::compression::CompressionLayer; use tower_http::cors::{Any, CorsLayer}; use tower_http::request_id::MakeRequestUuid; use tower_http::sensitive_headers::{ @@ -75,6 +76,7 @@ pub async fn init(ct: CancellationToken) -> Result<(), Error> { .catch_panic() .set_x_request_id(MakeRequestUuid) .propagate_x_request_id() + .layer(CompressionLayer::new()) .layer(AddExtensionLayer::new(app_state)) .layer(middleware::from_fn(client_ip::client_ip_middleware)) .layer(SetSensitiveRequestHeadersLayer::from_shared(Arc::clone(&headers))) @@ -102,6 +104,7 @@ pub async fn init(ct: CancellationToken) -> Result<(), Error> { ]) .allow_headers([ http::header::ACCEPT, + http::header::ACCEPT_ENCODING, http::header::AUTHORIZATION, http::header::CONTENT_TYPE, http::header::ORIGIN, diff --git a/tests/http_integration.rs b/tests/http_integration.rs index dfacec4c..f2153f7f 100644 --- a/tests/http_integration.rs +++ b/tests/http_integration.rs @@ -438,6 +438,7 @@ mod http_integration { headers.insert("NS", "N".parse()?); headers.insert("DB", "D".parse()?); headers.insert(header::ACCEPT, "application/json".parse()?); + let client = reqwest::Client::builder() .connect_timeout(Duration::from_millis(10)) .default_headers(headers) @@ -540,6 +541,35 @@ mod http_integration { Ok(()) } + #[test(tokio::test)] + async fn sql_endpoint_with_compression() -> Result<(), Box> { + let (addr, _server) = common::start_server_with_defaults().await.unwrap(); + let url = &format!("http://{addr}/sql"); + + // Prepare HTTP client + let mut headers = reqwest::header::HeaderMap::new(); + headers.insert("NS", "N".parse()?); + headers.insert("DB", "D".parse()?); + headers.insert(header::ACCEPT, "application/json".parse()?); + headers.insert(header::ACCEPT_ENCODING, "gzip".parse()?); + + let client = reqwest::Client::builder() + .connect_timeout(Duration::from_millis(10)) + .gzip(false) // So that the content-encoding header is not removed by Reqwest + .default_headers(headers.clone()) + .build()?; + + // Check that the content is gzip encoded + { + let res = + client.post(url).basic_auth(USER, Some(PASS)).body("CREATE foo").send().await?; + assert_eq!(res.status(), 200); + assert_eq!(res.headers()["content-encoding"], "gzip"); + } + + Ok(()) + } + #[test(tokio::test)] async fn sync_endpoint() -> Result<(), Box> { let (addr, _server) = common::start_server_with_defaults().await.unwrap();