Optimize Dask EWA persist and prune fornav tasks#707
Optimize Dask EWA persist and prune fornav tasks#707Zaczero wants to merge 1 commit intopytroll:mainfrom
Conversation
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #707 +/- ##
==========================================
+ Coverage 93.67% 93.81% +0.14%
==========================================
Files 89 89
Lines 13621 13805 +184
==========================================
+ Hits 12759 12951 +192
+ Misses 862 854 -8
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
djhoese
left a comment
There was a problem hiding this comment.
Very interesting pull request. Thanks for putting this together. I've written a couple comments inline, but otherwise I have some more general comments and concerns:
- Just FYI that the persisted ll2cr functionality is something I've turned off in my own processing because I found that it only helped performance when a small amount of the input data overlapped the target area. It'll be interesting to try it again with a better implementation.
- One concerning thing in dask in the last year is that they've restructured their dependency graph handling and it is not very bad for performance to mix array tasks and delayed tasks. I think I had fixed the main non-persist usage in this EWA code to use
map_blocksinstead, but it looks like the persist functionality still depends on it. In the long term this should be replaced if at all possible and may actually simplify some of the logic. For example, carry around a dask array of ll2cr chunks with the row/col built in to the structure. - I asked about how you made these PRs in another thread, but looking at this I want more detail. What AI did you use to analyze your processing chain? How/Why do you have PRs for bilinear resampling, nearest neighbor resampling, and EWA resampling? Are you really using all 3 of these in your processing? Why do you understand this EWA code? I mean, why bother? I've been the only one "maintaining" this so I'm just a little surprised that someone analyzed it enough to contribute this large of a rewrite.
pyresample/ewa/dask_ewa.py
Outdated
| if len(arrays) == 1: | ||
| return arrays[0] |
There was a problem hiding this comment.
This condition isn't possible since you added the if len(valid_chunks) == 1 in the _combine_fornav function, right?
pyresample/ewa/dask_ewa.py
Outdated
| ll2cr_delayeds = ll2cr_result.to_delayed() | ||
| flat_delayeds = [ | ||
| (in_row_idx, in_col_idx, delayed_block) | ||
| for in_row_idx, delayed_row in enumerate(ll2cr_delayeds) | ||
| for in_col_idx, delayed_block in enumerate(delayed_row) | ||
| ] |
There was a problem hiding this comment.
Why is this work done outside of the if persist block? I see that flat_delayeds is used, but since you end up using ll2cr_result.name can't we use the chunks/chunk sizes of ll2cr_result to do the for-looping? It seems this code assumes that iterating over these chunks and the conversion to delayed objects is worth dropping the two num_row/col_blocks arguments.
There was a problem hiding this comment.
I cleaned this up. It was a matter of lines of code versus performance, and I think it's better to have slightly more code and get better performance. The two arguments are derived from num_row_blocks, num_col_blocks = ll2cr_result.numblocks[-2:] to avoid chance for error.
pyresample/ewa/dask_ewa.py
Outdated
| key = (ll2cr_result.name, in_row_idx, in_col_idx) | ||
| if extent is None: | ||
| continue | ||
| block_cache[key] = persisted_delayed.key |
There was a problem hiding this comment.
What's the difference between .name and .key on a dask delayed object?
There was a problem hiding this comment.
I am not quite understanding the question. ll2cr_result.name would be the collection name for the dask array, and persisted_delayed.key is the graph key. I simplified this logic to keep it easier to understand and to avoid dict allocations and lookups. A simple list works well there.
I also stumbled upon a cache invalidation bug and fixed it and added a test_ll2cr_cache_recomputes_when_precompute_mode_changes regression test for it. When rows_per_scan or persist changed, the DaskEWAResampler would crash, even before my performance changes.
| return ( | ||
| y_slice.stop > row_min and y_slice.start <= row_max and | ||
| x_slice.stop > col_min and x_slice.start <= col_max | ||
| ) |
There was a problem hiding this comment.
I'm sure I could be reading this wrong, but doesn't this only allow chunks to be processed that entirely encompass the output chunk? That's not what we want. We want to process the data if there is any overlap.
There was a problem hiding this comment.
This code is checking for any overlap. test_generate_fornav_overlap_padding verifies this behavior.
There was a problem hiding this comment.
Here's me printing out the bounds, y_slice, x_slice in this function while running all instances of that test:
(1.9, 1.9, 1.9, 1.9) slice(0, 2, None) slice(0, 2, None)
(1.9, 1.9, 1.9, 1.9) slice(0, 2, None) slice(2, 4, None)
(1.9, 1.9, 1.9, 1.9) slice(2, 4, None) slice(0, 2, None)
(1.9, 1.9, 1.9, 1.9) slice(2, 4, None) slice(2, 4, None)
.(0.8999999999999999, 2.9, 0.8999999999999999, 2.9) slice(0, 2, None) slice(0, 2, None)
(0.8999999999999999, 2.9, 0.8999999999999999, 2.9) slice(0, 2, None) slice(2, 4, None)
(0.8999999999999999, 2.9, 0.8999999999999999, 2.9) slice(2, 4, None) slice(0, 2, None)
(0.8999999999999999, 2.9, 0.8999999999999999, 2.9) slice(2, 4, None) slice(2, 4, None)
.(0.8999999999999999, 2.9, 0.8999999999999999, 2.9) slice(0, 2, None) slice(0, 2, None)
(0.8999999999999999, 2.9, 0.8999999999999999, 2.9) slice(0, 2, None) slice(2, 4, None)
(0.8999999999999999, 2.9, 0.8999999999999999, 2.9) slice(2, 4, None) slice(0, 2, None)
(0.8999999999999999, 2.9, 0.8999999999999999, 2.9) slice(2, 4, None) slice(2, 4, None)
The first test case is the only one that doesn't expect an overlap of all 4 chunks and it has a zero-size input unless I'm missing something (all bounds are 1.9). So that doesn't seem realistic.
That said, I see now that the logic was the inverse of what I expected and that it works as expected...although it took me way too long to wrap my head around it at the end of a long day. Overall, makes sense. Thanks.
8444f72 to
2de35c8
Compare
Speaking of the long term, I would rather envision an implementation without the dask abstraction overhead. For the bilinear resampler case, I was able to get 300% more performance by having a numpy native implementation with some Linux optimization tricks. The nice thing is that Dask buys remote dataset support, but I'm unsure how useful it is needed here. If I were to do something long term (and given my ignorance), I would just go full send with numpy. Torch and GPU acceleration sound lovely too. I haven't really digged down into that though so maybe I'm completely wrong.
I use GPT family models with instructions that optimize my workflow. I am using bilinear resampling because that matches my data needs best. The nearest neighbor and EWA are just for fun and to contribute something because satpy and pyresample seem like great projects. |
So the original EWA support in pyresample and the applications that I had it in before this (it originally comes from a toolkit called ms2gt via ll2cr and fornav binaries) were all plain numpy arrays only. I don't even use pyresample or satpy for distributed (remote) data processing. The main benefit to the EWA resampling via the dask implementation is processing more data than comfortably fits into memory. The other benefit is the parallel-ness of throwing it onto multiple threads. Satpy is never tested/benchmarked on a distributed cluster, at least not for the main use cases as parts of Satpy still don't play nicely with distributed worker nodes. Threaded dask scheduler is our usual way of doing things. So I'll agree that EWA via dask is not a mind blowing improvement but it is an improvement over straight numpy when the arrays get large. The bilinear resampler in pyresample is just not a good algorithm. It was what needed to happen for something to work but even @pnuu who originally wrote it doesn't use it any more. The dask implementation of that resampler is just too...naive(?). So I'm not surprised that any other bilinear implementation performs better. It is why Panu and others use the experimental(?) gradient search resampling instead as it is bilinear by default and performs way way way better than the existing implementation. |
Benchmarks