[fix][io] Prevent OOM in FileSource by bounding internal queues#28
Open
Praveenkumar76 wants to merge 2 commits into
Open
[fix][io] Prevent OOM in FileSource by bounding internal queues#28Praveenkumar76 wants to merge 2 commits into
Praveenkumar76 wants to merge 2 commits into
Conversation
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Fixes apache#25135
Motivation
The
FileSourceconnector currently initializes its internal queues (workQueue,inProcess, andrecentlyProcessed) using unboundedLinkedBlockingQueueinstances.Under heavy workloads, the
FileListingThreadcan scan and enqueue files significantly faster than the worker threads can process them. Since the queues have no capacity limits, pending file tasks accumulate indefinitely in memory, eventually resulting in a fataljava.lang.OutOfMemoryError.This creates a classic unbounded producer-consumer backpressure issue where the producer rate is unconstrained by downstream processing throughput.
The issue was reproduced locally by:
This consistently triggered heap exhaustion during file listing.
Modifications
Queue Backpressure
Added
maxQueueSizeconfiguration toFileSourceConfigIntroduced a bounded queue capacity with a default value of
1000Updated
FileSourceto initialize:workQueueinProcessrecentlyProcessedusing bounded
LinkedBlockingQueueinstancesThis ensures the file listing thread naturally blocks when downstream processing cannot keep up, preventing unbounded memory growth.
Configuration Validation
<= 0)Stability Improvements
Introduced proper backpressure behavior between:
FileListingThreadPrevents uncontrolled queue accumulation during:
Verifying this change
Verified locally using:
-Xmx64m)500,000files)Manual Verification:
-Xmx64m) and a large-scale directory generation (500,000files).Before this change:
java.lang.OutOfMemoryErrorAfter this change:
Tests added
Added
FileSourceConfigTestcoverage for:maxQueueSize<= 0)Confirmed existing FileSource behavior remains unchanged under normal workloads
Does this pull request potentially affect one of the following parts: