diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/source/discover/JdbcTableDiscoverer.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/source/discover/JdbcTableDiscoverer.java
new file mode 100644
index 00000000000..2816210a403
--- /dev/null
+++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/source/discover/JdbcTableDiscoverer.java
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.common.source.discover;
+
+import org.apache.flink.cdc.common.configuration.ConfigOption;
+import org.apache.flink.cdc.common.configuration.ConfigOptions;
+import org.apache.flink.cdc.common.configuration.Configuration;
+import org.apache.flink.cdc.common.event.TableId;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.LinkedHashSet;
+import java.util.Set;
+
+/**
+ * A {@link TableDiscoverer} that reads the list of subscribed tables from a JDBC database.
+ *
+ * This implementation connects to any JDBC-compatible database (e.g., MySQL, PostgreSQL) and
+ * returns fully-qualified table names parsed as {@link TableId} objects.
+ *
+ *
Default mode — shared subscription table (recommended)
+ *
+ * By default, the discoverer assumes that subscriptions for many CDC jobs live in a single
+ * shared database table, and that each subscription set is identified by a {@code subscribe-id}.
+ * The discoverer issues a parameterized
+ *
+ *
{@code
+ * SELECT column-name FROM table-name WHERE subscribe-id-column = ?
+ * }
+ *
+ * using a {@link PreparedStatement} (injection-safe), and only the rows whose subscribe-id
+ * matches the configured value are returned.
+ *
+ *
Required keys: {@code table.discoverer.jdbc.url}, {@code
+ * table.discoverer.jdbc.username}, {@code table.discoverer.jdbc.password}, {@code
+ * table.discoverer.jdbc.table-name}, {@code table.discoverer.jdbc.subscribe-id}.
+ *
+ *
Optional keys: {@code table.discoverer.jdbc.column-name} (defaults to {@code
+ * "subscribe_table_name"}), {@code table.discoverer.jdbc.subscribe-id-column} (defaults to {@code
+ * "subscribe_id"}).
+ *
+ *
Recommended schema:
+ *
+ *
{@code
+ * CREATE TABLE cdc_subscriptions (
+ * subscribe_id VARCHAR(64) NOT NULL,
+ * subscribe_table_name VARCHAR(255) NOT NULL,
+ * PRIMARY KEY (subscribe_id, subscribe_table_name)
+ * );
+ * INSERT INTO cdc_subscriptions VALUES
+ * ('orders-subscription', 'source_db.orders'),
+ * ('orders-subscription', 'source_db.order_items'),
+ * ('analytics-subscription', 'analytics_db.user_events');
+ * }
+ *
+ * Advanced escape hatch — custom query (overrides the default mode)
+ *
+ * For uncommon layouts (e.g., needing JOINs or extra filters), users may set {@code
+ * table.discoverer.jdbc.subscribe-query} to any {@code SELECT} statement; column #1 of each row is
+ * treated as a fully-qualified table name. When this option is set it takes priority over the
+ * default mode and all of {@code table-name}, {@code column-name}, {@code subscribe-id-column} and
+ * {@code subscribe-id} are ignored. Use this only when the default schema cannot model your
+ * subscriptions.
+ *
+ *
Null values and rows that cannot be parsed into a valid {@link TableId} are silently skipped.
+ */
+public class JdbcTableDiscoverer implements TableDiscoverer {
+
+ private static final long serialVersionUID = 1L;
+
+ private static final Logger LOG = LoggerFactory.getLogger(JdbcTableDiscoverer.class);
+
+ public static final ConfigOption JDBC_URL =
+ ConfigOptions.key("table.discoverer.jdbc.url")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The JDBC connection URL for the table discovery database.");
+
+ public static final ConfigOption USERNAME =
+ ConfigOptions.key("table.discoverer.jdbc.username")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The JDBC username for the table discovery database.");
+
+ public static final ConfigOption PASSWORD =
+ ConfigOptions.key("table.discoverer.jdbc.password")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The JDBC password for the table discovery database.");
+
+ public static final ConfigOption SUBSCRIBE_QUERY =
+ ConfigOptions.key("table.discoverer.jdbc.subscribe-query")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "Custom SELECT statement used to discover subscribed tables. When set, "
+ + "this takes priority over the shared-table options. Column #1 of "
+ + "every row must be a fully-qualified table name.");
+
+ public static final ConfigOption TABLE_NAME =
+ ConfigOptions.key("table.discoverer.jdbc.table-name")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "The shared subscription table that stores subscription entries for "
+ + "one or more CDC jobs. Required in shared-table mode.");
+
+ public static final ConfigOption COLUMN_NAME =
+ ConfigOptions.key("table.discoverer.jdbc.column-name")
+ .stringType()
+ .defaultValue("subscribe_table_name")
+ .withDescription(
+ "The column name in the subscription table that contains the "
+ + "fully-qualified table names to subscribe to.");
+
+ public static final ConfigOption SUBSCRIBE_ID_COLUMN =
+ ConfigOptions.key("table.discoverer.jdbc.subscribe-id-column")
+ .stringType()
+ .defaultValue("subscribe_id")
+ .withDescription(
+ "The column name in the subscription table that holds the "
+ + "subscription-set identifier used for filtering.");
+
+ public static final ConfigOption SUBSCRIBE_ID =
+ ConfigOptions.key("table.discoverer.jdbc.subscribe-id")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "The current subscription-set identifier. Required in shared-table "
+ + "mode; rows whose subscribe-id column matches this value are "
+ + "discovered as subscribed tables.");
+
+ /** Compiled SQL to execute on every {@link #discover()} call. */
+ private transient String sql;
+
+ /** When non-null, the discoverer runs in shared-table mode and binds this as parameter #1. */
+ private transient String subscribeId;
+
+ private transient Connection connection;
+
+ @Override
+ public void open(Context context) throws Exception {
+ Configuration config = context.getConfiguration();
+
+ String jdbcUrl = requireNonEmpty(config, JDBC_URL);
+ String username = requireNonEmpty(config, USERNAME);
+ String password = requireNonEmpty(config, PASSWORD);
+
+ String subscribeQuery = config.get(SUBSCRIBE_QUERY);
+ if (subscribeQuery != null && !subscribeQuery.isEmpty()) {
+ // Mode A — custom query takes priority. Filter options are intentionally ignored.
+ this.sql = subscribeQuery;
+ this.subscribeId = null;
+ LOG.info(
+ "JdbcTableDiscoverer running in custom-query mode. URL='{}', query='{}'.",
+ jdbcUrl,
+ subscribeQuery);
+ } else {
+ // Mode B — shared-table filter; subscribe-id is mandatory.
+ String tableName = requireNonEmpty(config, TABLE_NAME);
+ String columnName = config.get(COLUMN_NAME);
+ String subscribeIdColumn = config.get(SUBSCRIBE_ID_COLUMN);
+ this.subscribeId = requireNonEmpty(config, SUBSCRIBE_ID);
+ this.sql =
+ "SELECT "
+ + columnName
+ + " FROM "
+ + tableName
+ + " WHERE "
+ + subscribeIdColumn
+ + " = ?";
+ LOG.info(
+ "JdbcTableDiscoverer running in shared-table mode. URL='{}', table='{}', "
+ + "column='{}', subscribeIdColumn='{}', subscribeId='{}'.",
+ jdbcUrl,
+ tableName,
+ columnName,
+ subscribeIdColumn,
+ subscribeId);
+ }
+
+ connection = DriverManager.getConnection(jdbcUrl, username, password);
+ }
+
+ @Override
+ public Set discover() throws Exception {
+ Set result = new LinkedHashSet<>();
+ if (subscribeId != null) {
+ try (PreparedStatement ps = connection.prepareStatement(sql)) {
+ ps.setString(1, subscribeId);
+ try (ResultSet rs = ps.executeQuery()) {
+ collect(rs, result);
+ }
+ }
+ } else {
+ try (Statement stmt = connection.createStatement();
+ ResultSet rs = stmt.executeQuery(sql)) {
+ collect(rs, result);
+ }
+ }
+ LOG.info("JdbcTableDiscoverer discovered {} tables.", result.size());
+ return result;
+ }
+
+ private void collect(ResultSet rs, Set result) throws Exception {
+ while (rs.next()) {
+ String value = rs.getString(1);
+ if (value == null || value.isEmpty()) {
+ continue;
+ }
+ try {
+ result.add(TableId.parse(value));
+ } catch (IllegalArgumentException e) {
+ LOG.warn(
+ "Skipping invalid table name '{}' returned by JdbcTableDiscoverer.", value);
+ }
+ }
+ }
+
+ private static String requireNonEmpty(Configuration config, ConfigOption option) {
+ String value = config.get(option);
+ if (value == null || value.isEmpty()) {
+ throw new IllegalArgumentException(
+ "'" + option.key() + "' is required for JdbcTableDiscoverer.");
+ }
+ return value;
+ }
+
+ @Override
+ public void close() throws Exception {
+ if (connection != null && !connection.isClosed()) {
+ connection.close();
+ LOG.info("JdbcTableDiscoverer closed JDBC connection.");
+ }
+ }
+}
diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/source/discover/JdbcTableDiscovererFactory.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/source/discover/JdbcTableDiscovererFactory.java
new file mode 100644
index 00000000000..9450ab82190
--- /dev/null
+++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/source/discover/JdbcTableDiscovererFactory.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.common.source.discover;
+
+/**
+ * {@link TableDiscovererFactory} for {@link JdbcTableDiscoverer}. Activated when {@code
+ * table.discoverer.type = 'jdbc'}.
+ */
+public class JdbcTableDiscovererFactory implements TableDiscovererFactory {
+
+ public static final String IDENTIFIER = "jdbc";
+
+ @Override
+ public String identifier() {
+ return IDENTIFIER;
+ }
+
+ @Override
+ public TableDiscoverer createDiscoverer() {
+ return new JdbcTableDiscoverer();
+ }
+}
diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/source/discover/TableDiscoverer.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/source/discover/TableDiscoverer.java
new file mode 100644
index 00000000000..6997692ce17
--- /dev/null
+++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/source/discover/TableDiscoverer.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.common.source.discover;
+
+import org.apache.flink.cdc.common.annotation.PublicEvolving;
+import org.apache.flink.cdc.common.configuration.Configuration;
+import org.apache.flink.cdc.common.event.TableId;
+
+import java.io.Serializable;
+import java.util.Set;
+
+/**
+ * Pluggable abstraction for discovering a set of tables that a source connector should read.
+ * Implementations are loaded via SPI through {@link TableDiscovererFactory} and determine which
+ * tables the source should subscribe to.
+ *
+ * Lifecycle: {@link #open(Context)} is called once before the first call to {@link #discover()}.
+ * {@link #close()} is called when the discoverer is no longer needed. Implementations manage their
+ * own resources (e.g., connections) within this lifecycle.
+ *
+ *
Built-in implementations include:
+ *
+ *
+ * - {@link JdbcTableDiscoverer} - reads the subscription list from a JDBC database table.
+ *
+ */
+@PublicEvolving
+public interface TableDiscoverer extends Serializable, AutoCloseable {
+
+ /**
+ * Opens this discoverer and initializes any resources needed for table discovery.
+ *
+ * @param context The context providing configuration and class loader.
+ * @throws Exception if initialization fails.
+ */
+ void open(Context context) throws Exception;
+
+ /**
+ * Discovers and returns the set of tables to subscribe to.
+ *
+ * @return A set of {@link TableId} representing the tables to read.
+ * @throws Exception if the discovery fails.
+ */
+ Set discover() throws Exception;
+
+ /**
+ * Closes this discoverer and releases any resources.
+ *
+ * @throws Exception if closing fails.
+ */
+ @Override
+ void close() throws Exception;
+
+ /** Context providing runtime information for the discoverer. */
+ interface Context {
+
+ /**
+ * Returns the full connector configuration. Discoverer implementations read their own
+ * configuration keys (e.g., {@code table.discoverer.jdbc.url}) directly from this
+ * configuration.
+ */
+ Configuration getConfiguration();
+
+ /** Returns the user code class loader of the current session. */
+ ClassLoader getUserCodeClassLoader();
+ }
+}
diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/source/discover/TableDiscovererFactory.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/source/discover/TableDiscovererFactory.java
new file mode 100644
index 00000000000..5e7af51f1f2
--- /dev/null
+++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/source/discover/TableDiscovererFactory.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.common.source.discover;
+
+import org.apache.flink.cdc.common.annotation.PublicEvolving;
+import org.apache.flink.cdc.common.configuration.Configuration;
+
+import java.util.HashSet;
+import java.util.ServiceLoader;
+import java.util.Set;
+
+/**
+ * SPI factory that creates a {@link TableDiscoverer}. Implementations are discovered at runtime via
+ * Java's {@link ServiceLoader} mechanism. To register a new discoverer type, add a line with the
+ * fully-qualified factory class name to:
+ *
+ * {@code
+ * META-INF/services/org.apache.flink.cdc.common.source.discover.TableDiscovererFactory
+ * }
+ *
+ * The discoverer type is selected by the user via the {@code table.discoverer.type} option,
+ * which is matched (case-insensitively) against {@link #identifier()}.
+ */
+@PublicEvolving
+public interface TableDiscovererFactory {
+
+ /**
+ * The unique identifier of this discoverer factory, used to match against the {@code
+ * table.discoverer.type} option. Must be lowercase, e.g. {@code "jdbc"} or {@code
+ * "fluss-default"}.
+ */
+ String identifier();
+
+ /**
+ * Creates a new {@link TableDiscoverer} instance. The returned discoverer is not yet
+ * initialized; the caller must invoke {@link TableDiscoverer#open(TableDiscoverer.Context)}
+ * before calling {@link TableDiscoverer#discover()}.
+ */
+ TableDiscoverer createDiscoverer();
+
+ /**
+ * Creates a {@link TableDiscoverer.Context} with the given configuration and class loader. This
+ * is a convenience factory method so that callers do not need to implement the {@link
+ * TableDiscoverer.Context} interface themselves.
+ *
+ * @param configuration The full connector configuration.
+ * @param classLoader The user code class loader.
+ * @return A new {@link TableDiscoverer.Context} instance.
+ */
+ static TableDiscoverer.Context createContext(
+ Configuration configuration, ClassLoader classLoader) {
+ return new DefaultDiscovererContext(configuration, classLoader);
+ }
+
+ /**
+ * Utility method that discovers a {@link TableDiscovererFactory} via SPI whose {@link
+ * #identifier()} matches the given {@code type}, and delegates discoverer creation to it.
+ *
+ * @param type The discoverer type identifier (e.g. "jdbc", "fluss-default").
+ * @param classLoader The class loader used for SPI discovery.
+ * @return A new, uninitialized {@link TableDiscoverer}.
+ * @throws IllegalArgumentException if no factory matches the given type.
+ * @throws IllegalStateException if multiple factories share the same identifier.
+ */
+ static TableDiscoverer createDiscoverer(String type, ClassLoader classLoader) {
+ ClassLoader loader =
+ classLoader != null ? classLoader : Thread.currentThread().getContextClassLoader();
+ ServiceLoader serviceLoader =
+ ServiceLoader.load(TableDiscovererFactory.class, loader);
+
+ TableDiscovererFactory matched = null;
+ Set known = new HashSet<>();
+ for (TableDiscovererFactory factory : serviceLoader) {
+ known.add(factory.identifier());
+ if (factory.identifier().equalsIgnoreCase(type)) {
+ if (matched != null) {
+ throw new IllegalStateException(
+ "Multiple TableDiscovererFactory implementations found for identifier '"
+ + type
+ + "': "
+ + matched.getClass().getName()
+ + " and "
+ + factory.getClass().getName());
+ }
+ matched = factory;
+ }
+ }
+ if (matched == null) {
+ throw new IllegalArgumentException(
+ "Unsupported 'table.discoverer.type' value: '"
+ + type
+ + "'. Available discoverer types: "
+ + known
+ + ".");
+ }
+ return matched.createDiscoverer();
+ }
+
+ /** Default implementation of {@link TableDiscoverer.Context}. */
+ class DefaultDiscovererContext implements TableDiscoverer.Context {
+ private final Configuration configuration;
+ private final ClassLoader classLoader;
+
+ DefaultDiscovererContext(Configuration configuration, ClassLoader classLoader) {
+ this.configuration = configuration;
+ this.classLoader = classLoader;
+ }
+
+ @Override
+ public Configuration getConfiguration() {
+ return configuration;
+ }
+
+ @Override
+ public ClassLoader getUserCodeClassLoader() {
+ return classLoader;
+ }
+ }
+}
diff --git a/flink-cdc-common/src/main/resources/META-INF/services/org.apache.flink.cdc.common.source.discover.TableDiscovererFactory b/flink-cdc-common/src/main/resources/META-INF/services/org.apache.flink.cdc.common.source.discover.TableDiscovererFactory
new file mode 100644
index 00000000000..a16bb5ae27d
--- /dev/null
+++ b/flink-cdc-common/src/main/resources/META-INF/services/org.apache.flink.cdc.common.source.discover.TableDiscovererFactory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.cdc.common.source.discover.JdbcTableDiscovererFactory
diff --git a/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/source/JdbcTableDiscovererITCase.java b/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/source/JdbcTableDiscovererITCase.java
new file mode 100644
index 00000000000..84967f3647e
--- /dev/null
+++ b/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/source/JdbcTableDiscovererITCase.java
@@ -0,0 +1,363 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.common.source;
+
+import org.apache.flink.cdc.common.configuration.Configuration;
+import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.source.discover.JdbcTableDiscoverer;
+import org.apache.flink.cdc.common.source.discover.TableDiscoverer;
+import org.apache.flink.cdc.common.source.discover.TableDiscovererFactory;
+
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.MySQLContainer;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.Statement;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/**
+ * Integration tests for {@link JdbcTableDiscoverer} and {@link TableDiscovererFactory} SPI loading.
+ * Uses a real MySQL container via Testcontainers to verify both the default shared-table mode and
+ * the advanced custom-query escape hatch.
+ */
+@Testcontainers
+class JdbcTableDiscovererITCase {
+
+ private static final String SUBSCRIBE_ID_ORDERS = "orders-subscription";
+ private static final String SUBSCRIBE_ID_ANALYTICS = "analytics-subscription";
+
+ @Container
+ private static final MySQLContainer> MYSQL =
+ new MySQLContainer<>("mysql:8.0")
+ .withDatabaseName("meta_db")
+ .withUsername("test_user")
+ .withPassword("test_password");
+
+ @BeforeAll
+ static void setupDatabase() throws Exception {
+ try (Connection conn =
+ DriverManager.getConnection(
+ MYSQL.getJdbcUrl(), MYSQL.getUsername(), MYSQL.getPassword());
+ Statement stmt = conn.createStatement()) {
+ // Shared subscription table with default column names.
+ stmt.execute(
+ "CREATE TABLE cdc_subscriptions ("
+ + " subscribe_id VARCHAR(64) NOT NULL,"
+ + " subscribe_table_name VARCHAR(255) NOT NULL,"
+ + " PRIMARY KEY (subscribe_id, subscribe_table_name)"
+ + ")");
+ stmt.execute(
+ "INSERT INTO cdc_subscriptions VALUES "
+ + "('orders-subscription', 'source_db.orders'),"
+ + "('orders-subscription', 'source_db.order_items'),"
+ + "('orders-subscription', 'source_db.products'),"
+ + "('analytics-subscription', 'analytics_db.user_events')");
+
+ // Shared subscription table using non-default column names.
+ stmt.execute(
+ "CREATE TABLE custom_subscriptions ("
+ + " sub_id VARCHAR(64) NOT NULL,"
+ + " table_fqn VARCHAR(255) NOT NULL,"
+ + " PRIMARY KEY (sub_id, table_fqn)"
+ + ")");
+ stmt.execute(
+ "INSERT INTO custom_subscriptions VALUES "
+ + "('warehouse-sub', 'warehouse.inventory'),"
+ + "('warehouse-sub', 'warehouse.shipments'),"
+ + "('hr-sub', 'hr.employees')");
+ }
+ }
+
+ // =========================================================================
+ // SPI loading
+ // =========================================================================
+
+ @Test
+ void testSpiLoadsJdbcFactory() {
+ TableDiscoverer discoverer =
+ TableDiscovererFactory.createDiscoverer(
+ "jdbc", Thread.currentThread().getContextClassLoader());
+ assertThat(discoverer).isInstanceOf(JdbcTableDiscoverer.class);
+ }
+
+ @Test
+ void testSpiCaseInsensitive() {
+ TableDiscoverer discoverer =
+ TableDiscovererFactory.createDiscoverer(
+ "JDBC", Thread.currentThread().getContextClassLoader());
+ assertThat(discoverer).isInstanceOf(JdbcTableDiscoverer.class);
+ }
+
+ @Test
+ void testSpiFailsForUnknownType() {
+ assertThatThrownBy(
+ () ->
+ TableDiscovererFactory.createDiscoverer(
+ "unknown-type",
+ Thread.currentThread().getContextClassLoader()))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("Unsupported 'table.discoverer.type' value: 'unknown-type'");
+ }
+
+ // =========================================================================
+ // Default mode — shared subscription table
+ // =========================================================================
+
+ @Test
+ void testDefaultModeWithDefaultColumns() throws Exception {
+ Configuration config =
+ sharedTableConfigBuilder()
+ .table("cdc_subscriptions")
+ .subscribeId(SUBSCRIBE_ID_ORDERS)
+ .build();
+
+ assertThat(runDiscovery(config))
+ .containsExactlyInAnyOrder(
+ TableId.tableId("source_db", "orders"),
+ TableId.tableId("source_db", "order_items"),
+ TableId.tableId("source_db", "products"));
+ }
+
+ @Test
+ void testDefaultModeFiltersBySubscribeId() throws Exception {
+ Configuration config =
+ sharedTableConfigBuilder()
+ .table("cdc_subscriptions")
+ .subscribeId(SUBSCRIBE_ID_ANALYTICS)
+ .build();
+
+ assertThat(runDiscovery(config))
+ .containsExactlyInAnyOrder(TableId.tableId("analytics_db", "user_events"));
+ }
+
+ @Test
+ void testDefaultModeWithCustomColumns() throws Exception {
+ Configuration config =
+ sharedTableConfigBuilder()
+ .table("custom_subscriptions")
+ .columnName("table_fqn")
+ .subscribeIdColumn("sub_id")
+ .subscribeId("warehouse-sub")
+ .build();
+
+ assertThat(runDiscovery(config))
+ .containsExactlyInAnyOrder(
+ TableId.tableId("warehouse", "inventory"),
+ TableId.tableId("warehouse", "shipments"));
+ }
+
+ @Test
+ void testDefaultModeReflectsDynamicChanges() throws Exception {
+ Configuration config =
+ sharedTableConfigBuilder()
+ .table("cdc_subscriptions")
+ .subscribeId(SUBSCRIBE_ID_ANALYTICS)
+ .build();
+
+ TableDiscoverer discoverer =
+ TableDiscovererFactory.createDiscoverer(
+ "jdbc", Thread.currentThread().getContextClassLoader());
+ discoverer.open(
+ TableDiscovererFactory.createContext(
+ config, Thread.currentThread().getContextClassLoader()));
+ try {
+ assertThat(discoverer.discover()).hasSize(1);
+
+ executeSql(
+ "INSERT INTO cdc_subscriptions VALUES "
+ + "('analytics-subscription', 'analytics_db.sessions')");
+ try {
+ Set updated = discoverer.discover();
+ assertThat(updated)
+ .containsExactlyInAnyOrder(
+ TableId.tableId("analytics_db", "user_events"),
+ TableId.tableId("analytics_db", "sessions"));
+ } finally {
+ executeSql(
+ "DELETE FROM cdc_subscriptions WHERE subscribe_table_name "
+ + "= 'analytics_db.sessions'");
+ }
+ } finally {
+ discoverer.close();
+ }
+ }
+
+ // =========================================================================
+ // Advanced escape hatch — custom query
+ // =========================================================================
+
+ @Test
+ void testCustomQueryReadsFirstColumn() throws Exception {
+ Map map = baseConnectionConfig();
+ map.put(
+ "table.discoverer.jdbc.subscribe-query",
+ "SELECT subscribe_table_name FROM cdc_subscriptions "
+ + "WHERE subscribe_id = 'orders-subscription' "
+ + "ORDER BY subscribe_table_name");
+
+ assertThat(runDiscovery(Configuration.fromMap(map)))
+ .containsExactlyInAnyOrder(
+ TableId.tableId("source_db", "order_items"),
+ TableId.tableId("source_db", "orders"),
+ TableId.tableId("source_db", "products"));
+ }
+
+ @Test
+ void testCustomQueryOverridesDefaultModeOptions() throws Exception {
+ // Mix both modes: the custom-query option must win and silently ignore the default-mode
+ // options below (table-name / subscribe-id).
+ Map map = baseConnectionConfig();
+ map.put("table.discoverer.jdbc.table-name", "cdc_subscriptions");
+ map.put("table.discoverer.jdbc.subscribe-id", SUBSCRIBE_ID_ANALYTICS);
+ map.put(
+ "table.discoverer.jdbc.subscribe-query",
+ "SELECT subscribe_table_name FROM cdc_subscriptions "
+ + "WHERE subscribe_id = 'orders-subscription'");
+
+ assertThat(runDiscovery(Configuration.fromMap(map)))
+ .containsExactlyInAnyOrder(
+ TableId.tableId("source_db", "orders"),
+ TableId.tableId("source_db", "order_items"),
+ TableId.tableId("source_db", "products"));
+ }
+
+ // =========================================================================
+ // Validation
+ // =========================================================================
+
+ @Test
+ void testMissingJdbcUrlThrows() {
+ Map map = new HashMap<>();
+ map.put("table.discoverer.jdbc.username", "user");
+ map.put("table.discoverer.jdbc.password", "pass");
+ map.put("table.discoverer.jdbc.table-name", "cdc_subscriptions");
+ map.put("table.discoverer.jdbc.subscribe-id", "x");
+
+ assertThatThrownBy(() -> runDiscovery(Configuration.fromMap(map)))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("table.discoverer.jdbc.url");
+ }
+
+ @Test
+ void testDefaultModeMissingTableNameThrows() {
+ Map map = baseConnectionConfig();
+ map.put("table.discoverer.jdbc.subscribe-id", SUBSCRIBE_ID_ORDERS);
+
+ assertThatThrownBy(() -> runDiscovery(Configuration.fromMap(map)))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("table.discoverer.jdbc.table-name");
+ }
+
+ @Test
+ void testDefaultModeMissingSubscribeIdThrows() {
+ Map map = baseConnectionConfig();
+ map.put("table.discoverer.jdbc.table-name", "cdc_subscriptions");
+
+ assertThatThrownBy(() -> runDiscovery(Configuration.fromMap(map)))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("table.discoverer.jdbc.subscribe-id");
+ }
+
+ // =========================================================================
+ // Helpers
+ // =========================================================================
+
+ private Set runDiscovery(Configuration config) throws Exception {
+ TableDiscoverer discoverer =
+ TableDiscovererFactory.createDiscoverer(
+ "jdbc", Thread.currentThread().getContextClassLoader());
+ discoverer.open(
+ TableDiscovererFactory.createContext(
+ config, Thread.currentThread().getContextClassLoader()));
+ try {
+ return discoverer.discover();
+ } finally {
+ discoverer.close();
+ }
+ }
+
+ private static Map baseConnectionConfig() {
+ Map map = new HashMap<>();
+ map.put("table.discoverer.jdbc.url", MYSQL.getJdbcUrl());
+ map.put("table.discoverer.jdbc.username", MYSQL.getUsername());
+ map.put("table.discoverer.jdbc.password", MYSQL.getPassword());
+ return map;
+ }
+
+ private static void executeSql(String sql) throws Exception {
+ try (Connection conn =
+ DriverManager.getConnection(
+ MYSQL.getJdbcUrl(), MYSQL.getUsername(), MYSQL.getPassword());
+ Statement stmt = conn.createStatement()) {
+ stmt.execute(sql);
+ }
+ }
+
+ private SharedTableConfigBuilder sharedTableConfigBuilder() {
+ return new SharedTableConfigBuilder();
+ }
+
+ private static final class SharedTableConfigBuilder {
+ private String table;
+ private String columnName;
+ private String subscribeIdColumn;
+ private String subscribeId;
+
+ SharedTableConfigBuilder table(String table) {
+ this.table = table;
+ return this;
+ }
+
+ SharedTableConfigBuilder columnName(String columnName) {
+ this.columnName = columnName;
+ return this;
+ }
+
+ SharedTableConfigBuilder subscribeIdColumn(String subscribeIdColumn) {
+ this.subscribeIdColumn = subscribeIdColumn;
+ return this;
+ }
+
+ SharedTableConfigBuilder subscribeId(String subscribeId) {
+ this.subscribeId = subscribeId;
+ return this;
+ }
+
+ Configuration build() {
+ Map map = baseConnectionConfig();
+ map.put("table.discoverer.jdbc.table-name", table);
+ map.put("table.discoverer.jdbc.subscribe-id", subscribeId);
+ if (columnName != null) {
+ map.put("table.discoverer.jdbc.column-name", columnName);
+ }
+ if (subscribeIdColumn != null) {
+ map.put("table.discoverer.jdbc.subscribe-id-column", subscribeIdColumn);
+ }
+ return Configuration.fromMap(map);
+ }
+ }
+}