Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ documentation = "https://docs.rs/deltalake"
repository = "https://github.com/delta-io/delta.rs"

[workspace.dependencies]
delta_kernel = { version = "0.19.0", features = [
delta_kernel = { version = "0.19.2", features = [
"arrow-57",
"default-engine-rustls",
"internal-api",
Expand Down
81 changes: 44 additions & 37 deletions crates/core/src/delta_datafusion/table_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -422,51 +422,58 @@ impl<'a> DeltaScanBuilder<'a> {

// needed to enforce limit and deal with missing statistics
// rust port of https://github.com/delta-io/delta/pull/1495
let mut pruned_without_stats = Vec::new();
let mut rows_collected = 0;
let mut files = Vec::with_capacity(num_containers);

use rand::seq::SliceRandom;
// Phase 1: partition kept files into with-stats and without-stats
// NOTE: when skip_stats is enabled (the default for EagerSnapshot),
// num_records() is always None and all files land in without_stats,
// effectively disabling limit-based file pruning. This is by design —
// see Snapshot::files() for rationale.
let mut with_stats = Vec::new();
let mut without_stats = Vec::new();

let log_data_handler = self.snapshot.log_data();
for (file_view, keep) in log_data_handler.iter().zip(files_to_prune.iter().copied()) {
if !keep {
continue;
}
let action = file_view.add_action_no_stats();
if self.limit.is_some() && file_view.num_records().is_none() {
without_stats.push(action.to_owned());
} else {
with_stats.push((action.to_owned(), file_view.num_records()));
}
}

let mut indices = (0..num_containers).collect::<Vec<_>>();
// Phase 2: optionally shuffle, then apply limit
// TODO: DELTA_RS_SHUFFLE_FILES is a blunt env-var toggle (any value enables it,
// including "0"/"false"). Consider a proper config flag and optional seed for
// reproducibility.
if self.limit.is_some() && std::env::var("DELTA_RS_SHUFFLE_FILES").is_ok() {
let mut rng = rand::thread_rng();
indices.shuffle(&mut rng);
use rand::seq::SliceRandom;
with_stats.shuffle(&mut rand::thread_rng());
}

let log_data_handler = self.snapshot.log_data();
for i in indices {
let file_view = log_data_handler.get(i).unwrap();
let keep = files_to_prune[i];
let mut files = Vec::with_capacity(num_containers);
let mut rows_collected: i64 = 0;

// prune file based on predicate pushdown
let action = file_view.add_action_no_stats();
let num_records = file_view.num_records();
if keep {
// prune file based on limit pushdown
if let Some(limit) = self.limit {
if let Some(num_records) = num_records {
if rows_collected <= limit as i64 {
rows_collected += num_records as i64;
files.push(action.to_owned());
} else {
break;
}
} else {
// some files are missing stats; skipping but storing them
// in a list in case we can't reach the target limit
pruned_without_stats.push(action.to_owned());
}
} else {
files.push(action.to_owned());
if let Some(limit) = self.limit {
for (action, num_records) in with_stats {
if rows_collected > limit as i64 {
break;
}
// Safety: when limit is set, phase 1 routes all None-stats
// files to `without_stats`, so num_records is always Some here.
rows_collected += num_records
.expect("with_stats entries always have num_records when limit is set")
as i64;
files.push(action);
}
}

if let Some(limit) = self.limit
&& rows_collected < limit as i64
{
files.extend(pruned_without_stats);
// fallback: include files without stats if we didn't reach the limit
if rows_collected < limit as i64 {
files.extend(without_stats);
}
} else {
files.extend(with_stats.into_iter().map(|(action, _)| action));
}

let files_scanned = files.len();
Expand Down
29 changes: 21 additions & 8 deletions crates/core/src/kernel/snapshot/log_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,23 @@ impl<T: PartitionsExt> PartitionsExt for Arc<T> {
pub struct LogDataHandler<'a> {
data: &'a [RecordBatch],
config: &'a TableConfiguration,
/// cumulative_rows[i] = total rows in data[0..i]
cumulative_rows: Vec<usize>,
}

impl<'a> LogDataHandler<'a> {
pub(crate) fn new(data: &'a [RecordBatch], config: &'a TableConfiguration) -> Self {
Self { data, config }
let mut cumulative_rows = Vec::with_capacity(data.len());
let mut total = 0;
for batch in data {
cumulative_rows.push(total);
total += batch.num_rows();
}
Self {
data,
config,
cumulative_rows,
}
}

pub(crate) fn table_configuration(&self) -> &TableConfiguration {
Expand All @@ -87,14 +99,15 @@ impl<'a> LogDataHandler<'a> {
self.data.iter().map(|batch| batch.num_rows()).sum()
}

pub fn get(&self, mut index: usize) -> Option<LogicalFileView> {
for batch in self.data {
if index < batch.num_rows() {
return Some(LogicalFileView::new(batch.clone(), index));
}
index -= batch.num_rows();
pub fn get(&self, index: usize) -> Option<LogicalFileView> {
let batch_idx = self.cumulative_rows.partition_point(|&start| start <= index);
if batch_idx == 0 {
return None;
}
None
let batch_idx = batch_idx - 1;
let local_idx = index - self.cumulative_rows[batch_idx];
let batch = &self.data[batch_idx];
(local_idx < batch.num_rows()).then(|| LogicalFileView::new(batch.clone(), local_idx))
}

pub fn iter(&self) -> impl Iterator<Item = LogicalFileView> + '_ {
Expand Down
18 changes: 15 additions & 3 deletions crates/core/src/kernel/snapshot/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use arrow::array::RecordBatch;
use arrow::compute::{filter_record_batch, is_not_null};
use arrow::datatypes::SchemaRef;
use arrow_arith::aggregate::sum_array_checked;
use arrow_array::{Int64Array, StructArray};
use arrow_array::Int64Array;
use delta_kernel::actions::{Remove, Sidecar};
use delta_kernel::engine::arrow_conversion::TryIntoArrow as _;
use delta_kernel::engine::arrow_data::ArrowEngineData;
Expand Down Expand Up @@ -270,7 +270,12 @@ impl Snapshot {
log_store: &dyn LogStore,
predicate: Option<PredicateRef>,
) -> SendableRBStream {
let scan = match self.scan_builder().with_predicate(predicate).build() {
// NOTE: skip_stats avoids deserializing per-column min/max/nullCount from checkpoints,
// which is a major speedup for wide tables. Trade-off: stats_parsed (including numRecords)
// will be absent, so downstream limit-based file pruning becomes a no-op — all files are
// included and the limit is enforced at scan time instead. If a future kernel version
// supports skipping per-column stats while preserving numRecords, this can be revisited.
let scan = match self.scan_builder().with_predicate(predicate).with_skip_stats(true).build() {
Ok(scan) => scan,
Err(err) => return Box::pin(once(ready(Err(err)))),
};
Expand All @@ -295,6 +300,7 @@ impl Snapshot {
existing_data: Box<T>,
existing_predicate: Option<PredicateRef>,
) -> SendableRBStream {
// TODO: consider adding .with_skip_stats(true) here for consistency with files()
let scan = match self.scan_builder().with_predicate(predicate).build() {
Ok(scan) => scan,
Err(err) => return Box::pin(once(ready(Err(err)))),
Expand Down Expand Up @@ -550,9 +556,15 @@ pub(crate) async fn resolve_snapshot(
}
}

// NOTE: reads "size" as a top-level column — this assumes `array` is a scan-output
// batch (where fields are flattened), not a raw log action batch (where it would be `add.size`).
fn read_adds_size(array: &dyn ProvidesColumnByName) -> usize {
if let Some(size) = ex::extract_and_cast_opt::<Int64Array>(array, "size") {
sum_array_checked::<arrow::array::types::Int64Type, _>(size).unwrap().unwrap_or_default() as usize
sum_array_checked::<arrow::array::types::Int64Type, _>(size)
.ok()
.flatten()
.unwrap_or_default()
.max(0) as usize
} else {
0
}
Expand Down
10 changes: 10 additions & 0 deletions crates/core/src/kernel/snapshot/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,16 @@ impl ScanBuilder {
self
}

/// Skip reading file statistics from checkpoint parquet files.
///
/// When enabled, the stats column is not read from checkpoint files and data skipping
/// is disabled. This is useful when the caller handles data skipping externally or
/// doesn't need file statistics.
pub fn with_skip_stats(mut self, skip_stats: bool) -> Self {
self.inner = self.inner.with_skip_stats(skip_stats);
self
}

pub fn build(self) -> DeltaResult<Scan> {
Ok(Scan::from(self.inner.build()?))
}
Expand Down
Loading