diff --git a/registration-processor/registration-processor-core/src/main/java/io/mosip/registration/processor/core/eventbus/KafkaMosipEventBus.java b/registration-processor/registration-processor-core/src/main/java/io/mosip/registration/processor/core/eventbus/KafkaMosipEventBus.java index f040c8c6d22..7a880228eb6 100644 --- a/registration-processor/registration-processor-core/src/main/java/io/mosip/registration/processor/core/eventbus/KafkaMosipEventBus.java +++ b/registration-processor/registration-processor-core/src/main/java/io/mosip/registration/processor/core/eventbus/KafkaMosipEventBus.java @@ -178,7 +178,7 @@ public void send(MessageBusAddress toAddress, MessageDTO message) { this.eventTracingHandler.writeHeaderOnKafkaProduce(producerRecord); Map mdc = MDC.getCopyOfContextMap(); kafkaProducer.write(producerRecord, handler -> { - MDC.setContextMap(mdc); + setMDCContextMap(mdc); if(handler.failed()) logger.error("Failed kafkaProducer.write {} {} ", handler.result(), handler.cause()); else @@ -317,6 +317,37 @@ Future setupAutoCommitProcessing( return promise.future(); } + private void completeProcessedRecord(KafkaConsumerRecord record, boolean commitRecord, + Promise promise) { + if(commitRecord) + commitOffset(record.topic(), record.partition(), + record.offset(), promise); + else + promise.complete(); + } + + private void failPromise(Promise promise, Throwable cause, String failureMessage) { + if(cause != null) + promise.fail(cause); + else + promise.fail(failureMessage); + } + + private void resumeRecordPartition(KafkaConsumerRecord record) { + Promise resumePromise = Promise.promise(); + resumePartition(new TopicPartition(record.topic(), record.partition()), resumePromise); + resumePromise.future().onFailure(cause -> logger.error( + "Partition resume failed for topic: {} partition: {}", + record.topic(), record.partition(), cause)); + } + + private void setMDCContextMap(Map mdc) { + if(mdc != null) + MDC.setContextMap(mdc); + else + MDC.clear(); + } + Future processRecord(MessageBusAddress toAddress, EventHandler>> eventHandler, KafkaConsumerRecord record, boolean commitRecord) { @@ -330,14 +361,12 @@ Future processRecord(MessageBusAddress toAddress, eventHandler.handle(eventDTO, res -> { if (!res.succeeded() && res.cause() instanceof MessageExpiredException) { logger.warn("Event handling failed {}", res.cause().getMessage()); - if(commitRecord) - commitOffset(record.topic(), record.partition(), - record.offset(), promise); - else - promise.complete(); + completeProcessedRecord(record, commitRecord, promise); + this.eventTracingHandler.closeSpan(span); } else if(!res.succeeded()) { logger.error("Event handling failed {}", res.cause()); - promise.fail(res.cause()); + failPromise(promise, res.cause(), "Event handling failed"); + this.eventTracingHandler.closeSpan(span); } else { if(toAddress != null) { MessageDTO messageDTO = res.result(); @@ -349,21 +378,23 @@ Future processRecord(MessageBusAddress toAddress, getKafkaKey(messageDTO, messageBusToAddress), jsonObject.toString()); this.eventTracingHandler.writeHeaderOnKafkaProduce(producerRecord, span); kafkaProducer.write(producerRecord, handler -> { - MDC.setContextMap(mdc); - if(handler.failed()) + setMDCContextMap(mdc); + if(handler.failed()) { logger.error("Failed kafkaProducer.write {} ", handler.result(), handler.cause()); - else + resumeRecordPartition(record); + failPromise(promise, handler.cause(), "Failed kafkaProducer.write"); + } else { logger.info("Success kafkaProducer.write {} ", handler.result()); + completeProcessedRecord(record, commitRecord, promise); + } MDC.clear(); + this.eventTracingHandler.closeSpan(span); }); + } else { + completeProcessedRecord(record, commitRecord, promise); + this.eventTracingHandler.closeSpan(span); } - if(commitRecord) - commitOffset(record.topic(), record.partition(), - record.offset(), promise); - else - promise.complete(); } - this.eventTracingHandler.closeSpan(span); }); return promise.future(); } diff --git a/registration-processor/registration-processor-core/src/test/java/io/mosip/registration/processor/core/eventbus/KafkaMosipEventBusTest.java b/registration-processor/registration-processor-core/src/test/java/io/mosip/registration/processor/core/eventbus/KafkaMosipEventBusTest.java index 8c35485c68c..21f81cee10e 100644 --- a/registration-processor/registration-processor-core/src/test/java/io/mosip/registration/processor/core/eventbus/KafkaMosipEventBusTest.java +++ b/registration-processor/registration-processor-core/src/test/java/io/mosip/registration/processor/core/eventbus/KafkaMosipEventBusTest.java @@ -47,6 +47,7 @@ import io.mosip.registration.processor.core.spi.eventbus.EventHandler; import io.mosip.registration.processor.core.tracing.EventTracingHandler; import io.vertx.core.AsyncResult; +import io.vertx.core.Future; import io.vertx.core.Handler; import io.vertx.core.Vertx; import io.vertx.core.json.JsonObject; @@ -55,6 +56,7 @@ import io.vertx.ext.unit.junit.VertxUnitRunner; import io.vertx.kafka.client.common.PartitionInfo; import io.vertx.kafka.client.consumer.KafkaConsumer; +import io.vertx.kafka.client.consumer.KafkaConsumerRecord; import io.vertx.kafka.client.consumer.KafkaConsumerRecords; import io.vertx.kafka.client.consumer.OffsetAndMetadata; import io.vertx.kafka.client.consumer.impl.KafkaConsumerRecordsImpl; @@ -97,6 +99,65 @@ public void setup(TestContext testContext) throws Exception { .thenReturn(kafkaProducer); eventTracingHandler = new EventTracingHandler(tracing, "kafka"); + mockKafkaProducerWriteResult(false, null); + } + + + private void mockKafkaProducerWriteResult(boolean failed, Throwable cause) { + AsyncResult producerAsyncResult = Mockito.mock(AsyncResult.class); + Mockito.when(producerAsyncResult.failed()).thenReturn(failed); + Mockito.when(producerAsyncResult.succeeded()).thenReturn(!failed); + Mockito.when(producerAsyncResult.cause()).thenReturn(cause); + doAnswer((Answer>) arguments -> { + ((Handler) arguments.getArgument(1)).handle(producerAsyncResult); + return kafkaProducer; + }).when(kafkaProducer).write(any(KafkaProducerRecord.class), any(Handler.class)); + } + + private void mockCommitResult(boolean succeeded) { + AsyncResult voidAsyncResult = Mockito.mock(AsyncResult.class); + Mockito.when(voidAsyncResult.succeeded()).thenReturn(succeeded); + doAnswer((Answer>) arguments -> { + ((Handler>) arguments.getArgument(1)).handle(voidAsyncResult); + return null; + }).when(kafkaConsumer).commit(anyMap(), any()); + } + + private void mockResumeResult(boolean succeeded) { + AsyncResult voidAsyncResult = Mockito.mock(AsyncResult.class); + Mockito.when(voidAsyncResult.succeeded()).thenReturn(succeeded); + doAnswer((Answer>) arguments -> { + ((Handler>) arguments.getArgument(1)).handle(voidAsyncResult); + return null; + }).when(kafkaConsumer).resume(any(io.vertx.kafka.client.common.TopicPartition.class), any()); + } + + private EventHandler>> prepareSuccessfulEventHandler() { + EventHandler>> eventHandler = + Mockito.mock(EventHandler.class); + doAnswer((Answer>) arguments -> { + AsyncResult asyncResultForMessageDTO = Mockito.mock(AsyncResult.class); + Mockito.when(asyncResultForMessageDTO.succeeded()).thenReturn(true); + MessageDTO messageDTO = new MessageDTO(); + messageDTO.setRid("1001"); + messageDTO.setReg_type(RegistrationType.NEW.name()); + Mockito.when(asyncResultForMessageDTO.result()).thenReturn(messageDTO); + ((Handler>) arguments.getArgument(1)) + .handle(asyncResultForMessageDTO); + return null; + }).when(eventHandler).handle(any(), any()); + return eventHandler; + } + + private KafkaConsumerRecord prepareKafkaConsumerRecord() { + KafkaConsumerRecord record = Mockito.mock(KafkaConsumerRecord.class); + Mockito.when(record.headers()).thenReturn(new ArrayList<>()); + Mockito.when(record.key()).thenReturn("1001"); + Mockito.when(record.value()).thenReturn("{\"rid\":\"1001\", \"reg_type\": \"NEW\" }"); + Mockito.when(record.topic()).thenReturn(MessageBusAddress.PACKET_VALIDATOR_BUS_IN.getAddress()); + Mockito.when(record.partition()).thenReturn(0); + Mockito.when(record.offset()).thenReturn(10L); + return record; } @After @@ -306,6 +367,75 @@ public void testConsumeAndSendWithSingleCommitType(TestContext testContext) { verify(kafkaProducer, times(testDataCount)).write(any(), any()); } + + @Test + public void testProcessRecordCommitsAfterProducerWriteSuccess(TestContext testContext) { + Async async = testContext.async(); + kafkaMosipEventBus = new KafkaMosipEventBus(vertx, "localhost:9091", "group_1", + "single", "100", "30000", 60000, eventTracingHandler, caffeineCacheManager); + mockCommitResult(true); + + Future result = kafkaMosipEventBus.processRecord(MessageBusAddress.PACKET_UPLOADER_OUT, + prepareSuccessfulEventHandler(), prepareKafkaConsumerRecord(), true); + + result.onComplete(handler -> { + assertTrue(handler.succeeded()); + InOrder inOrder = Mockito.inOrder(kafkaProducer, kafkaConsumer); + inOrder.verify(kafkaProducer, times(1)).write(any(KafkaProducerRecord.class), any(Handler.class)); + inOrder.verify(kafkaConsumer, times(1)).commit(anyMap(), any()); + async.complete(); + }); + async.await(); + } + + @Test + public void testProcessRecordFailsAndDoesNotCommitWhenProducerWriteFails(TestContext testContext) { + Async async = testContext.async(); + RuntimeException producerException = new RuntimeException("producer write failed"); + mockKafkaProducerWriteResult(true, producerException); + mockResumeResult(true); + kafkaMosipEventBus = new KafkaMosipEventBus(vertx, "localhost:9091", "group_1", + "single", "100", "30000", 60000, eventTracingHandler, caffeineCacheManager); + + Future result = kafkaMosipEventBus.processRecord(MessageBusAddress.PACKET_UPLOADER_OUT, + prepareSuccessfulEventHandler(), prepareKafkaConsumerRecord(), true); + + result.onComplete(handler -> { + assertTrue(handler.failed()); + assertEquals(producerException, handler.cause()); + verify(kafkaProducer, times(1)).write(any(KafkaProducerRecord.class), any(Handler.class)); + verify(kafkaConsumer, times(0)).commit(anyMap(), any()); + verify(kafkaConsumer, times(1)).resume( + any(io.vertx.kafka.client.common.TopicPartition.class), any()); + async.complete(); + }); + async.await(); + } + + @Test + public void testProcessRecordFailsBatchFutureWhenProducerWriteFails(TestContext testContext) { + Async async = testContext.async(); + RuntimeException producerException = new RuntimeException("producer write failed"); + mockKafkaProducerWriteResult(true, producerException); + mockResumeResult(true); + kafkaMosipEventBus = new KafkaMosipEventBus(vertx, "localhost:9091", "group_1", + "batch", "100", "30000", 60000, eventTracingHandler, caffeineCacheManager); + + Future result = kafkaMosipEventBus.processRecord(MessageBusAddress.PACKET_UPLOADER_OUT, + prepareSuccessfulEventHandler(), prepareKafkaConsumerRecord(), false); + + result.onComplete(handler -> { + assertTrue(handler.failed()); + assertEquals(producerException, handler.cause()); + verify(kafkaProducer, times(1)).write(any(KafkaProducerRecord.class), any(Handler.class)); + verify(kafkaConsumer, times(0)).commit(anyMap(), any()); + verify(kafkaConsumer, times(1)).resume( + any(io.vertx.kafka.client.common.TopicPartition.class), any()); + async.complete(); + }); + async.await(); + } + @Test public void testConsumeWithAutoCommitType(TestContext testContext) { int testDataCount = 20;