Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ public void send(MessageBusAddress toAddress, MessageDTO message) {
this.eventTracingHandler.writeHeaderOnKafkaProduce(producerRecord);
Map<String, String> mdc = MDC.getCopyOfContextMap();
kafkaProducer.write(producerRecord, handler -> {
MDC.setContextMap(mdc);
setMDCContextMap(mdc);
if(handler.failed())
logger.error("Failed kafkaProducer.write {} {} ", handler.result(), handler.cause());
else
Expand Down Expand Up @@ -317,6 +317,37 @@ Future<Void> setupAutoCommitProcessing(
return promise.future();
}

private void completeProcessedRecord(KafkaConsumerRecord<String, String> record, boolean commitRecord,
Promise<Void> promise) {
if(commitRecord)
commitOffset(record.topic(), record.partition(),
record.offset(), promise);
else
promise.complete();
}

private void failPromise(Promise<Void> promise, Throwable cause, String failureMessage) {
if(cause != null)
promise.fail(cause);
else
promise.fail(failureMessage);
}

private void resumeRecordPartition(KafkaConsumerRecord<String, String> record) {
Promise<Void> resumePromise = Promise.promise();
resumePartition(new TopicPartition(record.topic(), record.partition()), resumePromise);
resumePromise.future().onFailure(cause -> logger.error(
"Partition resume failed for topic: {} partition: {}",
record.topic(), record.partition(), cause));
}

private void setMDCContextMap(Map<String, String> mdc) {
if(mdc != null)
MDC.setContextMap(mdc);
else
MDC.clear();
}

Future<Void> processRecord(MessageBusAddress toAddress,
EventHandler<EventDTO, Handler<AsyncResult<MessageDTO>>> eventHandler,
KafkaConsumerRecord<String, String> record, boolean commitRecord) {
Expand All @@ -330,14 +361,12 @@ Future<Void> processRecord(MessageBusAddress toAddress,
eventHandler.handle(eventDTO, res -> {
if (!res.succeeded() && res.cause() instanceof MessageExpiredException) {
logger.warn("Event handling failed {}", res.cause().getMessage());
if(commitRecord)
commitOffset(record.topic(), record.partition(),
record.offset(), promise);
else
promise.complete();
completeProcessedRecord(record, commitRecord, promise);
this.eventTracingHandler.closeSpan(span);
} else if(!res.succeeded()) {
logger.error("Event handling failed {}", res.cause());
promise.fail(res.cause());
failPromise(promise, res.cause(), "Event handling failed");
this.eventTracingHandler.closeSpan(span);
} else {
if(toAddress != null) {
MessageDTO messageDTO = res.result();
Expand All @@ -349,21 +378,23 @@ Future<Void> processRecord(MessageBusAddress toAddress,
getKafkaKey(messageDTO, messageBusToAddress), jsonObject.toString());
this.eventTracingHandler.writeHeaderOnKafkaProduce(producerRecord, span);
kafkaProducer.write(producerRecord, handler -> {
MDC.setContextMap(mdc);
if(handler.failed())
setMDCContextMap(mdc);
if(handler.failed()) {
logger.error("Failed kafkaProducer.write {} ", handler.result(), handler.cause());
else
resumeRecordPartition(record);
failPromise(promise, handler.cause(), "Failed kafkaProducer.write");
} else {
logger.info("Success kafkaProducer.write {} ", handler.result());
completeProcessedRecord(record, commitRecord, promise);
}
MDC.clear();
this.eventTracingHandler.closeSpan(span);
Comment thread
coderabbitai[bot] marked this conversation as resolved.
});
} else {
completeProcessedRecord(record, commitRecord, promise);
this.eventTracingHandler.closeSpan(span);
}
if(commitRecord)
commitOffset(record.topic(), record.partition(),
record.offset(), promise);
else
promise.complete();
}
this.eventTracingHandler.closeSpan(span);
});
return promise.future();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import io.mosip.registration.processor.core.spi.eventbus.EventHandler;
import io.mosip.registration.processor.core.tracing.EventTracingHandler;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.json.JsonObject;
Expand All @@ -55,6 +56,7 @@
import io.vertx.ext.unit.junit.VertxUnitRunner;
import io.vertx.kafka.client.common.PartitionInfo;
import io.vertx.kafka.client.consumer.KafkaConsumer;
import io.vertx.kafka.client.consumer.KafkaConsumerRecord;
import io.vertx.kafka.client.consumer.KafkaConsumerRecords;
import io.vertx.kafka.client.consumer.OffsetAndMetadata;
import io.vertx.kafka.client.consumer.impl.KafkaConsumerRecordsImpl;
Expand Down Expand Up @@ -97,6 +99,65 @@ public void setup(TestContext testContext) throws Exception {
.thenReturn(kafkaProducer);

eventTracingHandler = new EventTracingHandler(tracing, "kafka");
mockKafkaProducerWriteResult(false, null);
}


private void mockKafkaProducerWriteResult(boolean failed, Throwable cause) {
AsyncResult producerAsyncResult = Mockito.mock(AsyncResult.class);
Mockito.when(producerAsyncResult.failed()).thenReturn(failed);
Mockito.when(producerAsyncResult.succeeded()).thenReturn(!failed);
Mockito.when(producerAsyncResult.cause()).thenReturn(cause);
doAnswer((Answer<KafkaProducer<String, String>>) arguments -> {
((Handler<AsyncResult>) arguments.getArgument(1)).handle(producerAsyncResult);
return kafkaProducer;
}).when(kafkaProducer).write(any(KafkaProducerRecord.class), any(Handler.class));
}

private void mockCommitResult(boolean succeeded) {
AsyncResult<Void> voidAsyncResult = Mockito.mock(AsyncResult.class);
Mockito.when(voidAsyncResult.succeeded()).thenReturn(succeeded);
doAnswer((Answer<AsyncResult<Void>>) arguments -> {
((Handler<AsyncResult<Void>>) arguments.getArgument(1)).handle(voidAsyncResult);
return null;
}).when(kafkaConsumer).commit(anyMap(), any());
}

private void mockResumeResult(boolean succeeded) {
AsyncResult<Void> voidAsyncResult = Mockito.mock(AsyncResult.class);
Mockito.when(voidAsyncResult.succeeded()).thenReturn(succeeded);
doAnswer((Answer<AsyncResult<Void>>) arguments -> {
((Handler<AsyncResult<Void>>) arguments.getArgument(1)).handle(voidAsyncResult);
return null;
}).when(kafkaConsumer).resume(any(io.vertx.kafka.client.common.TopicPartition.class), any());
}

private EventHandler<EventDTO, Handler<AsyncResult<MessageDTO>>> prepareSuccessfulEventHandler() {
EventHandler<EventDTO, Handler<AsyncResult<MessageDTO>>> eventHandler =
Mockito.mock(EventHandler.class);
doAnswer((Answer<AsyncResult<MessageDTO>>) arguments -> {
AsyncResult<MessageDTO> asyncResultForMessageDTO = Mockito.mock(AsyncResult.class);
Mockito.when(asyncResultForMessageDTO.succeeded()).thenReturn(true);
MessageDTO messageDTO = new MessageDTO();
messageDTO.setRid("1001");
messageDTO.setReg_type(RegistrationType.NEW.name());
Mockito.when(asyncResultForMessageDTO.result()).thenReturn(messageDTO);
((Handler<AsyncResult<MessageDTO>>) arguments.getArgument(1))
.handle(asyncResultForMessageDTO);
return null;
}).when(eventHandler).handle(any(), any());
return eventHandler;
}

private KafkaConsumerRecord<String, String> prepareKafkaConsumerRecord() {
KafkaConsumerRecord<String, String> record = Mockito.mock(KafkaConsumerRecord.class);
Mockito.when(record.headers()).thenReturn(new ArrayList<>());
Mockito.when(record.key()).thenReturn("1001");
Mockito.when(record.value()).thenReturn("{\"rid\":\"1001\", \"reg_type\": \"NEW\" }");
Mockito.when(record.topic()).thenReturn(MessageBusAddress.PACKET_VALIDATOR_BUS_IN.getAddress());
Mockito.when(record.partition()).thenReturn(0);
Mockito.when(record.offset()).thenReturn(10L);
return record;
}

@After
Expand Down Expand Up @@ -306,6 +367,75 @@ public void testConsumeAndSendWithSingleCommitType(TestContext testContext) {
verify(kafkaProducer, times(testDataCount)).write(any(), any());
}


@Test
public void testProcessRecordCommitsAfterProducerWriteSuccess(TestContext testContext) {
Async async = testContext.async();
kafkaMosipEventBus = new KafkaMosipEventBus(vertx, "localhost:9091", "group_1",
"single", "100", "30000", 60000, eventTracingHandler, caffeineCacheManager);
mockCommitResult(true);

Future<Void> result = kafkaMosipEventBus.processRecord(MessageBusAddress.PACKET_UPLOADER_OUT,
prepareSuccessfulEventHandler(), prepareKafkaConsumerRecord(), true);

result.onComplete(handler -> {
assertTrue(handler.succeeded());
InOrder inOrder = Mockito.inOrder(kafkaProducer, kafkaConsumer);
inOrder.verify(kafkaProducer, times(1)).write(any(KafkaProducerRecord.class), any(Handler.class));
inOrder.verify(kafkaConsumer, times(1)).commit(anyMap(), any());
async.complete();
});
async.await();
}

@Test
public void testProcessRecordFailsAndDoesNotCommitWhenProducerWriteFails(TestContext testContext) {
Async async = testContext.async();
RuntimeException producerException = new RuntimeException("producer write failed");
mockKafkaProducerWriteResult(true, producerException);
mockResumeResult(true);
kafkaMosipEventBus = new KafkaMosipEventBus(vertx, "localhost:9091", "group_1",
"single", "100", "30000", 60000, eventTracingHandler, caffeineCacheManager);

Future<Void> result = kafkaMosipEventBus.processRecord(MessageBusAddress.PACKET_UPLOADER_OUT,
prepareSuccessfulEventHandler(), prepareKafkaConsumerRecord(), true);

result.onComplete(handler -> {
assertTrue(handler.failed());
assertEquals(producerException, handler.cause());
verify(kafkaProducer, times(1)).write(any(KafkaProducerRecord.class), any(Handler.class));
verify(kafkaConsumer, times(0)).commit(anyMap(), any());
verify(kafkaConsumer, times(1)).resume(
any(io.vertx.kafka.client.common.TopicPartition.class), any());
async.complete();
});
async.await();
}

@Test
public void testProcessRecordFailsBatchFutureWhenProducerWriteFails(TestContext testContext) {
Async async = testContext.async();
RuntimeException producerException = new RuntimeException("producer write failed");
mockKafkaProducerWriteResult(true, producerException);
mockResumeResult(true);
kafkaMosipEventBus = new KafkaMosipEventBus(vertx, "localhost:9091", "group_1",
"batch", "100", "30000", 60000, eventTracingHandler, caffeineCacheManager);

Future<Void> result = kafkaMosipEventBus.processRecord(MessageBusAddress.PACKET_UPLOADER_OUT,
prepareSuccessfulEventHandler(), prepareKafkaConsumerRecord(), false);

result.onComplete(handler -> {
assertTrue(handler.failed());
assertEquals(producerException, handler.cause());
verify(kafkaProducer, times(1)).write(any(KafkaProducerRecord.class), any(Handler.class));
verify(kafkaConsumer, times(0)).commit(anyMap(), any());
verify(kafkaConsumer, times(1)).resume(
any(io.vertx.kafka.client.common.TopicPartition.class), any());
async.complete();
});
async.await();
}

@Test
public void testConsumeWithAutoCommitType(TestContext testContext) {
int testDataCount = 20;
Expand Down