Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
import java.util.Map;
import java.util.Set;


import io.vertx.core.*;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
Expand All @@ -26,6 +28,7 @@
import io.mosip.registration.processor.core.code.ModuleName;
import io.mosip.registration.processor.core.code.RegistrationTransactionStatusCode;
import io.mosip.registration.processor.core.code.RegistrationTransactionTypeCode;
import io.mosip.registration.processor.core.constant.AuditLogConstant;
import io.mosip.registration.processor.core.constant.LoggerFileConstant;
import io.mosip.registration.processor.core.exception.util.PlatformErrorMessages;
import io.mosip.registration.processor.core.exception.util.PlatformSuccessMessages;
Expand All @@ -40,8 +43,6 @@
import io.mosip.registration.processor.status.dto.RegistrationStatusDto;
import io.mosip.registration.processor.status.exception.TablenotAccessibleException;
import io.mosip.registration.processor.status.service.RegistrationStatusService;
import io.vertx.core.AsyncResult;
import io.vertx.core.Vertx;
import io.vertx.core.eventbus.EventBus;
import io.vertx.core.json.JsonObject;

Expand Down Expand Up @@ -96,9 +97,6 @@ public class ReprocessorVerticle extends MosipVerticleAPIManager {
@Value("#{'${registration.processor.reprocess.restart-trigger-filter}'.split(',')}")
private List<String> reprocessRestartTriggerFilter;

/** The is transaction successful. */
boolean isTransactionSuccessful;

/** The registration status service. */
@Autowired
RegistrationStatusService<String, InternalRegistrationStatusDto, RegistrationStatusDto> registrationStatusService;
Expand Down Expand Up @@ -218,6 +216,9 @@ public void start() {
*/
@Override
public MessageDTO process(MessageDTO object) {

boolean isReprocessorSuccessful = false;
StringBuffer ridSb=new StringBuffer();
List<InternalRegistrationStatusDto> reprocessorDtoList = null;
LogDescription description = new LogDescription();
List<String> statusList = new ArrayList<>();
Expand All @@ -226,9 +227,8 @@ public MessageDTO process(MessageDTO object) {
statusList.add(RegistrationTransactionStatusCode.IN_PROGRESS.toString());
regProcLogger.debug(LoggerFileConstant.SESSIONID.toString(), LoggerFileConstant.REGISTRATIONID.toString(), "",
"ReprocessorVerticle::process()::entry");
StringBuffer ridSb=new StringBuffer();
Comment thread
MuralitharanK marked this conversation as resolved.
try {
Map<String, Set<String>> reprocessRestartTriggerMap = intializeReprocessRestartTriggerMapping();
Map<String, Set<String>> reprocessRestartTriggerMap = initializeReprocessRestartTriggerMapping();
reprocessorDtoList = registrationStatusService.getResumablePackets(fetchSize);
if (!CollectionUtils.isEmpty(reprocessorDtoList)) {
if (reprocessorDtoList.size() < fetchSize) {
Expand All @@ -243,91 +243,62 @@ public MessageDTO process(MessageDTO object) {
reprocessCount, statusList, reprocessExcludeStageNames);
}


if (!CollectionUtils.isEmpty(reprocessorDtoList)) {
regProcLogger.info(LoggerFileConstant.SESSIONID.toString(), LoggerFileConstant.REGISTRATIONID.toString(),
null, "Reprocess count - {}", reprocessorDtoList.size());
List<Future> futures = new ArrayList<>();
reprocessorDtoList.forEach(dto -> {
String registrationId = dto.getRegistrationId();
ridSb.append(registrationId);
ridSb.append(",");
MessageDTO messageDTO = new MessageDTO();
messageDTO.setRid(registrationId);
messageDTO.setReg_type(dto.getRegistrationType());
messageDTO.setSource(dto.getSource());
messageDTO.setIteration(dto.getIteration());
messageDTO.setWorkflowInstanceId(dto.getWorkflowInstanceId());
if (reprocessCount.equals(dto.getReProcessRetryCount())) {
dto.setLatestTransactionStatusCode(
RegistrationTransactionStatusCode.REPROCESS_FAILED.toString());
dto.setLatestTransactionTypeCode(
RegistrationTransactionTypeCode.PACKET_REPROCESS.toString());
dto.setStatusComment(StatusUtil.RE_PROCESS_FAILED.getMessage());
dto.setStatusCode(RegistrationStatusCode.REPROCESS_FAILED.toString());
dto.setSubStatusCode(StatusUtil.RE_PROCESS_FAILED.getCode());
messageDTO.setIsValid(false);
description.setMessage(PlatformSuccessMessages.RPR_RE_PROCESS_FAILED.getMessage());
description.setCode(PlatformSuccessMessages.RPR_RE_PROCESS_FAILED.getCode());

} else {
messageDTO.setIsValid(true);
isTransactionSuccessful = true;
String stageName;
if (isRestartFromStageRequired(dto, reprocessRestartTriggerMap)) {
stageName = MessageBusUtil.getMessageBusAdress(reprocessRestartFromStage);
stageName = stageName.concat(ReprocessorConstants.BUS_IN);
sendAndSetStatus(dto, messageDTO, stageName);
dto.setStatusComment(StatusUtil.RE_PROCESS_RESTART_FROM_STAGE.getMessage());
dto.setSubStatusCode(StatusUtil.RE_PROCESS_RESTART_FROM_STAGE.getCode());
description
.setMessage(
PlatformSuccessMessages.RPR_SENT_TO_REPROCESS_RESTART_FROM_STAGE_SUCCESS
.getMessage());
description.setCode(
PlatformSuccessMessages.RPR_SENT_TO_REPROCESS_RESTART_FROM_STAGE_SUCCESS
.getCode());

} else {
stageName = MessageBusUtil.getMessageBusAdress(dto.getRegistrationStageName());
if (RegistrationTransactionStatusCode.SUCCESS.name()
.equalsIgnoreCase(dto.getLatestTransactionStatusCode())) {
stageName = stageName.concat(ReprocessorConstants.BUS_OUT);
ridSb.append(dto.getRegistrationId()).append(",");
Promise<Void> promise = Promise.promise();
vertx.executeBlocking(p -> {
processDTO(reprocessRestartTriggerMap, dto);
p.complete();
}, false, res -> {
if (res.succeeded()) {
promise.complete();
} else {
stageName = stageName.concat(ReprocessorConstants.BUS_IN);
promise.fail(res.cause());
}
sendAndSetStatus(dto, messageDTO, stageName);
dto.setStatusComment(StatusUtil.RE_PROCESS_COMPLETED.getMessage());
dto.setSubStatusCode(StatusUtil.RE_PROCESS_COMPLETED.getCode());
description.setMessage(PlatformSuccessMessages.RPR_SENT_TO_REPROCESS_SUCCESS.getMessage());
description.setCode(PlatformSuccessMessages.RPR_SENT_TO_REPROCESS_SUCCESS.getCode());
});
futures.add(promise.future());
});
CompositeFuture.all(futures).onComplete(ar -> {
boolean isBatchSuccessful = ar.succeeded();
try {
if (!isBatchSuccessful) {
regProcLogger.error(LoggerFileConstant.SESSIONID.toString(), LoggerFileConstant.REGISTRATIONID.toString(),
null, PlatformErrorMessages.REPROCESSOR_VERTICLE_FAILED.getMessage()
+ ExceptionUtils.getStackTrace(ar.cause()));
}
} finally {
String message = isBatchSuccessful ? PlatformSuccessMessages.RPR_RE_PROCESS_SUCCESS.getMessage() :
PlatformSuccessMessages.RPR_RE_PROCESS_FAILED.getMessage();
regProcLogger.info(LoggerFileConstant.SESSIONID.toString(), LoggerFileConstant.REGISTRATIONID.toString(),
null, message);
String eventId = isBatchSuccessful ? EventId.RPR_402.toString() : EventId.RPR_405.toString();
String eventName = isBatchSuccessful ? EventName.UPDATE.toString() : EventName.EXCEPTION.toString();
String eventType = isBatchSuccessful ? EventType.BUSINESS.toString() : EventType.SYSTEM.toString();
/** Module-Id can be Both Success/Error code */
String moduleId = isBatchSuccessful ? PlatformSuccessMessages.RPR_RE_PROCESS_SUCCESS.getCode() :
PlatformSuccessMessages.RPR_RE_PROCESS_FAILED.getCode();;
String moduleName = ModuleName.RE_PROCESSOR.toString();
auditLogRequestBuilder.createAuditRequestBuilder(message, eventId, eventName, eventType,
moduleId, moduleName, (ridSb.toString().length()>1?ridSb.substring(0,ridSb.length()-1):""));
Comment on lines +282 to +286

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Double semicolon on line 283 and unnecessary toString() call on line 286.

Line 283 has a stray extra semicolon (getCode();;). Line 286 calls ridSb.toString().length(), which allocates a temporary String object just to read the length; ridSb.length() is equivalent and allocation-free.

♻️ Proposed fixes
 					String moduleId = isBatchSuccessful ? PlatformSuccessMessages.RPR_RE_PROCESS_SUCCESS.getCode() :
-							PlatformSuccessMessages.RPR_RE_PROCESS_FAILED.getCode();;
+							PlatformSuccessMessages.RPR_RE_PROCESS_FAILED.getCode();
 					String moduleName = ModuleName.RE_PROCESSOR.toString();
 					auditLogRequestBuilder.createAuditRequestBuilder(message, eventId, eventName, eventType,
-							moduleId, moduleName, (ridSb.toString().length()>1?ridSb.substring(0,ridSb.length()-1):""));
+							moduleId, moduleName, (ridSb.length() > 1 ? ridSb.substring(0, ridSb.length()-1) : ""));
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@registration-processor/workflow-engine/registration-processor-reprocessor/src/main/java/io/mosip/registration/processor/reprocessor/verticle/ReprocessorVerticle.java`
around lines 282 - 286, Remove the stray extra semicolon after getCode() when
assigning moduleId, avoid the unnecessary enum toString() call by using
ModuleName.RE_PROCESSOR.name() (or otherwise assign the enum's String name) for
moduleName, and replace ridSb.toString().length() with ridSb.length() to avoid
creating a temporary String; these changes should be applied around the
moduleId/moduleName assignment and the call to
auditLogRequestBuilder.createAuditRequestBuilder(...) in ReprocessorVerticle.

}
regProcLogger.info(LoggerFileConstant.SESSIONID.toString(),
LoggerFileConstant.REGISTRATIONID.toString(), registrationId, description.getMessage());

/** Module-Id can be Both Success/Error code */
String moduleId = PlatformSuccessMessages.RPR_SENT_TO_REPROCESS_SUCCESS.getCode();
String moduleName = ModuleName.RE_PROCESSOR.toString();
registrationStatusService.updateRegistrationStatusForWorkflowEngine(dto, moduleId, moduleName);
String eventId = EventId.RPR_402.toString();
String eventName = EventName.UPDATE.toString();
String eventType = EventType.BUSINESS.toString();

if (!isTransactionSuccessful)
auditLogRequestBuilder.createAuditRequestBuilder(description.getMessage(), eventId, eventName,
eventType, moduleId, moduleName, registrationId);
});
Comment on lines +265 to 288

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

CompositeFuture.all() fails fast — batch audit fires before all DTOs complete on partial failure.

CompositeFuture.all() fails immediately when any future fails, while CompositeFuture.join() continues to run all remaining futures even after one fails. With the current all() semantics, if the first DTO's executeBlocking fails, onComplete fires immediately with isBatchSuccessful = false, logging the batch audit while the remaining N-1 worker threads are still executing processDTO. Those threads will still call registrationStatusService.updateRegistrationStatusForWorkflowEngine() concurrently after the batch audit has already been emitted.

For batch-processing semantics (process all, then audit), Future.join() (or CompositeFuture.join()) is more appropriate:

♻️ Proposed fix
-			Future.all(futures).onComplete(ar -> {
+			Future.join(futures).onComplete(ar -> {
 				boolean isBatchSuccessful = ar.succeeded();
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@registration-processor/workflow-engine/registration-processor-reprocessor/src/main/java/io/mosip/registration/processor/reprocessor/verticle/ReprocessorVerticle.java`
around lines 265 - 288, The batch currently uses CompositeFuture.all(futures)
which fails fast and triggers the audit before all per-DTO executeBlocking tasks
finish; change CompositeFuture.all(futures) to CompositeFuture.join(futures) so
the handler runs only after every future completes (success or failure), keep
the isBatchSuccessful = ar.succeeded() check, and leave the existing
logging/audit call sequence (regProcLogger.error/ regProcLogger.info and
auditLogRequestBuilder.createAuditRequestBuilder) intact so the batch audit is
emitted only after all worker futures have finished.


}
isReprocessorSuccessful = true;
} catch (TablenotAccessibleException e) {
isTransactionSuccessful = false;
isReprocessorSuccessful = false;
object.setInternalError(Boolean.TRUE);
description.setMessage(PlatformErrorMessages.RPR_RGS_REGISTRATION_TABLE_NOT_ACCESSIBLE.getMessage());
description.setCode(PlatformErrorMessages.RPR_RGS_REGISTRATION_TABLE_NOT_ACCESSIBLE.getCode());
regProcLogger.error(LoggerFileConstant.SESSIONID.toString(),
description.getCode() + " -- ",
PlatformErrorMessages.RPR_RGS_REGISTRATION_TABLE_NOT_ACCESSIBLE.getMessage(), e.toString());

}catch (Exception ex) {
isTransactionSuccessful = false;
} catch (Exception ex) {
isReprocessorSuccessful = false;
description.setMessage(PlatformErrorMessages.REPROCESSOR_VERTICLE_FAILED.getMessage());
description.setCode(PlatformErrorMessages.REPROCESSOR_VERTICLE_FAILED.getCode());
regProcLogger.error(LoggerFileConstant.SESSIONID.toString(), LoggerFileConstant.REGISTRATIONID.toString(),
Expand All @@ -339,25 +310,98 @@ public MessageDTO process(MessageDTO object) {
} finally {
regProcLogger.info(LoggerFileConstant.SESSIONID.toString(), LoggerFileConstant.REGISTRATIONID.toString(),
null, description.getMessage());
if (isTransactionSuccessful)
if (isReprocessorSuccessful)
description.setMessage(PlatformSuccessMessages.RPR_RE_PROCESS_SUCCESS.getMessage());

String eventId = isTransactionSuccessful ? EventId.RPR_402.toString() : EventId.RPR_405.toString();
String eventName = isTransactionSuccessful ? EventName.UPDATE.toString() : EventName.EXCEPTION.toString();
String eventType = isTransactionSuccessful ? EventType.BUSINESS.toString() : EventType.SYSTEM.toString();
String eventId = isReprocessorSuccessful ? EventId.RPR_402.toString() : EventId.RPR_405.toString();
String eventName = isReprocessorSuccessful ? EventName.UPDATE.toString() : EventName.EXCEPTION.toString();
String eventType = isReprocessorSuccessful ? EventType.BUSINESS.toString() : EventType.SYSTEM.toString();

/** Module-Id can be Both Success/Error code */
String moduleId = isTransactionSuccessful ? PlatformSuccessMessages.RPR_RE_PROCESS_SUCCESS.getCode()
String moduleId = isReprocessorSuccessful ? PlatformSuccessMessages.RPR_RE_PROCESS_SUCCESS.getCode()
: description.getCode();
String moduleName = ModuleName.RE_PROCESSOR.toString();
auditLogRequestBuilder.createAuditRequestBuilder(description.getMessage(), eventId, eventName, eventType,
moduleId, moduleName, (ridSb.toString().length()>1?ridSb.substring(0,ridSb.length()-1):""));
Comment thread
MuralitharanK marked this conversation as resolved.
moduleId, moduleName, AuditLogConstant.MULTIPLE_ID.name());
}

return object;
}

private Map<String, Set<String>> intializeReprocessRestartTriggerMapping() {
private void processDTO(Map<String, Set<String>> reprocessRestartTriggerMap, InternalRegistrationStatusDto dto) {

boolean isTransactionSuccessful = false;
LogDescription description = new LogDescription();

String registrationId = dto.getRegistrationId();
MessageDTO messageDTO = new MessageDTO();
messageDTO.setRid(registrationId);
messageDTO.setReg_type(dto.getRegistrationType());
messageDTO.setSource(dto.getSource());
messageDTO.setIteration(dto.getIteration());
messageDTO.setWorkflowInstanceId(dto.getWorkflowInstanceId());
if (reprocessCount.equals(dto.getReProcessRetryCount())) {
dto.setLatestTransactionStatusCode(
RegistrationTransactionStatusCode.REPROCESS_FAILED.toString());
dto.setLatestTransactionTypeCode(
RegistrationTransactionTypeCode.PACKET_REPROCESS.toString());
dto.setStatusComment(StatusUtil.RE_PROCESS_FAILED.getMessage());
dto.setStatusCode(RegistrationStatusCode.REPROCESS_FAILED.toString());
dto.setSubStatusCode(StatusUtil.RE_PROCESS_FAILED.getCode());
messageDTO.setIsValid(false);
description.setMessage(PlatformSuccessMessages.RPR_RE_PROCESS_FAILED.getMessage());
description.setCode(PlatformSuccessMessages.RPR_RE_PROCESS_FAILED.getCode());

} else {
messageDTO.setIsValid(true);
isTransactionSuccessful = true;
String stageName;
if (isRestartFromStageRequired(dto, reprocessRestartTriggerMap)) {
stageName = MessageBusUtil.getMessageBusAdress(reprocessRestartFromStage);
stageName = stageName.concat(ReprocessorConstants.BUS_IN);
sendAndSetStatus(dto, messageDTO, stageName);
dto.setStatusComment(StatusUtil.RE_PROCESS_RESTART_FROM_STAGE.getMessage());
dto.setSubStatusCode(StatusUtil.RE_PROCESS_RESTART_FROM_STAGE.getCode());
description
.setMessage(
PlatformSuccessMessages.RPR_SENT_TO_REPROCESS_RESTART_FROM_STAGE_SUCCESS
.getMessage());
description.setCode(
PlatformSuccessMessages.RPR_SENT_TO_REPROCESS_RESTART_FROM_STAGE_SUCCESS
.getCode());

} else {
stageName = MessageBusUtil.getMessageBusAdress(dto.getRegistrationStageName());
if (RegistrationTransactionStatusCode.SUCCESS.name()
.equalsIgnoreCase(dto.getLatestTransactionStatusCode())) {
stageName = stageName.concat(ReprocessorConstants.BUS_OUT);
} else {
stageName = stageName.concat(ReprocessorConstants.BUS_IN);
}
sendAndSetStatus(dto, messageDTO, stageName);
dto.setStatusComment(StatusUtil.RE_PROCESS_COMPLETED.getMessage());
dto.setSubStatusCode(StatusUtil.RE_PROCESS_COMPLETED.getCode());
description.setMessage(PlatformSuccessMessages.RPR_SENT_TO_REPROCESS_SUCCESS.getMessage());
description.setCode(PlatformSuccessMessages.RPR_SENT_TO_REPROCESS_SUCCESS.getCode());
}
}
regProcLogger.info(LoggerFileConstant.SESSIONID.toString(),
LoggerFileConstant.REGISTRATIONID.toString(), registrationId, description.getMessage());

/** Module-Id can be Both Success/Error code */
String moduleId = PlatformSuccessMessages.RPR_SENT_TO_REPROCESS_SUCCESS.getCode();
String moduleName = ModuleName.RE_PROCESSOR.toString();
registrationStatusService.updateRegistrationStatusForWorkflowEngine(dto, moduleId, moduleName);
String eventId = EventId.RPR_402.toString();
String eventName = EventName.UPDATE.toString();
String eventType = EventType.BUSINESS.toString();

if (!isTransactionSuccessful) {
auditLogRequestBuilder.createAuditRequestBuilder(description.getMessage(), eventId, eventName,
eventType, moduleId, moduleName, registrationId);
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Comment thread
coderabbitai[bot] marked this conversation as resolved.
}

private Map<String, Set<String>> initializeReprocessRestartTriggerMapping() {
Map<String, Set<String>> reprocessRestartTriggerMap = new HashMap<String, Set<String>>();
for (String filter : reprocessRestartTriggerFilter) {
String[] stageAndStatus = filter.split(":");
Expand Down