Skip to content
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import com.google.protobuf.util.JsonFormat;
import com.google.protobuf.util.JsonFormat.Printer;

import io.grpc.Context;
import io.grpc.ManagedChannel;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
Expand All @@ -47,6 +48,7 @@ public class GrpcTask extends AbstractTask {

private GrpcParameters grpcParameters;
private TaskExecutionContext taskExecutionContext;
private volatile Context.CancellableContext cancellableContext;

/**
* constructor
Expand All @@ -71,36 +73,87 @@ public void init() {

@Override
public void handle(TaskCallBack taskCallBack) throws TaskException {
ManagedChannel channel = null;
try {
ManagedChannel channel;
if (grpcParameters.getChannelCredentialType() == GrpcCredentialType.TLS_DEFAULT) {
TlsChannelCredentials creds = (TlsChannelCredentials) TlsChannelCredentials.create();
channel = GrpcDynamicService.ChannelFactory.createChannel(grpcParameters.getUrl(), creds);
} else {
channel = GrpcDynamicService.ChannelFactory.createChannel(grpcParameters.getUrl());
}

Descriptors.FileDescriptor fileDesc =
JSONDescriptorHelper.fileDescFromJSON(grpcParameters.getGrpcServiceDefinitionJSON());
GrpcDynamicService stubService = new GrpcDynamicService(channel, fileDesc);
DynamicMessage message = stubService.call(grpcParameters.getMethodName(), grpcParameters.getMessage(),
grpcParameters.getConnectTimeoutMs());
Printer printer = JsonFormat.printer().omittingInsignificantWhitespace();
addDefaultOutput(printer.print(message));
} catch (StatusRuntimeException statusre) {
validateResponse(statusre.getStatus());
return;
} catch (Exception e) {
throw new GrpcTaskException("gRPC handle exception:", e);

// → Attach a cancellable gRPC Context to support external cancellation.
// This context propagates cancellation signals to the underlying RPC call.
this.cancellableContext = (Context.CancellableContext) Context.current().withCancellation().attach();
Context previous = this.cancellableContext;
Copy link

Copilot AI Nov 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Incorrect context management pattern. The attach() method returns the previous context that was attached, not the newly created cancellable context. This code incorrectly tries to cast the return value to CancellableContext and then assigns it again to previous.

The correct pattern should be:

this.cancellableContext = Context.current().withCancellation();
Context previous = this.cancellableContext.attach();

This separates the creation of the cancellable context from the attachment, and correctly captures the previous context returned by attach().

Suggested change
this.cancellableContext = (Context.CancellableContext) Context.current().withCancellation().attach();
Context previous = this.cancellableContext;
this.cancellableContext = Context.current().withCancellation();
Context previous = this.cancellableContext.attach();

Copilot uses AI. Check for mistakes.

try {
GrpcDynamicService stubService = new GrpcDynamicService(channel, fileDesc);

// → Perform the actual blocking gRPC call.
// If cancel() is invoked concurrently, this call will throw StatusRuntimeException(CANCELLED).
DynamicMessage message = stubService.call(
grpcParameters.getMethodName(),
grpcParameters.getMessage(),
grpcParameters.getConnectTimeoutMs());

// → Format and store the response as task output
Printer printer = JsonFormat.printer().omittingInsignificantWhitespace();
addDefaultOutput(printer.print(message));
validateResponse(Status.OK);
} finally {
// → Detach the cancellable context to restore the previous context and avoid leaks
this.cancellableContext.detach(previous);
this.cancellableContext = null; // Clear reference for GC and idempotent cancel
}
} catch (StatusRuntimeException ex) {
validateResponse(ex.getStatus());
} catch (Exception ex) {
setExitStatusCode(TaskConstants.EXIT_CODE_FAILURE);
throw new GrpcTaskException("gRPC handle exception", ex);
} finally {
// → Gracefully shut down the gRPC channel to release network resources
if (channel != null) {
channel.shutdown();
}
}
validateResponse(Status.OK);
}

@Override
public void cancel() throws TaskException {
// Do nothing when task to be canceled
public void cancel() {
Copy link

Copilot AI Nov 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The cancel() method signature should declare throws TaskException to match the abstract method in AbstractTask. All other task implementations (ShellTask, SqlTask, etc.) follow this pattern and declare throws TaskException.

Suggested change
public void cancel() {
public void cancel() throws TaskException {

Copilot uses AI. Check for mistakes.
// → Read volatile reference once for thread safety (avoid repeated reads under race conditions)
Context.CancellableContext ctx = this.cancellableContext;

if (ctx != null && !ctx.isCancelled()) {
try {
log.debug("Canceling gRPC task: method={}", grpcParameters.getMethodName());

// → Trigger gRPC cancellation by canceling the context.
// This interrupts the ongoing RPC and causes stubService.call() to throw CANCELLED.
ctx.cancel(new TaskException("gRPC task was canceled by user"));

// → Record user intent: task was explicitly killed, not failed
setExitStatusCode(TaskConstants.EXIT_CODE_KILL);
log.debug("gRPC task was successfully canceled");
} catch (Exception ex) {
log.warn("Failed to cancel gRPC context", ex);
throw new TaskException("Cancel gRPC task failed", ex);
Copy link

Copilot AI Nov 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] The cancel() method wraps the caught exception in a TaskException and re-throws it, but this may cause issues since cancel() is typically called from a different thread (the cancellation thread). Consider logging the error instead of throwing an exception, as throwing from cancel() may not properly propagate to the caller. Most other task implementations (e.g., ShellTask) throw from cancel, so this follows the pattern, but the additional wrapping might not be necessary.

Suggested change
throw new TaskException("Cancel gRPC task failed", ex);
// Do not throw from cancel(); just log the error.

Copilot uses AI. Check for mistakes.
}
} else {
// → No active context: task may not have started, already finished, or already canceled
log.debug("gRPC task cancel requested, but no active cancellable context.");
}
}

private void validateResponse(Status statusCode) {
if (exitStatusCode == TaskConstants.EXIT_CODE_KILL) {
log.info("this gRPC task has been killed");
return;
}

switch (grpcParameters.getGrpcCheckCondition()) {
case STATUS_CODE_DEFAULT:
if (!statusCode.isOk()) {
Expand Down
Loading