diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/CommitterImpl.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/CommitterImpl.java index 04602a66a5e1..185c85e37c4a 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/CommitterImpl.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/CommitterImpl.java @@ -207,7 +207,7 @@ private void startCoordinator() { config.taskId()); Coordinator coordinator = new Coordinator(catalog, config, membersWhenWorkerIsCoordinator, clientFactory, context); - coordinatorThread = new CoordinatorThread(coordinator); + coordinatorThread = new CoordinatorThread(coordinator, config.connectorName()); coordinatorThread.start(); } } diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Coordinator.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Coordinator.java index 068e1e1f6e9c..c3994cfbc901 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Coordinator.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Coordinator.java @@ -107,7 +107,7 @@ class Coordinator extends Channel { new LinkedBlockingQueue<>(), new ThreadFactoryBuilder() .setDaemon(true) - .setNameFormat("iceberg-committer" + "-%d") + .setNameFormat("iceberg-committer-" + config.connectorName() + "-%d") .build()); this.commitState = new CommitState(config); } diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/CoordinatorThread.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/CoordinatorThread.java index b1a34d0474d9..3be126450585 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/CoordinatorThread.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/CoordinatorThread.java @@ -23,13 +23,12 @@ class CoordinatorThread extends Thread { private static final Logger LOG = LoggerFactory.getLogger(CoordinatorThread.class); - private static final String THREAD_NAME = "iceberg-coord"; private final Coordinator coordinator; private volatile boolean terminated; - CoordinatorThread(Coordinator coordinator) { - super(THREAD_NAME); + CoordinatorThread(Coordinator coordinator, String connectorName) { + super("iceberg-coord-" + connectorName); this.coordinator = coordinator; } diff --git a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/TestCoordinatorThread.java b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/TestCoordinatorThread.java index 79f0e8a62756..19be3324822f 100644 --- a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/TestCoordinatorThread.java +++ b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/TestCoordinatorThread.java @@ -31,7 +31,9 @@ public class TestCoordinatorThread { @Test public void testRun() { Coordinator coordinator = mock(Coordinator.class); - CoordinatorThread coordinatorThread = new CoordinatorThread(coordinator); + CoordinatorThread coordinatorThread = new CoordinatorThread(coordinator, "test-connector"); + + assertThat(coordinatorThread.getName()).isEqualTo("iceberg-coord-test-connector"); coordinatorThread.start();