From 5476cba4f902fbeb85ed395b399b05fd5ab6d50f Mon Sep 17 00:00:00 2001 From: Asish Kumar Date: Sat, 11 Apr 2026 20:12:42 +0000 Subject: [PATCH 1/2] Fix Kafka commit ordering after producer writes Delay manual offset completion until the downstream Kafka producer callback succeeds. Propagate producer failures through the processing future so single and batch commit paths do not acknowledge records that were not written to the next topic. Add KafkaMosipEventBus tests for producer-success commit ordering and producer-failure behavior in both single and batch processing paths. Signed-off-by: Asish Kumar --- .../core/eventbus/KafkaMosipEventBus.java | 54 +++++++--- .../core/eventbus/KafkaMosipEventBusTest.java | 100 ++++++++++++++++++ 2 files changed, 138 insertions(+), 16 deletions(-) diff --git a/registration-processor/registration-processor-core/src/main/java/io/mosip/registration/processor/core/eventbus/KafkaMosipEventBus.java b/registration-processor/registration-processor-core/src/main/java/io/mosip/registration/processor/core/eventbus/KafkaMosipEventBus.java index f040c8c6d22..1dc02a3c81b 100644 --- a/registration-processor/registration-processor-core/src/main/java/io/mosip/registration/processor/core/eventbus/KafkaMosipEventBus.java +++ b/registration-processor/registration-processor-core/src/main/java/io/mosip/registration/processor/core/eventbus/KafkaMosipEventBus.java @@ -178,7 +178,7 @@ public void send(MessageBusAddress toAddress, MessageDTO message) { this.eventTracingHandler.writeHeaderOnKafkaProduce(producerRecord); Map mdc = MDC.getCopyOfContextMap(); kafkaProducer.write(producerRecord, handler -> { - MDC.setContextMap(mdc); + setMDCContextMap(mdc); if(handler.failed()) logger.error("Failed kafkaProducer.write {} {} ", handler.result(), handler.cause()); else @@ -317,6 +317,29 @@ Future setupAutoCommitProcessing( return promise.future(); } + private void completeProcessedRecord(KafkaConsumerRecord record, boolean commitRecord, + Promise promise) { + if(commitRecord) + commitOffset(record.topic(), record.partition(), + record.offset(), promise); + else + promise.complete(); + } + + private void failPromise(Promise promise, Throwable cause, String failureMessage) { + if(cause != null) + promise.fail(cause); + else + promise.fail(failureMessage); + } + + private void setMDCContextMap(Map mdc) { + if(mdc != null) + MDC.setContextMap(mdc); + else + MDC.clear(); + } + Future processRecord(MessageBusAddress toAddress, EventHandler>> eventHandler, KafkaConsumerRecord record, boolean commitRecord) { @@ -330,14 +353,12 @@ Future processRecord(MessageBusAddress toAddress, eventHandler.handle(eventDTO, res -> { if (!res.succeeded() && res.cause() instanceof MessageExpiredException) { logger.warn("Event handling failed {}", res.cause().getMessage()); - if(commitRecord) - commitOffset(record.topic(), record.partition(), - record.offset(), promise); - else - promise.complete(); + completeProcessedRecord(record, commitRecord, promise); + this.eventTracingHandler.closeSpan(span); } else if(!res.succeeded()) { logger.error("Event handling failed {}", res.cause()); - promise.fail(res.cause()); + failPromise(promise, res.cause(), "Event handling failed"); + this.eventTracingHandler.closeSpan(span); } else { if(toAddress != null) { MessageDTO messageDTO = res.result(); @@ -349,21 +370,22 @@ Future processRecord(MessageBusAddress toAddress, getKafkaKey(messageDTO, messageBusToAddress), jsonObject.toString()); this.eventTracingHandler.writeHeaderOnKafkaProduce(producerRecord, span); kafkaProducer.write(producerRecord, handler -> { - MDC.setContextMap(mdc); - if(handler.failed()) + setMDCContextMap(mdc); + if(handler.failed()) { logger.error("Failed kafkaProducer.write {} ", handler.result(), handler.cause()); - else + failPromise(promise, handler.cause(), "Failed kafkaProducer.write"); + } else { logger.info("Success kafkaProducer.write {} ", handler.result()); + completeProcessedRecord(record, commitRecord, promise); + } MDC.clear(); + this.eventTracingHandler.closeSpan(span); }); + } else { + completeProcessedRecord(record, commitRecord, promise); + this.eventTracingHandler.closeSpan(span); } - if(commitRecord) - commitOffset(record.topic(), record.partition(), - record.offset(), promise); - else - promise.complete(); } - this.eventTracingHandler.closeSpan(span); }); return promise.future(); } diff --git a/registration-processor/registration-processor-core/src/test/java/io/mosip/registration/processor/core/eventbus/KafkaMosipEventBusTest.java b/registration-processor/registration-processor-core/src/test/java/io/mosip/registration/processor/core/eventbus/KafkaMosipEventBusTest.java index 8c35485c68c..39deed9215c 100644 --- a/registration-processor/registration-processor-core/src/test/java/io/mosip/registration/processor/core/eventbus/KafkaMosipEventBusTest.java +++ b/registration-processor/registration-processor-core/src/test/java/io/mosip/registration/processor/core/eventbus/KafkaMosipEventBusTest.java @@ -47,6 +47,7 @@ import io.mosip.registration.processor.core.spi.eventbus.EventHandler; import io.mosip.registration.processor.core.tracing.EventTracingHandler; import io.vertx.core.AsyncResult; +import io.vertx.core.Future; import io.vertx.core.Handler; import io.vertx.core.Vertx; import io.vertx.core.json.JsonObject; @@ -55,6 +56,7 @@ import io.vertx.ext.unit.junit.VertxUnitRunner; import io.vertx.kafka.client.common.PartitionInfo; import io.vertx.kafka.client.consumer.KafkaConsumer; +import io.vertx.kafka.client.consumer.KafkaConsumerRecord; import io.vertx.kafka.client.consumer.KafkaConsumerRecords; import io.vertx.kafka.client.consumer.OffsetAndMetadata; import io.vertx.kafka.client.consumer.impl.KafkaConsumerRecordsImpl; @@ -97,6 +99,56 @@ public void setup(TestContext testContext) throws Exception { .thenReturn(kafkaProducer); eventTracingHandler = new EventTracingHandler(tracing, "kafka"); + mockKafkaProducerWriteResult(false, null); + } + + + private void mockKafkaProducerWriteResult(boolean failed, Throwable cause) { + AsyncResult producerAsyncResult = Mockito.mock(AsyncResult.class); + Mockito.when(producerAsyncResult.failed()).thenReturn(failed); + Mockito.when(producerAsyncResult.succeeded()).thenReturn(!failed); + Mockito.when(producerAsyncResult.cause()).thenReturn(cause); + doAnswer((Answer>) arguments -> { + ((Handler) arguments.getArgument(1)).handle(producerAsyncResult); + return kafkaProducer; + }).when(kafkaProducer).write(any(KafkaProducerRecord.class), any(Handler.class)); + } + + private void mockCommitResult(boolean succeeded) { + AsyncResult voidAsyncResult = Mockito.mock(AsyncResult.class); + Mockito.when(voidAsyncResult.succeeded()).thenReturn(succeeded); + doAnswer((Answer>) arguments -> { + ((Handler>) arguments.getArgument(1)).handle(voidAsyncResult); + return null; + }).when(kafkaConsumer).commit(anyMap(), any()); + } + + private EventHandler>> prepareSuccessfulEventHandler() { + EventHandler>> eventHandler = + Mockito.mock(EventHandler.class); + doAnswer((Answer>) arguments -> { + AsyncResult asyncResultForMessageDTO = Mockito.mock(AsyncResult.class); + Mockito.when(asyncResultForMessageDTO.succeeded()).thenReturn(true); + MessageDTO messageDTO = new MessageDTO(); + messageDTO.setRid("1001"); + messageDTO.setReg_type(RegistrationType.NEW.name()); + Mockito.when(asyncResultForMessageDTO.result()).thenReturn(messageDTO); + ((Handler>) arguments.getArgument(1)) + .handle(asyncResultForMessageDTO); + return null; + }).when(eventHandler).handle(any(), any()); + return eventHandler; + } + + private KafkaConsumerRecord prepareKafkaConsumerRecord() { + KafkaConsumerRecord record = Mockito.mock(KafkaConsumerRecord.class); + Mockito.when(record.headers()).thenReturn(new ArrayList<>()); + Mockito.when(record.key()).thenReturn("1001"); + Mockito.when(record.value()).thenReturn("{\"rid\":\"1001\", \"reg_type\": \"NEW\" }"); + Mockito.when(record.topic()).thenReturn(MessageBusAddress.PACKET_VALIDATOR_BUS_IN.getAddress()); + Mockito.when(record.partition()).thenReturn(0); + Mockito.when(record.offset()).thenReturn(10L); + return record; } @After @@ -306,6 +358,54 @@ public void testConsumeAndSendWithSingleCommitType(TestContext testContext) { verify(kafkaProducer, times(testDataCount)).write(any(), any()); } + + @Test + public void testProcessRecordCommitsAfterProducerWriteSuccess(TestContext testContext) { + kafkaMosipEventBus = new KafkaMosipEventBus(vertx, "localhost:9091", "group_1", + "single", "100", "30000", 60000, eventTracingHandler, caffeineCacheManager); + mockCommitResult(true); + + Future result = kafkaMosipEventBus.processRecord(MessageBusAddress.PACKET_UPLOADER_OUT, + prepareSuccessfulEventHandler(), prepareKafkaConsumerRecord(), true); + + assertTrue(result.succeeded()); + InOrder inOrder = Mockito.inOrder(kafkaProducer, kafkaConsumer); + inOrder.verify(kafkaProducer, times(1)).write(any(KafkaProducerRecord.class), any(Handler.class)); + inOrder.verify(kafkaConsumer, times(1)).commit(anyMap(), any()); + } + + @Test + public void testProcessRecordFailsAndDoesNotCommitWhenProducerWriteFails(TestContext testContext) { + RuntimeException producerException = new RuntimeException("producer write failed"); + mockKafkaProducerWriteResult(true, producerException); + kafkaMosipEventBus = new KafkaMosipEventBus(vertx, "localhost:9091", "group_1", + "single", "100", "30000", 60000, eventTracingHandler, caffeineCacheManager); + + Future result = kafkaMosipEventBus.processRecord(MessageBusAddress.PACKET_UPLOADER_OUT, + prepareSuccessfulEventHandler(), prepareKafkaConsumerRecord(), true); + + assertTrue(result.failed()); + assertEquals(producerException, result.cause()); + verify(kafkaProducer, times(1)).write(any(KafkaProducerRecord.class), any(Handler.class)); + verify(kafkaConsumer, times(0)).commit(anyMap(), any()); + } + + @Test + public void testProcessRecordFailsBatchFutureWhenProducerWriteFails(TestContext testContext) { + RuntimeException producerException = new RuntimeException("producer write failed"); + mockKafkaProducerWriteResult(true, producerException); + kafkaMosipEventBus = new KafkaMosipEventBus(vertx, "localhost:9091", "group_1", + "batch", "100", "30000", 60000, eventTracingHandler, caffeineCacheManager); + + Future result = kafkaMosipEventBus.processRecord(MessageBusAddress.PACKET_UPLOADER_OUT, + prepareSuccessfulEventHandler(), prepareKafkaConsumerRecord(), false); + + assertTrue(result.failed()); + assertEquals(producerException, result.cause()); + verify(kafkaProducer, times(1)).write(any(KafkaProducerRecord.class), any(Handler.class)); + verify(kafkaConsumer, times(0)).commit(anyMap(), any()); + } + @Test public void testConsumeWithAutoCommitType(TestContext testContext) { int testDataCount = 20; From 9268c83ec326d7dbb5580d84fe4c375e9132b5a3 Mon Sep 17 00:00:00 2001 From: Asish Kumar Date: Sun, 12 Apr 2026 05:50:29 +0000 Subject: [PATCH 2/2] Fix Kafka partition resume on producer failure --- .../core/eventbus/KafkaMosipEventBus.java | 9 ++++ .../core/eventbus/KafkaMosipEventBusTest.java | 54 ++++++++++++++----- 2 files changed, 51 insertions(+), 12 deletions(-) diff --git a/registration-processor/registration-processor-core/src/main/java/io/mosip/registration/processor/core/eventbus/KafkaMosipEventBus.java b/registration-processor/registration-processor-core/src/main/java/io/mosip/registration/processor/core/eventbus/KafkaMosipEventBus.java index 1dc02a3c81b..7a880228eb6 100644 --- a/registration-processor/registration-processor-core/src/main/java/io/mosip/registration/processor/core/eventbus/KafkaMosipEventBus.java +++ b/registration-processor/registration-processor-core/src/main/java/io/mosip/registration/processor/core/eventbus/KafkaMosipEventBus.java @@ -333,6 +333,14 @@ private void failPromise(Promise promise, Throwable cause, String failureM promise.fail(failureMessage); } + private void resumeRecordPartition(KafkaConsumerRecord record) { + Promise resumePromise = Promise.promise(); + resumePartition(new TopicPartition(record.topic(), record.partition()), resumePromise); + resumePromise.future().onFailure(cause -> logger.error( + "Partition resume failed for topic: {} partition: {}", + record.topic(), record.partition(), cause)); + } + private void setMDCContextMap(Map mdc) { if(mdc != null) MDC.setContextMap(mdc); @@ -373,6 +381,7 @@ Future processRecord(MessageBusAddress toAddress, setMDCContextMap(mdc); if(handler.failed()) { logger.error("Failed kafkaProducer.write {} ", handler.result(), handler.cause()); + resumeRecordPartition(record); failPromise(promise, handler.cause(), "Failed kafkaProducer.write"); } else { logger.info("Success kafkaProducer.write {} ", handler.result()); diff --git a/registration-processor/registration-processor-core/src/test/java/io/mosip/registration/processor/core/eventbus/KafkaMosipEventBusTest.java b/registration-processor/registration-processor-core/src/test/java/io/mosip/registration/processor/core/eventbus/KafkaMosipEventBusTest.java index 39deed9215c..21f81cee10e 100644 --- a/registration-processor/registration-processor-core/src/test/java/io/mosip/registration/processor/core/eventbus/KafkaMosipEventBusTest.java +++ b/registration-processor/registration-processor-core/src/test/java/io/mosip/registration/processor/core/eventbus/KafkaMosipEventBusTest.java @@ -123,6 +123,15 @@ private void mockCommitResult(boolean succeeded) { }).when(kafkaConsumer).commit(anyMap(), any()); } + private void mockResumeResult(boolean succeeded) { + AsyncResult voidAsyncResult = Mockito.mock(AsyncResult.class); + Mockito.when(voidAsyncResult.succeeded()).thenReturn(succeeded); + doAnswer((Answer>) arguments -> { + ((Handler>) arguments.getArgument(1)).handle(voidAsyncResult); + return null; + }).when(kafkaConsumer).resume(any(io.vertx.kafka.client.common.TopicPartition.class), any()); + } + private EventHandler>> prepareSuccessfulEventHandler() { EventHandler>> eventHandler = Mockito.mock(EventHandler.class); @@ -361,6 +370,7 @@ public void testConsumeAndSendWithSingleCommitType(TestContext testContext) { @Test public void testProcessRecordCommitsAfterProducerWriteSuccess(TestContext testContext) { + Async async = testContext.async(); kafkaMosipEventBus = new KafkaMosipEventBus(vertx, "localhost:9091", "group_1", "single", "100", "30000", 60000, eventTracingHandler, caffeineCacheManager); mockCommitResult(true); @@ -368,42 +378,62 @@ public void testProcessRecordCommitsAfterProducerWriteSuccess(TestContext testCo Future result = kafkaMosipEventBus.processRecord(MessageBusAddress.PACKET_UPLOADER_OUT, prepareSuccessfulEventHandler(), prepareKafkaConsumerRecord(), true); - assertTrue(result.succeeded()); - InOrder inOrder = Mockito.inOrder(kafkaProducer, kafkaConsumer); - inOrder.verify(kafkaProducer, times(1)).write(any(KafkaProducerRecord.class), any(Handler.class)); - inOrder.verify(kafkaConsumer, times(1)).commit(anyMap(), any()); + result.onComplete(handler -> { + assertTrue(handler.succeeded()); + InOrder inOrder = Mockito.inOrder(kafkaProducer, kafkaConsumer); + inOrder.verify(kafkaProducer, times(1)).write(any(KafkaProducerRecord.class), any(Handler.class)); + inOrder.verify(kafkaConsumer, times(1)).commit(anyMap(), any()); + async.complete(); + }); + async.await(); } @Test public void testProcessRecordFailsAndDoesNotCommitWhenProducerWriteFails(TestContext testContext) { + Async async = testContext.async(); RuntimeException producerException = new RuntimeException("producer write failed"); mockKafkaProducerWriteResult(true, producerException); + mockResumeResult(true); kafkaMosipEventBus = new KafkaMosipEventBus(vertx, "localhost:9091", "group_1", "single", "100", "30000", 60000, eventTracingHandler, caffeineCacheManager); Future result = kafkaMosipEventBus.processRecord(MessageBusAddress.PACKET_UPLOADER_OUT, prepareSuccessfulEventHandler(), prepareKafkaConsumerRecord(), true); - assertTrue(result.failed()); - assertEquals(producerException, result.cause()); - verify(kafkaProducer, times(1)).write(any(KafkaProducerRecord.class), any(Handler.class)); - verify(kafkaConsumer, times(0)).commit(anyMap(), any()); + result.onComplete(handler -> { + assertTrue(handler.failed()); + assertEquals(producerException, handler.cause()); + verify(kafkaProducer, times(1)).write(any(KafkaProducerRecord.class), any(Handler.class)); + verify(kafkaConsumer, times(0)).commit(anyMap(), any()); + verify(kafkaConsumer, times(1)).resume( + any(io.vertx.kafka.client.common.TopicPartition.class), any()); + async.complete(); + }); + async.await(); } @Test public void testProcessRecordFailsBatchFutureWhenProducerWriteFails(TestContext testContext) { + Async async = testContext.async(); RuntimeException producerException = new RuntimeException("producer write failed"); mockKafkaProducerWriteResult(true, producerException); + mockResumeResult(true); kafkaMosipEventBus = new KafkaMosipEventBus(vertx, "localhost:9091", "group_1", "batch", "100", "30000", 60000, eventTracingHandler, caffeineCacheManager); Future result = kafkaMosipEventBus.processRecord(MessageBusAddress.PACKET_UPLOADER_OUT, prepareSuccessfulEventHandler(), prepareKafkaConsumerRecord(), false); - assertTrue(result.failed()); - assertEquals(producerException, result.cause()); - verify(kafkaProducer, times(1)).write(any(KafkaProducerRecord.class), any(Handler.class)); - verify(kafkaConsumer, times(0)).commit(anyMap(), any()); + result.onComplete(handler -> { + assertTrue(handler.failed()); + assertEquals(producerException, handler.cause()); + verify(kafkaProducer, times(1)).write(any(KafkaProducerRecord.class), any(Handler.class)); + verify(kafkaConsumer, times(0)).commit(anyMap(), any()); + verify(kafkaConsumer, times(1)).resume( + any(io.vertx.kafka.client.common.TopicPartition.class), any()); + async.complete(); + }); + async.await(); } @Test