-
Notifications
You must be signed in to change notification settings - Fork 1.9k
perf: optimize NthValue when ignore_nulls is true
#19496
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Instead of collecting all valid indices per batch in PartitionEvaluator for NthValue, use the iterator as appropriate for the case. Even tn the worst case of negative index larger than 1, only a sliding window of N last valid indices is needed.
Handle the case when FirstValue is called with ignore_nulls set to true, can prune the partition on the first non-null value. Also return early for the other function cases in the same condition, rather than grinding some logic only to discard the results.
Jefffrey
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would be good if there were some benchmarks to check this is indeed a performance boost
| let slice = array.slice(range.start, n_range); | ||
| if let Some(nulls) = slice.nulls() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can also be more strict by checking if the null_count instead of simply checking for the presence of the null buffer
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That will be another branch, and the iterator returned by nulls.valid_indices() being empty will do the right thing anyway?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can chain it like
if let Some(nulls) = slice.nulls()
&& nulls.null_count() > 0
{and the iterator returned by
nulls.valid_indices()being empty will do the right thing anyway?
But I think it would still iterate through the whole null buffer anyway? So if we're looking via a performance lens perhaps this approach is worth considering.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah yes, I somehow confused the null count check with valid_indices retrieving no values, while it's the opposite. Also I'm a bit new to Arrow/Datafusion, so it's not clear to me if there are non-pathological scenarios that could produce an array with the null buffer present, but containing no nulls.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would indeed be a weird/pedantic case; however here it might be appropriate as we are slicing the original array so it could be likely we have an array with nulls but slice into a region with no nulls 🤔
| // for the sliding window that will be discarded in the end. | ||
| return None; | ||
| } | ||
| let mut window = VecDeque::with_capacity(reverse_index); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we reverse the iterator of valid_indices() to avoid need of this queue?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately, BitIndexIterator is not bidirectional.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The queue is a bad solution, indeed. I think a simple ring buffer will do much better here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately,
BitIndexIteratoris not bidirectional.
Ah you're right, I was mistakenly thinking of BitIterator
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think if we do something like this it can work without needing a separate container?
Ordering::Less => {
let reverse_index = (-self.n) as usize;
let total_len = nulls.len();
let null_count = nulls.null_count();
let valid_indices_len = total_len - null_count;
if reverse_index > valid_indices_len {
return None;
}
nulls
.valid_indices()
.nth(valid_indices_len - reverse_index)
.map(|idx| idx + offset)
}There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the suggestion! With it, the implementation is faster across the board accordingly to the added benchmark.
| if self.ignore_nulls { | ||
| match self.state.kind { | ||
| // Prune on first non-null output in case of FIRST_VALUE | ||
| NthValueKind::First => { | ||
| if let Some(nulls) = out.nulls() { | ||
| if self.state.finalized_result.is_none() { | ||
| if let Some(valid_index) = nulls.valid_indices().next() { | ||
| let result = | ||
| ScalarValue::try_from_array(out, valid_index)?; | ||
| self.state.finalized_result = Some(result); | ||
| } else { | ||
| // The output is empty or all nulls, ignore | ||
| } | ||
| } | ||
| if state.window_frame_range.start < state.window_frame_range.end { | ||
| state.window_frame_range.start = | ||
| state.window_frame_range.end - 1; | ||
| } | ||
| return Ok(()); | ||
| } else { | ||
| // Fall through to the main case because there are no nulls | ||
| } | ||
| } | ||
| // Do not memoize for other kinds when nulls are ignored |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The logic here is really hard to follow with all the nesting present
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Between early returns and falling through to the main case, I don't know if I can make it more readable.
I have added a benchmark. It shows significant improvement in many cases, but it also shows that the |
Would you be able to post the benchmark results for us to see? |
Benchmark results against the branch base |
Jefffrey
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The evaluate looks good to me; I have some questions around memoize but they're mainly because I am not as familiar with this part of the codebase 😅
| NthValueKind::First => { | ||
| if let Some(nulls) = out.nulls() { | ||
| if self.state.finalized_result.is_none() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's a small pedantic case here; so if we have no null buffer (no nulls) then we fallback to existing behaviour below. If we have a null buffer, we try memoize if we haven't already but we'll still return from this point onwards (we won't fallback). But if the null buffer has no nulls then we don't fall through like before, we just handle the return here.
I guess it is the same result and I don't think it's too much of a concern at runtime, but it's just a potential path I find inconsistent when reading this code flow 🤔
| let result = | ||
| ScalarValue::try_from_array(out, valid_index)?; | ||
| self.state.finalized_result = Some(result); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I notice for below we memoize the last value in the out column whereas here we do the first; does it make a difference?
| if state.window_frame_range.start < state.window_frame_range.end { | ||
| state.window_frame_range.start = | ||
| state.window_frame_range.end - 1; | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this equivalent to the is_prunable check (for NthValueKind::First) where it ensures n_range > 0 and then if pruning it does state.window_frame_range.start = state.window_frame_range.end.saturating_sub(buffer_size)?
Rationale for this change
The
PartitionEvaluatorimplementation forNthValuein DataFusion has a few shortcomings:First/Last/Nthcase.memoizeimplementation gives up in the same condition, even after performing part of the logic!What changes are included in this PR?
Use only as much iteration over the valid indices as needed for the function case, without collecting all indices.
The
memoizeimplementation does the right thing forFirstValuewithignore_nullsset to true, or returns early for other function cases.Are these changes tested?
All existing tests pass for
FirstValue/LastValue/NthValue.Are there any user-facing changes?
No.