Skip to content

Release 1.19 lyft#83

Draft
maheepm-lyft wants to merge 2355 commits intorelease-1.17-lyftfrom
release-1.19-lyft
Draft

Release 1.19 lyft#83
maheepm-lyft wants to merge 2355 commits intorelease-1.17-lyftfrom
release-1.19-lyft

Conversation

@maheepm-lyft
Copy link

What is the purpose of the change

(For example: This pull request makes task deployment go through the blob server, rather than through RPC. That way we avoid re-transferring them on each deployment (during recovery).)

Brief change log

(for example:)

  • The TaskInfo is stored in the blob store on job creation time as a persistent artifact
  • Deployments RPC transmits only the blob storage reference
  • TaskManagers retrieve the TaskInfo from the blob cache

Verifying this change

Please make sure both new and modified tests in this PR follows the conventions defined in our code quality guide: https://flink.apache.org/contributing/code-style-and-quality-common.html#testing

(Please pick either of the following options)

This change is a trivial rework / code cleanup without any test coverage.

(or)

This change is already covered by existing tests, such as (please describe tests).

(or)

This change added tests and can be verified as follows:

(example:)

  • Added integration tests for end-to-end deployment with large payloads (100MB)
  • Extended integration test for recovery after master (JobManager) failure
  • Added test that validates that TaskInfo is transferred only once across recoveries
  • Manually verified the change by running a 4 node cluster with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (yes / no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (yes / no)
  • The serializers: (yes / no / don't know)
  • The runtime per-record code paths (performance sensitive): (yes / no / don't know)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know)
  • The S3 file system connector: (yes / no / don't know)

Documentation

  • Does this pull request introduce a new feature? (yes / no)
  • If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)

XComp and others added 30 commits February 19, 2024 16:43
…MemoryError in JDK21

This test started to fail quite regularly in JDK21. The problem was that the low heap size could have caused an OutOfMemoryError to appear when compiling the dummy classes. An OOM in the compilation phase results in a different error message being printed to stdout that wasn't captured by the test.

The solution is to pre-compile the classes upfront (with the normal heap size). The test main method will only load the classes. No compilation is necessary.
…ated tests of DefaultSlotStatusSyncerTest

Also deduplicate the code of these tests.
suspend and cancel reset the ExecutionGraph in a similar way. I move the common logic into its own method to make this more prominent in the code.
…iptorGroup out of the RPC main thread]"

This reverts commit d18a4bf.

(cherry picked from commit 7a709bf)
…used by multiple writes to the same sink table and shared staging directory

This closes apache#24492

* Fix unstable TableSourceITCase#testTableHintWithLogicalTableScanReuse
* Moves the staging dir configuration into builder for easier testing

---------

Co-authored-by: Matthias Pohl <matthias.pohl@aiven.io>
(cherry picked from commit 7d0111d)
Jiabao-Sun and others added 30 commits April 18, 2025 15:21
So far, we used a special value for the final checkpoint on endInput. However, as shown in the description of this ticket, final doesn't mean final. Hence, multiple committables with EOI could be created at different times.

With this commit, we stop using a special value for such committables and instead try to guess the checkpoint id of the next checkpoint. There are various factors that influence the checkpoint id but we can mostly ignore them all because we just need to pick a checkpoint id that is
- higher than all checkpoint ids of the previous, successful checkpoints of this attempt
- higher than the checkpoint id of the restored checkpoint
- lower than any future checkpoint id.

Hence, we just remember the last observed checkpoint id (initialized with max(0, restored id)), and use last id + 1 for endInput. Naturally, multiple endInput calls happening through restarts will result in unique checkpoint ids. Note that aborted checkpoints before endInput may result in diverged checkpoint ids across subtasks. However, each of the id satisfies above requirements and any id of endInput1 will be smaller than any id of endInput2. Thus, diverged checkpoint ids will not impact correctness at all.

(cherry picked from commit 9302545)
Co-authored-by: Ferenc Csaky <fcsaky@apache.org>
…in adaptive scheduler

Also enable this strategy by default via the introduced config option
Co-authored-by: Matthias Pohl <github@mapohl.com>
…Y type by evaluating Unsafe.arrayBaseOffset(byte[].class) in TM rather than in JM (apache#26592)

Fix HashPartitioner codegen for BINARY/VARBINARY type by evaluating BYTE_ARRAY_BASE_OFFSET in TM instead of JM.

The issue is, if JM memory is set > 32G while TM memory is set < 32G,
this causes JVM to treat the JAVA process > 32G as large heap JVM. This
can impact Unsafe behavior. For eg: UNSAFE.arrayBaseOffset(byte[].class)
will return 24 for large heap JVM while 16 for others.

Due to this, the tasks that run on TM (<32 G while JM > 32G or vice
versa) that try to read the byte[] for MurmurHash read wrong memory locations.

Signed-off-by: Jiangjie (Becket) Qin <becket.qin@gmail.com>
…es in batch mode (apache#27016)

In apache#26433, we removed the EOI marker in the form of Long.MAX_VALUE as the checkpoint id. Since
streaming pipelines can continue to checkpoint even after their respective operators have been shut
down, it is not safe to use a constant as this can lead to duplicate commits.

However, in batch pipelines we only have one commit on job shutdown. Using any checkpoint id should
suffice in this scenario. Any pending committables should be processed by the ComitterOperator when
the operator shuts down. No further checkpoints will take place.

There are various connectors which rely on this behavior. I don't see any drawbacks from keeping
this behavior for batch pipelines.
If a resource is lazily created in open, we can only close after checking for null. Otherwise a failure during initialization will trigger secondary failures.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.