Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.net.ssl.SSLException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -138,9 +139,12 @@ public void channelInactive(ChannelHandlerContext ctx) {
logger.trace("Channel {} inactive", ctx.channel());
nettyMetrics.channelDestructionRate.mark();
if (request != null && request.isOpen()) {
logger.error("Request {} was aborted because the channel {} became inactive", request.getUri(), ctx.channel());
nettyMetrics.channelInactiveWithActiveRequestCount.inc();
logger.error("Request {} was aborted because the channel {} became inactive. Method: {}, bytesReceived: {}",
request.getUri(), ctx.channel(), request.getRestMethod(), request.getBlobBytesReceived());
onRequestAborted(Utils.convertToClientTerminationException(new ClosedChannelException()));
} else {
nettyMetrics.channelInactiveWithoutActiveRequestCount.inc();
if (request != null) {
logger.error("Request {} on channel {} was already closed", request.getUri(), ctx.channel());
}
Expand All @@ -164,7 +168,10 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E
try {
if (request != null && request.isOpen() && cause instanceof Exception) {
nettyMetrics.processorExceptionCaughtCount.inc();
logger.error("Swallowing request {} error on channel {}", request.getUri(), ctx.channel(), cause);
classifyAndCountException(cause);
logger.error("Swallowing request {} error on channel {}. ChannelActive: {}, exception: {} - {}",
request.getUri(), ctx.channel(), ctx.channel().isActive(), cause.getClass().getSimpleName(),
cause.getMessage(), cause);
onRequestAborted((Exception) cause);
} else if (isOpen()) {
if (cause instanceof RestServiceException) {
Expand Down Expand Up @@ -201,6 +208,27 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E
}
}

/**
* Classifies the exception and increments counters for common network-termination failure categories.
* @param cause the exception to classify.
*/
private void classifyAndCountException(Throwable cause) {
if (cause instanceof ClosedChannelException) {
nettyMetrics.exceptionCaughtClosedChannelCount.inc();
} else if (cause instanceof SSLException) {
nettyMetrics.exceptionCaughtSSLExceptionCount.inc();
} else if (cause instanceof IOException) {
String message = cause.getMessage();
if (message != null && message.contains("Connection reset")) {
nettyMetrics.exceptionCaughtConnectionResetCount.inc();
} else if (message != null && message.contains("Broken pipe")) {
nettyMetrics.exceptionCaughtBrokenPipeCount.inc();
} else {
nettyMetrics.exceptionCaughtOtherIOExceptionCount.inc();
}
}
}

/**
* Netty calls this function when events that we have registered for, occur (in this case we are specifically waiting
* for {@link IdleStateEvent} so that we close connections that have been idle too long - maybe due to client failure)
Expand Down
33 changes: 33 additions & 0 deletions ambry-rest/src/main/java/com/github/ambry/rest/NettyMetrics.java
Original file line number Diff line number Diff line change
Expand Up @@ -106,15 +106,26 @@ public class NettyMetrics {
// NettyMessageProcessor
public final Histogram channelReadIntervalInMs;
public final Counter idleConnectionCloseCount;
public final Counter channelInactiveWithActiveRequestCount;
public final Counter channelInactiveWithoutActiveRequestCount;
public final Counter processorErrorAfterCloseCount;
public final Counter processorExceptionCaughtCount;
public final Counter exceptionCaughtConnectionResetCount;
public final Counter exceptionCaughtBrokenPipeCount;
public final Counter exceptionCaughtClosedChannelCount;
public final Counter exceptionCaughtSSLExceptionCount;
public final Counter exceptionCaughtOtherIOExceptionCount;
public final Counter processorIOExceptionCount;
public final Counter processorRestServiceExceptionCount;
public final Counter processorThrowableCount;
public final Counter processorUnknownExceptionCount;

// NettyResponseChannel
public final Counter clientEarlyTerminationCount;
public final Counter clientTerminationOnActiveChannelCount;
public final Counter clientTerminationOnInactiveChannelCount;
public final Counter errorResponseSentCount;
public final Counter errorResponseNotSentCount;
public final Counter acceptedCount;
public final Counter createdCount;
public final Counter okCount;
Expand Down Expand Up @@ -281,10 +292,24 @@ public NettyMetrics(MetricRegistry metricRegistry) {
metricRegistry.histogram(MetricRegistry.name(NettyMessageProcessor.class, "ChannelReadIntervalInMs"));
idleConnectionCloseCount =
metricRegistry.counter(MetricRegistry.name(NettyMessageProcessor.class, "IdleConnectionCloseCount"));
channelInactiveWithActiveRequestCount =
metricRegistry.counter(MetricRegistry.name(NettyMessageProcessor.class, "ChannelInactiveWithActiveRequestCount"));
channelInactiveWithoutActiveRequestCount = metricRegistry.counter(
MetricRegistry.name(NettyMessageProcessor.class, "ChannelInactiveWithoutActiveRequestCount"));
processorErrorAfterCloseCount =
metricRegistry.counter(MetricRegistry.name(NettyMessageProcessor.class, "ErrorAfterCloseCount"));
processorExceptionCaughtCount =
metricRegistry.counter(MetricRegistry.name(NettyMessageProcessor.class, "ExceptionCaughtCount"));
exceptionCaughtConnectionResetCount =
metricRegistry.counter(MetricRegistry.name(NettyMessageProcessor.class, "ExceptionCaughtConnectionResetCount"));
exceptionCaughtBrokenPipeCount =
metricRegistry.counter(MetricRegistry.name(NettyMessageProcessor.class, "ExceptionCaughtBrokenPipeCount"));
exceptionCaughtClosedChannelCount =
metricRegistry.counter(MetricRegistry.name(NettyMessageProcessor.class, "ExceptionCaughtClosedChannelCount"));
exceptionCaughtSSLExceptionCount =
metricRegistry.counter(MetricRegistry.name(NettyMessageProcessor.class, "ExceptionCaughtSSLExceptionCount"));
exceptionCaughtOtherIOExceptionCount = metricRegistry.counter(
MetricRegistry.name(NettyMessageProcessor.class, "ExceptionCaughtOtherIOExceptionCount"));
processorIOExceptionCount =
metricRegistry.counter(MetricRegistry.name(NettyMessageProcessor.class, "IOExceptionCount"));
processorRestServiceExceptionCount =
Expand All @@ -296,6 +321,14 @@ public NettyMetrics(MetricRegistry metricRegistry) {
// NettyResponseChannel
clientEarlyTerminationCount =
metricRegistry.counter(MetricRegistry.name(NettyResponseChannel.class, "ClientEarlyTerminationCount"));
clientTerminationOnActiveChannelCount =
metricRegistry.counter(MetricRegistry.name(NettyResponseChannel.class, "ClientTerminationOnActiveChannelCount"));
clientTerminationOnInactiveChannelCount = metricRegistry.counter(
MetricRegistry.name(NettyResponseChannel.class, "ClientTerminationOnInactiveChannelCount"));
errorResponseSentCount =
metricRegistry.counter(MetricRegistry.name(NettyResponseChannel.class, "ErrorResponseSentCount"));
errorResponseNotSentCount =
metricRegistry.counter(MetricRegistry.name(NettyResponseChannel.class, "ErrorResponseNotSentCount"));
acceptedCount = metricRegistry.counter(MetricRegistry.name(NettyResponseChannel.class, "AcceptedCount"));
createdCount = metricRegistry.counter(MetricRegistry.name(NettyResponseChannel.class, "CreatedCount"));
okCount = metricRegistry.counter(MetricRegistry.name(NettyResponseChannel.class, "OkCount"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -548,15 +548,28 @@ private boolean maybeSendErrorResponse(Exception exception) {
long processingStartTime = System.currentTimeMillis();
boolean responseSent = false;
logger.trace("Sending error response to client on channel {}", ctx.channel());
// If this is a likely client disconnect and the channel is already inactive, classify it as 4xx
// for accounting purposes but skip any response write attempt.
if (Utils.isPossibleClientTermination(exception) && !ctx.channel().isActive()) {
nettyMetrics.clientEarlyTerminationCount.inc();
nettyMetrics.clientTerminationOnInactiveChannelCount.inc();
errorResponseStatus = ResponseStatus.BadRequest;
responseStatus = errorResponseStatus;
logger.debug("Skipping error response write for client termination on inactive channel {}", ctx.channel());
nettyMetrics.errorResponseNotSentCount.inc();
return false;
}
FullHttpResponse errorResponse = getErrorResponse(exception);
if (maybeWriteResponseMetadata(errorResponse, new ErrorResponseWriteListener())) {
logger.trace("Scheduled error response sending on channel {}", ctx.channel());
responseStatus = errorResponseStatus;
responseSent = true;
nettyMetrics.errorResponseSentCount.inc();
long processingTime = System.currentTimeMillis() - processingStartTime;
nettyMetrics.errorResponseProcessingTimeInMs.update(processingTime);
} else {
logger.error("Could not send error response on channel {}", ctx.channel());
nettyMetrics.errorResponseNotSentCount.inc();
}
return responseSent;
}
Expand All @@ -577,20 +590,28 @@ private FullHttpResponse getErrorResponse(Throwable cause) {
errorResponseStatus = ResponseStatus.getResponseStatus(restServiceErrorCode);
status = getHttpResponseStatus(errorResponseStatus);
if (shouldSendFailureReason(status, restServiceException)) {
Throwable rootCause = Utils.getRootCause(cause);
String rootMessage = rootCause.getMessage();
if (rootMessage == null) {
rootMessage = rootCause.getClass().getSimpleName();
}
errReason = new String(
Utils.getRootCause(cause).getMessage().replaceAll("[\n\t\r]", " ").getBytes(StandardCharsets.US_ASCII),
rootMessage.replaceAll("[\n\t\r]", " ").getBytes(StandardCharsets.US_ASCII),
StandardCharsets.US_ASCII);
}
if (restServiceException.shouldIncludeExceptionMetadataInResponse()) {
errHeaders = restServiceException.getExceptionHeadersMap();
}
} else if (Utils.isPossibleClientTermination(cause)) {
// Client closed the connection, it's likely that error response won't be able to reach client.
// If that's the case, then set the status to client error. This would then be recorded as client error
// in ContainerMetrics
nettyMetrics.clientEarlyTerminationCount.inc();
status = HttpResponseStatus.BAD_REQUEST;
errorResponseStatus = ResponseStatus.BadRequest;
// Inactive-channel accounting-only case is handled in maybeSendErrorResponse().
// Any path reaching response construction should use 500 to avoid emitting 4xx to a still-connected client.
nettyMetrics.clientTerminationOnActiveChannelCount.inc();
logger.warn("Client termination detected on ACTIVE channel {} for request {}. Exception: {}", ctx.channel(),
request != null ? request.getUri() : "unknown", cause.getMessage());
nettyMetrics.internalServerErrorCount.inc();
status = HttpResponseStatus.INTERNAL_SERVER_ERROR;
errorResponseStatus = ResponseStatus.InternalServerError;
} else {
nettyMetrics.internalServerErrorCount.inc();
status = HttpResponseStatus.INTERNAL_SERVER_ERROR;
Expand Down Expand Up @@ -846,8 +867,8 @@ private void log(Exception exception) {
logger.trace("Error handling request {} with method {}", uri, restMethod, exception);
}
} else if (Utils.isPossibleClientTermination(exception)) {
logger.trace("Client likely terminated connection while handling request {} with method {}", uri, restMethod,
exception);
logger.debug("Client likely terminated connection while handling request {} with method {}. ChannelActive: {}",
uri, restMethod, ctx.channel().isActive(), exception);
} else {
logger.error("Unexpected error handling request {} with method {}", uri, restMethod, exception);
}
Expand Down
Loading
Loading