From 64a8ff5b622019f54b6bb02c462cba8b04f5e6c8 Mon Sep 17 00:00:00 2001 From: weixiuli Date: Sat, 22 Jan 2022 14:04:35 +0800 Subject: [PATCH 1/2] [SPARK-37984][SHUFFLE] Avoid calculating all outstanding requests to improve performance. --- .../spark/network/client/TransportResponseHandler.java | 10 ++++++++-- .../spark/network/server/TransportChannelHandler.java | 3 +-- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/common/network-common/src/main/java/org/apache/spark/network/client/TransportResponseHandler.java b/common/network-common/src/main/java/org/apache/spark/network/client/TransportResponseHandler.java index 576c08858d6c3..23bc78b539851 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/client/TransportResponseHandler.java +++ b/common/network-common/src/main/java/org/apache/spark/network/client/TransportResponseHandler.java @@ -140,7 +140,7 @@ public void channelActive() { @Override public void channelInactive() { - if (numOutstandingRequests() > 0) { + if (hasOutstandingRequests()) { String remoteAddress = getRemoteAddress(channel); logger.error("Still have {} requests outstanding when connection from {} is closed", numOutstandingRequests(), remoteAddress); @@ -150,7 +150,7 @@ public void channelInactive() { @Override public void exceptionCaught(Throwable cause) { - if (numOutstandingRequests() > 0) { + if (hasOutstandingRequests()) { String remoteAddress = getRemoteAddress(channel); logger.error("Still have {} requests outstanding when connection from {} is closed", numOutstandingRequests(), remoteAddress); @@ -275,6 +275,12 @@ public int numOutstandingRequests() { (streamActive ? 1 : 0); } + /** Check if there are any outstanding requests (fetch requests + rpcs) */ + public Boolean hasOutstandingRequests() { + return streamActive || outstandingFetches.size() > 0 || outstandingRpcs.size() > 0 + || streamCallbacks.size() > 0; + } + /** Returns the time in nanoseconds of when the last request was sent out. */ public long getTimeOfLastRequestNs() { return timeOfLastRequestNs.get(); diff --git a/common/network-common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java b/common/network-common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java index 275e64ee50f26..d197032003e6e 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java +++ b/common/network-common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java @@ -161,8 +161,7 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc boolean isActuallyOverdue = System.nanoTime() - responseHandler.getTimeOfLastRequestNs() > requestTimeoutNs; if (e.state() == IdleState.ALL_IDLE && isActuallyOverdue) { - boolean hasInFlightRequests = responseHandler.numOutstandingRequests() > 0; - if (hasInFlightRequests) { + if (responseHandler.hasOutstandingRequests()) { String address = getRemoteAddress(ctx.channel()); logger.error("Connection to {} has been quiet for {} ms while there are outstanding " + "requests. Assuming connection is dead; please adjust" + From cfba2db1ce747182601156d3ca43f55f7050e52f Mon Sep 17 00:00:00 2001 From: weixiuli Date: Sun, 23 Jan 2022 12:34:15 +0800 Subject: [PATCH 2/2] Address comments --- .../apache/spark/network/client/TransportResponseHandler.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/common/network-common/src/main/java/org/apache/spark/network/client/TransportResponseHandler.java b/common/network-common/src/main/java/org/apache/spark/network/client/TransportResponseHandler.java index 23bc78b539851..261f20540a297 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/client/TransportResponseHandler.java +++ b/common/network-common/src/main/java/org/apache/spark/network/client/TransportResponseHandler.java @@ -277,8 +277,8 @@ public int numOutstandingRequests() { /** Check if there are any outstanding requests (fetch requests + rpcs) */ public Boolean hasOutstandingRequests() { - return streamActive || outstandingFetches.size() > 0 || outstandingRpcs.size() > 0 - || streamCallbacks.size() > 0; + return streamActive || !outstandingFetches.isEmpty() || !outstandingRpcs.isEmpty() || + !streamCallbacks.isEmpty(); } /** Returns the time in nanoseconds of when the last request was sent out. */