diff --git a/Cargo.lock b/Cargo.lock index 754ce07..f6a4560 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4915,6 +4915,7 @@ dependencies = [ "tracing", "tracing-subscriber", "url", + "zstd", ] [[package]] diff --git a/crates/hotblocks/Cargo.toml b/crates/hotblocks/Cargo.toml index cd9a93c..37667ae 100644 --- a/crates/hotblocks/Cargo.toml +++ b/crates/hotblocks/Cargo.toml @@ -11,6 +11,7 @@ bytes = { workspace = true } chrono = { workspace = true, features = ["std"] } clap = { workspace = true, features = ["derive"] } flate2 = { workspace = true } +zstd = "0.13" futures = { workspace = true } ouroboros = { workspace = true } prometheus-client = { workspace = true } diff --git a/crates/hotblocks/src/api.rs b/crates/hotblocks/src/api.rs index 825ad9c..2a4c68d 100644 --- a/crates/hotblocks/src/api.rs +++ b/crates/hotblocks/src/api.rs @@ -160,9 +160,11 @@ impl IntoResponse for ResponseWithMetadata { async fn stream( Extension(app): Extension, Path(dataset_id): Path, + headers: HeaderMap, Json(query): Json, ) -> impl IntoResponse { - let response = stream_internal(app, dataset_id, query, false).await; + let encoding = encoding_from_headers(&headers); + let response = stream_internal(app, dataset_id, query, false, encoding).await; ResponseWithMetadata::new() .with_dataset_id(dataset_id) .with_endpoint("/stream") @@ -172,20 +174,34 @@ async fn stream( async fn finalized_stream( Extension(app): Extension, Path(dataset_id): Path, + headers: HeaderMap, Json(query): Json, ) -> impl IntoResponse { - let response = stream_internal(app, dataset_id, query, true).await; + let encoding = encoding_from_headers(&headers); + let response = stream_internal(app, dataset_id, query, true, encoding).await; ResponseWithMetadata::new() .with_dataset_id(dataset_id) .with_endpoint("/finalized_stream") .with_response(|| response) } +use axum::http::HeaderMap; +use crate::encoding::ContentEncoding; + +fn encoding_from_headers(headers: &HeaderMap) -> ContentEncoding { + let accept = headers + .get("accept-encoding") + .and_then(|v| v.to_str().ok()); + ContentEncoding::from_accept_encoding(accept) + .unwrap_or(ContentEncoding::Gzip) +} + async fn stream_internal( app: AppRef, dataset_id: DatasetId, query: Query, finalized: bool, + encoding: ContentEncoding, ) -> Response { let dataset = get_dataset!(app, dataset_id); @@ -194,9 +210,9 @@ async fn stream_internal( } let query_result = if finalized { - app.query_service.query_finalized(&dataset, query).await + app.query_service.query_finalized(&dataset, query, encoding).await } else { - app.query_service.query(&dataset, query).await + app.query_service.query(&dataset, query, encoding).await }; match query_result { @@ -204,7 +220,8 @@ async fn stream_internal( let mut res = Response::builder() .status(200) .header("content-type", "text/plain") - .header("content-encoding", "gzip"); + .header("content-encoding", encoding.as_str()) + .header("vary", "Accept-Encoding"); if let Some(finalized_head) = stream.finalized_head() { if finalized { diff --git a/crates/hotblocks/src/encoding.rs b/crates/hotblocks/src/encoding.rs new file mode 100644 index 0000000..f020afa --- /dev/null +++ b/crates/hotblocks/src/encoding.rs @@ -0,0 +1,166 @@ +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum ContentEncoding { + Gzip, + Zstd, +} + +impl ContentEncoding { + pub fn as_str(&self) -> &'static str { + match self { + ContentEncoding::Gzip => "gzip", + ContentEncoding::Zstd => "zstd", + } + } + + /// Parse Accept-Encoding header respecting quality values. + /// See + /// At equal q, prefers zstd over gzip. + /// Returns None if neither gzip nor zstd is accepted. + pub fn from_accept_encoding(header: Option<&str>) -> Option { + let header = header?; + + let mut best: Option = None; + let mut best_q: f32 = -1.0; + + for part in header.split(',') { + let part = part.trim(); + let (name, q) = if let Some((name, params)) = part.split_once(';') { + let q = params + .split(';') + .find_map(|p| p.trim().strip_prefix("q=")) + .and_then(|v| v.trim().parse::().ok()) + .unwrap_or(1.0); + (name.trim(), q) + } else { + (part, 1.0) + }; + + if q <= 0.0 { + continue; + } + + let encoding = match name { + "zstd" => Some(ContentEncoding::Zstd), + "gzip" => Some(ContentEncoding::Gzip), + _ => None, + }; + + if let Some(enc) = encoding { + if q > best_q || (q == best_q && enc == ContentEncoding::Zstd) { + best_q = q; + best = Some(enc); + } + } + } + + best + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn none_header() { + assert_eq!(ContentEncoding::from_accept_encoding(None), None); + } + + #[test] + fn empty_header() { + assert_eq!(ContentEncoding::from_accept_encoding(Some("")), None); + } + + #[test] + fn only_zstd() { + assert_eq!( + ContentEncoding::from_accept_encoding(Some("zstd")), + Some(ContentEncoding::Zstd) + ); + } + + #[test] + fn only_gzip() { + assert_eq!( + ContentEncoding::from_accept_encoding(Some("gzip")), + Some(ContentEncoding::Gzip) + ); + } + + #[test] + fn zstd_and_gzip_equal_q_prefers_zstd() { + assert_eq!( + ContentEncoding::from_accept_encoding(Some("gzip, zstd")), + Some(ContentEncoding::Zstd) + ); + assert_eq!( + ContentEncoding::from_accept_encoding(Some("zstd, gzip")), + Some(ContentEncoding::Zstd) + ); + } + + #[test] + fn gzip_higher_q() { + assert_eq!( + ContentEncoding::from_accept_encoding(Some("gzip;q=1.0, zstd;q=0.5")), + Some(ContentEncoding::Gzip) + ); + } + + #[test] + fn zstd_higher_q() { + assert_eq!( + ContentEncoding::from_accept_encoding(Some("gzip;q=0.5, zstd;q=1.0")), + Some(ContentEncoding::Zstd) + ); + } + + #[test] + fn unsupported_encodings_only() { + assert_eq!( + ContentEncoding::from_accept_encoding(Some("deflate, br")), + None + ); + } + + #[test] + fn mixed_with_unsupported() { + assert_eq!( + ContentEncoding::from_accept_encoding(Some("br, deflate, gzip;q=0.8")), + Some(ContentEncoding::Gzip) + ); + } + + #[test] + fn q_zero_excluded() { + assert_eq!( + ContentEncoding::from_accept_encoding(Some("gzip;q=0, zstd")), + Some(ContentEncoding::Zstd) + ); + } + + #[test] + fn q_zero_both() { + assert_eq!( + ContentEncoding::from_accept_encoding(Some("gzip;q=0, zstd;q=0")), + None + ); + } + + #[test] + fn whitespace_handling() { + assert_eq!( + ContentEncoding::from_accept_encoding(Some(" gzip ; q=0.5 , zstd ; q=0.9 ")), + Some(ContentEncoding::Zstd) + ); + } + + #[test] + fn wildcard_ignored() { + // We don't handle *, just pick from what we support + assert_eq!( + ContentEncoding::from_accept_encoding(Some("*, gzip;q=0.5")), + Some(ContentEncoding::Gzip) + ); + } +} diff --git a/crates/hotblocks/src/main.rs b/crates/hotblocks/src/main.rs index 3f802ca..fe5c18e 100644 --- a/crates/hotblocks/src/main.rs +++ b/crates/hotblocks/src/main.rs @@ -3,6 +3,7 @@ mod cli; mod data_service; mod dataset_config; mod dataset_controller; +mod encoding; mod errors; mod metrics; mod query; diff --git a/crates/hotblocks/src/query/mod.rs b/crates/hotblocks/src/query/mod.rs index 028f2ff..3cd8f39 100644 --- a/crates/hotblocks/src/query/mod.rs +++ b/crates/hotblocks/src/query/mod.rs @@ -6,4 +6,5 @@ mod static_snapshot; pub use executor::QueryExecutorCollector; pub use response::*; + pub use service::*; diff --git a/crates/hotblocks/src/query/response.rs b/crates/hotblocks/src/query/response.rs index 46bb7f1..b136212 100644 --- a/crates/hotblocks/src/query/response.rs +++ b/crates/hotblocks/src/query/response.rs @@ -81,6 +81,7 @@ impl QueryResponse { query: Query, only_finalized: bool, time_limit: Option, + encoding: crate::encoding::ContentEncoding, ) -> anyhow::Result { let Some(slot) = executor.get_slot() else { bail!(Busy) @@ -90,7 +91,7 @@ impl QueryResponse { let mut runner = slot .run(move |slot| -> anyhow::Result<_> { let mut runner = - RunningQuery::new(db, dataset_id, &query, only_finalized).map(Box::new)?; + RunningQuery::new(db, dataset_id, &query, only_finalized, encoding).map(Box::new)?; next_run(&mut runner, slot)?; Ok(runner) }) diff --git a/crates/hotblocks/src/query/running.rs b/crates/hotblocks/src/query/running.rs index 3476cb1..5dd5ad3 100644 --- a/crates/hotblocks/src/query/running.rs +++ b/crates/hotblocks/src/query/running.rs @@ -7,6 +7,7 @@ use anyhow::{anyhow, bail, ensure}; use bytes::{BufMut, Bytes, BytesMut}; use flate2::Compression; use flate2::write::GzEncoder; +use zstd::stream::write::Encoder as ZstdEncoder; use sqd_primitives::{BlockNumber, BlockRef}; use sqd_query::{JsonLinesWriter, Plan, Query}; use sqd_storage::db::{Chunk as StorageChunk, DatasetId}; @@ -49,6 +50,60 @@ impl RunningQueryStats { } } +use crate::encoding::ContentEncoding; + +enum Compressor { + Gzip(GzEncoder>), + Zstd(ZstdEncoder<'static, bytes::buf::Writer>), +} + +impl Write for Compressor { + fn write(&mut self, buf: &[u8]) -> std::io::Result { + match self { + Compressor::Gzip(e) => e.write(buf), + Compressor::Zstd(e) => e.write(buf), + } + } + + fn flush(&mut self) -> std::io::Result<()> { + match self { + Compressor::Gzip(e) => e.flush(), + Compressor::Zstd(e) => e.flush(), + } + } +} + +impl Compressor { + fn new(encoding: ContentEncoding) -> anyhow::Result { + let writer = BytesMut::new().writer(); + Ok(match encoding { + ContentEncoding::Gzip => Compressor::Gzip(GzEncoder::new(writer, Compression::fast())), + ContentEncoding::Zstd => Compressor::Zstd(ZstdEncoder::new(writer, 1)?), + }) + } + + fn get_ref(&self) -> &BytesMut { + match self { + Compressor::Gzip(e) => e.get_ref().get_ref(), + Compressor::Zstd(e) => e.get_ref().get_ref(), + } + } + + fn get_mut(&mut self) -> &mut BytesMut { + match self { + Compressor::Gzip(e) => e.get_mut().get_mut(), + Compressor::Zstd(e) => e.get_mut().get_mut(), + } + } + + fn finish(self) -> BytesMut { + match self { + Compressor::Gzip(e) => e.finish().expect("IO errors are not possible").into_inner(), + Compressor::Zstd(e) => e.finish().expect("IO errors are not possible").into_inner(), + } + } +} + pub struct RunningQuery { plan: Plan, last_block: Option, @@ -56,7 +111,8 @@ pub struct RunningQuery { next_chunk: Option>, chunk_iterator: StaticChunkIterator, finalized_head: Option, - buf: GzEncoder>, + encoding: ContentEncoding, + buf: Compressor, stats: RunningQueryStats, } @@ -66,6 +122,7 @@ impl RunningQuery { dataset_id: DatasetId, query: &Query, only_finalized: bool, + encoding: ContentEncoding, ) -> anyhow::Result { let snapshot = StaticSnapshot::new(db); @@ -147,7 +204,8 @@ impl RunningQuery { next_chunk: Some(Ok(first_chunk)), chunk_iterator, finalized_head, - buf: GzEncoder::new(BytesMut::new().writer(), Compression::fast()), + encoding, + buf: Compressor::new(encoding)?, stats, }) } @@ -160,20 +218,20 @@ impl RunningQuery { &self.stats } + pub fn encoding(&self) -> ContentEncoding { + self.encoding + } + pub fn buffered_bytes(&self) -> usize { - self.buf.get_ref().get_ref().len() + self.buf.get_ref().len() } pub fn take_buffered_bytes(&mut self) -> Bytes { - self.buf.get_mut().get_mut().split().freeze() + self.buf.get_mut().split().freeze() } pub fn finish(self) -> Bytes { - self.buf - .finish() - .expect("IO errors are not possible") - .into_inner() - .freeze() + self.buf.finish().freeze() } pub fn has_next_chunk(&self) -> bool { diff --git a/crates/hotblocks/src/query/service.rs b/crates/hotblocks/src/query/service.rs index 9c1b3c2..39729fd 100644 --- a/crates/hotblocks/src/query/service.rs +++ b/crates/hotblocks/src/query/service.rs @@ -82,16 +82,18 @@ impl QueryService { &self, dataset: &DatasetController, query: Query, + encoding: crate::encoding::ContentEncoding, ) -> anyhow::Result { - self.query_internal(dataset, query, false).await + self.query_internal(dataset, query, false, encoding).await } pub async fn query_finalized( &self, dataset: &DatasetController, query: Query, + encoding: crate::encoding::ContentEncoding, ) -> anyhow::Result { - self.query_internal(dataset, query, true).await + self.query_internal(dataset, query, true, encoding).await } async fn query_internal( @@ -99,6 +101,7 @@ impl QueryService { dataset: &DatasetController, query: Query, finalized: bool, + encoding: crate::encoding::ContentEncoding, ) -> anyhow::Result { ensure!( dataset.dataset_kind() == DatasetKind::from_query(&query), @@ -154,7 +157,8 @@ impl QueryService { dataset.dataset_id(), query, finalized, - None + None, + encoding, ).await }