From 289f01721a0e091996af989389e5009976bb684f Mon Sep 17 00:00:00 2001 From: Hongshun Wang Date: Wed, 20 May 2026 19:04:21 +0800 Subject: [PATCH 1/2] [FLINK-39732] Introduce TableDiscoverer SPI for flexible table subscription (with default JdbcTableDiscoverer) --- flink-cdc-common/pom.xml | 22 ++ .../source/discover/JdbcTableDiscoverer.java | 179 ++++++++++++ .../discover/JdbcTableDiscovererFactory.java | 37 +++ .../source/discover/TableDiscoverer.java | 82 ++++++ .../discover/TableDiscovererFactory.java | 134 +++++++++ ...mon.source.discover.TableDiscovererFactory | 16 ++ .../source/JdbcTableDiscovererITCase.java | 257 ++++++++++++++++++ 7 files changed, 727 insertions(+) create mode 100644 flink-cdc-common/src/main/java/org/apache/flink/cdc/common/source/discover/JdbcTableDiscoverer.java create mode 100644 flink-cdc-common/src/main/java/org/apache/flink/cdc/common/source/discover/JdbcTableDiscovererFactory.java create mode 100644 flink-cdc-common/src/main/java/org/apache/flink/cdc/common/source/discover/TableDiscoverer.java create mode 100644 flink-cdc-common/src/main/java/org/apache/flink/cdc/common/source/discover/TableDiscovererFactory.java create mode 100644 flink-cdc-common/src/main/resources/META-INF/services/org.apache.flink.cdc.common.source.discover.TableDiscovererFactory create mode 100644 flink-cdc-common/src/test/java/org/apache/flink/cdc/common/source/JdbcTableDiscovererITCase.java diff --git a/flink-cdc-common/pom.xml b/flink-cdc-common/pom.xml index 80237d26def..99ae43f2e53 100644 --- a/flink-cdc-common/pom.xml +++ b/flink-cdc-common/pom.xml @@ -27,6 +27,28 @@ limitations under the License. flink-cdc-common + + + + mysql + mysql-connector-java + 8.0.27 + test + + + org.testcontainers + mysql + ${testcontainers.version} + test + + + org.testcontainers + junit-jupiter + ${testcontainers.version} + test + + + diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/source/discover/JdbcTableDiscoverer.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/source/discover/JdbcTableDiscoverer.java new file mode 100644 index 00000000000..bc610a1d9a1 --- /dev/null +++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/source/discover/JdbcTableDiscoverer.java @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.common.source.discover; + +import org.apache.flink.cdc.common.configuration.ConfigOption; +import org.apache.flink.cdc.common.configuration.ConfigOptions; +import org.apache.flink.cdc.common.configuration.Configuration; +import org.apache.flink.cdc.common.event.TableId; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.Statement; +import java.util.LinkedHashSet; +import java.util.Set; + +/** + * A {@link TableDiscoverer} that reads the list of subscribed tables from a JDBC database table. + * + *

This implementation connects to any JDBC-compatible database (e.g., MySQL, PostgreSQL) and + * reads table names from a specified column. The table names are parsed as {@link TableId} objects. + * + *

Configuration keys (read from the full connector configuration): + * + *

    + *
  • {@code table.discoverer.jdbc.url} — JDBC connection URL (required). + *
  • {@code table.discoverer.jdbc.table-name} — The database table storing subscription entries + * (required). + *
  • {@code table.discoverer.jdbc.username} — JDBC username (required). + *
  • {@code table.discoverer.jdbc.password} — JDBC password (required). + *
  • {@code table.discoverer.jdbc.column-name} — The column containing fully-qualified table + * names. Defaults to {@code "subscribe_table_name"}. + *
+ * + *

Expected schema: The target column must contain fully-qualified table names formatted + * as {@code "schemaName.tableName"} (two-part) or {@code "namespace.schemaName.tableName"} + * (three-part). For example: + * + *

{@code
+ * CREATE TABLE cdc_subscriptions (
+ *     subscribe_table_name VARCHAR(255) PRIMARY KEY
+ * );
+ * INSERT INTO cdc_subscriptions VALUES ('source_db.orders'), ('source_db.products');
+ * }
+ * + *

Null values and rows that cannot be parsed into a valid {@link TableId} are silently skipped. + */ +public class JdbcTableDiscoverer implements TableDiscoverer { + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(JdbcTableDiscoverer.class); + + public static final ConfigOption JDBC_URL = + ConfigOptions.key("table.discoverer.jdbc.url") + .stringType() + .noDefaultValue() + .withDescription("The JDBC connection URL for the table discovery database."); + + public static final ConfigOption TABLE_NAME = + ConfigOptions.key("table.discoverer.jdbc.table-name") + .stringType() + .noDefaultValue() + .withDescription( + "The name of the database table storing the subscription entries."); + + public static final ConfigOption USERNAME = + ConfigOptions.key("table.discoverer.jdbc.username") + .stringType() + .noDefaultValue() + .withDescription("The JDBC username for the table discovery database."); + + public static final ConfigOption PASSWORD = + ConfigOptions.key("table.discoverer.jdbc.password") + .stringType() + .noDefaultValue() + .withDescription("The JDBC password for the table discovery database."); + + public static final ConfigOption COLUMN_NAME = + ConfigOptions.key("table.discoverer.jdbc.column-name") + .stringType() + .defaultValue("subscribe_table_name") + .withDescription( + "The column name in the subscription table that contains the " + + "fully-qualified table names to subscribe to."); + + private transient Connection connection; + private transient String tableName; + private transient String columnName; + + @Override + public void open(Context context) throws Exception { + Configuration config = context.getConfiguration(); + + String jdbcUrl = config.get(JDBC_URL); + if (jdbcUrl == null || jdbcUrl.isEmpty()) { + throw new IllegalArgumentException( + "'" + JDBC_URL.key() + "' is required for JdbcTableDiscoverer."); + } + tableName = config.get(TABLE_NAME); + if (tableName == null || tableName.isEmpty()) { + throw new IllegalArgumentException( + "'" + TABLE_NAME.key() + "' is required for JdbcTableDiscoverer."); + } + String username = config.get(USERNAME); + if (username == null || username.isEmpty()) { + throw new IllegalArgumentException( + "'" + USERNAME.key() + "' is required for JdbcTableDiscoverer."); + } + String password = config.get(PASSWORD); + if (password == null || password.isEmpty()) { + throw new IllegalArgumentException( + "'" + PASSWORD.key() + "' is required for JdbcTableDiscoverer."); + } + columnName = config.get(COLUMN_NAME); + + connection = DriverManager.getConnection(jdbcUrl, username, password); + LOG.info( + "JdbcTableDiscoverer opened connection to '{}', table='{}', column='{}'.", + jdbcUrl, + tableName, + columnName); + } + + @Override + public Set discover() throws Exception { + Set result = new LinkedHashSet<>(); + String sql = "SELECT " + columnName + " FROM " + tableName; + try (Statement stmt = connection.createStatement(); + ResultSet rs = stmt.executeQuery(sql)) { + while (rs.next()) { + String value = rs.getString(1); + if (value == null || value.isEmpty()) { + continue; + } + try { + result.add(TableId.parse(value)); + } catch (IllegalArgumentException e) { + LOG.warn( + "Skipping invalid table name '{}' from subscription table '{}'.", + value, + tableName); + } + } + } + LOG.info( + "JdbcTableDiscoverer discovered {} tables from '{}.{}'.", + result.size(), + tableName, + columnName); + return result; + } + + @Override + public void close() throws Exception { + if (connection != null && !connection.isClosed()) { + connection.close(); + LOG.info("JdbcTableDiscoverer closed JDBC connection."); + } + } +} diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/source/discover/JdbcTableDiscovererFactory.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/source/discover/JdbcTableDiscovererFactory.java new file mode 100644 index 00000000000..9450ab82190 --- /dev/null +++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/source/discover/JdbcTableDiscovererFactory.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.common.source.discover; + +/** + * {@link TableDiscovererFactory} for {@link JdbcTableDiscoverer}. Activated when {@code + * table.discoverer.type = 'jdbc'}. + */ +public class JdbcTableDiscovererFactory implements TableDiscovererFactory { + + public static final String IDENTIFIER = "jdbc"; + + @Override + public String identifier() { + return IDENTIFIER; + } + + @Override + public TableDiscoverer createDiscoverer() { + return new JdbcTableDiscoverer(); + } +} diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/source/discover/TableDiscoverer.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/source/discover/TableDiscoverer.java new file mode 100644 index 00000000000..6997692ce17 --- /dev/null +++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/source/discover/TableDiscoverer.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.common.source.discover; + +import org.apache.flink.cdc.common.annotation.PublicEvolving; +import org.apache.flink.cdc.common.configuration.Configuration; +import org.apache.flink.cdc.common.event.TableId; + +import java.io.Serializable; +import java.util.Set; + +/** + * Pluggable abstraction for discovering a set of tables that a source connector should read. + * Implementations are loaded via SPI through {@link TableDiscovererFactory} and determine which + * tables the source should subscribe to. + * + *

Lifecycle: {@link #open(Context)} is called once before the first call to {@link #discover()}. + * {@link #close()} is called when the discoverer is no longer needed. Implementations manage their + * own resources (e.g., connections) within this lifecycle. + * + *

Built-in implementations include: + * + *

    + *
  • {@link JdbcTableDiscoverer} - reads the subscription list from a JDBC database table. + *
+ */ +@PublicEvolving +public interface TableDiscoverer extends Serializable, AutoCloseable { + + /** + * Opens this discoverer and initializes any resources needed for table discovery. + * + * @param context The context providing configuration and class loader. + * @throws Exception if initialization fails. + */ + void open(Context context) throws Exception; + + /** + * Discovers and returns the set of tables to subscribe to. + * + * @return A set of {@link TableId} representing the tables to read. + * @throws Exception if the discovery fails. + */ + Set discover() throws Exception; + + /** + * Closes this discoverer and releases any resources. + * + * @throws Exception if closing fails. + */ + @Override + void close() throws Exception; + + /** Context providing runtime information for the discoverer. */ + interface Context { + + /** + * Returns the full connector configuration. Discoverer implementations read their own + * configuration keys (e.g., {@code table.discoverer.jdbc.url}) directly from this + * configuration. + */ + Configuration getConfiguration(); + + /** Returns the user code class loader of the current session. */ + ClassLoader getUserCodeClassLoader(); + } +} diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/source/discover/TableDiscovererFactory.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/source/discover/TableDiscovererFactory.java new file mode 100644 index 00000000000..5e7af51f1f2 --- /dev/null +++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/source/discover/TableDiscovererFactory.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.common.source.discover; + +import org.apache.flink.cdc.common.annotation.PublicEvolving; +import org.apache.flink.cdc.common.configuration.Configuration; + +import java.util.HashSet; +import java.util.ServiceLoader; +import java.util.Set; + +/** + * SPI factory that creates a {@link TableDiscoverer}. Implementations are discovered at runtime via + * Java's {@link ServiceLoader} mechanism. To register a new discoverer type, add a line with the + * fully-qualified factory class name to: + * + *
{@code
+ * META-INF/services/org.apache.flink.cdc.common.source.discover.TableDiscovererFactory
+ * }
+ * + *

The discoverer type is selected by the user via the {@code table.discoverer.type} option, + * which is matched (case-insensitively) against {@link #identifier()}. + */ +@PublicEvolving +public interface TableDiscovererFactory { + + /** + * The unique identifier of this discoverer factory, used to match against the {@code + * table.discoverer.type} option. Must be lowercase, e.g. {@code "jdbc"} or {@code + * "fluss-default"}. + */ + String identifier(); + + /** + * Creates a new {@link TableDiscoverer} instance. The returned discoverer is not yet + * initialized; the caller must invoke {@link TableDiscoverer#open(TableDiscoverer.Context)} + * before calling {@link TableDiscoverer#discover()}. + */ + TableDiscoverer createDiscoverer(); + + /** + * Creates a {@link TableDiscoverer.Context} with the given configuration and class loader. This + * is a convenience factory method so that callers do not need to implement the {@link + * TableDiscoverer.Context} interface themselves. + * + * @param configuration The full connector configuration. + * @param classLoader The user code class loader. + * @return A new {@link TableDiscoverer.Context} instance. + */ + static TableDiscoverer.Context createContext( + Configuration configuration, ClassLoader classLoader) { + return new DefaultDiscovererContext(configuration, classLoader); + } + + /** + * Utility method that discovers a {@link TableDiscovererFactory} via SPI whose {@link + * #identifier()} matches the given {@code type}, and delegates discoverer creation to it. + * + * @param type The discoverer type identifier (e.g. "jdbc", "fluss-default"). + * @param classLoader The class loader used for SPI discovery. + * @return A new, uninitialized {@link TableDiscoverer}. + * @throws IllegalArgumentException if no factory matches the given type. + * @throws IllegalStateException if multiple factories share the same identifier. + */ + static TableDiscoverer createDiscoverer(String type, ClassLoader classLoader) { + ClassLoader loader = + classLoader != null ? classLoader : Thread.currentThread().getContextClassLoader(); + ServiceLoader serviceLoader = + ServiceLoader.load(TableDiscovererFactory.class, loader); + + TableDiscovererFactory matched = null; + Set known = new HashSet<>(); + for (TableDiscovererFactory factory : serviceLoader) { + known.add(factory.identifier()); + if (factory.identifier().equalsIgnoreCase(type)) { + if (matched != null) { + throw new IllegalStateException( + "Multiple TableDiscovererFactory implementations found for identifier '" + + type + + "': " + + matched.getClass().getName() + + " and " + + factory.getClass().getName()); + } + matched = factory; + } + } + if (matched == null) { + throw new IllegalArgumentException( + "Unsupported 'table.discoverer.type' value: '" + + type + + "'. Available discoverer types: " + + known + + "."); + } + return matched.createDiscoverer(); + } + + /** Default implementation of {@link TableDiscoverer.Context}. */ + class DefaultDiscovererContext implements TableDiscoverer.Context { + private final Configuration configuration; + private final ClassLoader classLoader; + + DefaultDiscovererContext(Configuration configuration, ClassLoader classLoader) { + this.configuration = configuration; + this.classLoader = classLoader; + } + + @Override + public Configuration getConfiguration() { + return configuration; + } + + @Override + public ClassLoader getUserCodeClassLoader() { + return classLoader; + } + } +} diff --git a/flink-cdc-common/src/main/resources/META-INF/services/org.apache.flink.cdc.common.source.discover.TableDiscovererFactory b/flink-cdc-common/src/main/resources/META-INF/services/org.apache.flink.cdc.common.source.discover.TableDiscovererFactory new file mode 100644 index 00000000000..a16bb5ae27d --- /dev/null +++ b/flink-cdc-common/src/main/resources/META-INF/services/org.apache.flink.cdc.common.source.discover.TableDiscovererFactory @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.flink.cdc.common.source.discover.JdbcTableDiscovererFactory diff --git a/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/source/JdbcTableDiscovererITCase.java b/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/source/JdbcTableDiscovererITCase.java new file mode 100644 index 00000000000..6191f50cdef --- /dev/null +++ b/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/source/JdbcTableDiscovererITCase.java @@ -0,0 +1,257 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.common.source; + +import org.apache.flink.cdc.common.configuration.Configuration; +import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.common.source.discover.JdbcTableDiscoverer; +import org.apache.flink.cdc.common.source.discover.TableDiscoverer; +import org.apache.flink.cdc.common.source.discover.TableDiscovererFactory; + +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.testcontainers.containers.MySQLContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.Statement; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** + * Integration tests for {@link JdbcTableDiscoverer} and {@link TableDiscovererFactory} SPI loading. + * Uses a real MySQL container via Testcontainers to verify end-to-end table discovery. + */ +@Testcontainers +class JdbcTableDiscovererITCase { + + @Container + private static final MySQLContainer MYSQL = + new MySQLContainer<>("mysql:8.0") + .withDatabaseName("meta_db") + .withUsername("test_user") + .withPassword("test_password"); + + @BeforeAll + static void setupDatabase() throws Exception { + try (Connection conn = + DriverManager.getConnection( + MYSQL.getJdbcUrl(), MYSQL.getUsername(), MYSQL.getPassword()); + Statement stmt = conn.createStatement()) { + // Create the subscription table with the default column name + stmt.execute( + "CREATE TABLE cdc_subscriptions (" + + " subscribe_table_name VARCHAR(255) PRIMARY KEY" + + ")"); + // Seed some subscription entries + stmt.execute( + "INSERT INTO cdc_subscriptions VALUES " + + "('source_db.orders'), " + + "('source_db.products'), " + + "('analytics_db.user_events')"); + + // Create another subscription table with a custom column name + stmt.execute( + "CREATE TABLE custom_subscriptions (" + + " table_fqn VARCHAR(255) PRIMARY KEY" + + ")"); + stmt.execute( + "INSERT INTO custom_subscriptions VALUES " + + "('warehouse.inventory'), " + + "('warehouse.shipments')"); + } + } + + @AfterAll + static void tearDown() { + // Container is auto-stopped by Testcontainers + } + + // ========================================================================= + // SPI Loading Tests + // ========================================================================= + + @Test + void testSpiLoadsJdbcFactory() { + // Verify that the SPI mechanism correctly discovers JdbcTableDiscovererFactory + TableDiscoverer discoverer = + TableDiscovererFactory.createDiscoverer( + "jdbc", Thread.currentThread().getContextClassLoader()); + assertThat(discoverer).isInstanceOf(JdbcTableDiscoverer.class); + } + + @Test + void testSpiFailsForUnknownType() { + assertThatThrownBy( + () -> + TableDiscovererFactory.createDiscoverer( + "unknown-type", + Thread.currentThread().getContextClassLoader())) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Unsupported 'table.discoverer.type' value: 'unknown-type'"); + } + + @Test + void testSpiCaseInsensitive() { + // Verify case-insensitive matching + TableDiscoverer discoverer = + TableDiscovererFactory.createDiscoverer( + "JDBC", Thread.currentThread().getContextClassLoader()); + assertThat(discoverer).isInstanceOf(JdbcTableDiscoverer.class); + } + + // ========================================================================= + // End-to-End Discovery Tests (SPI + MySQL read) + // ========================================================================= + + @Test + void testDiscoverTablesWithDefaultColumn() throws Exception { + // Load discoverer via SPI + TableDiscoverer discoverer = + TableDiscovererFactory.createDiscoverer( + "jdbc", Thread.currentThread().getContextClassLoader()); + + // Build configuration + Configuration config = buildConfig("cdc_subscriptions", null); + + // Open and discover + discoverer.open( + TableDiscovererFactory.createContext( + config, Thread.currentThread().getContextClassLoader())); + try { + Set discovered = discoverer.discover(); + assertThat(discovered) + .containsExactlyInAnyOrder( + TableId.tableId("source_db", "orders"), + TableId.tableId("source_db", "products"), + TableId.tableId("analytics_db", "user_events")); + } finally { + discoverer.close(); + } + } + + @Test + void testDiscoverTablesWithCustomColumn() throws Exception { + TableDiscoverer discoverer = + TableDiscovererFactory.createDiscoverer( + "jdbc", Thread.currentThread().getContextClassLoader()); + + Configuration config = buildConfig("custom_subscriptions", "table_fqn"); + + discoverer.open( + TableDiscovererFactory.createContext( + config, Thread.currentThread().getContextClassLoader())); + try { + Set discovered = discoverer.discover(); + assertThat(discovered) + .containsExactlyInAnyOrder( + TableId.tableId("warehouse", "inventory"), + TableId.tableId("warehouse", "shipments")); + } finally { + discoverer.close(); + } + } + + @Test + void testDiscoverReflectsDynamicChanges() throws Exception { + TableDiscoverer discoverer = + TableDiscovererFactory.createDiscoverer( + "jdbc", Thread.currentThread().getContextClassLoader()); + + Configuration config = buildConfig("cdc_subscriptions", null); + discoverer.open( + TableDiscovererFactory.createContext( + config, Thread.currentThread().getContextClassLoader())); + try { + // Initial discovery + Set initial = discoverer.discover(); + assertThat(initial).hasSize(3); + + // Dynamically add a new subscription row + try (Connection conn = + DriverManager.getConnection( + MYSQL.getJdbcUrl(), MYSQL.getUsername(), MYSQL.getPassword()); + Statement stmt = conn.createStatement()) { + stmt.execute("INSERT INTO cdc_subscriptions VALUES ('new_db.new_table')"); + } + + // Re-discover — should include the new entry + Set updated = discoverer.discover(); + assertThat(updated).hasSize(4); + assertThat(updated).contains(TableId.tableId("new_db", "new_table")); + + // Cleanup: remove the added row to not affect other tests + try (Connection conn = + DriverManager.getConnection( + MYSQL.getJdbcUrl(), MYSQL.getUsername(), MYSQL.getPassword()); + Statement stmt = conn.createStatement()) { + stmt.execute( + "DELETE FROM cdc_subscriptions WHERE subscribe_table_name = 'new_db.new_table'"); + } + } finally { + discoverer.close(); + } + } + + @Test + void testMissingRequiredConfigThrows() { + TableDiscoverer discoverer = + TableDiscovererFactory.createDiscoverer( + "jdbc", Thread.currentThread().getContextClassLoader()); + + // Missing jdbc-url + Configuration config = + Configuration.fromMap( + Map.of( + "table.discoverer.jdbc.table-name", "some_table", + "table.discoverer.jdbc.username", "user", + "table.discoverer.jdbc.password", "pass")); + + assertThatThrownBy( + () -> + discoverer.open( + TableDiscovererFactory.createContext( + config, + Thread.currentThread().getContextClassLoader()))) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("table.discoverer.jdbc.url"); + } + + // ========================================================================= + // Helpers + // ========================================================================= + + private Configuration buildConfig(String tableName, String columnName) { + Map map = new HashMap<>(); + map.put("table.discoverer.jdbc.url", MYSQL.getJdbcUrl()); + map.put("table.discoverer.jdbc.table-name", tableName); + map.put("table.discoverer.jdbc.username", MYSQL.getUsername()); + map.put("table.discoverer.jdbc.password", MYSQL.getPassword()); + if (columnName != null) { + map.put("table.discoverer.jdbc.column-name", columnName); + } + return Configuration.fromMap(map); + } +} From 8903047e5e173486b4cdb3191ec4c7ff5e30ee30 Mon Sep 17 00:00:00 2001 From: Hongshun Wang Date: Tue, 26 May 2026 17:34:55 +0800 Subject: [PATCH 2/2] [FLINK-39732] Introduce TableDiscoverer SPI for flexible table subscription (with default JdbcTableDiscoverer) --- docs/themes/book | 1 - .../source/discover/JdbcTableDiscoverer.java | 220 ++++++++---- .../source/JdbcTableDiscovererITCase.java | 338 ++++++++++++------ 3 files changed, 371 insertions(+), 188 deletions(-) delete mode 160000 docs/themes/book diff --git a/docs/themes/book b/docs/themes/book deleted file mode 160000 index a486adf8462..00000000000 --- a/docs/themes/book +++ /dev/null @@ -1 +0,0 @@ -Subproject commit a486adf8462c0abfc9034436ddd72927d6656809 diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/source/discover/JdbcTableDiscoverer.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/source/discover/JdbcTableDiscoverer.java index bc610a1d9a1..2816210a403 100644 --- a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/source/discover/JdbcTableDiscoverer.java +++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/source/discover/JdbcTableDiscoverer.java @@ -27,40 +27,62 @@ import java.sql.Connection; import java.sql.DriverManager; +import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.Statement; import java.util.LinkedHashSet; import java.util.Set; /** - * A {@link TableDiscoverer} that reads the list of subscribed tables from a JDBC database table. + * A {@link TableDiscoverer} that reads the list of subscribed tables from a JDBC database. * *

This implementation connects to any JDBC-compatible database (e.g., MySQL, PostgreSQL) and - * reads table names from a specified column. The table names are parsed as {@link TableId} objects. + * returns fully-qualified table names parsed as {@link TableId} objects. * - *

Configuration keys (read from the full connector configuration): + *

Default mode — shared subscription table (recommended)

* - *
    - *
  • {@code table.discoverer.jdbc.url} — JDBC connection URL (required). - *
  • {@code table.discoverer.jdbc.table-name} — The database table storing subscription entries - * (required). - *
  • {@code table.discoverer.jdbc.username} — JDBC username (required). - *
  • {@code table.discoverer.jdbc.password} — JDBC password (required). - *
  • {@code table.discoverer.jdbc.column-name} — The column containing fully-qualified table - * names. Defaults to {@code "subscribe_table_name"}. - *
+ *

By default, the discoverer assumes that subscriptions for many CDC jobs live in a single + * shared database table, and that each subscription set is identified by a {@code subscribe-id}. + * The discoverer issues a parameterized * - *

Expected schema: The target column must contain fully-qualified table names formatted - * as {@code "schemaName.tableName"} (two-part) or {@code "namespace.schemaName.tableName"} - * (three-part). For example: + *

{@code
+ * SELECT column-name FROM table-name WHERE subscribe-id-column = ?
+ * }
+ * + *

using a {@link PreparedStatement} (injection-safe), and only the rows whose subscribe-id + * matches the configured value are returned. + * + *

Required keys: {@code table.discoverer.jdbc.url}, {@code + * table.discoverer.jdbc.username}, {@code table.discoverer.jdbc.password}, {@code + * table.discoverer.jdbc.table-name}, {@code table.discoverer.jdbc.subscribe-id}. + * + *

Optional keys: {@code table.discoverer.jdbc.column-name} (defaults to {@code + * "subscribe_table_name"}), {@code table.discoverer.jdbc.subscribe-id-column} (defaults to {@code + * "subscribe_id"}). + * + *

Recommended schema: * *

{@code
  * CREATE TABLE cdc_subscriptions (
- *     subscribe_table_name VARCHAR(255) PRIMARY KEY
+ *     subscribe_id         VARCHAR(64)  NOT NULL,
+ *     subscribe_table_name VARCHAR(255) NOT NULL,
+ *     PRIMARY KEY (subscribe_id, subscribe_table_name)
  * );
- * INSERT INTO cdc_subscriptions VALUES ('source_db.orders'), ('source_db.products');
+ * INSERT INTO cdc_subscriptions VALUES
+ *   ('orders-subscription',    'source_db.orders'),
+ *   ('orders-subscription',    'source_db.order_items'),
+ *   ('analytics-subscription', 'analytics_db.user_events');
  * }
* + *

Advanced escape hatch — custom query (overrides the default mode)

+ * + *

For uncommon layouts (e.g., needing JOINs or extra filters), users may set {@code + * table.discoverer.jdbc.subscribe-query} to any {@code SELECT} statement; column #1 of each row is + * treated as a fully-qualified table name. When this option is set it takes priority over the + * default mode and all of {@code table-name}, {@code column-name}, {@code subscribe-id-column} and + * {@code subscribe-id} are ignored. Use this only when the default schema cannot model your + * subscriptions. + * *

Null values and rows that cannot be parsed into a valid {@link TableId} are silently skipped. */ public class JdbcTableDiscoverer implements TableDiscoverer { @@ -75,13 +97,6 @@ public class JdbcTableDiscoverer implements TableDiscoverer { .noDefaultValue() .withDescription("The JDBC connection URL for the table discovery database."); - public static final ConfigOption TABLE_NAME = - ConfigOptions.key("table.discoverer.jdbc.table-name") - .stringType() - .noDefaultValue() - .withDescription( - "The name of the database table storing the subscription entries."); - public static final ConfigOption USERNAME = ConfigOptions.key("table.discoverer.jdbc.username") .stringType() @@ -94,6 +109,23 @@ public class JdbcTableDiscoverer implements TableDiscoverer { .noDefaultValue() .withDescription("The JDBC password for the table discovery database."); + public static final ConfigOption SUBSCRIBE_QUERY = + ConfigOptions.key("table.discoverer.jdbc.subscribe-query") + .stringType() + .noDefaultValue() + .withDescription( + "Custom SELECT statement used to discover subscribed tables. When set, " + + "this takes priority over the shared-table options. Column #1 of " + + "every row must be a fully-qualified table name."); + + public static final ConfigOption TABLE_NAME = + ConfigOptions.key("table.discoverer.jdbc.table-name") + .stringType() + .noDefaultValue() + .withDescription( + "The shared subscription table that stores subscription entries for " + + "one or more CDC jobs. Required in shared-table mode."); + public static final ConfigOption COLUMN_NAME = ConfigOptions.key("table.discoverer.jdbc.column-name") .stringType() @@ -102,73 +134,119 @@ public class JdbcTableDiscoverer implements TableDiscoverer { "The column name in the subscription table that contains the " + "fully-qualified table names to subscribe to."); + public static final ConfigOption SUBSCRIBE_ID_COLUMN = + ConfigOptions.key("table.discoverer.jdbc.subscribe-id-column") + .stringType() + .defaultValue("subscribe_id") + .withDescription( + "The column name in the subscription table that holds the " + + "subscription-set identifier used for filtering."); + + public static final ConfigOption SUBSCRIBE_ID = + ConfigOptions.key("table.discoverer.jdbc.subscribe-id") + .stringType() + .noDefaultValue() + .withDescription( + "The current subscription-set identifier. Required in shared-table " + + "mode; rows whose subscribe-id column matches this value are " + + "discovered as subscribed tables."); + + /** Compiled SQL to execute on every {@link #discover()} call. */ + private transient String sql; + + /** When non-null, the discoverer runs in shared-table mode and binds this as parameter #1. */ + private transient String subscribeId; + private transient Connection connection; - private transient String tableName; - private transient String columnName; @Override public void open(Context context) throws Exception { Configuration config = context.getConfiguration(); - String jdbcUrl = config.get(JDBC_URL); - if (jdbcUrl == null || jdbcUrl.isEmpty()) { - throw new IllegalArgumentException( - "'" + JDBC_URL.key() + "' is required for JdbcTableDiscoverer."); - } - tableName = config.get(TABLE_NAME); - if (tableName == null || tableName.isEmpty()) { - throw new IllegalArgumentException( - "'" + TABLE_NAME.key() + "' is required for JdbcTableDiscoverer."); - } - String username = config.get(USERNAME); - if (username == null || username.isEmpty()) { - throw new IllegalArgumentException( - "'" + USERNAME.key() + "' is required for JdbcTableDiscoverer."); - } - String password = config.get(PASSWORD); - if (password == null || password.isEmpty()) { - throw new IllegalArgumentException( - "'" + PASSWORD.key() + "' is required for JdbcTableDiscoverer."); + String jdbcUrl = requireNonEmpty(config, JDBC_URL); + String username = requireNonEmpty(config, USERNAME); + String password = requireNonEmpty(config, PASSWORD); + + String subscribeQuery = config.get(SUBSCRIBE_QUERY); + if (subscribeQuery != null && !subscribeQuery.isEmpty()) { + // Mode A — custom query takes priority. Filter options are intentionally ignored. + this.sql = subscribeQuery; + this.subscribeId = null; + LOG.info( + "JdbcTableDiscoverer running in custom-query mode. URL='{}', query='{}'.", + jdbcUrl, + subscribeQuery); + } else { + // Mode B — shared-table filter; subscribe-id is mandatory. + String tableName = requireNonEmpty(config, TABLE_NAME); + String columnName = config.get(COLUMN_NAME); + String subscribeIdColumn = config.get(SUBSCRIBE_ID_COLUMN); + this.subscribeId = requireNonEmpty(config, SUBSCRIBE_ID); + this.sql = + "SELECT " + + columnName + + " FROM " + + tableName + + " WHERE " + + subscribeIdColumn + + " = ?"; + LOG.info( + "JdbcTableDiscoverer running in shared-table mode. URL='{}', table='{}', " + + "column='{}', subscribeIdColumn='{}', subscribeId='{}'.", + jdbcUrl, + tableName, + columnName, + subscribeIdColumn, + subscribeId); } - columnName = config.get(COLUMN_NAME); connection = DriverManager.getConnection(jdbcUrl, username, password); - LOG.info( - "JdbcTableDiscoverer opened connection to '{}', table='{}', column='{}'.", - jdbcUrl, - tableName, - columnName); } @Override public Set discover() throws Exception { Set result = new LinkedHashSet<>(); - String sql = "SELECT " + columnName + " FROM " + tableName; - try (Statement stmt = connection.createStatement(); - ResultSet rs = stmt.executeQuery(sql)) { - while (rs.next()) { - String value = rs.getString(1); - if (value == null || value.isEmpty()) { - continue; - } - try { - result.add(TableId.parse(value)); - } catch (IllegalArgumentException e) { - LOG.warn( - "Skipping invalid table name '{}' from subscription table '{}'.", - value, - tableName); + if (subscribeId != null) { + try (PreparedStatement ps = connection.prepareStatement(sql)) { + ps.setString(1, subscribeId); + try (ResultSet rs = ps.executeQuery()) { + collect(rs, result); } } + } else { + try (Statement stmt = connection.createStatement(); + ResultSet rs = stmt.executeQuery(sql)) { + collect(rs, result); + } } - LOG.info( - "JdbcTableDiscoverer discovered {} tables from '{}.{}'.", - result.size(), - tableName, - columnName); + LOG.info("JdbcTableDiscoverer discovered {} tables.", result.size()); return result; } + private void collect(ResultSet rs, Set result) throws Exception { + while (rs.next()) { + String value = rs.getString(1); + if (value == null || value.isEmpty()) { + continue; + } + try { + result.add(TableId.parse(value)); + } catch (IllegalArgumentException e) { + LOG.warn( + "Skipping invalid table name '{}' returned by JdbcTableDiscoverer.", value); + } + } + } + + private static String requireNonEmpty(Configuration config, ConfigOption option) { + String value = config.get(option); + if (value == null || value.isEmpty()) { + throw new IllegalArgumentException( + "'" + option.key() + "' is required for JdbcTableDiscoverer."); + } + return value; + } + @Override public void close() throws Exception { if (connection != null && !connection.isClosed()) { diff --git a/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/source/JdbcTableDiscovererITCase.java b/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/source/JdbcTableDiscovererITCase.java index 6191f50cdef..84967f3647e 100644 --- a/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/source/JdbcTableDiscovererITCase.java +++ b/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/source/JdbcTableDiscovererITCase.java @@ -23,7 +23,6 @@ import org.apache.flink.cdc.common.source.discover.TableDiscoverer; import org.apache.flink.cdc.common.source.discover.TableDiscovererFactory; -import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.testcontainers.containers.MySQLContainer; @@ -42,11 +41,15 @@ /** * Integration tests for {@link JdbcTableDiscoverer} and {@link TableDiscovererFactory} SPI loading. - * Uses a real MySQL container via Testcontainers to verify end-to-end table discovery. + * Uses a real MySQL container via Testcontainers to verify both the default shared-table mode and + * the advanced custom-query escape hatch. */ @Testcontainers class JdbcTableDiscovererITCase { + private static final String SUBSCRIBE_ID_ORDERS = "orders-subscription"; + private static final String SUBSCRIBE_ID_ANALYTICS = "analytics-subscription"; + @Container private static final MySQLContainer MYSQL = new MySQLContainer<>("mysql:8.0") @@ -60,48 +63,55 @@ static void setupDatabase() throws Exception { DriverManager.getConnection( MYSQL.getJdbcUrl(), MYSQL.getUsername(), MYSQL.getPassword()); Statement stmt = conn.createStatement()) { - // Create the subscription table with the default column name + // Shared subscription table with default column names. stmt.execute( "CREATE TABLE cdc_subscriptions (" - + " subscribe_table_name VARCHAR(255) PRIMARY KEY" + + " subscribe_id VARCHAR(64) NOT NULL," + + " subscribe_table_name VARCHAR(255) NOT NULL," + + " PRIMARY KEY (subscribe_id, subscribe_table_name)" + ")"); - // Seed some subscription entries stmt.execute( "INSERT INTO cdc_subscriptions VALUES " - + "('source_db.orders'), " - + "('source_db.products'), " - + "('analytics_db.user_events')"); + + "('orders-subscription', 'source_db.orders')," + + "('orders-subscription', 'source_db.order_items')," + + "('orders-subscription', 'source_db.products')," + + "('analytics-subscription', 'analytics_db.user_events')"); - // Create another subscription table with a custom column name + // Shared subscription table using non-default column names. stmt.execute( "CREATE TABLE custom_subscriptions (" - + " table_fqn VARCHAR(255) PRIMARY KEY" + + " sub_id VARCHAR(64) NOT NULL," + + " table_fqn VARCHAR(255) NOT NULL," + + " PRIMARY KEY (sub_id, table_fqn)" + ")"); stmt.execute( "INSERT INTO custom_subscriptions VALUES " - + "('warehouse.inventory'), " - + "('warehouse.shipments')"); + + "('warehouse-sub', 'warehouse.inventory')," + + "('warehouse-sub', 'warehouse.shipments')," + + "('hr-sub', 'hr.employees')"); } } - @AfterAll - static void tearDown() { - // Container is auto-stopped by Testcontainers - } - // ========================================================================= - // SPI Loading Tests + // SPI loading // ========================================================================= @Test void testSpiLoadsJdbcFactory() { - // Verify that the SPI mechanism correctly discovers JdbcTableDiscovererFactory TableDiscoverer discoverer = TableDiscovererFactory.createDiscoverer( "jdbc", Thread.currentThread().getContextClassLoader()); assertThat(discoverer).isInstanceOf(JdbcTableDiscoverer.class); } + @Test + void testSpiCaseInsensitive() { + TableDiscoverer discoverer = + TableDiscovererFactory.createDiscoverer( + "JDBC", Thread.currentThread().getContextClassLoader()); + assertThat(discoverer).isInstanceOf(JdbcTableDiscoverer.class); + } + @Test void testSpiFailsForUnknownType() { assertThatThrownBy( @@ -113,145 +123,241 @@ void testSpiFailsForUnknownType() { .hasMessageContaining("Unsupported 'table.discoverer.type' value: 'unknown-type'"); } - @Test - void testSpiCaseInsensitive() { - // Verify case-insensitive matching - TableDiscoverer discoverer = - TableDiscovererFactory.createDiscoverer( - "JDBC", Thread.currentThread().getContextClassLoader()); - assertThat(discoverer).isInstanceOf(JdbcTableDiscoverer.class); - } - // ========================================================================= - // End-to-End Discovery Tests (SPI + MySQL read) + // Default mode — shared subscription table // ========================================================================= @Test - void testDiscoverTablesWithDefaultColumn() throws Exception { - // Load discoverer via SPI - TableDiscoverer discoverer = - TableDiscovererFactory.createDiscoverer( - "jdbc", Thread.currentThread().getContextClassLoader()); - - // Build configuration - Configuration config = buildConfig("cdc_subscriptions", null); + void testDefaultModeWithDefaultColumns() throws Exception { + Configuration config = + sharedTableConfigBuilder() + .table("cdc_subscriptions") + .subscribeId(SUBSCRIBE_ID_ORDERS) + .build(); - // Open and discover - discoverer.open( - TableDiscovererFactory.createContext( - config, Thread.currentThread().getContextClassLoader())); - try { - Set discovered = discoverer.discover(); - assertThat(discovered) - .containsExactlyInAnyOrder( - TableId.tableId("source_db", "orders"), - TableId.tableId("source_db", "products"), - TableId.tableId("analytics_db", "user_events")); - } finally { - discoverer.close(); - } + assertThat(runDiscovery(config)) + .containsExactlyInAnyOrder( + TableId.tableId("source_db", "orders"), + TableId.tableId("source_db", "order_items"), + TableId.tableId("source_db", "products")); } @Test - void testDiscoverTablesWithCustomColumn() throws Exception { - TableDiscoverer discoverer = - TableDiscovererFactory.createDiscoverer( - "jdbc", Thread.currentThread().getContextClassLoader()); + void testDefaultModeFiltersBySubscribeId() throws Exception { + Configuration config = + sharedTableConfigBuilder() + .table("cdc_subscriptions") + .subscribeId(SUBSCRIBE_ID_ANALYTICS) + .build(); - Configuration config = buildConfig("custom_subscriptions", "table_fqn"); + assertThat(runDiscovery(config)) + .containsExactlyInAnyOrder(TableId.tableId("analytics_db", "user_events")); + } - discoverer.open( - TableDiscovererFactory.createContext( - config, Thread.currentThread().getContextClassLoader())); - try { - Set discovered = discoverer.discover(); - assertThat(discovered) - .containsExactlyInAnyOrder( - TableId.tableId("warehouse", "inventory"), - TableId.tableId("warehouse", "shipments")); - } finally { - discoverer.close(); - } + @Test + void testDefaultModeWithCustomColumns() throws Exception { + Configuration config = + sharedTableConfigBuilder() + .table("custom_subscriptions") + .columnName("table_fqn") + .subscribeIdColumn("sub_id") + .subscribeId("warehouse-sub") + .build(); + + assertThat(runDiscovery(config)) + .containsExactlyInAnyOrder( + TableId.tableId("warehouse", "inventory"), + TableId.tableId("warehouse", "shipments")); } @Test - void testDiscoverReflectsDynamicChanges() throws Exception { + void testDefaultModeReflectsDynamicChanges() throws Exception { + Configuration config = + sharedTableConfigBuilder() + .table("cdc_subscriptions") + .subscribeId(SUBSCRIBE_ID_ANALYTICS) + .build(); + TableDiscoverer discoverer = TableDiscovererFactory.createDiscoverer( "jdbc", Thread.currentThread().getContextClassLoader()); - - Configuration config = buildConfig("cdc_subscriptions", null); discoverer.open( TableDiscovererFactory.createContext( config, Thread.currentThread().getContextClassLoader())); try { - // Initial discovery - Set initial = discoverer.discover(); - assertThat(initial).hasSize(3); - - // Dynamically add a new subscription row - try (Connection conn = - DriverManager.getConnection( - MYSQL.getJdbcUrl(), MYSQL.getUsername(), MYSQL.getPassword()); - Statement stmt = conn.createStatement()) { - stmt.execute("INSERT INTO cdc_subscriptions VALUES ('new_db.new_table')"); - } + assertThat(discoverer.discover()).hasSize(1); - // Re-discover — should include the new entry - Set updated = discoverer.discover(); - assertThat(updated).hasSize(4); - assertThat(updated).contains(TableId.tableId("new_db", "new_table")); - - // Cleanup: remove the added row to not affect other tests - try (Connection conn = - DriverManager.getConnection( - MYSQL.getJdbcUrl(), MYSQL.getUsername(), MYSQL.getPassword()); - Statement stmt = conn.createStatement()) { - stmt.execute( - "DELETE FROM cdc_subscriptions WHERE subscribe_table_name = 'new_db.new_table'"); + executeSql( + "INSERT INTO cdc_subscriptions VALUES " + + "('analytics-subscription', 'analytics_db.sessions')"); + try { + Set updated = discoverer.discover(); + assertThat(updated) + .containsExactlyInAnyOrder( + TableId.tableId("analytics_db", "user_events"), + TableId.tableId("analytics_db", "sessions")); + } finally { + executeSql( + "DELETE FROM cdc_subscriptions WHERE subscribe_table_name " + + "= 'analytics_db.sessions'"); } } finally { discoverer.close(); } } + // ========================================================================= + // Advanced escape hatch — custom query + // ========================================================================= + @Test - void testMissingRequiredConfigThrows() { - TableDiscoverer discoverer = - TableDiscovererFactory.createDiscoverer( - "jdbc", Thread.currentThread().getContextClassLoader()); + void testCustomQueryReadsFirstColumn() throws Exception { + Map map = baseConnectionConfig(); + map.put( + "table.discoverer.jdbc.subscribe-query", + "SELECT subscribe_table_name FROM cdc_subscriptions " + + "WHERE subscribe_id = 'orders-subscription' " + + "ORDER BY subscribe_table_name"); - // Missing jdbc-url - Configuration config = - Configuration.fromMap( - Map.of( - "table.discoverer.jdbc.table-name", "some_table", - "table.discoverer.jdbc.username", "user", - "table.discoverer.jdbc.password", "pass")); + assertThat(runDiscovery(Configuration.fromMap(map))) + .containsExactlyInAnyOrder( + TableId.tableId("source_db", "order_items"), + TableId.tableId("source_db", "orders"), + TableId.tableId("source_db", "products")); + } - assertThatThrownBy( - () -> - discoverer.open( - TableDiscovererFactory.createContext( - config, - Thread.currentThread().getContextClassLoader()))) + @Test + void testCustomQueryOverridesDefaultModeOptions() throws Exception { + // Mix both modes: the custom-query option must win and silently ignore the default-mode + // options below (table-name / subscribe-id). + Map map = baseConnectionConfig(); + map.put("table.discoverer.jdbc.table-name", "cdc_subscriptions"); + map.put("table.discoverer.jdbc.subscribe-id", SUBSCRIBE_ID_ANALYTICS); + map.put( + "table.discoverer.jdbc.subscribe-query", + "SELECT subscribe_table_name FROM cdc_subscriptions " + + "WHERE subscribe_id = 'orders-subscription'"); + + assertThat(runDiscovery(Configuration.fromMap(map))) + .containsExactlyInAnyOrder( + TableId.tableId("source_db", "orders"), + TableId.tableId("source_db", "order_items"), + TableId.tableId("source_db", "products")); + } + + // ========================================================================= + // Validation + // ========================================================================= + + @Test + void testMissingJdbcUrlThrows() { + Map map = new HashMap<>(); + map.put("table.discoverer.jdbc.username", "user"); + map.put("table.discoverer.jdbc.password", "pass"); + map.put("table.discoverer.jdbc.table-name", "cdc_subscriptions"); + map.put("table.discoverer.jdbc.subscribe-id", "x"); + + assertThatThrownBy(() -> runDiscovery(Configuration.fromMap(map))) .isInstanceOf(IllegalArgumentException.class) .hasMessageContaining("table.discoverer.jdbc.url"); } + @Test + void testDefaultModeMissingTableNameThrows() { + Map map = baseConnectionConfig(); + map.put("table.discoverer.jdbc.subscribe-id", SUBSCRIBE_ID_ORDERS); + + assertThatThrownBy(() -> runDiscovery(Configuration.fromMap(map))) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("table.discoverer.jdbc.table-name"); + } + + @Test + void testDefaultModeMissingSubscribeIdThrows() { + Map map = baseConnectionConfig(); + map.put("table.discoverer.jdbc.table-name", "cdc_subscriptions"); + + assertThatThrownBy(() -> runDiscovery(Configuration.fromMap(map))) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("table.discoverer.jdbc.subscribe-id"); + } + // ========================================================================= // Helpers // ========================================================================= - private Configuration buildConfig(String tableName, String columnName) { + private Set runDiscovery(Configuration config) throws Exception { + TableDiscoverer discoverer = + TableDiscovererFactory.createDiscoverer( + "jdbc", Thread.currentThread().getContextClassLoader()); + discoverer.open( + TableDiscovererFactory.createContext( + config, Thread.currentThread().getContextClassLoader())); + try { + return discoverer.discover(); + } finally { + discoverer.close(); + } + } + + private static Map baseConnectionConfig() { Map map = new HashMap<>(); map.put("table.discoverer.jdbc.url", MYSQL.getJdbcUrl()); - map.put("table.discoverer.jdbc.table-name", tableName); map.put("table.discoverer.jdbc.username", MYSQL.getUsername()); map.put("table.discoverer.jdbc.password", MYSQL.getPassword()); - if (columnName != null) { - map.put("table.discoverer.jdbc.column-name", columnName); + return map; + } + + private static void executeSql(String sql) throws Exception { + try (Connection conn = + DriverManager.getConnection( + MYSQL.getJdbcUrl(), MYSQL.getUsername(), MYSQL.getPassword()); + Statement stmt = conn.createStatement()) { + stmt.execute(sql); + } + } + + private SharedTableConfigBuilder sharedTableConfigBuilder() { + return new SharedTableConfigBuilder(); + } + + private static final class SharedTableConfigBuilder { + private String table; + private String columnName; + private String subscribeIdColumn; + private String subscribeId; + + SharedTableConfigBuilder table(String table) { + this.table = table; + return this; + } + + SharedTableConfigBuilder columnName(String columnName) { + this.columnName = columnName; + return this; + } + + SharedTableConfigBuilder subscribeIdColumn(String subscribeIdColumn) { + this.subscribeIdColumn = subscribeIdColumn; + return this; + } + + SharedTableConfigBuilder subscribeId(String subscribeId) { + this.subscribeId = subscribeId; + return this; + } + + Configuration build() { + Map map = baseConnectionConfig(); + map.put("table.discoverer.jdbc.table-name", table); + map.put("table.discoverer.jdbc.subscribe-id", subscribeId); + if (columnName != null) { + map.put("table.discoverer.jdbc.column-name", columnName); + } + if (subscribeIdColumn != null) { + map.put("table.discoverer.jdbc.subscribe-id-column", subscribeIdColumn); + } + return Configuration.fromMap(map); } - return Configuration.fromMap(map); } }