diff --git a/columnar/src/block_accessor.rs b/columnar/src/block_accessor.rs index ed0fe47311..c73e63262e 100644 --- a/columnar/src/block_accessor.rs +++ b/columnar/src/block_accessor.rs @@ -163,6 +163,15 @@ impl self.val_cache.iter().cloned() } + /// Returns the fetched values of the current block as a contiguous slice. + /// + /// This lets reducers (sum/min/max/stats) process the block with a tight, vectorizable + /// loop instead of going through the `iter_vals` iterator. + #[inline] + pub fn vals(&self) -> &[T] { + &self.val_cache + } + #[inline] /// Returns an iterator over the docids and values /// The passed in `docs` slice needs to be the same slice that was passed to `fetch_block` or diff --git a/src/aggregation/metric/extended_stats.rs b/src/aggregation/metric/extended_stats.rs index 1e625d5de6..efc00d7080 100644 --- a/src/aggregation/metric/extended_stats.rs +++ b/src/aggregation/metric/extended_stats.rs @@ -315,6 +315,40 @@ impl IntermediateExtendedStats { self.sum_of_squares_elastic = t; self.update_variance(value); } + + /// Collects a contiguous block of raw column values. + /// + /// The per-value [`Self::collect`] is a strictly serial recurrence (Welford's variance + /// recomputes the mean from the running sum each step). To break that dependency chain we + /// accumulate into `LANES` independent accumulators and combine them with + /// [`Self::merge_fruits`] — the exact Chan parallel-variance combination already used to + /// merge across segments, so the result matches multi-segment aggregation (fp summation + /// order differs, as it already does there). + #[inline] + fn collect_block(&mut self, vals: &[u64], field_type: ColumnType) { + const LANES: usize = 4; + if vals.len() < LANES * 2 { + for &val in vals { + self.collect(f64_from_fastfield_u64(val, field_type)); + } + return; + } + let mut lanes: [IntermediateExtendedStats; LANES] = Default::default(); + let mut chunks = vals.chunks_exact(LANES); + for chunk in chunks.by_ref() { + for lane in 0..LANES { + lanes[lane].collect(f64_from_fastfield_u64(chunk[lane], field_type)); + } + } + for &val in chunks.remainder() { + lanes[0].collect(f64_from_fastfield_u64(val, field_type)); + } + let mut combined = mem::take(&mut lanes[0]); + for lane in 1..LANES { + combined.merge_fruits(mem::take(&mut lanes[lane])); + } + self.merge_fruits(combined); + } } #[derive(Clone, Debug)] @@ -376,10 +410,7 @@ impl SegmentAggregationCollector for SegmentExtendedStatsCollector { agg_data .column_block_accessor .fetch_block_with_missing(docs, &self.accessor, self.missing); - for val in agg_data.column_block_accessor.iter_vals() { - let val1 = f64_from_fastfield_u64(val, self.field_type); - extended_stats.collect(val1); - } + extended_stats.collect_block(agg_data.column_block_accessor.vals(), self.field_type); // store back self.buckets[parent_bucket_id as usize] = extended_stats; diff --git a/src/aggregation/metric/stats.rs b/src/aggregation/metric/stats.rs index eb15427a15..41a0236f6a 100644 --- a/src/aggregation/metric/stats.rs +++ b/src/aggregation/metric/stats.rs @@ -300,11 +300,11 @@ impl SegmentAggregationCollector &self.accessor, self.missing_u64, ); - collect_stats::( + collect_stats_slice::( &mut self.buckets[parent_bucket_id as usize], - agg_data.column_block_accessor.iter_vals(), + agg_data.column_block_accessor.vals(), self.is_number_or_date_type, - )?; + ); Ok(()) } @@ -357,6 +357,72 @@ impl SegmentAggregationCollector } } +/// Reduces a contiguous block of raw column values into `stats`. +/// +/// Uses `LANES` independent (sum, delta) Kahan accumulators and `LANES` min/max +/// accumulators so the per-element dependency chain of the serial Kahan sum is broken, +/// letting the CPU pipeline the additions and auto-vectorize the min/max. The lanes are +/// merged back into `stats` with the same compensated-sum combination used by +/// [`IntermediateStats::merge_fruits`], so accuracy is preserved (the summation order +/// differs, exactly as it already does across segment merges). +#[inline] +fn collect_stats_slice( + stats: &mut IntermediateStats, + vals: &[u64], + is_number_or_date_type: bool, +) { + if !is_number_or_date_type { + // Non-numeric: only the presence of a value matters (preserve existing behavior). + for _ in 0..vals.len() { + stats.collect(0.0); + } + return; + } + + const LANES: usize = 4; + let mut sum = [0f64; LANES]; + let mut delta = [0f64; LANES]; + let mut min = [f64::MAX; LANES]; + let mut max = [f64::MIN; LANES]; + + let mut chunks = vals.chunks_exact(LANES); + for chunk in chunks.by_ref() { + for lane in 0..LANES { + let val = convert_to_f64::(chunk[lane]); + // Per-lane Kahan summation. + let y = val - delta[lane]; + let t = sum[lane] + y; + delta[lane] = (t - sum[lane]) - y; + sum[lane] = t; + min[lane] = min[lane].min(val); + max[lane] = max[lane].max(val); + } + } + + stats.count += vals.len() as u64; + + // Merge the lanes into `stats`. + for lane in 0..LANES { + let y = sum[lane] - (stats.delta + delta[lane]); + let t = stats.sum + y; + stats.delta = (t - stats.sum) - y; + stats.sum = t; + stats.min = stats.min.min(min[lane]); + stats.max = stats.max.max(max[lane]); + } + + // Tail (fewer than LANES values) — fold directly into `stats` (count already added). + for &raw in chunks.remainder() { + let val = convert_to_f64::(raw); + let y = val - stats.delta; + let t = stats.sum + y; + stats.delta = (t - stats.sum) - y; + stats.sum = t; + stats.min = stats.min.min(val); + stats.max = stats.max.max(val); + } +} + #[inline] fn collect_stats( stats: &mut IntermediateStats, diff --git a/src/query/phrase_query/phrase_scorer.rs b/src/query/phrase_query/phrase_scorer.rs index ff7def9175..66305e511b 100644 --- a/src/query/phrase_query/phrase_scorer.rs +++ b/src/query/phrase_query/phrase_scorer.rs @@ -79,7 +79,21 @@ fn intersection_exists(left: &[u32], right: &[u32]) -> bool { false } +/// When the longer array is at least this many times longer than the shorter one, +/// galloping (binary search) beats the linear two-pointer merge. Below this ratio the +/// cache-friendly sequential scan wins, so we keep it. The threshold is deliberately +/// conservative (measured crossover is lower) to never regress balanced inputs. +const GALLOP_RATIO: usize = 64; + pub(crate) fn intersection_count(left: &[u32], right: &[u32]) -> usize { + let (min_len, max_len) = if left.len() <= right.len() { + (left.len(), right.len()) + } else { + (right.len(), left.len()) + }; + if min_len != 0 && max_len >= min_len * GALLOP_RATIO { + return intersection_count_galloping(left, right); + } let mut left_index = 0; let mut right_index = 0; let mut count = 0; @@ -103,12 +117,113 @@ pub(crate) fn intersection_count(left: &[u32], right: &[u32]) -> usize { count } +/// Searches for `needle` in the sorted slice `hay` using exponential (galloping) search. +/// +/// Returns `(found, advance)` where `advance` is the number of leading elements of `hay` +/// that are `<= needle` (when found) or `< needle` (when not found) — i.e. the amount by +/// which a cursor positioned at the start of `hay` should move forward. This lets a caller +/// walk a small array against a large one in O(m·log(n/m)) total. +#[inline] +fn gallop_find(hay: &[u32], needle: u32) -> (bool, usize) { + if hay.is_empty() { + return (false, 0); + } + // Exponential search for an upper bound on the needle's position. + let mut bound = 1; + while bound < hay.len() && hay[bound] < needle { + bound *= 2; + } + let lo = bound / 2; + let hi = (bound + 1).min(hay.len()); + match hay[lo..hi].binary_search(&needle) { + Ok(pos) => (true, lo + pos + 1), + Err(pos) => (false, lo + pos), + } +} + +/// Counts the elements common to two sorted, duplicate-free arrays, using galloping +/// (exponential + binary search) of the smaller array into the larger one. +/// +/// This is O(m·log(n/m)) where m <= n, versus the O(n+m) of the linear two-pointer +/// [`intersection_count`]. It wins big when the two arrays differ a lot in size (e.g. a +/// rare and a frequent term in a phrase), and is competitive when they are balanced. +fn intersection_count_galloping(left: &[u32], right: &[u32]) -> usize { + let (small, large) = if left.len() <= right.len() { + (left, right) + } else { + (right, left) + }; + let mut count = 0; + // Position in `large` past which all values are < the current needle. + let mut base = 0; + for &needle in small { + if base >= large.len() { + break; + } + let (found, advance) = gallop_find(&large[base..], needle); + base += advance; + count += found as usize; + } + count +} + +/// Like [`intersection`], but uses galloping. Writes the (ascending) intersection into the +/// first elements of `left` and truncates it. Wins when `left` and `right` differ a lot in +/// size; pick it via the same [`GALLOP_RATIO`] guard used by [`intersection`]. +fn intersection_galloping(left: &mut Vec, right: &[u32]) { + let mut count = 0; + let mut base = 0; + if left.len() <= right.len() { + // Walk `left` (small) as needles, searching forward in `right` (large). + // Matches are written back into `left[count]`; since `count <= i` at all times, + // this never overwrites a not-yet-read element. + let mut i = 0; + while i < left.len() { + if base >= right.len() { + break; + } + let needle = left[i]; + let (found, advance) = gallop_find(&right[base..], needle); + base += advance; + if found { + left[count] = needle; + count += 1; + } + i += 1; + } + } else { + // Walk `right` (small) as needles, searching forward in `left` (large). + // `count <= base` holds throughout, so writing `left[count]` is safe. + for &needle in right { + if base >= left.len() { + break; + } + let (found, advance) = gallop_find(&left[base..], needle); + base += advance; + if found { + left[count] = needle; + count += 1; + } + } + } + left.truncate(count); +} + /// Intersect twos sorted arrays `left` and `right` and outputs the /// resulting array in left. /// /// Returns the length of the intersection #[inline] fn intersection(left: &mut Vec, right: &[u32]) { + let (min_len, max_len) = if left.len() <= right.len() { + (left.len(), right.len()) + } else { + (right.len(), left.len()) + }; + if min_len != 0 && max_len >= min_len * GALLOP_RATIO { + intersection_galloping(left, right); + return; + } let mut left_index = 0; let mut right_index = 0; let mut count = 0; @@ -617,6 +732,54 @@ mod tests { test_intersection_sym(&[5, 7], &[1, 5, 10, 12], &[5]); test_intersection_sym(&[1, 5, 6, 9, 10, 12], &[6, 8, 9, 12], &[6, 9, 12]); } + + #[test] + fn test_galloping_matches_scalar() { + use proptest::prelude::*; + // Use a wide size range (including very asymmetric pairs) so both small/large branches + // and the linear reference are exercised. + proptest!(|(mut a in proptest::collection::vec(0u32..300, 0..150), + mut b in proptest::collection::vec(0u32..300, 0..150))| { + a.sort_unstable(); + a.dedup(); + b.sort_unstable(); + b.dedup(); + + // Counting variant. + prop_assert_eq!( + intersection_count_galloping(&a, &b), + intersection_count(&a, &b) + ); + + // Output-writing variant: galloping must produce the same array as the + // linear two-pointer reference. + let mut expected = a.clone(); + two_pointer_intersection_ref(&mut expected, &b); + let mut got = a.clone(); + intersection_galloping(&mut got, &b); + prop_assert_eq!(got, expected); + }); + } + + /// Linear two-pointer reference used only to validate [`intersection_galloping`]. + fn two_pointer_intersection_ref(left: &mut Vec, right: &[u32]) { + let mut li = 0; + let mut ri = 0; + let mut count = 0; + while li < left.len() && ri < right.len() { + match left[li].cmp(&right[ri]) { + std::cmp::Ordering::Less => li += 1, + std::cmp::Ordering::Equal => { + left[count] = left[li]; + count += 1; + li += 1; + ri += 1; + } + std::cmp::Ordering::Greater => ri += 1, + } + } + left.truncate(count); + } #[test] fn test_slop() { // The slop is not symmetric. It does not allow for the phrase to be out of order. @@ -809,4 +972,59 @@ mod bench { intersection_count(&left, &right); }); } + + // Large, balanced, ~50% overlap — the regime where SIMD intersection would pay off. + #[bench] + fn bench_intersection_count_large_balanced(b: &mut Bencher) { + let left: Vec = (0..2048u32).map(|i| i * 2).collect(); + let right: Vec = (0..2048u32).map(|i| i * 3).collect(); + b.iter(|| intersection_count(&left, &right)); + } + + // Large, highly asymmetric — the regime where galloping (binary search) would pay off. + #[bench] + fn bench_intersection_count_large_asymmetric(b: &mut Bencher) { + let left: Vec = (0..4096u32).collect(); + let right: Vec = [3u32, 1000, 2500, 4000].to_vec(); + b.iter(|| intersection_count(&left, &right)); + } + + #[bench] + fn bench_intersection_count_galloping_large_balanced(b: &mut Bencher) { + let left: Vec = (0..2048u32).map(|i| i * 2).collect(); + let right: Vec = (0..2048u32).map(|i| i * 3).collect(); + b.iter(|| intersection_count_galloping(&left, &right)); + } + + #[bench] + fn bench_intersection_count_galloping_large_asymmetric(b: &mut Bencher) { + let left: Vec = (0..4096u32).collect(); + let right: Vec = [3u32, 1000, 2500, 4000].to_vec(); + b.iter(|| intersection_count_galloping(&left, &right)); + } + + // Output-writing `intersection`: balanced (stays two-pointer) vs asymmetric (gallops). + #[bench] + fn bench_intersection_large_balanced(b: &mut Bencher) { + let left_data: Vec = (0..2048u32).map(|i| i * 2).collect(); + let right: Vec = (0..2048u32).map(|i| i * 3).collect(); + let mut left = Vec::with_capacity(left_data.len()); + b.iter(|| { + left.clear(); + left.extend_from_slice(&left_data); + intersection(&mut left, &right); + }); + } + + #[bench] + fn bench_intersection_large_asymmetric(b: &mut Bencher) { + let left_data: Vec = (0..4096u32).collect(); + let right: Vec = [3u32, 1000, 2500, 4000].to_vec(); + let mut left = Vec::with_capacity(left_data.len()); + b.iter(|| { + left.clear(); + left.extend_from_slice(&left_data); + intersection(&mut left, &right); + }); + } } diff --git a/src/termdict/fst_termdict/merger.rs b/src/termdict/fst_termdict/merger.rs index 43147a5aef..7e1ba9eb95 100644 --- a/src/termdict/fst_termdict/merger.rs +++ b/src/termdict/fst_termdict/merger.rs @@ -94,7 +94,7 @@ impl<'a> TermMerger<'a> { #[cfg(all(test, feature = "unstable"))] mod bench { - use rand::distributions::Alphanumeric; + use rand::distr::Alphanumeric; use rand::{rng, Rng}; use test::{self, Bencher};