Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 42 additions & 10 deletions .github/workflows/flink_cdc_base.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,23 +27,24 @@ on:
description: "Flink versions to test against."
required: false
type: string
default: "['generic']"
default: "['1.20.3']"
modules:
description: "Flink CDC modules to test against."
required: true
type: string
parallelism:
description: "Flink parallelism."
e2e-stages:
description: "Pipeline E2E stage filters (JSON array). Defaults to a single empty entry meaning no filter (run everything). Set to e.g. ['stage1','stage2','stage3'] to fan out the pipeline E2E module across multiple jobs by subpackage."
required: false
type: number
default: 4
type: string
default: '[""]'
custom-maven-parameter:
description: "Custom maven parameter."
required: false
type: string

jobs:
test:
name: jdk${{ matrix.java-version }}-flink-${{ matrix.flink-version }}, ${{ matrix.module }}${{ matrix.stage != '' && format(', {0}', matrix.stage) || '' }}
runs-on: ubuntu-22.04
timeout-minutes: 120
strategy:
Expand All @@ -53,14 +54,15 @@ jobs:
java-version: ${{ fromJSON(inputs.java-versions) }}
flink-version: ${{ fromJSON(inputs.flink-versions) }}
module: ${{ fromJSON(inputs.modules) }}
stage: ${{ fromJSON(inputs.e2e-stages) }}
steps:
- run: echo "Running CI pipeline for JDK version ${{ matrix.java-version }}"
- name: Clean up disk space
run: |
set -euo pipefail

echo "Disk space before cleanup"
df -h
df -h

echo "Cleaning up disk space"
sudo rm -rf /usr/share/dotnet
Expand All @@ -72,6 +74,25 @@ jobs:
echo "Disk space after cleanup"
df -h

- name: Relocate Docker data-root to /mnt
run: |
set -euo pipefail

# GitHub-hosted runners ship a large secondary volume mounted at /mnt
# (typically /dev/sdb1, ~75G). The default Docker data-root lives on
# /dev/root and gets crowded by Maven, build artifacts and other test
# containers, leaving e2e containers (notably OceanBase, which OBD
# pre-checks for ~10G free) failing to boot. Point Docker at /mnt to
# give containers the headroom they need.
sudo systemctl stop docker docker.socket
sudo mkdir -p /mnt/docker
echo '{"data-root":"/mnt/docker"}' | sudo tee /etc/docker/daemon.json
sudo systemctl start docker
docker info | grep "Docker Root Dir"

echo "Disk space after Docker relocation"
df -h

- name: Check out repository code
uses: actions/checkout@v6
with:
Expand Down Expand Up @@ -99,16 +120,27 @@ jobs:

modules=$(./.github/workflows/modules.py test "${{ matrix.module }}")
compile_modules=$(./.github/workflows/modules.py compile "${{ matrix.module }}")

build_maven_parameter="-DspecifiedMongoVersion=8.0.14"

if [ ! -z "${{ matrix.flink-version }}" ]; then
build_maven_parameter="${build_maven_parameter:+$build_maven_parameter }-DspecifiedFlinkVersion=${{ matrix.flink-version }}"
fi

build_maven_parameter="${build_maven_parameter:+$build_maven_parameter }${{ inputs.custom-maven-parameter }}"

mvn --no-snapshot-updates -B -DskipTests ${{ inputs.custom-maven-parameter }} -pl $compile_modules -am install && mvn --no-snapshot-updates -B $build_maven_parameter -pl $modules -DspecifiedParallelism=${{ inputs.parallelism }} -Duser.timezone=$jvm_timezone verify

# Pipeline E2E stage filter: when matrix.stage is set, restrict surefire to the
# matching subpackage. Stage1 hosts the YARN test; stage2/stage3 skip it via
# -DskipYarnTest=true. Empty stage (default) means no filter — run everything.
e2e_stage="${{ matrix.stage }}"
if [ -n "$e2e_stage" ]; then
build_maven_parameter="$build_maven_parameter -De2eIncludePattern=**/${e2e_stage}/**/*ITCase.java"
if [ "$e2e_stage" != "stage1" ]; then
build_maven_parameter="$build_maven_parameter -DskipYarnTest=true"
fi
fi

mvn --no-snapshot-updates -B -DskipTests ${{ inputs.custom-maven-parameter }} -pl $compile_modules -am install && mvn --no-snapshot-updates -B $build_maven_parameter -pl $modules -Duser.timezone=$jvm_timezone verify

- name: Print JVM thread dumps when cancelled
if: ${{ failure() }}
Expand Down
18 changes: 4 additions & 14 deletions .github/workflows/flink_cdc_ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -100,32 +100,22 @@ jobs:
custom-maven-parameter: "-Pflink2"
modules: "['mysql-source', 'postgres-source, oceanbase-source, tidb, vitess', 'oracle, sqlserver', 'db2, mongodb']"
pipeline_e2e:
strategy:
max-parallel: 2
fail-fast: false
matrix:
parallelism: [ 1, 4 ]
name: Pipeline E2E Tests (${{ matrix.parallelism }}-Parallelism)
name: Pipeline E2E Tests
uses: ./.github/workflows/flink_cdc_base.yml
with:
java-versions: "[11]"
flink-versions: "['1.20.3']"
modules: "['pipeline_e2e']"
parallelism: ${{ matrix.parallelism }}
e2e-stages: "['stage1', 'stage2', 'stage3']"
pipeline_e2e_2_x:
strategy:
max-parallel: 2
fail-fast: false
matrix:
parallelism: [ 1, 4 ]
name: Pipeline E2E Tests 2.x (${{ matrix.parallelism }}-Parallelism)
name: Pipeline E2E Tests 2.x
uses: ./.github/workflows/flink_cdc_base.yml
with:
java-versions: "[11]"
flink-versions: "['2.2.0']"
custom-maven-parameter: "-Pflink2"
modules: "['pipeline_e2e']"
parallelism: ${{ matrix.parallelism }}
e2e-stages: "['stage1', 'stage2', 'stage3']"
source_e2e:
name: Source E2E Tests
uses: ./.github/workflows/flink_cdc_base.yml
Expand Down
18 changes: 4 additions & 14 deletions .github/workflows/flink_cdc_ci_nightly.yml
Original file line number Diff line number Diff line change
Expand Up @@ -97,33 +97,23 @@ jobs:
modules: "['mysql-source', 'postgres-source, oceanbase-source, tidb, vitess', 'oracle, sqlserver', 'db2, mongodb']"
pipeline_e2e:
if: github.repository == 'apache/flink-cdc'
strategy:
max-parallel: 2
fail-fast: false
matrix:
parallelism: [ 1, 4 ]
name: Pipeline E2E Tests (${{ matrix.parallelism }}-Parallelism)
name: Pipeline E2E Tests
uses: ./.github/workflows/flink_cdc_base.yml
with:
java-versions: "[17]"
flink-versions: "['1.20.3']"
modules: "['pipeline_e2e']"
parallelism: ${{ matrix.parallelism }}
e2e-stages: "['stage1', 'stage2', 'stage3']"
pipeline_e2e_2_x:
if: github.repository == 'apache/flink-cdc'
strategy:
max-parallel: 2
fail-fast: false
matrix:
parallelism: [ 1, 4 ]
name: Pipeline E2E Tests 2.x (${{ matrix.parallelism }}-Parallelism)
name: Pipeline E2E Tests 2.x
uses: ./.github/workflows/flink_cdc_base.yml
with:
java-versions: "[17]"
flink-versions: "['2.2.0']"
custom-maven-parameter: "-Pflink2"
modules: "['pipeline_e2e']"
parallelism: ${{ matrix.parallelism }}
e2e-stages: "['stage1', 'stage2', 'stage3']"
source_e2e:
if: github.repository == 'apache/flink-cdc'
name: Source E2E Tests
Expand Down
11 changes: 9 additions & 2 deletions flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,12 @@ limitations under the License.
<fluss.version>0.9.0-incubating</fluss.version>
<jmh.version>1.37</jmh.version>
<hudi.version>1.1.0</hudi.version>
<!-- E2E stage filtering: CI overrides via -De2eIncludePattern=**/stageN/**/*ITCase.java
and -DskipYarnTest=true for stages where the YARN test should not run. The defaults
here preserve the original `mvn verify` behavior for local developers. -->
<e2eIncludePattern>**/*ITCase.java</e2eIncludePattern>
<e2eExcludePattern>**/MysqlE2eWithYarnApplicationITCase.java</e2eExcludePattern>
<skipYarnTest>false</skipYarnTest>
</properties>

<dependencies>
Expand Down Expand Up @@ -436,10 +442,10 @@ limitations under the License.
</goals>
<configuration>
<includes>
<include>**/*.*</include>
<include>${e2eIncludePattern}</include>
</includes>
<excludes>
<exclude>**/MysqlE2eWithYarnApplicationITCase.java</exclude>
<exclude>${e2eExcludePattern}</exclude>
</excludes>
<forkCount>1</forkCount>
<systemPropertyVariables>
Expand All @@ -454,6 +460,7 @@ limitations under the License.
<goal>test</goal>
</goals>
<configuration>
<skip>${skipYarnTest}</skip>
<includes>
<include>**/MysqlE2eWithYarnApplicationITCase.java</include>
</includes>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.flink.cdc.pipeline.tests;
package org.apache.flink.cdc.pipeline.tests.stage1;

import org.apache.flink.cdc.common.utils.Preconditions;
import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase;
Expand All @@ -24,6 +24,8 @@
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedClass;
import org.junit.jupiter.params.provider.ValueSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -36,6 +38,8 @@
import java.util.function.Function;

/** End-to-end tests for mysql cdc pipeline job. */
@ParameterizedClass
@ValueSource(ints = {1, 4})
class MysqlE2eITCase extends PipelineTestEnvironment {
private static final Logger LOG = LoggerFactory.getLogger(MysqlE2eITCase.class);

Expand All @@ -45,6 +49,10 @@ class MysqlE2eITCase extends PipelineTestEnvironment {
private final Function<String, String> dbNameFormatter =
(s) -> String.format(s, mysqlInventoryDatabase.getDatabaseName());

MysqlE2eITCase(int parallelism) {
super(parallelism);
}

@BeforeEach
public void before() throws Exception {
super.before();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.flink.cdc.pipeline.tests;
package org.apache.flink.cdc.pipeline.tests.stage1;

import org.apache.flink.cdc.common.test.utils.TestUtils;
import org.apache.flink.cdc.connectors.mysql.testutils.MySqlContainer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,19 @@
* limitations under the License.
*/

package org.apache.flink.cdc.pipeline.tests;
package org.apache.flink.cdc.pipeline.tests.stage1;

import org.apache.flink.cdc.common.test.utils.TestUtils;
import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase;
import org.apache.flink.cdc.pipeline.tests.utils.PipelineTestEnvironment;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.params.ParameterizedClass;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -39,12 +41,18 @@
import java.util.stream.Stream;

/** E2e tests for User-defined functions. */
@ParameterizedClass
@ValueSource(ints = {1, 4})
class UdfE2eITCase extends PipelineTestEnvironment {
private static final Logger LOG = LoggerFactory.getLogger(TransformE2eITCase.class);
private static final Logger LOG = LoggerFactory.getLogger(UdfE2eITCase.class);

protected final UniqueDatabase udfTestDatabase =
new UniqueDatabase(MYSQL, "transform_test", MYSQL_TEST_USER, MYSQL_TEST_PASSWORD);

UdfE2eITCase(int parallelism) {
super(parallelism);
}

private final Function<String, String> dbNameFormatter =
(s) -> String.format(s, udfTestDatabase.getDatabaseName());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,28 @@
* limitations under the License.
*/

package org.apache.flink.cdc.pipeline.tests;
package org.apache.flink.cdc.pipeline.tests.stage1;

import org.apache.flink.cdc.pipeline.tests.utils.PipelineTestEnvironment;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedClass;
import org.junit.jupiter.params.provider.ValueSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;

/** End-to-end tests for values cdc pipeline job. */
@ParameterizedClass
@ValueSource(ints = {1, 4})
class ValuesE2eITCase extends PipelineTestEnvironment {
private static final Logger LOG = LoggerFactory.getLogger(ValuesE2eITCase.class);

ValuesE2eITCase(int parallelism) {
super(parallelism);
}

@Test
void testValuesSingleSplitSingleTable() throws Exception {
String pipelineJob =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.flink.cdc.pipeline.tests.migration;
package org.apache.flink.cdc.pipeline.tests.stage1.migration;

import org.apache.flink.api.common.JobID;
import org.apache.flink.cdc.common.test.utils.TestUtils;
Expand All @@ -29,8 +29,10 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.EnabledIfSystemProperty;
import org.junit.jupiter.params.ParameterizedClass;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.Container;
Expand All @@ -54,13 +56,20 @@
* E2e cases for stopping & restarting jobs of `MySQL source to Paimon sink` from previous state.
*/
@EnabledIfSystemProperty(named = "specifiedFlinkVersion", matches = "^1.*")
@ParameterizedClass
@ValueSource(ints = {1, 4})
class MySqlToPaimonMigrationITCase extends PipelineTestEnvironment {

private static final Logger LOG = LoggerFactory.getLogger(MySqlToPaimonMigrationITCase.class);

private static final Duration PAIMON_TESTCASE_TIMEOUT = Duration.ofMinutes(3);

protected UniqueDatabase mysqlInventoryDatabase;

MySqlToPaimonMigrationITCase(int parallelism) {
super(parallelism);
}

private final Function<String, String> dbNameFormatter =
(s) -> String.format(s, mysqlInventoryDatabase.getDatabaseName());

Expand Down
Loading
Loading