From 295d163f6e33222d16b05ddcc31f17ee30b11ef1 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Wed, 23 Jan 2019 16:07:31 +0800 Subject: [PATCH 1/3] enable fetch-big-block-to-memory by default --- .../org/apache/spark/internal/config/package.scala | 12 ++++++------ docs/configuration.md | 11 +++++++++++ 2 files changed, 17 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 1e7280005514d..948cba4a40d3e 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -631,17 +631,17 @@ package object config { private[spark] val MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM = ConfigBuilder("spark.maxRemoteBlockSizeFetchToMem") .doc("Remote block will be fetched to disk when size of the block is above this threshold " + - "in bytes. This is to avoid a giant request takes too much memory. We can enable this " + - "config by setting a specific value(e.g. 200m). Note this configuration will affect " + - "both shuffle fetch and block manager remote block fetch. For users who enabled " + - "external shuffle service, this feature can only be worked when external shuffle" + - "service is newer than Spark 2.2.") + "in bytes. This is to avoid a giant request takes too much memory. Note this " + + "configuration will affect both shuffle fetch and block manager remote block fetch. " + + "For users who enabled external shuffle service, this feature can only work when " + + "external shuffle service is newer than Spark 2.2.") .bytesConf(ByteUnit.BYTE) // fetch-to-mem is guaranteed to fail if the message is bigger than 2 GB, so we might // as well use fetch-to-disk in that case. The message includes some metadata in addition // to the block data itself (in particular UploadBlock has a lot of metadata), so we leave // extra room. - .createWithDefault(Int.MaxValue - 512) + .checkValue(_ <= Int.MaxValue - 512, "maxRemoteBlockSizeFetchToMem must be less than 2GB.") + .createWithDefaultString("200m") private[spark] val TASK_METRICS_TRACK_UPDATED_BLOCK_STATUSES = ConfigBuilder("spark.taskMetrics.trackUpdatedBlockStatuses") diff --git a/docs/configuration.md b/docs/configuration.md index 7d3bbf93ae969..095b157a401d9 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -1519,6 +1519,17 @@ Apart from these, the following properties are also available, and may be useful you can set larger value. + + spark.maxRemoteBlockSizeFetchToMem + 200m + + Remote block will be fetched to disk when size of the block is above this threshold + in bytes. This is to avoid a giant request takes too much memory. Note this + configuration will affect both shuffle fetch and block manager remote block fetch. + For users who enabled external shuffle service, this feature can only work when + external shuffle service is newer than Spark 2.2. + + ### Scheduling From 161c4e6816ecfb2d3e7b09bce34058abdf625f7a Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Thu, 24 Jan 2019 13:16:54 +0800 Subject: [PATCH 2/3] address comments --- .../apache/spark/internal/config/package.scala | 6 ++++-- docs/configuration.md | 15 +-------------- 2 files changed, 5 insertions(+), 16 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 948cba4a40d3e..ff2d23fa93d3b 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -634,13 +634,15 @@ package object config { "in bytes. This is to avoid a giant request takes too much memory. Note this " + "configuration will affect both shuffle fetch and block manager remote block fetch. " + "For users who enabled external shuffle service, this feature can only work when " + - "external shuffle service is newer than Spark 2.2.") + "external shuffle service is at least 2.3.0.") .bytesConf(ByteUnit.BYTE) // fetch-to-mem is guaranteed to fail if the message is bigger than 2 GB, so we might // as well use fetch-to-disk in that case. The message includes some metadata in addition // to the block data itself (in particular UploadBlock has a lot of metadata), so we leave // extra room. - .checkValue(_ <= Int.MaxValue - 512, "maxRemoteBlockSizeFetchToMem must be less than 2GB.") + .checkValue( + _ <= Int.MaxValue - 512, + "maxRemoteBlockSizeFetchToMem must be less than (Int.MaxValue - 512) bytes.") .createWithDefaultString("200m") private[spark] val TASK_METRICS_TRACK_UPDATED_BLOCK_STATUSES = diff --git a/docs/configuration.md b/docs/configuration.md index 095b157a401d9..806e16af3640e 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -626,19 +626,6 @@ Apart from these, the following properties are also available, and may be useful You can mitigate this issue by setting it to a lower value. - - spark.maxRemoteBlockSizeFetchToMem - Int.MaxValue - 512 - - The remote block will be fetched to disk when size of the block is above this threshold in bytes. - This is to avoid a giant request that takes too much memory. By default, this is only enabled - for blocks > 2GB, as those cannot be fetched directly into memory, no matter what resources are - available. But it can be turned down to a much lower value (eg. 200m) to avoid using too much - memory on smaller blocks as well. Note this configuration will affect both shuffle fetch - and block manager remote block fetch. For users who enabled external shuffle service, - this feature can only be used when external shuffle service is newer than Spark 2.2. - - spark.shuffle.compress true @@ -1527,7 +1514,7 @@ Apart from these, the following properties are also available, and may be useful in bytes. This is to avoid a giant request takes too much memory. Note this configuration will affect both shuffle fetch and block manager remote block fetch. For users who enabled external shuffle service, this feature can only work when - external shuffle service is newer than Spark 2.2. + external shuffle service is at least 2.3.0. From 54c216edeb1fbeef9b825e49715b1f1eae2494db Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Mon, 28 Jan 2019 11:51:53 +0800 Subject: [PATCH 3/3] address comment --- .../main/scala/org/apache/spark/internal/config/package.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index ff2d23fa93d3b..b07c0d60fc2b0 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -642,7 +642,7 @@ package object config { // extra room. .checkValue( _ <= Int.MaxValue - 512, - "maxRemoteBlockSizeFetchToMem must be less than (Int.MaxValue - 512) bytes.") + "maxRemoteBlockSizeFetchToMem cannot be larger than (Int.MaxValue - 512) bytes.") .createWithDefaultString("200m") private[spark] val TASK_METRICS_TRACK_UPDATED_BLOCK_STATUSES =