diff --git a/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaSinkTaskContext.java b/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaSinkTaskContext.java index 76ff8c0c0a..8567c50215 100644 --- a/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaSinkTaskContext.java +++ b/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaSinkTaskContext.java @@ -126,7 +126,7 @@ private Long currentOffset(TopicPartition topicPartition) { public Map currentOffsets() { Map snapshot = Maps.newHashMapWithExpectedSize(currentOffsets.size()); currentOffsets.forEach((topicPartition, offset) -> { - if (offset > 0) { + if (offset >= 0) { snapshot.put(topicPartition, new OffsetAndMetadata(offset, Optional.empty(), null)); } diff --git a/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java b/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java index 74af038b31..a7e45de775 100644 --- a/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java +++ b/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java @@ -1207,6 +1207,10 @@ public void offsetTest() throws Exception { // offset is 0 for the first written record assertEquals(sink.currentOffset(topicName, partition), 0); + // current offsets map returned by the PulsarKafkaSinkTaskContext should contain the record with offset 0 + assertEquals( + sink.taskContext.currentOffsets().get(new TopicPartition(topicName, partition)).offset(), 0 + ); entryId.set(1); sink.write(record);