From 2d7d4f13886a6972a111649adaf24116ba2fb4da Mon Sep 17 00:00:00 2001 From: Ingo Schuster Date: Fri, 23 Jun 2017 15:50:26 +0200 Subject: [PATCH 1/4] SPARK-21176: Limit number of selector threads for ProxyServlet to 8 --- core/src/main/scala/org/apache/spark/ui/JettyUtils.scala | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala index edf328b5ae538..a269e0e30d960 100644 --- a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala @@ -26,6 +26,8 @@ import scala.language.implicitConversions import scala.xml.Node import org.eclipse.jetty.client.api.Response +import org.eclipse.jetty.client.HttpClient +import org.eclipse.jetty.client.http.HttpClientTransportOverHTTP import org.eclipse.jetty.proxy.ProxyServlet import org.eclipse.jetty.server._ import org.eclipse.jetty.server.handler._ @@ -208,6 +210,13 @@ private[spark] object JettyUtils extends Logging { rewrittenURI.toString() } + override def newHttpClient(): HttpClient = { + // Use the Jetty logic to calculate the number of selector threads (#CPUs/2), + // but limit it to 8 max. + val numSelectors = math.max(1,math.min(8,Runtime.getRuntime().availableProcessors()/2)) + return new HttpClient(new HttpClientTransportOverHTTP(numSelectors), null) + } + override def filterServerResponseHeader( clientRequest: HttpServletRequest, serverResponse: Response, From 7cc808ba11dbff873065e8336fb7a0707aa6b11f Mon Sep 17 00:00:00 2001 From: IngoSchuster Date: Wed, 28 Jun 2017 16:06:58 +0200 Subject: [PATCH 2/4] Update JettyUtils.scala --- core/src/main/scala/org/apache/spark/ui/JettyUtils.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala index a269e0e30d960..84728ab557202 100644 --- a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala @@ -213,7 +213,7 @@ private[spark] object JettyUtils extends Logging { override def newHttpClient(): HttpClient = { // Use the Jetty logic to calculate the number of selector threads (#CPUs/2), // but limit it to 8 max. - val numSelectors = math.max(1,math.min(8,Runtime.getRuntime().availableProcessors()/2)) + val numSelectors = math.max(1, math.min(8, Runtime.getRuntime().availableProcessors()/2)) return new HttpClient(new HttpClientTransportOverHTTP(numSelectors), null) } From 1e17a20396603ea9dc3ed55d9c7bbf243d47fd1c Mon Sep 17 00:00:00 2001 From: IngoSchuster Date: Thu, 29 Jun 2017 13:06:31 +0200 Subject: [PATCH 3/4] Update JettyUtils.scala --- core/src/main/scala/org/apache/spark/ui/JettyUtils.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala index 84728ab557202..a390807374904 100644 --- a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala @@ -213,8 +213,8 @@ private[spark] object JettyUtils extends Logging { override def newHttpClient(): HttpClient = { // Use the Jetty logic to calculate the number of selector threads (#CPUs/2), // but limit it to 8 max. - val numSelectors = math.max(1, math.min(8, Runtime.getRuntime().availableProcessors()/2)) - return new HttpClient(new HttpClientTransportOverHTTP(numSelectors), null) + val numSelectors = math.max(1, math.min(8, Runtime.getRuntime().availableProcessors() / 2)) + new HttpClient(new HttpClientTransportOverHTTP(numSelectors), null) } override def filterServerResponseHeader( From c6198d2296e85beeac941f8abb078dbac2f212e9 Mon Sep 17 00:00:00 2001 From: IngoSchuster Date: Thu, 29 Jun 2017 16:42:46 +0200 Subject: [PATCH 4/4] Update JettyUtils.scala --- core/src/main/scala/org/apache/spark/ui/JettyUtils.scala | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala index a390807374904..b9371c7ad7b45 100644 --- a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala @@ -211,8 +211,11 @@ private[spark] object JettyUtils extends Logging { } override def newHttpClient(): HttpClient = { - // Use the Jetty logic to calculate the number of selector threads (#CPUs/2), + // SPARK-21176: Use the Jetty logic to calculate the number of selector threads (#CPUs/2), // but limit it to 8 max. + // Otherwise, it might happen that we exhaust the threadpool since in reverse proxy mode + // a proxy is instantiated for each executor. If the head node has many processors, this + // can quickly add up to an unreasonably high number of threads. val numSelectors = math.max(1, math.min(8, Runtime.getRuntime().availableProcessors() / 2)) new HttpClient(new HttpClientTransportOverHTTP(numSelectors), null) }