Skip to content

Commit d7e958d

Browse files
committed
feat: support zstd response encoding based on Accept-Encoding header
Respect client Accept-Encoding header per RFC 7231: - If client sends Accept-Encoding: zstd → respond with zstd compression - If client sends Accept-Encoding: gzip → respond with gzip (backward compat) - Quality values respected (e.g. "zstd;q=1.0, gzip;q=0.5" → zstd) - At equal q, prefer zstd over gzip - Default to gzip when no Accept-Encoding header present Related: subsquid/squid-sdk#456
1 parent fbd871d commit d7e958d

9 files changed

Lines changed: 283 additions & 20 deletions

File tree

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/hotblocks/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ bytes = { workspace = true }
1111
chrono = { workspace = true, features = ["std"] }
1212
clap = { workspace = true, features = ["derive"] }
1313
flate2 = { workspace = true }
14+
zstd = "0.13"
1415
futures = { workspace = true }
1516
ouroboros = { workspace = true }
1617
prometheus-client = { workspace = true }

crates/hotblocks/src/api.rs

Lines changed: 37 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -160,9 +160,16 @@ impl IntoResponse for ResponseWithMetadata {
160160
async fn stream(
161161
Extension(app): Extension<AppRef>,
162162
Path(dataset_id): Path<DatasetId>,
163-
Json(query): Json<Query>,
163+
req: Request,
164164
) -> impl IntoResponse {
165-
let response = stream_internal(app, dataset_id, query, false).await;
165+
let (encoding, query) = match extract_encoding_and_body::<Query>(req).await {
166+
Ok(v) => v,
167+
Err(e) => return ResponseWithMetadata::new()
168+
.with_dataset_id(dataset_id)
169+
.with_endpoint("/stream")
170+
.with_response(|| e),
171+
};
172+
let response = stream_internal(app, dataset_id, query, false, encoding).await;
166173
ResponseWithMetadata::new()
167174
.with_dataset_id(dataset_id)
168175
.with_endpoint("/stream")
@@ -172,20 +179,43 @@ async fn stream(
172179
async fn finalized_stream(
173180
Extension(app): Extension<AppRef>,
174181
Path(dataset_id): Path<DatasetId>,
175-
Json(query): Json<Query>,
182+
req: Request,
176183
) -> impl IntoResponse {
177-
let response = stream_internal(app, dataset_id, query, true).await;
184+
let (encoding, query) = match extract_encoding_and_body::<Query>(req).await {
185+
Ok(v) => v,
186+
Err(e) => return ResponseWithMetadata::new()
187+
.with_dataset_id(dataset_id)
188+
.with_endpoint("/finalized_stream")
189+
.with_response(|| e),
190+
};
191+
let response = stream_internal(app, dataset_id, query, true, encoding).await;
178192
ResponseWithMetadata::new()
179193
.with_dataset_id(dataset_id)
180194
.with_endpoint("/finalized_stream")
181195
.with_response(|| response)
182196
}
183197

198+
use crate::encoding::ContentEncoding;
199+
200+
async fn extract_encoding_and_body<T: serde::de::DeserializeOwned>(req: Request) -> Result<(ContentEncoding, T), Response> {
201+
let accept = req.headers()
202+
.get("accept-encoding")
203+
.and_then(|v| v.to_str().ok());
204+
let encoding = ContentEncoding::from_accept_encoding(accept)
205+
.unwrap_or(ContentEncoding::Gzip);
206+
let body = axum::body::to_bytes(req.into_body(), 1024 * 1024).await
207+
.map_err(|_| text!(StatusCode::BAD_REQUEST, "failed to read body"))?;
208+
let query: T = serde_json::from_slice(&body)
209+
.map_err(|e| text!(StatusCode::BAD_REQUEST, "{}", e))?;
210+
Ok((encoding, query))
211+
}
212+
184213
async fn stream_internal(
185214
app: AppRef,
186215
dataset_id: DatasetId,
187216
query: Query,
188217
finalized: bool,
218+
encoding: ContentEncoding,
189219
) -> Response {
190220
let dataset = get_dataset!(app, dataset_id);
191221

@@ -194,17 +224,17 @@ async fn stream_internal(
194224
}
195225

196226
let query_result = if finalized {
197-
app.query_service.query_finalized(&dataset, query).await
227+
app.query_service.query_finalized(&dataset, query, encoding).await
198228
} else {
199-
app.query_service.query(&dataset, query).await
229+
app.query_service.query(&dataset, query, encoding).await
200230
};
201231

202232
match query_result {
203233
Ok(stream) => {
204234
let mut res = Response::builder()
205235
.status(200)
206236
.header("content-type", "text/plain")
207-
.header("content-encoding", "gzip");
237+
.header("content-encoding", encoding.as_str());
208238

209239
if let Some(finalized_head) = stream.finalized_head() {
210240
if finalized {

crates/hotblocks/src/encoding.rs

Lines changed: 166 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,166 @@
1+
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2+
pub enum ContentEncoding {
3+
Gzip,
4+
Zstd,
5+
}
6+
7+
impl ContentEncoding {
8+
pub fn as_str(&self) -> &'static str {
9+
match self {
10+
ContentEncoding::Gzip => "gzip",
11+
ContentEncoding::Zstd => "zstd",
12+
}
13+
}
14+
15+
/// Parse Accept-Encoding header respecting quality values.
16+
/// See <https://datatracker.ietf.org/doc/html/rfc7231#section-5.3.4>
17+
/// At equal q, prefers zstd over gzip.
18+
/// Returns None if neither gzip nor zstd is accepted.
19+
pub fn from_accept_encoding(header: Option<&str>) -> Option<Self> {
20+
let header = header?;
21+
22+
let mut best: Option<Self> = None;
23+
let mut best_q: f32 = -1.0;
24+
25+
for part in header.split(',') {
26+
let part = part.trim();
27+
let (name, q) = if let Some((name, params)) = part.split_once(';') {
28+
let q = params
29+
.split(';')
30+
.find_map(|p| p.trim().strip_prefix("q="))
31+
.and_then(|v| v.trim().parse::<f32>().ok())
32+
.unwrap_or(1.0);
33+
(name.trim(), q)
34+
} else {
35+
(part, 1.0)
36+
};
37+
38+
if q <= 0.0 {
39+
continue;
40+
}
41+
42+
let encoding = match name {
43+
"zstd" => Some(ContentEncoding::Zstd),
44+
"gzip" => Some(ContentEncoding::Gzip),
45+
_ => None,
46+
};
47+
48+
if let Some(enc) = encoding {
49+
if q > best_q || (q == best_q && enc == ContentEncoding::Zstd) {
50+
best_q = q;
51+
best = Some(enc);
52+
}
53+
}
54+
}
55+
56+
best
57+
}
58+
}
59+
60+
#[cfg(test)]
61+
mod tests {
62+
use super::*;
63+
64+
#[test]
65+
fn none_header() {
66+
assert_eq!(ContentEncoding::from_accept_encoding(None), None);
67+
}
68+
69+
#[test]
70+
fn empty_header() {
71+
assert_eq!(ContentEncoding::from_accept_encoding(Some("")), None);
72+
}
73+
74+
#[test]
75+
fn only_zstd() {
76+
assert_eq!(
77+
ContentEncoding::from_accept_encoding(Some("zstd")),
78+
Some(ContentEncoding::Zstd)
79+
);
80+
}
81+
82+
#[test]
83+
fn only_gzip() {
84+
assert_eq!(
85+
ContentEncoding::from_accept_encoding(Some("gzip")),
86+
Some(ContentEncoding::Gzip)
87+
);
88+
}
89+
90+
#[test]
91+
fn zstd_and_gzip_equal_q_prefers_zstd() {
92+
assert_eq!(
93+
ContentEncoding::from_accept_encoding(Some("gzip, zstd")),
94+
Some(ContentEncoding::Zstd)
95+
);
96+
assert_eq!(
97+
ContentEncoding::from_accept_encoding(Some("zstd, gzip")),
98+
Some(ContentEncoding::Zstd)
99+
);
100+
}
101+
102+
#[test]
103+
fn gzip_higher_q() {
104+
assert_eq!(
105+
ContentEncoding::from_accept_encoding(Some("gzip;q=1.0, zstd;q=0.5")),
106+
Some(ContentEncoding::Gzip)
107+
);
108+
}
109+
110+
#[test]
111+
fn zstd_higher_q() {
112+
assert_eq!(
113+
ContentEncoding::from_accept_encoding(Some("gzip;q=0.5, zstd;q=1.0")),
114+
Some(ContentEncoding::Zstd)
115+
);
116+
}
117+
118+
#[test]
119+
fn unsupported_encodings_only() {
120+
assert_eq!(
121+
ContentEncoding::from_accept_encoding(Some("deflate, br")),
122+
None
123+
);
124+
}
125+
126+
#[test]
127+
fn mixed_with_unsupported() {
128+
assert_eq!(
129+
ContentEncoding::from_accept_encoding(Some("br, deflate, gzip;q=0.8")),
130+
Some(ContentEncoding::Gzip)
131+
);
132+
}
133+
134+
#[test]
135+
fn q_zero_excluded() {
136+
assert_eq!(
137+
ContentEncoding::from_accept_encoding(Some("gzip;q=0, zstd")),
138+
Some(ContentEncoding::Zstd)
139+
);
140+
}
141+
142+
#[test]
143+
fn q_zero_both() {
144+
assert_eq!(
145+
ContentEncoding::from_accept_encoding(Some("gzip;q=0, zstd;q=0")),
146+
None
147+
);
148+
}
149+
150+
#[test]
151+
fn whitespace_handling() {
152+
assert_eq!(
153+
ContentEncoding::from_accept_encoding(Some(" gzip ; q=0.5 , zstd ; q=0.9 ")),
154+
Some(ContentEncoding::Zstd)
155+
);
156+
}
157+
158+
#[test]
159+
fn wildcard_ignored() {
160+
// We don't handle *, just pick from what we support
161+
assert_eq!(
162+
ContentEncoding::from_accept_encoding(Some("*, gzip;q=0.5")),
163+
Some(ContentEncoding::Gzip)
164+
);
165+
}
166+
}

crates/hotblocks/src/main.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ mod cli;
33
mod data_service;
44
mod dataset_config;
55
mod dataset_controller;
6+
mod encoding;
67
mod errors;
78
mod metrics;
89
mod query;

crates/hotblocks/src/query/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,4 +6,5 @@ mod static_snapshot;
66

77
pub use executor::QueryExecutorCollector;
88
pub use response::*;
9+
910
pub use service::*;

crates/hotblocks/src/query/response.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ impl QueryResponse {
8181
query: Query,
8282
only_finalized: bool,
8383
time_limit: Option<Duration>,
84+
encoding: crate::encoding::ContentEncoding,
8485
) -> anyhow::Result<Self> {
8586
let Some(slot) = executor.get_slot() else {
8687
bail!(Busy)
@@ -90,7 +91,7 @@ impl QueryResponse {
9091
let mut runner = slot
9192
.run(move |slot| -> anyhow::Result<_> {
9293
let mut runner =
93-
RunningQuery::new(db, dataset_id, &query, only_finalized).map(Box::new)?;
94+
RunningQuery::new(db, dataset_id, &query, only_finalized, encoding).map(Box::new)?;
9495
next_run(&mut runner, slot)?;
9596
Ok(runner)
9697
})

0 commit comments

Comments
 (0)