Feat: enable compression on the HTTP connector (#2867)

This commit is contained in:
Emmanuel Keller 2023-10-23 14:37:42 +01:00 committed by GitHub
parent 4c1ceac6eb
commit bc2f7fdafa
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 57 additions and 4 deletions

View file

@ -94,7 +94,7 @@ jobs:
clippy: clippy:
name: Check clippy name: Check clippy
runs-on: ubuntu-latest runs-on: ubuntu-latest-16-cores
steps: steps:
- name: Install stable toolchain - name: Install stable toolchain
@ -217,7 +217,7 @@ jobs:
test: test:
name: Test workspace name: Test workspace
runs-on: ubuntu-latest runs-on: ubuntu-latest-16-cores
steps: steps:
- name: Install stable toolchain - name: Install stable toolchain

20
Cargo.lock generated
View file

@ -434,6 +434,22 @@ dependencies = [
"futures-core", "futures-core",
] ]
[[package]]
name = "async-compression"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "62b74f44609f0f91493e3082d3734d98497e094777144380ea4db9f9905dd5b6"
dependencies = [
"brotli",
"flate2",
"futures-core",
"memchr",
"pin-project-lite",
"tokio 1.32.0",
"zstd",
"zstd-safe",
]
[[package]] [[package]]
name = "async-executor" name = "async-executor"
version = "1.5.1" version = "1.5.1"
@ -4214,6 +4230,7 @@ version = "0.11.20"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3e9ad3fe7488d7e34558a2033d45a0c90b72d97b4f80705666fea71472e2e6a1" checksum = "3e9ad3fe7488d7e34558a2033d45a0c90b72d97b4f80705666fea71472e2e6a1"
dependencies = [ dependencies = [
"async-compression",
"base64 0.21.2", "base64 0.21.2",
"bytes 1.4.0", "bytes 1.4.0",
"encoding_rs", "encoding_rs",
@ -6053,6 +6070,7 @@ version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "55ae70283aba8d2a8b411c695c437fe25b8b5e44e23e780662002fc72fb47a82" checksum = "55ae70283aba8d2a8b411c695c437fe25b8b5e44e23e780662002fc72fb47a82"
dependencies = [ dependencies = [
"async-compression",
"base64 0.21.2", "base64 0.21.2",
"bitflags 2.4.0", "bitflags 2.4.0",
"bytes 1.4.0", "bytes 1.4.0",
@ -6063,6 +6081,8 @@ dependencies = [
"http-range-header", "http-range-header",
"mime", "mime",
"pin-project-lite", "pin-project-lite",
"tokio 1.32.0",
"tokio-util",
"tower", "tower",
"tower-layer", "tower-layer",
"tower-service", "tower-service",

View file

@ -53,7 +53,7 @@ opentelemetry = { version = "0.19", features = ["rt-tokio"] }
opentelemetry-otlp = { version = "0.12.0", features = ["metrics"] } opentelemetry-otlp = { version = "0.12.0", features = ["metrics"] }
pin-project-lite = "0.2.12" pin-project-lite = "0.2.12"
rand = "0.8.5" rand = "0.8.5"
reqwest = { version = "0.11.18", default-features = false, features = ["blocking"] } reqwest = { version = "0.11.18", default-features = false, features = ["blocking", "gzip"] }
rustyline = { version = "11.0.0", features = ["derive"] } rustyline = { version = "11.0.0", features = ["derive"] }
serde = { version = "1.0.183", features = ["derive"] } serde = { version = "1.0.183", features = ["derive"] }
serde_cbor = "0.11.2" serde_cbor = "0.11.2"
@ -65,7 +65,7 @@ thiserror = "1.0.44"
tokio = { version = "1.31.0", features = ["macros", "signal"] } tokio = { version = "1.31.0", features = ["macros", "signal"] }
tokio-util = { version = "0.7.8", features = ["io"] } tokio-util = { version = "0.7.8", features = ["io"] }
tower = "0.4.13" tower = "0.4.13"
tower-http = { version = "0.4.3", features = ["trace", "sensitive-headers", "auth", "request-id", "util", "catch-panic", "cors", "set-header", "limit", "add-extension"] } tower-http = { version = "0.4.3", features = ["trace", "sensitive-headers", "auth", "request-id", "util", "catch-panic", "cors", "set-header", "limit", "add-extension", "compression-full"] }
tracing = "0.1" tracing = "0.1"
tracing-futures = { version = "0.2.5", features = ["tokio"], default-features = false } tracing-futures = { version = "0.2.5", features = ["tokio"], default-features = false }
tracing-opentelemetry = "0.19.0" tracing-opentelemetry = "0.19.0"

View file

@ -29,6 +29,7 @@ use tokio_util::sync::CancellationToken;
use tower::ServiceBuilder; use tower::ServiceBuilder;
use tower_http::add_extension::AddExtensionLayer; use tower_http::add_extension::AddExtensionLayer;
use tower_http::auth::AsyncRequireAuthorizationLayer; use tower_http::auth::AsyncRequireAuthorizationLayer;
use tower_http::compression::CompressionLayer;
use tower_http::cors::{Any, CorsLayer}; use tower_http::cors::{Any, CorsLayer};
use tower_http::request_id::MakeRequestUuid; use tower_http::request_id::MakeRequestUuid;
use tower_http::sensitive_headers::{ use tower_http::sensitive_headers::{
@ -75,6 +76,7 @@ pub async fn init(ct: CancellationToken) -> Result<(), Error> {
.catch_panic() .catch_panic()
.set_x_request_id(MakeRequestUuid) .set_x_request_id(MakeRequestUuid)
.propagate_x_request_id() .propagate_x_request_id()
.layer(CompressionLayer::new())
.layer(AddExtensionLayer::new(app_state)) .layer(AddExtensionLayer::new(app_state))
.layer(middleware::from_fn(client_ip::client_ip_middleware)) .layer(middleware::from_fn(client_ip::client_ip_middleware))
.layer(SetSensitiveRequestHeadersLayer::from_shared(Arc::clone(&headers))) .layer(SetSensitiveRequestHeadersLayer::from_shared(Arc::clone(&headers)))
@ -102,6 +104,7 @@ pub async fn init(ct: CancellationToken) -> Result<(), Error> {
]) ])
.allow_headers([ .allow_headers([
http::header::ACCEPT, http::header::ACCEPT,
http::header::ACCEPT_ENCODING,
http::header::AUTHORIZATION, http::header::AUTHORIZATION,
http::header::CONTENT_TYPE, http::header::CONTENT_TYPE,
http::header::ORIGIN, http::header::ORIGIN,

View file

@ -438,6 +438,7 @@ mod http_integration {
headers.insert("NS", "N".parse()?); headers.insert("NS", "N".parse()?);
headers.insert("DB", "D".parse()?); headers.insert("DB", "D".parse()?);
headers.insert(header::ACCEPT, "application/json".parse()?); headers.insert(header::ACCEPT, "application/json".parse()?);
let client = reqwest::Client::builder() let client = reqwest::Client::builder()
.connect_timeout(Duration::from_millis(10)) .connect_timeout(Duration::from_millis(10))
.default_headers(headers) .default_headers(headers)
@ -540,6 +541,35 @@ mod http_integration {
Ok(()) Ok(())
} }
#[test(tokio::test)]
async fn sql_endpoint_with_compression() -> Result<(), Box<dyn std::error::Error>> {
let (addr, _server) = common::start_server_with_defaults().await.unwrap();
let url = &format!("http://{addr}/sql");
// Prepare HTTP client
let mut headers = reqwest::header::HeaderMap::new();
headers.insert("NS", "N".parse()?);
headers.insert("DB", "D".parse()?);
headers.insert(header::ACCEPT, "application/json".parse()?);
headers.insert(header::ACCEPT_ENCODING, "gzip".parse()?);
let client = reqwest::Client::builder()
.connect_timeout(Duration::from_millis(10))
.gzip(false) // So that the content-encoding header is not removed by Reqwest
.default_headers(headers.clone())
.build()?;
// Check that the content is gzip encoded
{
let res =
client.post(url).basic_auth(USER, Some(PASS)).body("CREATE foo").send().await?;
assert_eq!(res.status(), 200);
assert_eq!(res.headers()["content-encoding"], "gzip");
}
Ok(())
}
#[test(tokio::test)] #[test(tokio::test)]
async fn sync_endpoint() -> Result<(), Box<dyn std::error::Error>> { async fn sync_endpoint() -> Result<(), Box<dyn std::error::Error>> {
let (addr, _server) = common::start_server_with_defaults().await.unwrap(); let (addr, _server) = common::start_server_with_defaults().await.unwrap();