From f83eec7a4c8fce223c5fd28b411fe2d1ae1da8dd Mon Sep 17 00:00:00 2001 From: Nathan Howell Date: Wed, 7 Dec 2016 23:32:14 +0000 Subject: [PATCH 1/3] [SPARK-18772][SQL] NaN/Infinite float parsing in JSON is inconsistent --- .../sql/catalyst/json/JacksonParser.scala | 11 +++++++ .../datasources/json/JsonSuite.scala | 31 +++++++++++++++++++ 2 files changed, 42 insertions(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala index ff6c93ae9815c..e993722c6408e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala @@ -88,6 +88,17 @@ class JacksonParser( } } + private object SpecialDouble { + def unapply(value: String): Option[Double] = { + value.toLowerCase match { + case "nan" => Some(Double.NaN) + case "infinity" | "+infinity" | "inf" | "+inf" => Some(Double.PositiveInfinity) + case "-infinity" | "-inf" => Some(Double.NegativeInfinity) + case _ => None + } + } + } + /** * Create a converter which converts the JSON documents held by the `JsonParser` * to a value according to a desired schema. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index 5e7f7944bd845..93e948b04050d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -1986,6 +1986,37 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { .collect }.getMessage assert(errMsg.startsWith("The field for corrupt records must be string type and nullable")) + + test("SPARK-18772: Special floats") { + val records = sparkContext + .parallelize( + """{"a": "NaN"}""" :: + """{"a": "nAn"}""" :: + """{"a": "-iNf"}""" :: + """{"a": "inF"}""" :: + """{"a": "+Inf"}""" :: + """{"a": "-iNfInity"}""" :: + """{"a": "InFiNiTy"}""" :: + """{"a": "+InfiNitY"}""" :: + """{"a": "+Infi"}""" :: + Nil) + + for (dt <- Seq(FloatType, DoubleType)) { + val res = spark.read + .schema(StructType(Seq(StructField("a", dt)))) + .json(records) + .select($"a".cast(DoubleType).as[java.lang.Double]) + .collect() + assert(res.length === 9) + assert(res(0).isNaN) + assert(res(1).isNaN) + assert(res(2).toDouble.isNegInfinity) + assert(res(3).toDouble.isPosInfinity) + assert(res(4).toDouble.isPosInfinity) + assert(res(5).toDouble.isNegInfinity) + assert(res(6).toDouble.isPosInfinity) + assert(res(7).toDouble.isPosInfinity) + assert(res(8) eq null) } } } From fa63ff4e63bfcb3d780183528663223ab63891a3 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Fri, 12 May 2017 11:22:27 +0900 Subject: [PATCH 2/3] Avoid unnecessary cast try for special floats in JSON and add related tests --- .../sql/catalyst/json/JacksonParser.scala | 46 ++++------- .../datasources/json/JsonSuite.scala | 81 ++++++++++++------- 2 files changed, 65 insertions(+), 62 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala index e993722c6408e..ba2761fa0f64f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala @@ -18,7 +18,6 @@ package org.apache.spark.sql.catalyst.json import java.io.ByteArrayOutputStream -import java.util.Locale import scala.collection.mutable.ArrayBuffer import scala.util.Try @@ -88,17 +87,6 @@ class JacksonParser( } } - private object SpecialDouble { - def unapply(value: String): Option[Double] = { - value.toLowerCase match { - case "nan" => Some(Double.NaN) - case "infinity" | "+infinity" | "inf" | "+inf" => Some(Double.PositiveInfinity) - case "-infinity" | "-inf" => Some(Double.NegativeInfinity) - case _ => None - } - } - } - /** * Create a converter which converts the JSON documents held by the `JsonParser` * to a value according to a desired schema. @@ -137,16 +125,13 @@ class JacksonParser( case VALUE_STRING => // Special case handling for NaN and Infinity. - val value = parser.getText - val lowerCaseValue = value.toLowerCase(Locale.ROOT) - if (lowerCaseValue.equals("nan") || - lowerCaseValue.equals("infinity") || - lowerCaseValue.equals("-infinity") || - lowerCaseValue.equals("inf") || - lowerCaseValue.equals("-inf")) { - value.toFloat - } else { - throw new RuntimeException(s"Cannot parse $value as FloatType.") + parser.getText match { + case "NaN" | "+NaN" | "-NaN" => Float.NaN + case "+INF" | "INF" | "+Infinity" | "Infinity" => Float.PositiveInfinity + case "-INF" | "-Infinity" => Float.NegativeInfinity + case other => Try(other.toFloat).getOrElse { + throw new RuntimeException(s"Cannot parse $other as FloatType.") + } } } @@ -157,16 +142,13 @@ class JacksonParser( case VALUE_STRING => // Special case handling for NaN and Infinity. - val value = parser.getText - val lowerCaseValue = value.toLowerCase(Locale.ROOT) - if (lowerCaseValue.equals("nan") || - lowerCaseValue.equals("infinity") || - lowerCaseValue.equals("-infinity") || - lowerCaseValue.equals("inf") || - lowerCaseValue.equals("-inf")) { - value.toDouble - } else { - throw new RuntimeException(s"Cannot parse $value as DoubleType.") + parser.getText match { + case "NaN" | "+NaN" | "-NaN" => Double.NaN + case "+INF" | "INF" | "+Infinity" | "Infinity" => Double.PositiveInfinity + case "-INF" | "-Infinity" => Double.NegativeInfinity + case other => Try(other.toDouble).getOrElse { + throw new RuntimeException(s"Cannot parse $other as DoubleType.") + } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index 93e948b04050d..edc55d2b61078 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.datasources.json import java.io.{File, StringWriter} import java.nio.charset.StandardCharsets import java.sql.{Date, Timestamp} +import java.util.Locale import com.fasterxml.jackson.core.JsonFactory import org.apache.hadoop.fs.{Path, PathFilter} @@ -1986,37 +1987,57 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { .collect }.getMessage assert(errMsg.startsWith("The field for corrupt records must be string type and nullable")) + } + } - test("SPARK-18772: Special floats") { - val records = sparkContext - .parallelize( - """{"a": "NaN"}""" :: - """{"a": "nAn"}""" :: - """{"a": "-iNf"}""" :: - """{"a": "inF"}""" :: - """{"a": "+Inf"}""" :: - """{"a": "-iNfInity"}""" :: - """{"a": "InFiNiTy"}""" :: - """{"a": "+InfiNitY"}""" :: - """{"a": "+Infi"}""" :: - Nil) - - for (dt <- Seq(FloatType, DoubleType)) { - val res = spark.read - .schema(StructType(Seq(StructField("a", dt)))) - .json(records) - .select($"a".cast(DoubleType).as[java.lang.Double]) - .collect() - assert(res.length === 9) - assert(res(0).isNaN) - assert(res(1).isNaN) - assert(res(2).toDouble.isNegInfinity) - assert(res(3).toDouble.isPosInfinity) - assert(res(4).toDouble.isPosInfinity) - assert(res(5).toDouble.isNegInfinity) - assert(res(6).toDouble.isPosInfinity) - assert(res(7).toDouble.isPosInfinity) - assert(res(8) eq null) + test("SPARK-18772: Parse special floats correctly") { + val jsons = Seq( + """{"a": "+INF"}""", + """{"a": "INF"}""", + """{"a": "-INF"}""", + """{"a": "NaN"}""", + """{"a": "+NaN"}""", + """{"a": "-NaN"}""", + """{"a": "Infinity"}""", + """{"a": "+Infinity"}""", + """{"a": "-Infinity"}""") + + // positive cases + val checks: Seq[Double => Boolean] = Seq( + _.isPosInfinity, + _.isPosInfinity, + _.isNegInfinity, + _.isNaN, + _.isNaN, + _.isNaN, + _.isPosInfinity, + _.isPosInfinity, + _.isNegInfinity) + + Seq(FloatType, DoubleType).foreach { dt => + jsons.zip(checks).foreach { case (json, check) => + val ds = spark.read + .schema(StructType(Seq(StructField("a", dt)))) + .json(Seq(json).toDS()) + .select($"a".cast(DoubleType)).as[Double] + assert(check(ds.first())) + } + } + + // negative cases + Seq(FloatType, DoubleType).foreach { dt => + val lowerCasedJsons = jsons.map(_.toLowerCase(Locale.ROOT)) + // The special floats are case-sensitive so these cases below throw exceptions. + lowerCasedJsons.foreach { lowerCasedJson => + val e = intercept[SparkException] { + spark.read + .option("mode", "FAILFAST") + .schema(StructType(Seq(StructField("a", dt)))) + .json(Seq(lowerCasedJson).toDS()) + .collect() + } + assert(e.getMessage.contains("Cannot parse")) + } } } } From 90330bc7916c906fec10081bd012fc4efdb450f2 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Sat, 13 May 2017 16:25:06 +0900 Subject: [PATCH 3/3] Address comments --- .../sql/catalyst/json/JacksonParser.scala | 20 ++++++++----------- .../datasources/json/JsonSuite.scala | 12 ----------- 2 files changed, 8 insertions(+), 24 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala index ba2761fa0f64f..4ed6728994193 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala @@ -126,12 +126,10 @@ class JacksonParser( case VALUE_STRING => // Special case handling for NaN and Infinity. parser.getText match { - case "NaN" | "+NaN" | "-NaN" => Float.NaN - case "+INF" | "INF" | "+Infinity" | "Infinity" => Float.PositiveInfinity - case "-INF" | "-Infinity" => Float.NegativeInfinity - case other => Try(other.toFloat).getOrElse { - throw new RuntimeException(s"Cannot parse $other as FloatType.") - } + case "NaN" => Float.NaN + case "Infinity" => Float.PositiveInfinity + case "-Infinity" => Float.NegativeInfinity + case other => throw new RuntimeException(s"Cannot parse $other as FloatType.") } } @@ -143,12 +141,10 @@ class JacksonParser( case VALUE_STRING => // Special case handling for NaN and Infinity. parser.getText match { - case "NaN" | "+NaN" | "-NaN" => Double.NaN - case "+INF" | "INF" | "+Infinity" | "Infinity" => Double.PositiveInfinity - case "-INF" | "-Infinity" => Double.NegativeInfinity - case other => Try(other.toDouble).getOrElse { - throw new RuntimeException(s"Cannot parse $other as DoubleType.") - } + case "NaN" => Double.NaN + case "Infinity" => Double.PositiveInfinity + case "-Infinity" => Double.NegativeInfinity + case other => throw new RuntimeException(s"Cannot parse $other as DoubleType.") } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index edc55d2b61078..e66a60d7503f3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -1992,25 +1992,13 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { test("SPARK-18772: Parse special floats correctly") { val jsons = Seq( - """{"a": "+INF"}""", - """{"a": "INF"}""", - """{"a": "-INF"}""", """{"a": "NaN"}""", - """{"a": "+NaN"}""", - """{"a": "-NaN"}""", """{"a": "Infinity"}""", - """{"a": "+Infinity"}""", """{"a": "-Infinity"}""") // positive cases val checks: Seq[Double => Boolean] = Seq( - _.isPosInfinity, - _.isPosInfinity, - _.isNegInfinity, - _.isNaN, _.isNaN, - _.isNaN, - _.isPosInfinity, _.isPosInfinity, _.isNegInfinity)