diff --git a/python/pyspark/errors/error-conditions.json b/python/pyspark/errors/error-conditions.json index bbc4d005b490a..94ede56da5e1f 100644 --- a/python/pyspark/errors/error-conditions.json +++ b/python/pyspark/errors/error-conditions.json @@ -1253,6 +1253,11 @@ "Return type of the user-defined function should be , but is ." ] }, + "UDTF_ARROW_DATA_CONVERSION_ERROR": { + "message": [ + "Cannot convert UDTF output to Arrow. Data: . Schema: . Arrow Schema: ." + ] + }, "UDTF_ARROW_TYPE_CAST_ERROR": { "message": [ "Cannot convert the output value of the column '' with type '' to the specified return type of the column: ''. Please check if the data types match and try again." diff --git a/python/pyspark/sql/tests/arrow/test_arrow_udtf.py b/python/pyspark/sql/tests/arrow/test_arrow_udtf.py index 0e366d23632b2..920dd6a3ab6b5 100644 --- a/python/pyspark/sql/tests/arrow/test_arrow_udtf.py +++ b/python/pyspark/sql/tests/arrow/test_arrow_udtf.py @@ -34,6 +34,23 @@ @unittest.skipIf(not have_pyarrow, pyarrow_requirement_message) class ArrowUDTFTestsMixin: + def test_arrow_udtf_data_conversion_error(self): + from pyspark.sql.functions import udtf + + @udtf(returnType="x int, y int") + class DataConversionErrorUDTF: + def eval(self): + # Return a non-tuple value when multiple return values are expected. + # This will cause LocalDataToArrowConversion.convert to fail with TypeError (len() on int), + # which should be wrapped in UDTF_ARROW_DATA_CONVERSION_ERROR. + yield 1 + + # Enable Arrow optimization for regular UDTFs + with self.sql_conf({"spark.sql.execution.pythonUDTF.arrow.enabled": "true"}): + with self.assertRaisesRegex(PythonException, "UDTF_ARROW_DATA_CONVERSION_ERROR"): + result_df = DataConversionErrorUDTF() + result_df.collect() + def test_arrow_udtf_zero_args(self): @arrow_udtf(returnType="id int, value string") class TestUDTF: diff --git a/python/pyspark/sql/tests/test_udtf.py b/python/pyspark/sql/tests/test_udtf.py index de201b842c787..a18911815a9df 100644 --- a/python/pyspark/sql/tests/test_udtf.py +++ b/python/pyspark/sql/tests/test_udtf.py @@ -3745,7 +3745,7 @@ class TestUDTF: def eval(self): yield 1, - err = "UDTF_ARROW_TYPE_CONVERSION_ERROR" + err = "UDTF_ARROW_DATA_CONVERSION_ERROR" for ret_type, expected in [ ("x: boolean", err), @@ -3772,7 +3772,7 @@ class TestUDTF: def eval(self): yield "1", - err = "UDTF_ARROW_TYPE_CONVERSION_ERROR" + err = "UDTF_ARROW_DATA_CONVERSION_ERROR" for ret_type, expected in [ ("x: boolean", err), @@ -3801,7 +3801,7 @@ class TestUDTF: def eval(self): yield "hello", - err = "UDTF_ARROW_TYPE_CONVERSION_ERROR" + err = "UDTF_ARROW_DATA_CONVERSION_ERROR" for ret_type, expected in [ ("x: boolean", err), ("x: tinyint", err), @@ -3829,7 +3829,7 @@ class TestUDTF: def eval(self): yield [0, 1.1, 2], - err = "UDTF_ARROW_TYPE_CONVERSION_ERROR" + err = "UDTF_ARROW_DATA_CONVERSION_ERROR" for ret_type, expected in [ ("x: boolean", err), ("x: tinyint", err), @@ -3861,7 +3861,7 @@ class TestUDTF: def eval(self): yield {"a": 0, "b": 1.1, "c": 2}, - err = "UDTF_ARROW_TYPE_CONVERSION_ERROR" + err = "UDTF_ARROW_DATA_CONVERSION_ERROR" for ret_type, expected in [ ("x: boolean", err), ("x: tinyint", err), @@ -3892,7 +3892,7 @@ class TestUDTF: def eval(self): yield {"a": 0, "b": 1.1, "c": 2}, - err = "UDTF_ARROW_TYPE_CONVERSION_ERROR" + err = "UDTF_ARROW_DATA_CONVERSION_ERROR" for ret_type, expected in [ ("x: boolean", err), ("x: tinyint", err), @@ -3922,7 +3922,7 @@ class TestUDTF: def eval(self): yield Row(a=0, b=1.1, c=2), - err = "UDTF_ARROW_TYPE_CONVERSION_ERROR" + err = "UDTF_ARROW_DATA_CONVERSION_ERROR" for ret_type, expected in [ ("x: boolean", err), ("x: tinyint", err), @@ -3958,7 +3958,7 @@ def eval(self): "x: array", ]: with self.subTest(ret_type=ret_type): - with self.assertRaisesRegex(PythonException, "UDTF_ARROW_TYPE_CONVERSION_ERROR"): + with self.assertRaisesRegex(PythonException, "UDTF_ARROW_DATA_CONVERSION_ERROR"): udtf(TestUDTF, returnType=ret_type)().collect() def test_decimal_round(self): diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py index a590b3ed47c4e..3717ba6d4d6af 100644 --- a/python/pyspark/worker.py +++ b/python/pyspark/worker.py @@ -2343,7 +2343,7 @@ def convert_to_arrow(data: Iterable): def raise_conversion_error(original_exception): raise PySparkRuntimeError( - errorClass="UDTF_ARROW_TYPE_CONVERSION_ERROR", + errorClass="UDTF_ARROW_DATA_CONVERSION_ERROR", messageParameters={ "data": str(data), "schema": return_type.simpleString(),