From a7c08a68adf6150aaaffc1d13035fb699887e64c Mon Sep 17 00:00:00 2001 From: Daniel Tenedorio Date: Wed, 12 Jun 2024 11:52:05 -0700 Subject: [PATCH 01/12] commit commit commit commit commit commit commit --- .../spark/sql/avro/AvroFunctionsSuite.scala | 83 +++++++++- .../catalyst/analysis/FunctionRegistry.scala | 6 +- .../expressions/toFromAvroSqlFunctions.scala | 144 ++++++++++++++++++ 3 files changed, 231 insertions(+), 2 deletions(-) create mode 100644 sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/toFromAvroSqlFunctions.scala diff --git a/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroFunctionsSuite.scala b/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroFunctionsSuite.scala index d16ddb4973205..727129ded377b 100644 --- a/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroFunctionsSuite.scala +++ b/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroFunctionsSuite.scala @@ -26,7 +26,7 @@ import org.apache.avro.generic.{GenericDatumWriter, GenericRecord, GenericRecord import org.apache.avro.io.EncoderFactory import org.apache.spark.SparkException -import org.apache.spark.sql.{QueryTest, Row} +import org.apache.spark.sql.{AnalysisException, QueryTest, Row} import org.apache.spark.sql.execution.LocalTableScanExec import org.apache.spark.sql.functions.{col, lit, struct} import org.apache.spark.sql.internal.SQLConf @@ -286,4 +286,85 @@ class AvroFunctionsSuite extends QueryTest with SharedSparkSession { assert(msg.contains("Invalid default for field id: null not a \"long\"")) } } + + test("SPARK-48545: from_avro and to_avro SQL functions") { + withTable("t") { + sql( + """ + |create table t as + | select named_struct('u', named_struct('member0', member0, 'member1', member1)) as s + | from values (1, null), (null, 'a') tab(member0, member1) + |""".stripMargin) + val jsonFormatSchema = + """ + |{ + | "type": "record", + | "name": "struct", + | "fields": [{ + | "name": "u", + | "type": ["int","string"] + | }] + |} + |""".stripMargin + val toAvroSql = + s""" + |select to_avro(s, '$jsonFormatSchema') as result from t + |""".stripMargin + val avroResult = spark.sql(toAvroSql).collect() + assert(avroResult != null) + checkAnswer( + spark.sql(s"select from_avro(result, '$jsonFormatSchema', map()).u from ($toAvroSql)"), + Seq(Row(Row(1, null)), + Row(Row(null, "a")))) + + // Negative tests. + checkError( + exception = intercept[AnalysisException](sql( + s""" + |select to_avro(s, 42) as result from t + |""".stripMargin)), + errorClass = "DATATYPE_MISMATCH.TYPE_CHECK_FAILURE_WITH_HINT", + parameters = Map("sqlExpr" -> "\"toavrosqlfunction(s, 42)\"", + "msg" -> ("The second argument of the TO_AVRO SQL function must be a constant string " + + "containing the JSON representation of the schema to use for converting the value to " + + "AVRO format"), + "hint" -> ""), + queryContext = Array(ExpectedContext( + fragment = "to_avro(s, 42)", + start = 8, + stop = 21))) + checkError( + exception = intercept[AnalysisException](sql( + s""" + |select from_avro(s, 42, '') as result from t + |""".stripMargin)), + errorClass = "DATATYPE_MISMATCH.TYPE_CHECK_FAILURE_WITH_HINT", + parameters = Map("sqlExpr" -> "\"fromavrosqlfunction(s, 42, )\"", + "msg" -> ("The second argument of the FROM_AVRO SQL function must be a constant string " + + "containing the JSON representation of the schema to use for converting the value " + + "from AVRO format"), + "hint" -> ""), + queryContext = Array(ExpectedContext( + fragment = "from_avro(s, 42, '')", + start = 8, + stop = 27))) + checkError( + exception = intercept[AnalysisException](sql( + s""" + |select from_avro(s, '$jsonFormatSchema', 42) as result from t + |""".stripMargin)), + errorClass = "DATATYPE_MISMATCH.TYPE_CHECK_FAILURE_WITH_HINT", + parameters = Map( + "sqlExpr" -> + s"\"fromavrosqlfunction(s, $jsonFormatSchema, 42)\"".stripMargin, + "msg" -> ("The third argument of the FROM_AVRO SQL function must be a constant map of " + + "strings to strings containing the options to use for converting the value " + + "from AVRO format"), + "hint" -> ""), + queryContext = Array(ExpectedContext( + fragment = s"from_avro(s, '$jsonFormatSchema', 42)", + start = 8, + stop = 138))) + } + } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala index 588752f3fc176..81247cd40ae4f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala @@ -854,7 +854,11 @@ object FunctionRegistry { // Xml expression[XmlToStructs]("from_xml"), expression[SchemaOfXml]("schema_of_xml"), - expression[StructsToXml]("to_xml") + expression[StructsToXml]("to_xml"), + + // Avro + expression[FromAvroSqlFunction]("from_avro"), + expression[ToAvroSqlFunction]("to_avro") ) val builtin: SimpleFunctionRegistry = { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/toFromAvroSqlFunctions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/toFromAvroSqlFunctions.scala new file mode 100644 index 0000000000000..5189dcfb33d33 --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/toFromAvroSqlFunctions.scala @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions + +import org.apache.spark.sql.catalyst.analysis.TypeCheckResult +import org.apache.spark.sql.catalyst.util.ArrayBasedMapData +import org.apache.spark.sql.types.{MapType, NullType, StringType} +import org.apache.spark.unsafe.types.UTF8String +import org.apache.spark.util.Utils + +/** + * Converts a binary column of Avro format into its corresponding Catalyst value. + * This is a thin wrapper over the [[AvroDataToCatalyst]] class to create a SQL function. + * The specified schema must match actual schema of the read data, otherwise the behavior + * is undefined: it may fail or return arbitrary result. + * To deserialize the data with a compatible and evolved schema, the expected Avro schema can be + * set via the option avroSchema. + * + * @param child the Catalyst binary input column. + * @param jsonFormatSchema the Avro schema in JSON string format. + * @param options the options to use when performing the conversion + * + * @since 4.0.0 + */ +case class FromAvroSqlFunction(child: Expression, jsonFormatSchema: Expression, options: Expression) + extends TernaryExpression with RuntimeReplaceable { + override def first: Expression = child + override def second: Expression = jsonFormatSchema + override def third: Expression = options + + override def withNewChildrenInternal( + newFirst: Expression, newSecond: Expression, newThird: Expression): Expression = { + copy(child = newFirst, jsonFormatSchema = newSecond, options = newThird) + } + + override def checkInputDataTypes(): TypeCheckResult = { + val schemaCheck = jsonFormatSchema.dataType match { + case _: StringType | + _: NullType + if jsonFormatSchema.foldable => + None + case _ => + Some(TypeCheckResult.TypeCheckFailure( + "The second argument of the FROM_AVRO SQL function must be a constant string " + + "containing the JSON representation of the schema to use for converting the value " + + "from AVRO format")) + } + val optionsCheck = options.dataType match { + case MapType(StringType, StringType, _) | + MapType(NullType, NullType, _) | + _: NullType + if options.foldable => + None + case _ => + Some(TypeCheckResult.TypeCheckFailure( + "The third argument of the FROM_AVRO SQL function must be a constant map of strings to " + + "strings containing the options to use for converting the value from AVRO format")) + } + schemaCheck.getOrElse( + optionsCheck.getOrElse( + TypeCheckResult.TypeCheckSuccess)) + } + + override def replacement: Expression = { + val schemaValue: String = jsonFormatSchema.eval() match { + case s: UTF8String => + s.toString + case null => + "" + } + val optionsValue: Map[String, String] = options.eval() match { + case a: ArrayBasedMapData if a.keyArray.array.nonEmpty => + val keys: Array[String] = a.keyArray.array.map(_.toString) + val values: Array[String] = a.valueArray.array.map(_.toString) + keys.zip(values).toMap + case _ => + Map.empty + } + val constructor = + Utils.classForName("org.apache.spark.sql.avro.AvroDataToCatalyst").getConstructors().head + val expr = constructor.newInstance(child, schemaValue, optionsValue) + expr.asInstanceOf[Expression] + } +} + +/** + * Converts a Catalyst binary input value into its corresponding AvroAvro format result. + * This is a thin wrapper over the [[CatalystDataToAvro]] class to create a SQL function. + * + * @param child the Catalyst binary input column. + * @param jsonFormatSchema the Avro schema in JSON string format. + * + * @since 4.0.0 + */ +case class ToAvroSqlFunction(child: Expression, jsonFormatSchema: Expression) + extends BinaryExpression with RuntimeReplaceable { + override def left: Expression = child + + override def right: Expression = jsonFormatSchema + + override def withNewChildrenInternal(newLeft: Expression, newRight: Expression): Expression = { + copy(child = newLeft, jsonFormatSchema = newRight) + } + + override def checkInputDataTypes(): TypeCheckResult = { + jsonFormatSchema.dataType match { + case _: StringType if jsonFormatSchema.foldable => + TypeCheckResult.TypeCheckSuccess + case _ => + TypeCheckResult.TypeCheckFailure( + "The second argument of the TO_AVRO SQL function must be a constant string " + + "containing the JSON representation of the schema to use for converting the value " + + "to AVRO format") + } + } + + override def replacement: Expression = { + val schemaValue: Option[String] = jsonFormatSchema.eval() match { + case null => + None + case s: UTF8String => + Some(s.toString) + } + val constructor = + Utils.classForName("org.apache.spark.sql.avro.CatalystDataToAvro").getConstructors().head + val expr = constructor.newInstance(child, schemaValue) + expr.asInstanceOf[Expression] + } +} From 4241540414aff7a0eee1cd21cc37174dce52c1e0 Mon Sep 17 00:00:00 2001 From: Daniel Tenedorio Date: Thu, 13 Jun 2024 13:32:06 -0700 Subject: [PATCH 02/12] add descriptions --- .../expressions/toFromAvroSqlFunctions.scala | 27 +++++++++++++++---- 1 file changed, 22 insertions(+), 5 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/toFromAvroSqlFunctions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/toFromAvroSqlFunctions.scala index 5189dcfb33d33..334f0e4279f55 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/toFromAvroSqlFunctions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/toFromAvroSqlFunctions.scala @@ -26,17 +26,26 @@ import org.apache.spark.util.Utils /** * Converts a binary column of Avro format into its corresponding Catalyst value. * This is a thin wrapper over the [[AvroDataToCatalyst]] class to create a SQL function. - * The specified schema must match actual schema of the read data, otherwise the behavior - * is undefined: it may fail or return arbitrary result. - * To deserialize the data with a compatible and evolved schema, the expected Avro schema can be - * set via the option avroSchema. * * @param child the Catalyst binary input column. * @param jsonFormatSchema the Avro schema in JSON string format. - * @param options the options to use when performing the conversion + * @param options the options to use when performing the conversion. * * @since 4.0.0 */ +@ExpressionDescription( + usage = """ + _FUNC_(child, jsonFormatSchema, options) - Converts a binary Avro value into a Catalyst value. + """, + note = """ + The specified schema must match actual schema of the read data, otherwise the behavior + is undefined: it may fail or return arbitrary result. + To deserialize the data with a compatible and evolved schema, the expected Avro schema can be + set via the corresponding option. + """, + group = "misc_funcs", + since = "4.0.0" +) case class FromAvroSqlFunction(child: Expression, jsonFormatSchema: Expression, options: Expression) extends TernaryExpression with RuntimeReplaceable { override def first: Expression = child @@ -107,6 +116,14 @@ case class FromAvroSqlFunction(child: Expression, jsonFormatSchema: Expression, * * @since 4.0.0 */ +@ExpressionDescription( + usage = """ + _FUNC_(child, jsonFormatSchema) - Converts a Catalyst binary input value into its corresponding + Avro format result. + """, + group = "misc_funcs", + since = "4.0.0" +) case class ToAvroSqlFunction(child: Expression, jsonFormatSchema: Expression) extends BinaryExpression with RuntimeReplaceable { override def left: Expression = child From 9a36eacd037b864ff5f336c1b5985a968e51d215 Mon Sep 17 00:00:00 2001 From: Daniel Tenedorio Date: Thu, 13 Jun 2024 13:38:11 -0700 Subject: [PATCH 03/12] update expression schema suite --- .../src/test/resources/sql-functions/sql-expression-schema.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sql/core/src/test/resources/sql-functions/sql-expression-schema.md b/sql/core/src/test/resources/sql-functions/sql-expression-schema.md index 65b513264598b..acdab20f14349 100644 --- a/sql/core/src/test/resources/sql-functions/sql-expression-schema.md +++ b/sql/core/src/test/resources/sql-functions/sql-expression-schema.md @@ -147,6 +147,7 @@ | org.apache.spark.sql.catalyst.expressions.FormatNumber | format_number | SELECT format_number(12332.123456, 4) | struct | | org.apache.spark.sql.catalyst.expressions.FormatString | format_string | SELECT format_string("Hello World %d %s", 100, "days") | struct | | org.apache.spark.sql.catalyst.expressions.FormatString | printf | SELECT printf("Hello World %d %s", 100, "days") | struct | +| org.apache.spark.sql.catalyst.expressions.FromAvroSqlFunction | from_avro | N/A | N/A | | org.apache.spark.sql.catalyst.expressions.FromUTCTimestamp | from_utc_timestamp | SELECT from_utc_timestamp('2016-08-31', 'Asia/Seoul') | struct | | org.apache.spark.sql.catalyst.expressions.FromUnixTime | from_unixtime | SELECT from_unixtime(0, 'yyyy-MM-dd HH:mm:ss') | struct | | org.apache.spark.sql.catalyst.expressions.Get | get | SELECT get(array(1, 2, 3), 0) | struct | @@ -334,6 +335,7 @@ | org.apache.spark.sql.catalyst.expressions.Tan | tan | SELECT tan(0) | struct | | org.apache.spark.sql.catalyst.expressions.Tanh | tanh | SELECT tanh(0) | struct | | org.apache.spark.sql.catalyst.expressions.TimeWindow | window | SELECT a, window.start, window.end, count(*) as cnt FROM VALUES ('A1', '2021-01-01 00:00:00'), ('A1', '2021-01-01 00:04:30'), ('A1', '2021-01-01 00:06:00'), ('A2', '2021-01-01 00:01:00') AS tab(a, b) GROUP by a, window(b, '5 minutes') ORDER BY a, start | struct | +| org.apache.spark.sql.catalyst.expressions.ToAvroSqlFunction | to_avro | N/A | N/A | | org.apache.spark.sql.catalyst.expressions.ToBinary | to_binary | SELECT to_binary('abc', 'utf-8') | struct | | org.apache.spark.sql.catalyst.expressions.ToCharacterBuilder | to_char | SELECT to_char(454, '999') | struct | | org.apache.spark.sql.catalyst.expressions.ToCharacterBuilder | to_varchar | SELECT to_varchar(454, '999') | struct | From ac697415f94acc84a2880bc2cda69a1feab0f2ff Mon Sep 17 00:00:00 2001 From: Daniel Tenedorio Date: Fri, 14 Jun 2024 10:54:19 -0700 Subject: [PATCH 04/12] add examples --- .../sql/catalyst/expressions/toFromAvroSqlFunctions.scala | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/toFromAvroSqlFunctions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/toFromAvroSqlFunctions.scala index 334f0e4279f55..6816bb32cb3d5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/toFromAvroSqlFunctions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/toFromAvroSqlFunctions.scala @@ -37,6 +37,10 @@ import org.apache.spark.util.Utils usage = """ _FUNC_(child, jsonFormatSchema, options) - Converts a binary Avro value into a Catalyst value. """, + examples = """ + > SELECT IS_NULL(_FUNC_(result, '{"type": "record", "name": "struct", "fields": [{ "name": "u", "type": ["int","string"] }]}')) AS result FROM (SELECT NAMED_STRUCT('u', NAMED_STRUCT('member0', member0, 'member1', member1)) AS s FROM VALUES (1, NULL), (NULL, 'a') tab(member0, member1)); + [false] + """, note = """ The specified schema must match actual schema of the read data, otherwise the behavior is undefined: it may fail or return arbitrary result. @@ -121,6 +125,10 @@ case class FromAvroSqlFunction(child: Expression, jsonFormatSchema: Expression, _FUNC_(child, jsonFormatSchema) - Converts a Catalyst binary input value into its corresponding Avro format result. """, + examples = """ + > SELECT IS_NULL(_FUNC_(result, '{"type": "record", "name": "struct", "fields": [{ "name": "u", "type": ["int","string"] }]}', MAP() ).u FROM (SELECT FROM_AVRO(result, '{"type": "record", "name": "struct", "fields": [{ "name": "u", "type": ["int","string"] }]}' ) AS result FROM (SELECT NAMED_STRUCT('u', NAMED_STRUCT('member0', member0, 'member1', member1)) AS s FROM VALUES (1, NULL), (NULL, 'a') tab(member0, member1))); + [false] + """, group = "misc_funcs", since = "4.0.0" ) From 1e0980147b5f882a2caaeeeaea2145b1970e7694 Mon Sep 17 00:00:00 2001 From: Daniel Tenedorio Date: Fri, 14 Jun 2024 17:10:17 -0700 Subject: [PATCH 05/12] respond to code review comments --- .../catalyst/analysis/FunctionRegistry.scala | 4 ++-- .../expressions/toFromAvroSqlFunctions.scala | 22 ++++++++++++------- 2 files changed, 16 insertions(+), 10 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala index 81247cd40ae4f..4304e284e9910 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala @@ -857,8 +857,8 @@ object FunctionRegistry { expression[StructsToXml]("to_xml"), // Avro - expression[FromAvroSqlFunction]("from_avro"), - expression[ToAvroSqlFunction]("to_avro") + expression[FromAvro]("from_avro"), + expression[ToAvro]("to_avro") ) val builtin: SimpleFunctionRegistry = { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/toFromAvroSqlFunctions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/toFromAvroSqlFunctions.scala index 6816bb32cb3d5..343f08af42e7f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/toFromAvroSqlFunctions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/toFromAvroSqlFunctions.scala @@ -33,13 +33,15 @@ import org.apache.spark.util.Utils * * @since 4.0.0 */ +// scalastyle:off line.size.limit @ExpressionDescription( usage = """ _FUNC_(child, jsonFormatSchema, options) - Converts a binary Avro value into a Catalyst value. """, examples = """ - > SELECT IS_NULL(_FUNC_(result, '{"type": "record", "name": "struct", "fields": [{ "name": "u", "type": ["int","string"] }]}')) AS result FROM (SELECT NAMED_STRUCT('u', NAMED_STRUCT('member0', member0, 'member1', member1)) AS s FROM VALUES (1, NULL), (NULL, 'a') tab(member0, member1)); - [false] + Examples: + > SELECT IS_NULL(_FUNC_(result, '{"type": "record", "name": "struct", "fields": [{ "name": "u", "type": ["int","string"] }]}')) AS result FROM (SELECT NAMED_STRUCT('u', NAMED_STRUCT('member0', member0, 'member1', member1)) AS s FROM VALUES (1, NULL), (NULL, 'a') tab(member0, member1)); + [false] """, note = """ The specified schema must match actual schema of the read data, otherwise the behavior @@ -47,10 +49,11 @@ import org.apache.spark.util.Utils To deserialize the data with a compatible and evolved schema, the expected Avro schema can be set via the corresponding option. """, - group = "misc_funcs", + group = "avro_funcs", since = "4.0.0" ) -case class FromAvroSqlFunction(child: Expression, jsonFormatSchema: Expression, options: Expression) +// scalastyle:on line.size.limit +case class FromAvro(child: Expression, jsonFormatSchema: Expression, options: Expression) extends TernaryExpression with RuntimeReplaceable { override def first: Expression = child override def second: Expression = jsonFormatSchema @@ -120,19 +123,22 @@ case class FromAvroSqlFunction(child: Expression, jsonFormatSchema: Expression, * * @since 4.0.0 */ +// scalastyle:off line.size.limit @ExpressionDescription( usage = """ _FUNC_(child, jsonFormatSchema) - Converts a Catalyst binary input value into its corresponding Avro format result. """, examples = """ - > SELECT IS_NULL(_FUNC_(result, '{"type": "record", "name": "struct", "fields": [{ "name": "u", "type": ["int","string"] }]}', MAP() ).u FROM (SELECT FROM_AVRO(result, '{"type": "record", "name": "struct", "fields": [{ "name": "u", "type": ["int","string"] }]}' ) AS result FROM (SELECT NAMED_STRUCT('u', NAMED_STRUCT('member0', member0, 'member1', member1)) AS s FROM VALUES (1, NULL), (NULL, 'a') tab(member0, member1))); - [false] + Examples: + > SELECT IS_NULL(_FUNC_(result, '{"type": "record", "name": "struct", "fields": [{ "name": "u", "type": ["int","string"] }]}', MAP() ).u FROM (SELECT FROM_AVRO(result, '{"type": "record", "name": "struct", "fields": [{ "name": "u", "type": ["int","string"] }]}' ) AS result FROM (SELECT NAMED_STRUCT('u', NAMED_STRUCT('member0', member0, 'member1', member1)) AS s FROM VALUES (1, NULL), (NULL, 'a') tab(member0, member1))); + [false] """, - group = "misc_funcs", + group = "avro_funcs", since = "4.0.0" ) -case class ToAvroSqlFunction(child: Expression, jsonFormatSchema: Expression) +// scalastyle:on line.size.limit +case class ToAvro(child: Expression, jsonFormatSchema: Expression) extends BinaryExpression with RuntimeReplaceable { override def left: Expression = child From ddbb57fdf42c4fa261eac76e2dabfb07ff9cf01a Mon Sep 17 00:00:00 2001 From: Daniel Tenedorio Date: Tue, 18 Jun 2024 16:57:11 -0700 Subject: [PATCH 06/12] fix test --- .../sql/catalyst/expressions/toFromAvroSqlFunctions.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/toFromAvroSqlFunctions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/toFromAvroSqlFunctions.scala index 343f08af42e7f..963afce07845c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/toFromAvroSqlFunctions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/toFromAvroSqlFunctions.scala @@ -49,7 +49,7 @@ import org.apache.spark.util.Utils To deserialize the data with a compatible and evolved schema, the expected Avro schema can be set via the corresponding option. """, - group = "avro_funcs", + group = "misc_funcs", since = "4.0.0" ) // scalastyle:on line.size.limit @@ -134,7 +134,7 @@ case class FromAvro(child: Expression, jsonFormatSchema: Expression, options: Ex > SELECT IS_NULL(_FUNC_(result, '{"type": "record", "name": "struct", "fields": [{ "name": "u", "type": ["int","string"] }]}', MAP() ).u FROM (SELECT FROM_AVRO(result, '{"type": "record", "name": "struct", "fields": [{ "name": "u", "type": ["int","string"] }]}' ) AS result FROM (SELECT NAMED_STRUCT('u', NAMED_STRUCT('member0', member0, 'member1', member1)) AS s FROM VALUES (1, NULL), (NULL, 'a') tab(member0, member1))); [false] """, - group = "avro_funcs", + group = "misc_funcs", since = "4.0.0" ) // scalastyle:on line.size.limit From 16161e41a0fa67673b2edab49b54001490ef4936 Mon Sep 17 00:00:00 2001 From: Daniel Tenedorio Date: Thu, 20 Jun 2024 11:13:51 -0700 Subject: [PATCH 07/12] fix --- .../org/apache/spark/sql/avro/AvroFunctionsSuite.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroFunctionsSuite.scala b/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroFunctionsSuite.scala index 727129ded377b..c807685db0f0c 100644 --- a/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroFunctionsSuite.scala +++ b/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroFunctionsSuite.scala @@ -324,7 +324,7 @@ class AvroFunctionsSuite extends QueryTest with SharedSparkSession { |select to_avro(s, 42) as result from t |""".stripMargin)), errorClass = "DATATYPE_MISMATCH.TYPE_CHECK_FAILURE_WITH_HINT", - parameters = Map("sqlExpr" -> "\"toavrosqlfunction(s, 42)\"", + parameters = Map("sqlExpr" -> "\"toavro(s, 42)\"", "msg" -> ("The second argument of the TO_AVRO SQL function must be a constant string " + "containing the JSON representation of the schema to use for converting the value to " + "AVRO format"), @@ -339,7 +339,7 @@ class AvroFunctionsSuite extends QueryTest with SharedSparkSession { |select from_avro(s, 42, '') as result from t |""".stripMargin)), errorClass = "DATATYPE_MISMATCH.TYPE_CHECK_FAILURE_WITH_HINT", - parameters = Map("sqlExpr" -> "\"fromavrosqlfunction(s, 42, )\"", + parameters = Map("sqlExpr" -> "\"fromavro(s, 42, )\"", "msg" -> ("The second argument of the FROM_AVRO SQL function must be a constant string " + "containing the JSON representation of the schema to use for converting the value " + "from AVRO format"), @@ -356,7 +356,7 @@ class AvroFunctionsSuite extends QueryTest with SharedSparkSession { errorClass = "DATATYPE_MISMATCH.TYPE_CHECK_FAILURE_WITH_HINT", parameters = Map( "sqlExpr" -> - s"\"fromavrosqlfunction(s, $jsonFormatSchema, 42)\"".stripMargin, + s"\"fromavro(s, $jsonFormatSchema, 42)\"".stripMargin, "msg" -> ("The third argument of the FROM_AVRO SQL function must be a constant map of " + "strings to strings containing the options to use for converting the value " + "from AVRO format"), From f5e5f64ea89e642c6805a21278f7b30ff3bf59d8 Mon Sep 17 00:00:00 2001 From: Daniel Tenedorio Date: Thu, 20 Jun 2024 12:23:11 -0700 Subject: [PATCH 08/12] fix test --- .../sql/catalyst/expressions/toFromAvroSqlFunctions.scala | 4 ++-- .../src/test/resources/sql-functions/sql-expression-schema.md | 2 -- .../scala/org/apache/spark/sql/ExpressionsSchemaSuite.scala | 4 ++++ .../apache/spark/sql/expressions/ExpressionInfoSuite.scala | 3 +++ 4 files changed, 9 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/toFromAvroSqlFunctions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/toFromAvroSqlFunctions.scala index 963afce07845c..7e59f84fffb16 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/toFromAvroSqlFunctions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/toFromAvroSqlFunctions.scala @@ -40,7 +40,7 @@ import org.apache.spark.util.Utils """, examples = """ Examples: - > SELECT IS_NULL(_FUNC_(result, '{"type": "record", "name": "struct", "fields": [{ "name": "u", "type": ["int","string"] }]}')) AS result FROM (SELECT NAMED_STRUCT('u', NAMED_STRUCT('member0', member0, 'member1', member1)) AS s FROM VALUES (1, NULL), (NULL, 'a') tab(member0, member1)); + > SELECT _FUNC_(s, '{"type": "record", "name": "struct", "fields": [{ "name": "u", "type": ["int","string"] }]}', map()) IS NULL AS result FROM (SELECT NAMED_STRUCT('u', NAMED_STRUCT('member0', member0, 'member1', member1)) AS s FROM VALUES (1, NULL), (NULL, 'a') tab(member0, member1)); [false] """, note = """ @@ -131,7 +131,7 @@ case class FromAvro(child: Expression, jsonFormatSchema: Expression, options: Ex """, examples = """ Examples: - > SELECT IS_NULL(_FUNC_(result, '{"type": "record", "name": "struct", "fields": [{ "name": "u", "type": ["int","string"] }]}', MAP() ).u FROM (SELECT FROM_AVRO(result, '{"type": "record", "name": "struct", "fields": [{ "name": "u", "type": ["int","string"] }]}' ) AS result FROM (SELECT NAMED_STRUCT('u', NAMED_STRUCT('member0', member0, 'member1', member1)) AS s FROM VALUES (1, NULL), (NULL, 'a') tab(member0, member1))); + > SELECT _FUNC_(s, '{"type": "record", "name": "struct", "fields": [{ "name": "u", "type": ["int","string"] }]}', MAP()).u IS NULL FROM (SELECT FROM_AVRO(result, '{"type": "record", "name": "struct", "fields": [{ "name": "u", "type": ["int","string"] }]}' ) AS s FROM (SELECT NAMED_STRUCT('u', NAMED_STRUCT('member0', member0, 'member1', member1)) AS s FROM VALUES (1, NULL), (NULL, 'a') tab(member0, member1)); [false] """, group = "misc_funcs", diff --git a/sql/core/src/test/resources/sql-functions/sql-expression-schema.md b/sql/core/src/test/resources/sql-functions/sql-expression-schema.md index b314edc56d095..27e56b24625bb 100644 --- a/sql/core/src/test/resources/sql-functions/sql-expression-schema.md +++ b/sql/core/src/test/resources/sql-functions/sql-expression-schema.md @@ -147,7 +147,6 @@ | org.apache.spark.sql.catalyst.expressions.FormatNumber | format_number | SELECT format_number(12332.123456, 4) | struct | | org.apache.spark.sql.catalyst.expressions.FormatString | format_string | SELECT format_string("Hello World %d %s", 100, "days") | struct | | org.apache.spark.sql.catalyst.expressions.FormatString | printf | SELECT printf("Hello World %d %s", 100, "days") | struct | -| org.apache.spark.sql.catalyst.expressions.FromAvroSqlFunction | from_avro | N/A | N/A | | org.apache.spark.sql.catalyst.expressions.FromUTCTimestamp | from_utc_timestamp | SELECT from_utc_timestamp('2016-08-31', 'Asia/Seoul') | struct | | org.apache.spark.sql.catalyst.expressions.FromUnixTime | from_unixtime | SELECT from_unixtime(0, 'yyyy-MM-dd HH:mm:ss') | struct | | org.apache.spark.sql.catalyst.expressions.Get | get | SELECT get(array(1, 2, 3), 0) | struct | @@ -335,7 +334,6 @@ | org.apache.spark.sql.catalyst.expressions.Tan | tan | SELECT tan(0) | struct | | org.apache.spark.sql.catalyst.expressions.Tanh | tanh | SELECT tanh(0) | struct | | org.apache.spark.sql.catalyst.expressions.TimeWindow | window | SELECT a, window.start, window.end, count(*) as cnt FROM VALUES ('A1', '2021-01-01 00:00:00'), ('A1', '2021-01-01 00:04:30'), ('A1', '2021-01-01 00:06:00'), ('A2', '2021-01-01 00:01:00') AS tab(a, b) GROUP by a, window(b, '5 minutes') ORDER BY a, start | struct | -| org.apache.spark.sql.catalyst.expressions.ToAvroSqlFunction | to_avro | N/A | N/A | | org.apache.spark.sql.catalyst.expressions.ToBinary | to_binary | SELECT to_binary('abc', 'utf-8') | struct | | org.apache.spark.sql.catalyst.expressions.ToCharacterBuilder | to_char | SELECT to_char(454, '999') | struct | | org.apache.spark.sql.catalyst.expressions.ToCharacterBuilder | to_varchar | SELECT to_varchar(454, '999') | struct | diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ExpressionsSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ExpressionsSchemaSuite.scala index 73b2eba7060d0..443597f10056b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/ExpressionsSchemaSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/ExpressionsSchemaSuite.scala @@ -117,6 +117,10 @@ class ExpressionsSchemaSuite extends QueryTest with SharedSparkSession { // Note: We need to filter out the commands that set the parameters, such as: // SET spark.sql.parser.escapedStringLiterals=true example.split(" > ").tail.filterNot(_.trim.startsWith("SET")).take(1).foreach { + case _ if funcName == "from_avro" || funcName == "to_avro" => + // Skip running the example queries for the from_avro and to_avro functions because + // these functions dynamically load the AvroDataToCatalyst or CatalystDataToAvro classes + // which are not available in this test. case exampleRe(sql, _) => val df = spark.sql(sql) val escapedSql = sql.replaceAll("\\|", "|") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/expressions/ExpressionInfoSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/expressions/ExpressionInfoSuite.scala index e80f4af1dc462..bf5d1b24af219 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/expressions/ExpressionInfoSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/expressions/ExpressionInfoSuite.scala @@ -225,6 +225,9 @@ class ExpressionInfoSuite extends SparkFunSuite with SharedSparkSession { // Throws an error "org.apache.spark.sql.catalyst.expressions.RaiseErrorExpressionBuilder", "org.apache.spark.sql.catalyst.expressions.AssertTrue", + // Requires dynamic class loading not available in this test suite. + "org.apache.spark.sql.catalyst.expressions.FromAvro", + "org.apache.spark.sql.catalyst.expressions.ToAvro", classOf[CurrentUser].getName, // The encrypt expression includes a random initialization vector to its encrypted result classOf[AesEncrypt].getName) From d1f1844cef80ca789f013a91e0945f2acb97e6ca Mon Sep 17 00:00:00 2001 From: Daniel Tenedorio Date: Thu, 20 Jun 2024 12:27:05 -0700 Subject: [PATCH 09/12] fix test --- .../sql/catalyst/expressions/toFromAvroSqlFunctions.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/toFromAvroSqlFunctions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/toFromAvroSqlFunctions.scala index 7e59f84fffb16..507511a360071 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/toFromAvroSqlFunctions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/toFromAvroSqlFunctions.scala @@ -131,8 +131,8 @@ case class FromAvro(child: Expression, jsonFormatSchema: Expression, options: Ex """, examples = """ Examples: - > SELECT _FUNC_(s, '{"type": "record", "name": "struct", "fields": [{ "name": "u", "type": ["int","string"] }]}', MAP()).u IS NULL FROM (SELECT FROM_AVRO(result, '{"type": "record", "name": "struct", "fields": [{ "name": "u", "type": ["int","string"] }]}' ) AS s FROM (SELECT NAMED_STRUCT('u', NAMED_STRUCT('member0', member0, 'member1', member1)) AS s FROM VALUES (1, NULL), (NULL, 'a') tab(member0, member1)); - [false] + > SELECT _FUNC_(s, '{"type": "record", "name": "struct", "fields": [{ "name": "u", "type": ["int","string"] }]}', MAP()) IS NULL FROM (SELECT NULL AS s); + [true] """, group = "misc_funcs", since = "4.0.0" From 50f728f9837d1708c51a7f24f3ead3f64cc08537 Mon Sep 17 00:00:00 2001 From: Daniel Tenedorio Date: Thu, 20 Jun 2024 14:36:06 -0700 Subject: [PATCH 10/12] fix test --- sql/gen-sql-functions-docs.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sql/gen-sql-functions-docs.py b/sql/gen-sql-functions-docs.py index 053e11d10295b..8919fde4e8365 100644 --- a/sql/gen-sql-functions-docs.py +++ b/sql/gen-sql-functions-docs.py @@ -163,7 +163,8 @@ def _make_pretty_examples(jspark, infos): pretty_output = "" for info in infos: - if info.examples.startswith("\n Examples:"): + if (info.examples.startswith("\n Examples:") + and info.name.lower() not in ("from_json", "to_json")): output = [] output.append("-- %s" % info.name) query_examples = filter(lambda x: x.startswith(" > "), info.examples.split("\n")) From fa7ca52ad3ea82b2bcc706c908460794b01715f5 Mon Sep 17 00:00:00 2001 From: Daniel Tenedorio Date: Thu, 20 Jun 2024 16:36:51 -0700 Subject: [PATCH 11/12] fix compile --- sql/gen-sql-functions-docs.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/gen-sql-functions-docs.py b/sql/gen-sql-functions-docs.py index 8919fde4e8365..6f0e3f21e5074 100644 --- a/sql/gen-sql-functions-docs.py +++ b/sql/gen-sql-functions-docs.py @@ -170,7 +170,7 @@ def _make_pretty_examples(jspark, infos): query_examples = filter(lambda x: x.startswith(" > "), info.examples.split("\n")) for query_example in query_examples: query = query_example.lstrip(" > ") - print(" %s" % query) + print(" %s: %s" % (info.name, query)) query_output = jspark.sql(query).showString(20, 20, False) output.append(query) output.append(query_output) From aa26df3dad9b12ad2190d387b8ac822634c4c715 Mon Sep 17 00:00:00 2001 From: Daniel Tenedorio Date: Thu, 20 Jun 2024 17:14:26 -0700 Subject: [PATCH 12/12] fix compile --- sql/gen-sql-functions-docs.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/gen-sql-functions-docs.py b/sql/gen-sql-functions-docs.py index 6f0e3f21e5074..dc48a5a6155ed 100644 --- a/sql/gen-sql-functions-docs.py +++ b/sql/gen-sql-functions-docs.py @@ -164,13 +164,13 @@ def _make_pretty_examples(jspark, infos): pretty_output = "" for info in infos: if (info.examples.startswith("\n Examples:") - and info.name.lower() not in ("from_json", "to_json")): + and info.name.lower() not in ("from_avro", "to_avro")): output = [] output.append("-- %s" % info.name) query_examples = filter(lambda x: x.startswith(" > "), info.examples.split("\n")) for query_example in query_examples: query = query_example.lstrip(" > ") - print(" %s: %s" % (info.name, query)) + print(" %s" % query) query_output = jspark.sql(query).showString(20, 20, False) output.append(query) output.append(query_output)