Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions columnar/src/block_accessor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,15 @@ impl<T: PartialOrd + Copy + std::fmt::Debug + Send + Sync + 'static + Default>
self.val_cache.iter().cloned()
}

/// Returns the fetched values of the current block as a contiguous slice.
///
/// This lets reducers (sum/min/max/stats) process the block with a tight, vectorizable
/// loop instead of going through the `iter_vals` iterator.
#[inline]
pub fn vals(&self) -> &[T] {
&self.val_cache
}

#[inline]
/// Returns an iterator over the docids and values
/// The passed in `docs` slice needs to be the same slice that was passed to `fetch_block` or
Expand Down
39 changes: 35 additions & 4 deletions src/aggregation/metric/extended_stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,40 @@ impl IntermediateExtendedStats {
self.sum_of_squares_elastic = t;
self.update_variance(value);
}

/// Collects a contiguous block of raw column values.
///
/// The per-value [`Self::collect`] is a strictly serial recurrence (Welford's variance
/// recomputes the mean from the running sum each step). To break that dependency chain we
/// accumulate into `LANES` independent accumulators and combine them with
/// [`Self::merge_fruits`] — the exact Chan parallel-variance combination already used to
/// merge across segments, so the result matches multi-segment aggregation (fp summation
/// order differs, as it already does there).
#[inline]
fn collect_block(&mut self, vals: &[u64], field_type: ColumnType) {
const LANES: usize = 4;
if vals.len() < LANES * 2 {
for &val in vals {
self.collect(f64_from_fastfield_u64(val, field_type));
}
return;
}
let mut lanes: [IntermediateExtendedStats; LANES] = Default::default();
let mut chunks = vals.chunks_exact(LANES);
for chunk in chunks.by_ref() {
for lane in 0..LANES {
lanes[lane].collect(f64_from_fastfield_u64(chunk[lane], field_type));
}
}
for &val in chunks.remainder() {
lanes[0].collect(f64_from_fastfield_u64(val, field_type));
}
let mut combined = mem::take(&mut lanes[0]);
for lane in 1..LANES {
combined.merge_fruits(mem::take(&mut lanes[lane]));
}
self.merge_fruits(combined);
}
}

#[derive(Clone, Debug)]
Expand Down Expand Up @@ -376,10 +410,7 @@ impl SegmentAggregationCollector for SegmentExtendedStatsCollector {
agg_data
.column_block_accessor
.fetch_block_with_missing(docs, &self.accessor, self.missing);
for val in agg_data.column_block_accessor.iter_vals() {
let val1 = f64_from_fastfield_u64(val, self.field_type);
extended_stats.collect(val1);
}
extended_stats.collect_block(agg_data.column_block_accessor.vals(), self.field_type);

// store back
self.buckets[parent_bucket_id as usize] = extended_stats;
Expand Down
72 changes: 69 additions & 3 deletions src/aggregation/metric/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,11 +300,11 @@ impl<const COLUMN_TYPE_ID: u8> SegmentAggregationCollector
&self.accessor,
self.missing_u64,
);
collect_stats::<COLUMN_TYPE_ID>(
collect_stats_slice::<COLUMN_TYPE_ID>(
&mut self.buckets[parent_bucket_id as usize],
agg_data.column_block_accessor.iter_vals(),
agg_data.column_block_accessor.vals(),
self.is_number_or_date_type,
)?;
);

Ok(())
}
Expand Down Expand Up @@ -357,6 +357,72 @@ impl<const COLUMN_TYPE_ID: u8> SegmentAggregationCollector
}
}

/// Reduces a contiguous block of raw column values into `stats`.
///
/// Uses `LANES` independent (sum, delta) Kahan accumulators and `LANES` min/max
/// accumulators so the per-element dependency chain of the serial Kahan sum is broken,
/// letting the CPU pipeline the additions and auto-vectorize the min/max. The lanes are
/// merged back into `stats` with the same compensated-sum combination used by
/// [`IntermediateStats::merge_fruits`], so accuracy is preserved (the summation order
/// differs, exactly as it already does across segment merges).
#[inline]
fn collect_stats_slice<const COLUMN_TYPE_ID: u8>(
stats: &mut IntermediateStats,
vals: &[u64],
is_number_or_date_type: bool,
) {
if !is_number_or_date_type {
// Non-numeric: only the presence of a value matters (preserve existing behavior).
for _ in 0..vals.len() {
stats.collect(0.0);
}
return;
}

const LANES: usize = 4;
let mut sum = [0f64; LANES];
let mut delta = [0f64; LANES];
let mut min = [f64::MAX; LANES];
let mut max = [f64::MIN; LANES];

let mut chunks = vals.chunks_exact(LANES);
for chunk in chunks.by_ref() {
for lane in 0..LANES {
let val = convert_to_f64::<COLUMN_TYPE_ID>(chunk[lane]);
// Per-lane Kahan summation.
let y = val - delta[lane];
let t = sum[lane] + y;
delta[lane] = (t - sum[lane]) - y;
sum[lane] = t;
min[lane] = min[lane].min(val);
max[lane] = max[lane].max(val);
}
}

stats.count += vals.len() as u64;

// Merge the lanes into `stats`.
for lane in 0..LANES {
let y = sum[lane] - (stats.delta + delta[lane]);
let t = stats.sum + y;
stats.delta = (t - stats.sum) - y;
stats.sum = t;
stats.min = stats.min.min(min[lane]);
stats.max = stats.max.max(max[lane]);
}

// Tail (fewer than LANES values) — fold directly into `stats` (count already added).
for &raw in chunks.remainder() {
let val = convert_to_f64::<COLUMN_TYPE_ID>(raw);
let y = val - stats.delta;
let t = stats.sum + y;
stats.delta = (t - stats.sum) - y;
stats.sum = t;
stats.min = stats.min.min(val);
stats.max = stats.max.max(val);
}
}

#[inline]
fn collect_stats<const COLUMN_TYPE_ID: u8>(
stats: &mut IntermediateStats,
Expand Down
Loading
Loading