Kafka Connect: Tolerate CommitFailedException and InvalidProducerEpochException during rebalance#16366
Open
kumarpritam863 wants to merge 28 commits into
Conversation
Contributor
Author
|
@danielcweeks can you please review this. |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
When a Kafka consumer group re-balance happens between the time the iceberg-kafka-connect
sink task prepares a transactional offset commit and the time the broker processes it, the
connector currently dies with an unrecoverable
ConnectException:org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
...
Caused by: org.apache.kafka.clients.consumer.CommitFailedException: Transaction offset Commit
failed due to consumer group metadata mismatch: Specified group generation id is not valid.
at org.apache.kafka.clients.producer.internals.TransactionManager$TxnOffsetCommitHandler.handleResponse(...)
This is a transient, expected failure mode under re-balance — the consumer-group
generation id captured in
producer.sendOffsetsToTransaction(...)becomes stale by the timethe broker validates it. The same applies to
InvalidProducerEpochExceptionwhen theproducer epoch is bumped mid-flight. Both should be recoverable, not fatal.
Root cause
In
Channel.send()the flow is:The previous catch block aborted the transaction and rethrew the exception unchanged, so it
propagated as a non-retriable failure into WorkerSinkTask.deliverMessages and killed the
task.
Fix
Channel.send() now distinguishes recoverable re-balance failures from fatal ones:
wrapped in another KafkaException): abort the transaction and translate to
org.apache.kafka.connect.errors.RetriableException. Connect's framework then pauses the
consumer, retains the message batch, and re-delivers it after the re-balance settles. The
aborted transaction never advanced source offsets, so once the partitions are reassigned
the new owner resumes from the last broker-committed offset — no data loss. Any data files
flushed before the abort become orphans, recoverable by Iceberg's orphan-file expiration.
CommitterImpl.processControlEvents adds a RetriableException log line at info level so
the re-balance recovery is visible in task logs but doesn't surface as an error.
KafkaUtils.seekToLastCommittedOffsets(SinkTaskContext) is added and called from
IcebergSinkTask.close(partitions) in addition to open(partitions). This is required
under incremental cooperative re-balance, where Connect can invoke close() on a
revoked partition without a paired open() — meaning the framework's own rewind in
onPartitionsAssigned never runs for it. Seeking the main consumer here guarantees that
records read past the broker-committed offset (and never committed transactionally) are
re-fetched on the next poll.
Guarantees preserved
via the producer transaction — unchanged.
re-fetched records flow through the next successful commit.
(kafka.connect.offsets..) still gate replays on Coordinator restart.
Test plan
New ChannelTest covers:
Existing WorkerTest, CoordinatorTest, CommitterImplTest, IcebergSinkTaskTest and the
rest of the kafka-connect suite pass unchanged.