Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/hotblocks/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ bytes = { workspace = true }
chrono = { workspace = true, features = ["std"] }
clap = { workspace = true, features = ["derive"] }
flate2 = { workspace = true }
zstd = "0.13"
futures = { workspace = true }
ouroboros = { workspace = true }
prometheus-client = { workspace = true }
Expand Down
27 changes: 22 additions & 5 deletions crates/hotblocks/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,9 +160,11 @@ impl IntoResponse for ResponseWithMetadata {
async fn stream(
Extension(app): Extension<AppRef>,
Path(dataset_id): Path<DatasetId>,
headers: HeaderMap,
Json(query): Json<Query>,
) -> impl IntoResponse {
let response = stream_internal(app, dataset_id, query, false).await;
let encoding = encoding_from_headers(&headers);
let response = stream_internal(app, dataset_id, query, false, encoding).await;
ResponseWithMetadata::new()
.with_dataset_id(dataset_id)
.with_endpoint("/stream")
Expand All @@ -172,20 +174,34 @@ async fn stream(
async fn finalized_stream(
Extension(app): Extension<AppRef>,
Path(dataset_id): Path<DatasetId>,
headers: HeaderMap,
Json(query): Json<Query>,
) -> impl IntoResponse {
let response = stream_internal(app, dataset_id, query, true).await;
let encoding = encoding_from_headers(&headers);
let response = stream_internal(app, dataset_id, query, true, encoding).await;
ResponseWithMetadata::new()
.with_dataset_id(dataset_id)
.with_endpoint("/finalized_stream")
.with_response(|| response)
}

use axum::http::HeaderMap;
use crate::encoding::ContentEncoding;

fn encoding_from_headers(headers: &HeaderMap) -> ContentEncoding {
let accept = headers
.get("accept-encoding")
.and_then(|v| v.to_str().ok());
ContentEncoding::from_accept_encoding(accept)
.unwrap_or(ContentEncoding::Gzip)
}

async fn stream_internal(
app: AppRef,
dataset_id: DatasetId,
query: Query,
finalized: bool,
encoding: ContentEncoding,
) -> Response {
let dataset = get_dataset!(app, dataset_id);

Expand All @@ -194,17 +210,18 @@ async fn stream_internal(
}

let query_result = if finalized {
app.query_service.query_finalized(&dataset, query).await
app.query_service.query_finalized(&dataset, query, encoding).await
} else {
app.query_service.query(&dataset, query).await
app.query_service.query(&dataset, query, encoding).await
};

match query_result {
Ok(stream) => {
let mut res = Response::builder()
.status(200)
.header("content-type", "text/plain")
.header("content-encoding", "gzip");
.header("content-encoding", encoding.as_str())
.header("vary", "Accept-Encoding");

if let Some(finalized_head) = stream.finalized_head() {
if finalized {
Expand Down
166 changes: 166 additions & 0 deletions crates/hotblocks/src/encoding.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ContentEncoding {
Gzip,
Zstd,
}

impl ContentEncoding {
pub fn as_str(&self) -> &'static str {
match self {
ContentEncoding::Gzip => "gzip",
ContentEncoding::Zstd => "zstd",
}
}

/// Parse Accept-Encoding header respecting quality values.
/// See <https://datatracker.ietf.org/doc/html/rfc7231#section-5.3.4>
/// At equal q, prefers zstd over gzip.
/// Returns None if neither gzip nor zstd is accepted.
pub fn from_accept_encoding(header: Option<&str>) -> Option<Self> {
let header = header?;

let mut best: Option<Self> = None;
let mut best_q: f32 = -1.0;

for part in header.split(',') {
let part = part.trim();
let (name, q) = if let Some((name, params)) = part.split_once(';') {
let q = params
.split(';')
.find_map(|p| p.trim().strip_prefix("q="))
.and_then(|v| v.trim().parse::<f32>().ok())
.unwrap_or(1.0);
(name.trim(), q)
} else {
(part, 1.0)
};

if q <= 0.0 {
continue;
}

let encoding = match name {
"zstd" => Some(ContentEncoding::Zstd),
"gzip" => Some(ContentEncoding::Gzip),
_ => None,
};

if let Some(enc) = encoding {
if q > best_q || (q == best_q && enc == ContentEncoding::Zstd) {
best_q = q;
best = Some(enc);
}
Comment on lines +48 to +52
Copy link

Copilot AI Mar 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tie-breaking uses q == best_q on f32, which can be unreliable due to floating point representation (two decimal strings that should be equal may compare unequal after parsing). To make tie-breaking deterministic, consider parsing q into an integer scale (e.g. 0..=1000) or using an epsilon/ordered representation.

Copilot uses AI. Check for mistakes.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RFC q-values have at most 3 decimal digits. f32 represents all values with ≤3 decimal digits exactly. Not a practical concern.

}
}

best
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn none_header() {
assert_eq!(ContentEncoding::from_accept_encoding(None), None);
}

#[test]
fn empty_header() {
assert_eq!(ContentEncoding::from_accept_encoding(Some("")), None);
}

#[test]
fn only_zstd() {
assert_eq!(
ContentEncoding::from_accept_encoding(Some("zstd")),
Some(ContentEncoding::Zstd)
);
}

#[test]
fn only_gzip() {
assert_eq!(
ContentEncoding::from_accept_encoding(Some("gzip")),
Some(ContentEncoding::Gzip)
);
}

#[test]
fn zstd_and_gzip_equal_q_prefers_zstd() {
assert_eq!(
ContentEncoding::from_accept_encoding(Some("gzip, zstd")),
Some(ContentEncoding::Zstd)
);
assert_eq!(
ContentEncoding::from_accept_encoding(Some("zstd, gzip")),
Some(ContentEncoding::Zstd)
);
}

#[test]
fn gzip_higher_q() {
assert_eq!(
ContentEncoding::from_accept_encoding(Some("gzip;q=1.0, zstd;q=0.5")),
Some(ContentEncoding::Gzip)
);
}

#[test]
fn zstd_higher_q() {
assert_eq!(
ContentEncoding::from_accept_encoding(Some("gzip;q=0.5, zstd;q=1.0")),
Some(ContentEncoding::Zstd)
);
}

#[test]
fn unsupported_encodings_only() {
assert_eq!(
ContentEncoding::from_accept_encoding(Some("deflate, br")),
None
);
}

#[test]
fn mixed_with_unsupported() {
assert_eq!(
ContentEncoding::from_accept_encoding(Some("br, deflate, gzip;q=0.8")),
Some(ContentEncoding::Gzip)
);
}

#[test]
fn q_zero_excluded() {
assert_eq!(
ContentEncoding::from_accept_encoding(Some("gzip;q=0, zstd")),
Some(ContentEncoding::Zstd)
);
}

#[test]
fn q_zero_both() {
assert_eq!(
ContentEncoding::from_accept_encoding(Some("gzip;q=0, zstd;q=0")),
None
);
}

#[test]
fn whitespace_handling() {
assert_eq!(
ContentEncoding::from_accept_encoding(Some(" gzip ; q=0.5 , zstd ; q=0.9 ")),
Some(ContentEncoding::Zstd)
);
}

#[test]
fn wildcard_ignored() {
// We don't handle *, just pick from what we support
assert_eq!(
ContentEncoding::from_accept_encoding(Some("*, gzip;q=0.5")),
Some(ContentEncoding::Gzip)
);
}
}
1 change: 1 addition & 0 deletions crates/hotblocks/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ mod cli;
mod data_service;
mod dataset_config;
mod dataset_controller;
mod encoding;
mod errors;
mod metrics;
mod query;
Expand Down
1 change: 1 addition & 0 deletions crates/hotblocks/src/query/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,5 @@ mod static_snapshot;

pub use executor::QueryExecutorCollector;
pub use response::*;

pub use service::*;
3 changes: 2 additions & 1 deletion crates/hotblocks/src/query/response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ impl QueryResponse {
query: Query,
only_finalized: bool,
time_limit: Option<Duration>,
encoding: crate::encoding::ContentEncoding,
) -> anyhow::Result<Self> {
let Some(slot) = executor.get_slot() else {
bail!(Busy)
Expand All @@ -90,7 +91,7 @@ impl QueryResponse {
let mut runner = slot
.run(move |slot| -> anyhow::Result<_> {
let mut runner =
RunningQuery::new(db, dataset_id, &query, only_finalized).map(Box::new)?;
RunningQuery::new(db, dataset_id, &query, only_finalized, encoding).map(Box::new)?;
next_run(&mut runner, slot)?;
Ok(runner)
})
Expand Down
Loading
Loading