From 88f485e0668855403cf9e2a2911784e4b2a80e80 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Mon, 8 Jun 2020 12:06:17 -0700 Subject: [PATCH 1/2] [SPARK-31923][CORE] Ignore internal accumulators that use unrecognized types rather than crashing Ignore internal accumulators that use unrecognized types rather than crashing so that an event log containing such accumulators can still be converted to JSON and logged. A user may use internal accumulators by adding the `internal.metrics.` prefix to the accumulator name to hide sensitive information from UI (Accumulators except internal ones will be shown in Spark UI). However, `org.apache.spark.util.JsonProtocol.accumValueToJson` assumes an internal accumulator has only 3 possible types: `int`, `long`, and `java.util.List[(BlockId, BlockStatus)]`. When an internal accumulator uses an unexpected type, it will crash. An event log that contains such accumulator will be dropped because it cannot be converted to JSON, and it will cause weird UI issue when rendering in Spark History Server. For example, if `SparkListenerTaskEnd` is dropped because of this issue, the user will see the task is still running even if it was finished. It's better to make `accumValueToJson` more robust because it's up to the user to pick up the accumulator name. No The new unit tests. Closes #28744 from zsxwing/fix-internal-accum. Authored-by: Shixiong Zhu Signed-off-by: Shixiong Zhu --- .../org/apache/spark/util/JsonProtocol.scala | 20 ++++-- .../apache/spark/util/JsonProtocolSuite.scala | 72 +++++++++++++++++++ 2 files changed, 87 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index 50c6461373dee..0e613ce958f4d 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -326,12 +326,22 @@ private[spark] object JsonProtocol { case v: Long => JInt(v) // We only have 3 kind of internal accumulator types, so if it's not int or long, it must be // the blocks accumulator, whose type is `java.util.List[(BlockId, BlockStatus)]` - case v => - JArray(v.asInstanceOf[java.util.List[(BlockId, BlockStatus)]].asScala.toList.map { - case (id, status) => - ("Block ID" -> id.toString) ~ - ("Status" -> blockStatusToJson(status)) + case v: java.util.List[_] => + JArray(v.asScala.toList.flatMap { + case (id: BlockId, status: BlockStatus) => + Some( + ("Block ID" -> id.toString) ~ + ("Status" -> blockStatusToJson(status)) + ) + case _ => + // Ignore unsupported types. A user may put `METRICS_PREFIX` in the name. We should + // not crash. + None }) + case _ => + // Ignore unsupported types. A user may put `METRICS_PREFIX` in the name. We should not + // crash. + JNothing } } else { // For all external accumulators, just use strings diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index 74b72d940eeef..a958734ed4155 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -436,6 +436,78 @@ class JsonProtocolSuite extends SparkFunSuite { testAccumValue(Some("anything"), 123, JString("123")) } +<<<<<<< HEAD +======= + /** Create an AccumulableInfo and verify we can serialize and deserialize it. */ + private def testAccumulableInfo( + name: String, + value: Option[Any], + expectedValue: Option[Any]): Unit = { + val isInternal = name.startsWith(InternalAccumulator.METRICS_PREFIX) + val accum = AccumulableInfo( + 123L, + Some(name), + update = value, + value = value, + internal = isInternal, + countFailedValues = false) + val json = JsonProtocol.accumulableInfoToJson(accum) + val newAccum = JsonProtocol.accumulableInfoFromJson(json) + assert(newAccum == accum.copy(update = expectedValue, value = expectedValue)) + } + + test("SPARK-31923: unexpected value type of internal accumulator") { + // Because a user may use `METRICS_PREFIX` in an accumulator name, we should test unexpected + // types to make sure we don't crash. + import InternalAccumulator.METRICS_PREFIX + testAccumulableInfo( + METRICS_PREFIX + "fooString", + value = Some("foo"), + expectedValue = None) + testAccumulableInfo( + METRICS_PREFIX + "fooList", + value = Some(java.util.Arrays.asList("string")), + expectedValue = Some(java.util.Collections.emptyList()) + ) + val blocks = Seq( + (TestBlockId("block1"), BlockStatus(StorageLevel.MEMORY_ONLY, 1L, 2L)), + (TestBlockId("block2"), BlockStatus(StorageLevel.DISK_ONLY, 3L, 4L))) + testAccumulableInfo( + METRICS_PREFIX + "fooList", + value = Some(java.util.Arrays.asList( + "string", + blocks(0), + blocks(1))), + expectedValue = Some(blocks.asJava) + ) + testAccumulableInfo( + METRICS_PREFIX + "fooSet", + value = Some(Set("foo")), + expectedValue = None) + } + + test("SPARK-30936: forwards compatibility - ignore unknown fields") { + val expected = TestListenerEvent("foo", 123) + val unknownFieldsJson = + """{ + | "Event" : "org.apache.spark.util.TestListenerEvent", + | "foo" : "foo", + | "bar" : 123, + | "unknown" : "unknown" + |}""".stripMargin + assert(JsonProtocol.sparkEventFromJson(parse(unknownFieldsJson)) === expected) + } + + test("SPARK-30936: backwards compatibility - set default values for missing fields") { + val expected = TestListenerEvent("foo", 0) + val unknownFieldsJson = + """{ + | "Event" : "org.apache.spark.util.TestListenerEvent", + | "foo" : "foo" + |}""".stripMargin + assert(JsonProtocol.sparkEventFromJson(parse(unknownFieldsJson)) === expected) + } +>>>>>>> b333ed0c4a... [SPARK-31923][CORE] Ignore internal accumulators that use unrecognized types rather than crashing } From 2e648495a40c2a77b7c7e89f7a60adaa2d58e55a Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Mon, 8 Jun 2020 13:20:59 -0700 Subject: [PATCH 2/2] resolve conflicts --- .../apache/spark/util/JsonProtocolSuite.scala | 25 ------------------- 1 file changed, 25 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index a958734ed4155..40fb2e3497033 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -436,8 +436,6 @@ class JsonProtocolSuite extends SparkFunSuite { testAccumValue(Some("anything"), 123, JString("123")) } -<<<<<<< HEAD -======= /** Create an AccumulableInfo and verify we can serialize and deserialize it. */ private def testAccumulableInfo( name: String, @@ -485,29 +483,6 @@ class JsonProtocolSuite extends SparkFunSuite { value = Some(Set("foo")), expectedValue = None) } - - test("SPARK-30936: forwards compatibility - ignore unknown fields") { - val expected = TestListenerEvent("foo", 123) - val unknownFieldsJson = - """{ - | "Event" : "org.apache.spark.util.TestListenerEvent", - | "foo" : "foo", - | "bar" : 123, - | "unknown" : "unknown" - |}""".stripMargin - assert(JsonProtocol.sparkEventFromJson(parse(unknownFieldsJson)) === expected) - } - - test("SPARK-30936: backwards compatibility - set default values for missing fields") { - val expected = TestListenerEvent("foo", 0) - val unknownFieldsJson = - """{ - | "Event" : "org.apache.spark.util.TestListenerEvent", - | "foo" : "foo" - |}""".stripMargin - assert(JsonProtocol.sparkEventFromJson(parse(unknownFieldsJson)) === expected) - } ->>>>>>> b333ed0c4a... [SPARK-31923][CORE] Ignore internal accumulators that use unrecognized types rather than crashing }