Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,14 @@
//! [`CellClass::Opaque`]: super::classifier::CellClass::Opaque

use std::collections::BTreeMap;
use std::sync::Arc;

use serde_json::Value;
use sha2::{Digest, Sha256};

use super::classifier::{classify_cell, CellClass, ClassifyConfig};
use super::ir::{Bucket, CellValue, Compaction, FieldSpec, Row, Schema};
use crate::ccr::CcrStore;

/// Config for the compactor.
#[derive(Debug, Clone)]
Expand Down Expand Up @@ -95,7 +97,37 @@ impl Default for CompactConfig {
}

/// Top-level compaction entry point.
///
/// Opaque blobs become CCR pointers, but the original payload is **not**
/// stored — callers that need `<<ccr:...>>` markers to resolve on
/// retrieval must use [`compact_with_store`] instead.
pub fn compact(items: &[Value], cfg: &CompactConfig) -> Compaction {
compact_inner(items, cfg, None)
}

/// Like [`compact`], but stash every opaque-blob payload in `store` under
/// the same 12-char hash that ends up in its `<<ccr:HASH,...>>` marker, so
/// the runtime can serve the original back on a `headroom_retrieve` call /
/// `GET /v1/retrieve/{hash}`. Mirrors the contract already honored by the
/// opaque-string path in [`super::walker::emit_opaque_ccr_marker`].
///
/// The IR (and therefore the rendered marker text) is identical whether or
/// not a store is supplied — the store only adds a side-effecting write, so
/// `compact(items, cfg)` and `compact_with_store(items, cfg, Some(store))`
/// return the same [`Compaction`].
pub fn compact_with_store(
items: &[Value],
cfg: &CompactConfig,
store: Option<&Arc<dyn CcrStore>>,
) -> Compaction {
compact_inner(items, cfg, store)
}

fn compact_inner(
items: &[Value],
cfg: &CompactConfig,
store: Option<&Arc<dyn CcrStore>>,
) -> Compaction {
if items.len() < cfg.min_items {
return Compaction::Untouched(Value::Array(items.to_vec()));
}
Expand All @@ -117,14 +149,14 @@ pub fn compact(items: &[Value], cfg: &CompactConfig) -> Compaction {

if core_ratio < cfg.heterogeneous_core_ratio {
if let Some(disc) = detect_discriminator(items, &key_freqs, cfg) {
return bucket_by(items, &disc, cfg);
return bucket_by(items, &disc, cfg, store);
}
// No clean discriminator — fall through to a sparse Table
// rather than refusing. A sparse table is still better than
// letting the lossy path drop fields wholesale.
}

build_homogeneous_table(items, &key_freqs, cfg)
build_homogeneous_table(items, &key_freqs, cfg, store)
}

fn compute_key_freqs(items: &[Value]) -> BTreeMap<String, usize> {
Expand All @@ -143,6 +175,7 @@ fn build_homogeneous_table(
items: &[Value],
key_freqs: &BTreeMap<String, usize>,
cfg: &CompactConfig,
store: Option<&Arc<dyn CcrStore>>,
) -> Compaction {
// Order: descending frequency, then alphabetical for stability.
let mut keys: Vec<(&String, &usize)> = key_freqs.iter().collect();
Expand All @@ -165,7 +198,7 @@ fn build_homogeneous_table(

let mut rows: Vec<Row> = items
.iter()
.map(|item| build_row(item, &ordered_keys, cfg))
.map(|item| build_row(item, &ordered_keys, cfg, store))
.collect();

flatten_uniform_nested(&mut field_specs, &mut rows, cfg);
Expand All @@ -179,7 +212,12 @@ fn build_homogeneous_table(
}
}

fn build_row(item: &Value, ordered_keys: &[String], cfg: &CompactConfig) -> Row {
fn build_row(
item: &Value,
ordered_keys: &[String],
cfg: &CompactConfig,
store: Option<&Arc<dyn CcrStore>>,
) -> Row {
let obj = match item.as_object() {
Some(o) => o,
None => return Row::new(vec![]),
Expand All @@ -188,21 +226,21 @@ fn build_row(item: &Value, ordered_keys: &[String], cfg: &CompactConfig) -> Row
.iter()
.map(|k| match obj.get(k) {
None => CellValue::Missing,
Some(v) => cell_from_value(v, cfg),
Some(v) => cell_from_value(v, cfg, store),
})
.collect();
Row::new(cells)
}

fn cell_from_value(v: &Value, cfg: &CompactConfig) -> CellValue {
fn cell_from_value(v: &Value, cfg: &CompactConfig, store: Option<&Arc<dyn CcrStore>>) -> CellValue {
match classify_cell(v, &cfg.classify) {
CellClass::Scalar => CellValue::Scalar(v.clone()),
CellClass::JsonObject => CellValue::Scalar(v.clone()), // flatten pass may promote
CellClass::JsonArray => {
// Recurse if the inner array is array-of-objects; else scalar.
if let Value::Array(items) = v {
if items.iter().all(|i| matches!(i, Value::Object(_))) && items.len() >= 2 {
return CellValue::Nested(Box::new(compact(items, cfg)));
return CellValue::Nested(Box::new(compact_inner(items, cfg, store)));
}
}
CellValue::Scalar(v.clone())
Expand All @@ -212,19 +250,28 @@ fn cell_from_value(v: &Value, cfg: &CompactConfig) -> CellValue {
// store the parsed value as a Scalar (un-escapes for free).
if let Value::Array(items) = &parsed {
if items.iter().all(|i| matches!(i, Value::Object(_))) && items.len() >= 2 {
return CellValue::Nested(Box::new(compact(items, cfg)));
return CellValue::Nested(Box::new(compact_inner(items, cfg, store)));
}
}
CellValue::Scalar(parsed)
}
CellClass::Opaque(kind) => {
let bytes = match v {
Value::String(s) => s.as_bytes(),
let s = match v {
Value::String(s) => s,
_ => return CellValue::Scalar(v.clone()),
};
let ccr_hash = hash_opaque(s.as_bytes());
// Stash the original so `GET /v1/retrieve/{hash}` and the
// `headroom_retrieve` tool can serve it back — mirrors
// `walker::emit_opaque_ccr_marker`. Without this write the
// marker points at a key that was never stored and retrieval
// 404s (issue #1083).
if let Some(store) = store {
store.put(&ccr_hash, s);
}
CellValue::OpaqueRef {
ccr_hash: hash_opaque(bytes),
byte_size: bytes.len(),
ccr_hash,
byte_size: s.len(),
kind,
}
}
Expand Down Expand Up @@ -451,7 +498,12 @@ fn detect_discriminator(
best.map(|(k, _)| k)
}

fn bucket_by(items: &[Value], discriminator: &str, cfg: &CompactConfig) -> Compaction {
fn bucket_by(
items: &[Value],
discriminator: &str,
cfg: &CompactConfig,
store: Option<&Arc<dyn CcrStore>>,
) -> Compaction {
let mut groups: BTreeMap<String, Vec<Value>> = BTreeMap::new();
for item in items {
let key = item
Expand All @@ -465,7 +517,7 @@ fn bucket_by(items: &[Value], discriminator: &str, cfg: &CompactConfig) -> Compa
let buckets: Vec<Bucket> = groups
.into_iter()
.map(|(key, group_items)| {
let inner = compact(&group_items, cfg);
let inner = compact_inner(&group_items, cfg, store);
match inner {
Compaction::Table { schema, rows, .. } => Bucket {
key: Value::String(key),
Expand Down Expand Up @@ -730,4 +782,69 @@ mod tests {
assert_ne!(h1, h3);
assert_eq!(h1.len(), 12);
}

#[test]
fn opaque_payload_is_stored_under_marker_hash() {
use crate::ccr::{CcrStore, InMemoryCcrStore};
use std::sync::Arc;

// Same blob the `opaque_cell_becomes_ccr_ref` test uses — known to
// classify as Opaque, so the OpaqueRef / `<<ccr:HASH,...>>` path runs.
let big = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/=".repeat(8);
let items = vec![
json!({"id": 1, "blob": big.clone()}),
json!({"id": 2, "blob": big.clone()}),
];

let store: Arc<dyn CcrStore> = Arc::new(InMemoryCcrStore::new());
let c = compact_with_store(&items, &cfg(), Some(&store));

// Pull the hash the rendered marker will carry out of the IR.
let hash = match &c {
Compaction::Table { rows, schema, .. } => {
let blob_idx = schema
.fields
.iter()
.position(|f| f.name == "blob")
.expect("blob col");
match &rows[0].0[blob_idx] {
CellValue::OpaqueRef { ccr_hash, .. } => ccr_hash.clone(),
other => panic!("expected OpaqueRef, got {other:?}"),
}
}
other => panic!("expected Table, got {other:?}"),
};

// Issue #1083: the original payload must be retrievable under the
// exact hash the marker carries (before the fix the store was empty).
assert_eq!(store.get(&hash).as_deref(), Some(big.as_str()));
// Lock the key<->marker contract: stored key == hash_opaque(payload).
assert_eq!(hash, hash_opaque(big.as_bytes()));
}

#[test]
fn store_presence_does_not_change_the_ir() {
use crate::ccr::{CcrStore, InMemoryCcrStore};
use std::sync::Arc;

let big = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/=".repeat(8);
let items = vec![
json!({"id": 1, "blob": big.clone()}),
json!({"id": 2, "blob": big.clone()}),
];

let without = compact(&items, &cfg());
let store: Arc<dyn CcrStore> = Arc::new(InMemoryCcrStore::new());
let with = compact_with_store(&items, &cfg(), Some(&store));

// Marker text is store-independent — only a side-effecting write is
// added, so the two IRs render identically. Compare a deterministic
// formatter's output rather than Debug formatting, which is not a
// stable contract.
use super::super::{Formatter, JsonFormatter};
let fmt = JsonFormatter::new();
assert_eq!(fmt.format(&without), fmt.format(&with));
// ...and that write actually happened.
assert!(!store.is_empty());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ pub mod ir;
pub mod walker;

pub use classifier::{classify_cell, CellClass, ClassifyConfig};
pub use compactor::{compact, CompactConfig};
pub use compactor::{compact, compact_with_store, CompactConfig};
pub use formatter::{CsvSchemaFormatter, Formatter, JsonFormatter, MarkdownKvFormatter};
pub use ir::{Bucket, CellValue, Compaction, FieldSpec, OpaqueKind, Row, Schema};
pub use walker::{
Expand Down Expand Up @@ -115,6 +115,24 @@ impl CompactionStage {
let rendered = self.formatter.format(&c);
(c, rendered)
}

/// Like [`Self::run`], but stash every opaque-blob payload into `store`
/// under the same hash the rendered `<<ccr:HASH,...>>` marker carries,
/// so `GET /v1/retrieve/{hash}` and the `headroom_retrieve` tool can
/// serve the original back. `SmartCrusher::crush_array`'s lossless
/// branch passes the proxy's CCR store here; previously it called
/// [`Self::run`], which rendered markers whose payload was never stored
/// (issue #1083). When `store` is `None`, behaves exactly like
/// [`Self::run`].
pub fn run_with_store(
&self,
items: &[serde_json::Value],
store: Option<&std::sync::Arc<dyn crate::ccr::CcrStore>>,
) -> (Compaction, String) {
let c = compact_with_store(items, &self.config, store);
let rendered = self.formatter.format(&c);
(c, rendered)
}
}

impl std::fmt::Debug for CompactionStage {
Expand Down
6 changes: 5 additions & 1 deletion crates/headroom-core/src/transforms/smart_crusher/crusher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -660,7 +660,11 @@ impl SmartCrusher {
// ship it — nothing dropped, no CCR retrieval needed.
// Otherwise fall through to the lossy path.
if let Some(stage) = &self.compaction {
let (c, rendered) = stage.run(items);
// Thread the CCR store so opaque-blob `<<ccr:HASH,...>>` markers
// emitted by lossless:table compaction are actually retrievable
// (issue #1083); the row-drop lossy path below stores its own
// payload separately.
let (c, rendered) = stage.run_with_store(items, self.ccr_store.as_ref());
if c.was_compacted() {
let input_bytes = estimate_array_bytes(&item_strings);
let savings_ratio = if input_bytes > 0 {
Expand Down
Loading