diff --git a/crates/headroom-core/src/transforms/smart_crusher/compaction/compactor.rs b/crates/headroom-core/src/transforms/smart_crusher/compaction/compactor.rs index 9c21e0a93..496188175 100644 --- a/crates/headroom-core/src/transforms/smart_crusher/compaction/compactor.rs +++ b/crates/headroom-core/src/transforms/smart_crusher/compaction/compactor.rs @@ -40,12 +40,14 @@ //! [`CellClass::Opaque`]: super::classifier::CellClass::Opaque use std::collections::BTreeMap; +use std::sync::Arc; use serde_json::Value; use sha2::{Digest, Sha256}; use super::classifier::{classify_cell, CellClass, ClassifyConfig}; use super::ir::{Bucket, CellValue, Compaction, FieldSpec, Row, Schema}; +use crate::ccr::CcrStore; /// Config for the compactor. #[derive(Debug, Clone)] @@ -95,7 +97,37 @@ impl Default for CompactConfig { } /// Top-level compaction entry point. +/// +/// Opaque blobs become CCR pointers, but the original payload is **not** +/// stored — callers that need `<>` markers to resolve on +/// retrieval must use [`compact_with_store`] instead. pub fn compact(items: &[Value], cfg: &CompactConfig) -> Compaction { + compact_inner(items, cfg, None) +} + +/// Like [`compact`], but stash every opaque-blob payload in `store` under +/// the same 12-char hash that ends up in its `<>` marker, so +/// the runtime can serve the original back on a `headroom_retrieve` call / +/// `GET /v1/retrieve/{hash}`. Mirrors the contract already honored by the +/// opaque-string path in [`super::walker::emit_opaque_ccr_marker`]. +/// +/// The IR (and therefore the rendered marker text) is identical whether or +/// not a store is supplied — the store only adds a side-effecting write, so +/// `compact(items, cfg)` and `compact_with_store(items, cfg, Some(store))` +/// return the same [`Compaction`]. +pub fn compact_with_store( + items: &[Value], + cfg: &CompactConfig, + store: Option<&Arc>, +) -> Compaction { + compact_inner(items, cfg, store) +} + +fn compact_inner( + items: &[Value], + cfg: &CompactConfig, + store: Option<&Arc>, +) -> Compaction { if items.len() < cfg.min_items { return Compaction::Untouched(Value::Array(items.to_vec())); } @@ -117,14 +149,14 @@ pub fn compact(items: &[Value], cfg: &CompactConfig) -> Compaction { if core_ratio < cfg.heterogeneous_core_ratio { if let Some(disc) = detect_discriminator(items, &key_freqs, cfg) { - return bucket_by(items, &disc, cfg); + return bucket_by(items, &disc, cfg, store); } // No clean discriminator — fall through to a sparse Table // rather than refusing. A sparse table is still better than // letting the lossy path drop fields wholesale. } - build_homogeneous_table(items, &key_freqs, cfg) + build_homogeneous_table(items, &key_freqs, cfg, store) } fn compute_key_freqs(items: &[Value]) -> BTreeMap { @@ -143,6 +175,7 @@ fn build_homogeneous_table( items: &[Value], key_freqs: &BTreeMap, cfg: &CompactConfig, + store: Option<&Arc>, ) -> Compaction { // Order: descending frequency, then alphabetical for stability. let mut keys: Vec<(&String, &usize)> = key_freqs.iter().collect(); @@ -165,7 +198,7 @@ fn build_homogeneous_table( let mut rows: Vec = items .iter() - .map(|item| build_row(item, &ordered_keys, cfg)) + .map(|item| build_row(item, &ordered_keys, cfg, store)) .collect(); flatten_uniform_nested(&mut field_specs, &mut rows, cfg); @@ -179,7 +212,12 @@ fn build_homogeneous_table( } } -fn build_row(item: &Value, ordered_keys: &[String], cfg: &CompactConfig) -> Row { +fn build_row( + item: &Value, + ordered_keys: &[String], + cfg: &CompactConfig, + store: Option<&Arc>, +) -> Row { let obj = match item.as_object() { Some(o) => o, None => return Row::new(vec![]), @@ -188,13 +226,13 @@ fn build_row(item: &Value, ordered_keys: &[String], cfg: &CompactConfig) -> Row .iter() .map(|k| match obj.get(k) { None => CellValue::Missing, - Some(v) => cell_from_value(v, cfg), + Some(v) => cell_from_value(v, cfg, store), }) .collect(); Row::new(cells) } -fn cell_from_value(v: &Value, cfg: &CompactConfig) -> CellValue { +fn cell_from_value(v: &Value, cfg: &CompactConfig, store: Option<&Arc>) -> CellValue { match classify_cell(v, &cfg.classify) { CellClass::Scalar => CellValue::Scalar(v.clone()), CellClass::JsonObject => CellValue::Scalar(v.clone()), // flatten pass may promote @@ -202,7 +240,7 @@ fn cell_from_value(v: &Value, cfg: &CompactConfig) -> CellValue { // Recurse if the inner array is array-of-objects; else scalar. if let Value::Array(items) = v { if items.iter().all(|i| matches!(i, Value::Object(_))) && items.len() >= 2 { - return CellValue::Nested(Box::new(compact(items, cfg))); + return CellValue::Nested(Box::new(compact_inner(items, cfg, store))); } } CellValue::Scalar(v.clone()) @@ -212,19 +250,28 @@ fn cell_from_value(v: &Value, cfg: &CompactConfig) -> CellValue { // store the parsed value as a Scalar (un-escapes for free). if let Value::Array(items) = &parsed { if items.iter().all(|i| matches!(i, Value::Object(_))) && items.len() >= 2 { - return CellValue::Nested(Box::new(compact(items, cfg))); + return CellValue::Nested(Box::new(compact_inner(items, cfg, store))); } } CellValue::Scalar(parsed) } CellClass::Opaque(kind) => { - let bytes = match v { - Value::String(s) => s.as_bytes(), + let s = match v { + Value::String(s) => s, _ => return CellValue::Scalar(v.clone()), }; + let ccr_hash = hash_opaque(s.as_bytes()); + // Stash the original so `GET /v1/retrieve/{hash}` and the + // `headroom_retrieve` tool can serve it back — mirrors + // `walker::emit_opaque_ccr_marker`. Without this write the + // marker points at a key that was never stored and retrieval + // 404s (issue #1083). + if let Some(store) = store { + store.put(&ccr_hash, s); + } CellValue::OpaqueRef { - ccr_hash: hash_opaque(bytes), - byte_size: bytes.len(), + ccr_hash, + byte_size: s.len(), kind, } } @@ -451,7 +498,12 @@ fn detect_discriminator( best.map(|(k, _)| k) } -fn bucket_by(items: &[Value], discriminator: &str, cfg: &CompactConfig) -> Compaction { +fn bucket_by( + items: &[Value], + discriminator: &str, + cfg: &CompactConfig, + store: Option<&Arc>, +) -> Compaction { let mut groups: BTreeMap> = BTreeMap::new(); for item in items { let key = item @@ -465,7 +517,7 @@ fn bucket_by(items: &[Value], discriminator: &str, cfg: &CompactConfig) -> Compa let buckets: Vec = groups .into_iter() .map(|(key, group_items)| { - let inner = compact(&group_items, cfg); + let inner = compact_inner(&group_items, cfg, store); match inner { Compaction::Table { schema, rows, .. } => Bucket { key: Value::String(key), @@ -730,4 +782,69 @@ mod tests { assert_ne!(h1, h3); assert_eq!(h1.len(), 12); } + + #[test] + fn opaque_payload_is_stored_under_marker_hash() { + use crate::ccr::{CcrStore, InMemoryCcrStore}; + use std::sync::Arc; + + // Same blob the `opaque_cell_becomes_ccr_ref` test uses — known to + // classify as Opaque, so the OpaqueRef / `<>` path runs. + let big = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/=".repeat(8); + let items = vec![ + json!({"id": 1, "blob": big.clone()}), + json!({"id": 2, "blob": big.clone()}), + ]; + + let store: Arc = Arc::new(InMemoryCcrStore::new()); + let c = compact_with_store(&items, &cfg(), Some(&store)); + + // Pull the hash the rendered marker will carry out of the IR. + let hash = match &c { + Compaction::Table { rows, schema, .. } => { + let blob_idx = schema + .fields + .iter() + .position(|f| f.name == "blob") + .expect("blob col"); + match &rows[0].0[blob_idx] { + CellValue::OpaqueRef { ccr_hash, .. } => ccr_hash.clone(), + other => panic!("expected OpaqueRef, got {other:?}"), + } + } + other => panic!("expected Table, got {other:?}"), + }; + + // Issue #1083: the original payload must be retrievable under the + // exact hash the marker carries (before the fix the store was empty). + assert_eq!(store.get(&hash).as_deref(), Some(big.as_str())); + // Lock the key<->marker contract: stored key == hash_opaque(payload). + assert_eq!(hash, hash_opaque(big.as_bytes())); + } + + #[test] + fn store_presence_does_not_change_the_ir() { + use crate::ccr::{CcrStore, InMemoryCcrStore}; + use std::sync::Arc; + + let big = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/=".repeat(8); + let items = vec![ + json!({"id": 1, "blob": big.clone()}), + json!({"id": 2, "blob": big.clone()}), + ]; + + let without = compact(&items, &cfg()); + let store: Arc = Arc::new(InMemoryCcrStore::new()); + let with = compact_with_store(&items, &cfg(), Some(&store)); + + // Marker text is store-independent — only a side-effecting write is + // added, so the two IRs render identically. Compare a deterministic + // formatter's output rather than Debug formatting, which is not a + // stable contract. + use super::super::{Formatter, JsonFormatter}; + let fmt = JsonFormatter::new(); + assert_eq!(fmt.format(&without), fmt.format(&with)); + // ...and that write actually happened. + assert!(!store.is_empty()); + } } diff --git a/crates/headroom-core/src/transforms/smart_crusher/compaction/mod.rs b/crates/headroom-core/src/transforms/smart_crusher/compaction/mod.rs index b3ce01e8c..3fe57693e 100644 --- a/crates/headroom-core/src/transforms/smart_crusher/compaction/mod.rs +++ b/crates/headroom-core/src/transforms/smart_crusher/compaction/mod.rs @@ -30,7 +30,7 @@ pub mod ir; pub mod walker; pub use classifier::{classify_cell, CellClass, ClassifyConfig}; -pub use compactor::{compact, CompactConfig}; +pub use compactor::{compact, compact_with_store, CompactConfig}; pub use formatter::{CsvSchemaFormatter, Formatter, JsonFormatter, MarkdownKvFormatter}; pub use ir::{Bucket, CellValue, Compaction, FieldSpec, OpaqueKind, Row, Schema}; pub use walker::{ @@ -115,6 +115,24 @@ impl CompactionStage { let rendered = self.formatter.format(&c); (c, rendered) } + + /// Like [`Self::run`], but stash every opaque-blob payload into `store` + /// under the same hash the rendered `<>` marker carries, + /// so `GET /v1/retrieve/{hash}` and the `headroom_retrieve` tool can + /// serve the original back. `SmartCrusher::crush_array`'s lossless + /// branch passes the proxy's CCR store here; previously it called + /// [`Self::run`], which rendered markers whose payload was never stored + /// (issue #1083). When `store` is `None`, behaves exactly like + /// [`Self::run`]. + pub fn run_with_store( + &self, + items: &[serde_json::Value], + store: Option<&std::sync::Arc>, + ) -> (Compaction, String) { + let c = compact_with_store(items, &self.config, store); + let rendered = self.formatter.format(&c); + (c, rendered) + } } impl std::fmt::Debug for CompactionStage { diff --git a/crates/headroom-core/src/transforms/smart_crusher/crusher.rs b/crates/headroom-core/src/transforms/smart_crusher/crusher.rs index 456271b67..0fce342c6 100644 --- a/crates/headroom-core/src/transforms/smart_crusher/crusher.rs +++ b/crates/headroom-core/src/transforms/smart_crusher/crusher.rs @@ -660,7 +660,11 @@ impl SmartCrusher { // ship it — nothing dropped, no CCR retrieval needed. // Otherwise fall through to the lossy path. if let Some(stage) = &self.compaction { - let (c, rendered) = stage.run(items); + // Thread the CCR store so opaque-blob `<>` markers + // emitted by lossless:table compaction are actually retrievable + // (issue #1083); the row-drop lossy path below stores its own + // payload separately. + let (c, rendered) = stage.run_with_store(items, self.ccr_store.as_ref()); if c.was_compacted() { let input_bytes = estimate_array_bytes(&item_strings); let savings_ratio = if input_bytes > 0 {