From bfa5a9f8ffc6108584616625661fb34ee576f6f6 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Sat, 17 Oct 2020 22:14:33 -0700 Subject: [PATCH 1/3] [SPARK-33175][K8S] Detect duplicated mountPaths and fail at Spark side --- .../features/MountVolumesFeatureStep.scala | 5 ++++ .../k8s/ExecutorPodsSnapshotsStoreImpl.scala | 3 +++ .../MountVolumesFeatureStepSuite.scala | 25 +++++++++++++++++++ 3 files changed, 33 insertions(+) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStep.scala index e297656520200..7b87d0e55a61c 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStep.scala @@ -49,6 +49,11 @@ private[spark] class MountVolumesFeatureStep(conf: KubernetesConf) private def constructVolumes( volumeSpecs: Iterable[KubernetesVolumeSpec] ): Iterable[(VolumeMount, Volume)] = { + val duplicateMountPaths = volumeSpecs.map(_.mountPath).toSeq.groupBy(identity).collect { + case (x, ys) if ys.length > 1 => s"`$x`" + } + require(duplicateMountPaths.isEmpty, + s"Found duplicated mountPath: ${duplicateMountPaths.mkString(",")}") volumeSpecs.zipWithIndex.map { case (spec, i) => val volumeMount = new VolumeMountBuilder() .withMountPath(spec.mountPath) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshotsStoreImpl.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshotsStoreImpl.scala index 5c192c690eba5..3f2cb485bbb31 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshotsStoreImpl.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshotsStoreImpl.scala @@ -133,6 +133,9 @@ private[spark] class ExecutorPodsSnapshotsStoreImpl(subscribersExecutor: Schedul snapshotsBuffer.drainTo(snapshots) onNewSnapshots(snapshots.asScala.toSeq) } catch { + case e: IllegalArgumentException => + logError("Going to stop due to IllegalArgumentException", e) + System.exit(1) case NonFatal(e) => logWarning("Exception when notifying snapshot subscriber.", e) } finally { lock.unlock() diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala index e95af264d09ec..004c1626e5661 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala @@ -236,6 +236,31 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite { assert(configuredPod.container.getVolumeMounts.size() === 2) } + test("mountPath should be unique") { + val hpVolumeConf = KubernetesVolumeSpec( + "hpVolume", + "/data", + "", + false, + KubernetesHostPathVolumeConf("/hostPath/tmp") + ) + val pvcVolumeConf = KubernetesVolumeSpec( + "checkpointVolume", + "/data", + "", + true, + KubernetesPVCVolumeConf("pvcClaim") + ) + val kubernetesConf = KubernetesTestConf.createDriverConf( + volumes = Seq(hpVolumeConf, pvcVolumeConf)) + + val step = new MountVolumesFeatureStep(kubernetesConf) + val m = intercept[IllegalArgumentException] { + step.configurePod(SparkPod.initialPod()) + }.getMessage + assert(m.contains("Found duplicated mountPath: `/data`")) + } + test("Mounts subpath on emptyDir") { val volumeConf = KubernetesVolumeSpec( "testVolume", From 02854e5db9025fd634a3339f0cd70b2cfb8ce49b Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Sun, 18 Oct 2020 09:27:57 -0700 Subject: [PATCH 2/3] Address comments --- .../spark/deploy/k8s/features/MountVolumesFeatureStep.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStep.scala index 7b87d0e55a61c..c66756fd69116 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStep.scala @@ -50,10 +50,10 @@ private[spark] class MountVolumesFeatureStep(conf: KubernetesConf) volumeSpecs: Iterable[KubernetesVolumeSpec] ): Iterable[(VolumeMount, Volume)] = { val duplicateMountPaths = volumeSpecs.map(_.mountPath).toSeq.groupBy(identity).collect { - case (x, ys) if ys.length > 1 => s"`$x`" + case (x, ys) if ys.length > 1 => s"'$x'" } require(duplicateMountPaths.isEmpty, - s"Found duplicated mountPath: ${duplicateMountPaths.mkString(",")}") + s"Found duplicated mountPath: ${duplicateMountPaths.mkString(", ")}") volumeSpecs.zipWithIndex.map { case (spec, i) => val volumeMount = new VolumeMountBuilder() .withMountPath(spec.mountPath) From 8ab6b8854dcc669e1b21e7354bdd781a0d4911bd Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Sun, 18 Oct 2020 09:30:52 -0700 Subject: [PATCH 3/3] fix --- .../deploy/k8s/features/MountVolumesFeatureStepSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala index 004c1626e5661..bbb89fd0a1c24 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala @@ -258,7 +258,7 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite { val m = intercept[IllegalArgumentException] { step.configurePod(SparkPod.initialPod()) }.getMessage - assert(m.contains("Found duplicated mountPath: `/data`")) + assert(m.contains("Found duplicated mountPath: '/data'")) } test("Mounts subpath on emptyDir") {