Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
231 changes: 92 additions & 139 deletions crates/common/src/streaming_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@
//! - UTF-8 boundary handling

use error_stack::{Report, ResultExt};
use std::cell::RefCell;
use std::io::{self, Read, Write};
use std::rc::Rc;

use crate::error::TrustedServerError;

Expand Down Expand Up @@ -189,39 +191,10 @@
use flate2::write::GzEncoder;
use flate2::Compression;

// Decompress input
let mut decoder = GzDecoder::new(input);
let mut decompressed = Vec::new();
decoder
.read_to_end(&mut decompressed)
.change_context(TrustedServerError::Proxy {
message: "Failed to decompress gzip".to_string(),
})?;

log::info!("Decompressed size: {} bytes", decompressed.len());
let decoder = GzDecoder::new(input);
let encoder = GzEncoder::new(output, Compression::default());

// Process the decompressed content
let processed = self
.processor
.process_chunk(&decompressed, true)
.change_context(TrustedServerError::Proxy {
message: "Failed to process content".to_string(),
})?;

log::info!("Processed size: {} bytes", processed.len());

// Recompress the output
let mut encoder = GzEncoder::new(output, Compression::default());
encoder
.write_all(&processed)
.change_context(TrustedServerError::Proxy {
message: "Failed to write to gzip encoder".to_string(),
})?;
encoder.finish().change_context(TrustedServerError::Proxy {
message: "Failed to finish gzip encoder".to_string(),
})?;

Ok(())
self.process_through_compression(decoder, encoder)
}

/// Decompress input, process content, and write uncompressed output.
Expand Down Expand Up @@ -393,81 +366,73 @@
}
}

/// Adapter to use `lol_html` `HtmlRewriter` as a `StreamProcessor`
/// Important: Due to `lol_html`'s ownership model, we must accumulate input
/// and process it all at once when the stream ends. This is a limitation
/// of the `lol_html` library's API design.
/// Output sink that writes lol_html output chunks into a shared `Rc<RefCell<Vec<u8>>>` buffer.

Check failure on line 369 in crates/common/src/streaming_processor.rs

View workflow job for this annotation

GitHub Actions / cargo fmt

item in documentation is missing backticks
struct RcVecSink(Rc<RefCell<Vec<u8>>>);

impl lol_html::OutputSink for RcVecSink {
fn handle_chunk(&mut self, chunk: &[u8]) {
self.0.borrow_mut().extend_from_slice(chunk);
}
}

/// Adapter to use `lol_html` `HtmlRewriter` as a `StreamProcessor`.
///
/// Uses lol_html's incremental streaming API: each incoming chunk is written to

Check failure on line 380 in crates/common/src/streaming_processor.rs

View workflow job for this annotation

GitHub Actions / cargo fmt

item in documentation is missing backticks
/// the rewriter immediately, and whatever output lol_html has ready is drained

Check failure on line 381 in crates/common/src/streaming_processor.rs

View workflow job for this annotation

GitHub Actions / cargo fmt

item in documentation is missing backticks
/// and returned. This avoids buffering the full document before processing begins.
pub struct HtmlRewriterAdapter {
settings: lol_html::Settings<'static, 'static>,
accumulated_input: Vec<u8>,
rewriter: Option<lol_html::HtmlRewriter<'static, RcVecSink>>,
output: Rc<RefCell<Vec<u8>>>,
}

impl HtmlRewriterAdapter {
/// Create a new HTML rewriter adapter
/// Create a new HTML rewriter adapter.
#[must_use]
pub fn new(settings: lol_html::Settings<'static, 'static>) -> Self {
let output = Rc::new(RefCell::new(Vec::new()));
let rewriter = lol_html::HtmlRewriter::new(settings, RcVecSink(Rc::clone(&output)));
Self {
settings,
accumulated_input: Vec::new(),
rewriter: Some(rewriter),
output,
}
}
}

impl StreamProcessor for HtmlRewriterAdapter {
fn process_chunk(&mut self, chunk: &[u8], is_last: bool) -> Result<Vec<u8>, io::Error> {
// Accumulate input chunks
self.accumulated_input.extend_from_slice(chunk);

if !chunk.is_empty() {
log::debug!(
"Buffering chunk: {} bytes, total buffered: {} bytes",
chunk.len(),
self.accumulated_input.len()
);
if let Some(rewriter) = &mut self.rewriter {
if !chunk.is_empty() {
rewriter.write(chunk).map_err(|e| {
log::error!("Failed to write HTML chunk: {}", e);
io::Error::other(format!("HTML processing failed: {}", e))
})?;
}
}

// Only process when we have all the input
if is_last {
log::info!(
"Processing complete document: {} bytes",
self.accumulated_input.len()
);

// Process all accumulated input at once
let mut output = Vec::new();

// Create rewriter with output sink
let mut rewriter = lol_html::HtmlRewriter::new(
std::mem::take(&mut self.settings),
|chunk: &[u8]| {
output.extend_from_slice(chunk);
},
);

// Process the entire document
rewriter.write(&self.accumulated_input).map_err(|e| {
log::error!("Failed to process HTML: {}", e);
io::Error::other(format!("HTML processing failed: {}", e))
})?;

// Finalize the rewriter
rewriter.end().map_err(|e| {
log::error!("Failed to finalize: {}", e);
io::Error::other(format!("HTML finalization failed: {}", e))
})?;

log::debug!("Output size: {} bytes", output.len());
self.accumulated_input.clear();
Ok(output)
} else {
// Return empty until we have all input
// This is a limitation of lol_html's API
Ok(Vec::new())
if let Some(rewriter) = self.rewriter.take() {
rewriter.end().map_err(|e| {
log::error!("Failed to finalize HTML rewriter: {}", e);
io::Error::other(format!("HTML finalization failed: {}", e))
})?;
}
}

// Drain whatever lol_html produced for this chunk and return it.
let result = std::mem::take(&mut *self.output.borrow_mut());
log::debug!(
"HtmlRewriterAdapter::process_chunk: input={} bytes, output={} bytes, is_last={}",
chunk.len(),
result.len(),
is_last
);
Ok(result)
}

fn reset(&mut self) {
self.accumulated_input.clear();
// The rewriter is consumed after end(); a new HtmlRewriterAdapter should
// be created per document. Clear any remaining output buffer.
self.output.borrow_mut().clear();
}
}

Expand Down Expand Up @@ -534,7 +499,7 @@
}

#[test]
fn test_html_rewriter_adapter_accumulates_until_last() {
fn test_html_rewriter_adapter_streams_incrementally() {
use lol_html::{element, Settings};

// Create a simple HTML rewriter that replaces text
Expand All @@ -548,30 +513,32 @@

let mut adapter = HtmlRewriterAdapter::new(settings);

// Test that intermediate chunks return empty
// Collect all output across chunks; the rewriter may emit partial output at any point.
let mut full_output = Vec::new();

let chunk1 = b"<html><body>";
let result1 = adapter
.process_chunk(chunk1, false)
.expect("should process chunk1");
assert_eq!(result1.len(), 0, "Should return empty for non-last chunk");
full_output.extend(
adapter
.process_chunk(chunk1, false)
.expect("should process chunk1"),
);

let chunk2 = b"<p>original</p>";
let result2 = adapter
.process_chunk(chunk2, false)
.expect("should process chunk2");
assert_eq!(result2.len(), 0, "Should return empty for non-last chunk");
full_output.extend(
adapter
.process_chunk(chunk2, false)
.expect("should process chunk2"),
);

// Test that last chunk processes everything
let chunk3 = b"</body></html>";
let result3 = adapter
.process_chunk(chunk3, true)
.expect("should process final chunk");
assert!(
!result3.is_empty(),
"Should return processed content for last chunk"
full_output.extend(
adapter
.process_chunk(chunk3, true)
.expect("should process final chunk"),
);

let output = String::from_utf8(result3).expect("output should be valid UTF-8");
assert!(!full_output.is_empty(), "Should have produced output");
let output = String::from_utf8(full_output).expect("output should be valid UTF-8");
assert!(output.contains("replaced"), "Should have replaced content");
assert!(output.contains("<html>"), "Should have complete HTML");
}
Expand All @@ -590,60 +557,46 @@
}
large_html.push_str("</body></html>");

// Process in chunks
// Process in chunks, collecting all output.
let chunk_size = 1024;
let bytes = large_html.as_bytes();
let mut chunks = bytes.chunks(chunk_size);
let mut last_chunk = chunks.next().unwrap_or(&[]);
let chunks: Vec<_> = bytes.chunks(chunk_size).collect();
let last_idx = chunks.len().saturating_sub(1);

for chunk in chunks {
let mut full_output = Vec::new();
for (i, chunk) in chunks.iter().enumerate() {
let is_last = i == last_idx;
let result = adapter
.process_chunk(last_chunk, false)
.expect("should process intermediate chunk");
assert_eq!(result.len(), 0, "Intermediate chunks should return empty");
last_chunk = chunk;
.process_chunk(chunk, is_last)
.expect("should process chunk");
full_output.extend(result);
}

// Process last chunk
let result = adapter
.process_chunk(last_chunk, true)
.expect("should process last chunk");
assert!(!result.is_empty(), "Last chunk should return content");

let output = String::from_utf8(result).expect("output should be valid UTF-8");
assert!(!full_output.is_empty(), "Should have produced output");
let output = String::from_utf8(full_output).expect("output should be valid UTF-8");
assert!(
output.contains("Paragraph 999"),
"Should contain all content"
);
}

#[test]
fn test_html_rewriter_adapter_reset() {
fn test_html_rewriter_adapter_reset_clears_output_buffer() {
use lol_html::Settings;

// reset() is a no-op on the rewriter itself (a new adapter is needed per document),
// but it must clear any pending bytes in the output buffer.
let settings = Settings::default();
let mut adapter = HtmlRewriterAdapter::new(settings);

// Process some content
adapter
.process_chunk(b"<html>", false)
.expect("should process html tag");
adapter
.process_chunk(b"<body>test</body>", false)
.expect("should process body");
// Write a full document so the rewriter is finished.
let _ = adapter
.process_chunk(b"<html><body><p>test</p></body></html>", true)
.expect("should process complete document");

// Reset should clear accumulated input
// reset() should not panic and should leave the buffer empty.
adapter.reset();

// After reset, adapter should be ready for new input
let result = adapter
.process_chunk(b"<p>new</p>", true)
.expect("should process new content after reset");
let output = String::from_utf8(result).expect("output should be valid UTF-8");
assert_eq!(
output, "<p>new</p>",
"Should only contain new input after reset"
);
// No assertion on a subsequent process_chunk — the rewriter is consumed.
}

#[test]
Expand Down
Loading