From 7f87d2537488ca03c926f4d9c6318451c688ebe5 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Tue, 27 Feb 2018 00:02:55 +0900 Subject: [PATCH 1/8] Adds a conf for Arrow fallback in toPandas/createDataFrame with Pandas DataFrame --- docs/sql-programming-guide.md | 1 + python/pyspark/sql/dataframe.py | 118 +++++++++++------- python/pyspark/sql/session.py | 24 +++- python/pyspark/sql/tests.py | 78 +++++++++--- .../apache/spark/sql/internal/SQLConf.scala | 10 ++ 5 files changed, 173 insertions(+), 58 deletions(-) diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index c37c338a134f3..e7d415a2bb40a 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -1800,6 +1800,7 @@ working with timestamps in `pandas_udf`s to get the best performance, see ## Upgrading From Spark SQL 2.3 to 2.4 - Since Spark 2.4, Spark maximizes the usage of a vectorized ORC reader for ORC files by default. To do that, `spark.sql.orc.impl` and `spark.sql.orc.filterPushdown` change their default values to `native` and `true` respectively. + - In PySpark, when Arrow optimization is enabled, previously `toPandas` just failed when Arrow optimization is unabled to be used whereas `createDataFrame` from Pandas DataFrame allowed the fallback to non-optimization. Now, both `toPandas` and `createDataFrame` from Pandas DataFrame allow the fallback by default, which can be switched by `spark.sql.execution.arrow.fallback.enabled`. ## Upgrading From Spark SQL 2.2 to 2.3 diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index f37777e13ee12..7c84ac18f247d 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -1986,55 +1986,89 @@ def toPandas(self): timezone = None if self.sql_ctx.getConf("spark.sql.execution.arrow.enabled", "false").lower() == "true": + should_fallback = False try: - from pyspark.sql.types import _check_dataframe_convert_date, \ - _check_dataframe_localize_timestamps, to_arrow_schema + from pyspark.sql.types import to_arrow_schema from pyspark.sql.utils import require_minimum_pyarrow_version + require_minimum_pyarrow_version() - import pyarrow to_arrow_schema(self.schema) - tables = self._collectAsArrow() - if tables: - table = pyarrow.concat_tables(tables) - pdf = table.to_pandas() - pdf = _check_dataframe_convert_date(pdf, self.schema) - return _check_dataframe_localize_timestamps(pdf, timezone) - else: - return pd.DataFrame.from_records([], columns=self.columns) except Exception as e: - msg = ( - "Note: toPandas attempted Arrow optimization because " - "'spark.sql.execution.arrow.enabled' is set to true. Please set it to false " - "to disable this.") - raise RuntimeError("%s\n%s" % (_exception_message(e), msg)) - else: - pdf = pd.DataFrame.from_records(self.collect(), columns=self.columns) - dtype = {} + if self.sql_ctx.getConf("spark.sql.execution.arrow.fallback.enabled", "false") \ + .lower() == "true": + msg = ( + "toPandas attempted Arrow optimization because " + "'spark.sql.execution.arrow.enabled' is set to true; however, " + "failed by the reason below:\n" + " %s\n" + "Attempts non-optimization as " + "'spark.sql.execution.arrow.fallback.enabled' is set to " + "true." % _exception_message(e)) + warnings.warn(msg) + should_fallback = True + else: + msg = ( + "toPandas attempted Arrow optimization because " + "'spark.sql.execution.arrow.enabled' is set to true; however, " + "failed by the reason below:\n" + " %s\n" + "For fallback to non-optimization automatically, please set true to " + "'spark.sql.execution.arrow.fallback.enabled'." % _exception_message(e)) + raise RuntimeError(msg) + + if not should_fallback: + try: + from pyspark.sql.types import _check_dataframe_convert_date, \ + _check_dataframe_localize_timestamps + import pyarrow + + tables = self._collectAsArrow() + if tables: + table = pyarrow.concat_tables(tables) + pdf = table.to_pandas() + pdf = _check_dataframe_convert_date(pdf, self.schema) + return _check_dataframe_localize_timestamps(pdf, timezone) + else: + return pd.DataFrame.from_records([], columns=self.columns) + except Exception as e: + # We might have to allow fallback here as well but multiple Spark jobs can + # be executed. So, simply fail in this case for now. + msg = ( + "toPandas attempted Arrow optimization because " + "'spark.sql.execution.arrow.enabled' is set to true; however, " + "failed unexpectedly:\n" + " %s" % _exception_message(e)) + raise RuntimeError(msg) + + # Below is toPandas without Arrow optimization. + pdf = pd.DataFrame.from_records(self.collect(), columns=self.columns) + + dtype = {} + for field in self.schema: + pandas_type = _to_corrected_pandas_type(field.dataType) + # SPARK-21766: if an integer field is nullable and has null values, it can be + # inferred by pandas as float column. Once we convert the column with NaN back + # to integer type e.g., np.int16, we will hit exception. So we use the inferred + # float type, not the corrected type from the schema in this case. + if pandas_type is not None and \ + not(isinstance(field.dataType, IntegralType) and field.nullable and + pdf[field.name].isnull().any()): + dtype[field.name] = pandas_type + + for f, t in dtype.items(): + pdf[f] = pdf[f].astype(t, copy=False) + + if timezone is None: + return pdf + else: + from pyspark.sql.types import _check_series_convert_timestamps_local_tz for field in self.schema: - pandas_type = _to_corrected_pandas_type(field.dataType) - # SPARK-21766: if an integer field is nullable and has null values, it can be - # inferred by pandas as float column. Once we convert the column with NaN back - # to integer type e.g., np.int16, we will hit exception. So we use the inferred - # float type, not the corrected type from the schema in this case. - if pandas_type is not None and \ - not(isinstance(field.dataType, IntegralType) and field.nullable and - pdf[field.name].isnull().any()): - dtype[field.name] = pandas_type - - for f, t in dtype.items(): - pdf[f] = pdf[f].astype(t, copy=False) - - if timezone is None: - return pdf - else: - from pyspark.sql.types import _check_series_convert_timestamps_local_tz - for field in self.schema: - # TODO: handle nested timestamps, such as ArrayType(TimestampType())? - if isinstance(field.dataType, TimestampType): - pdf[field.name] = \ - _check_series_convert_timestamps_local_tz(pdf[field.name], timezone) - return pdf + # TODO: handle nested timestamps, such as ArrayType(TimestampType())? + if isinstance(field.dataType, TimestampType): + pdf[field.name] = \ + _check_series_convert_timestamps_local_tz(pdf[field.name], timezone) + return pdf def _collectAsArrow(self): """ diff --git a/python/pyspark/sql/session.py b/python/pyspark/sql/session.py index b3af9b82953f3..d57aa031122c4 100644 --- a/python/pyspark/sql/session.py +++ b/python/pyspark/sql/session.py @@ -666,8 +666,28 @@ def createDataFrame(self, data, schema=None, samplingRatio=None, verifySchema=Tr try: return self._create_from_pandas_with_arrow(data, schema, timezone) except Exception as e: - warnings.warn("Arrow will not be used in createDataFrame: %s" % str(e)) - # Fallback to create DataFrame without arrow if raise some exception + from pyspark.util import _exception_message + + if self.conf.get("spark.sql.execution.arrow.fallback.enabled", "false") \ + .lower() == "true": + msg = ( + "createDataFrame attempted Arrow optimization because " + "'spark.sql.execution.arrow.enabled' is set to true; however, " + "failed by the reason below:\n" + " %s\n" + "Attempts non-optimization as " + "'spark.sql.execution.arrow.fallback.enabled' is set to " + "true." % _exception_message(e)) + warnings.warn(msg) + else: + msg = ( + "createDataFrame attempted Arrow optimization because " + "'spark.sql.execution.arrow.enabled' is set to true; however, " + "failed by the reason below:\n" + " %s\n" + "For fallback to non-optimization automatically, please set true to " + "'spark.sql.execution.arrow.fallback.enabled'." % _exception_message(e)) + raise RuntimeError(msg) data = self._convert_from_pandas(data, schema, timezone) if isinstance(schema, StructType): diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index 19653072ea316..0f52d50a50a23 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -32,7 +32,9 @@ import datetime import array import ctypes +import warnings import py4j +from contextlib import contextmanager try: import xmlrunner @@ -48,12 +50,13 @@ else: import unittest +from pyspark.util import _exception_message + _pandas_requirement_message = None try: from pyspark.sql.utils import require_minimum_pandas_version require_minimum_pandas_version() except ImportError as e: - from pyspark.util import _exception_message # If Pandas version requirement is not satisfied, skip related tests. _pandas_requirement_message = _exception_message(e) @@ -62,7 +65,6 @@ from pyspark.sql.utils import require_minimum_pyarrow_version require_minimum_pyarrow_version() except ImportError as e: - from pyspark.util import _exception_message # If Arrow version requirement is not satisfied, skip related tests. _pyarrow_requirement_message = _exception_message(e) @@ -3493,19 +3495,42 @@ def create_pandas_data_frame(self): data_dict["4_float_t"] = np.float32(data_dict["4_float_t"]) return pd.DataFrame(data=data_dict) - def test_unsupported_datatype(self): - schema = StructType([StructField("map", MapType(StringType(), IntegerType()), True)]) - df = self.spark.createDataFrame([(None,)], schema=schema) - with QuietTest(self.sc): - with self.assertRaisesRegexp(Exception, 'Unsupported type'): - df.toPandas() + @contextmanager + def arrow_fallback(self, enabled): + orig_value = self.spark.conf.get("spark.sql.execution.arrow.fallback.enabled", None) + self.spark.conf.set("spark.sql.execution.arrow.fallback.enabled", enabled) + try: + yield + finally: + if orig_value is None: + self.spark.conf.unset("spark.sql.execution.arrow.fallback.enabled") + else: + self.spark.conf.set("spark.sql.execution.arrow.fallback.enabled", orig_value) - df = self.spark.createDataFrame([(None,)], schema="a binary") - with QuietTest(self.sc): - with self.assertRaisesRegexp( - Exception, - 'Unsupported type.*\nNote: toPandas attempted Arrow optimization because'): - df.toPandas() + def test_toPandas_fallback_enabled(self): + import pandas as pd + + with self.arrow_fallback(True): + schema = StructType([StructField("map", MapType(StringType(), IntegerType()), True)]) + df = self.spark.createDataFrame([({u'a': 1},)], schema=schema) + with QuietTest(self.sc): + with warnings.catch_warnings(record=True) as warns: + pdf = df.toPandas() + # Catch and check the last UserWarning. + user_warns = [ + warn.message for warn in warns if isinstance(warn.message, UserWarning)] + self.assertTrue(len(user_warns) > 0) + self.assertTrue( + "Attempts non-optimization" in _exception_message(user_warns[-1])) + self.assertPandasEqual(pdf, pd.DataFrame({u'map': [{u'a': 1}]})) + + def test_toPandas_fallback_disabled(self): + with self.arrow_fallback(False): + schema = StructType([StructField("map", MapType(StringType(), IntegerType()), True)]) + df = self.spark.createDataFrame([(None,)], schema=schema) + with QuietTest(self.sc): + with self.assertRaisesRegexp(Exception, 'Unsupported type'): + df.toPandas() def test_null_conversion(self): df_null = self.spark.createDataFrame([tuple([None for _ in range(len(self.data[0]))])] + @@ -3705,6 +3730,31 @@ def test_createDataFrame_with_int_col_names(self): self.assertEqual(pdf_col_names, df.columns) self.assertEqual(pdf_col_names, df_arrow.columns) + def test_createDataFrame_fallback_enabled(self): + import pandas as pd + + with QuietTest(self.sc): + with self.arrow_fallback(True): + with warnings.catch_warnings(record=True) as warns: + df = self.spark.createDataFrame( + pd.DataFrame([[{u'a': 1}]]), "a: map") + # Catch and check the last UserWarning. + user_warns = [ + warn.message for warn in warns if isinstance(warn.message, UserWarning)] + self.assertTrue(len(user_warns) > 0) + self.assertTrue( + "Attempts non-optimization" in _exception_message(user_warns[-1])) + self.assertEqual(df.collect(), [Row(a={u'a': 1})]) + + def test_createDataFrame_fallback_disabled(self): + import pandas as pd + + with QuietTest(self.sc): + with self.arrow_fallback(False): + with self.assertRaisesRegexp(Exception, 'Unsupported type'): + self.spark.createDataFrame( + pd.DataFrame([[{u'a': 1}]]), "a: map") + # Regression test for SPARK-23314 def test_timestamp_dst(self): import pandas as pd diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index ce3f94618edeb..cde4c9cc62ad7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1068,6 +1068,14 @@ object SQLConf { .booleanConf .createWithDefault(false) + val ARROW_FALLBACK_ENABLE = + buildConf("spark.sql.execution.arrow.fallback.enabled") + .doc("When true, the optimization by 'spark.sql.execution.arrow.enabled' " + + "could be disabled when it is unable to be used, and fallback to " + + "non-optimization.") + .booleanConf + .createWithDefault(true) + val ARROW_EXECUTION_MAX_RECORDS_PER_BATCH = buildConf("spark.sql.execution.arrow.maxRecordsPerBatch") .doc("When using Apache Arrow, limit the maximum number of records that can be written " + @@ -1520,6 +1528,8 @@ class SQLConf extends Serializable with Logging { def arrowEnable: Boolean = getConf(ARROW_EXECUTION_ENABLE) + def arrowFallbackEnable: Boolean = getConf(ARROW_FALLBACK_ENABLE) + def arrowMaxRecordsPerBatch: Int = getConf(ARROW_EXECUTION_MAX_RECORDS_PER_BATCH) def pandasRespectSessionTimeZone: Boolean = getConf(PANDAS_RESPECT_SESSION_LOCAL_TIMEZONE) From 7641fd090eabb160282a045047c5469f64ad2158 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Tue, 27 Feb 2018 12:42:48 +0900 Subject: [PATCH 2/8] Address comments --- python/pyspark/sql/dataframe.py | 8 ++-- python/pyspark/sql/session.py | 8 ++-- python/pyspark/sql/tests.py | 43 +++++++++++-------- .../apache/spark/sql/internal/SQLConf.scala | 5 +-- 4 files changed, 33 insertions(+), 31 deletions(-) diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index 7c84ac18f247d..c0f27d7db74ea 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -1995,13 +1995,12 @@ def toPandas(self): to_arrow_schema(self.schema) except Exception as e: - if self.sql_ctx.getConf("spark.sql.execution.arrow.fallback.enabled", "false") \ + if self.sql_ctx.getConf("spark.sql.execution.arrow.fallback.enabled", "true") \ .lower() == "true": msg = ( "toPandas attempted Arrow optimization because " "'spark.sql.execution.arrow.enabled' is set to true; however, " - "failed by the reason below:\n" - " %s\n" + "failed by the reason below:\n %s\n" "Attempts non-optimization as " "'spark.sql.execution.arrow.fallback.enabled' is set to " "true." % _exception_message(e)) @@ -2011,8 +2010,7 @@ def toPandas(self): msg = ( "toPandas attempted Arrow optimization because " "'spark.sql.execution.arrow.enabled' is set to true; however, " - "failed by the reason below:\n" - " %s\n" + "failed by the reason below:\n %s\n" "For fallback to non-optimization automatically, please set true to " "'spark.sql.execution.arrow.fallback.enabled'." % _exception_message(e)) raise RuntimeError(msg) diff --git a/python/pyspark/sql/session.py b/python/pyspark/sql/session.py index d57aa031122c4..215bb3e5c5173 100644 --- a/python/pyspark/sql/session.py +++ b/python/pyspark/sql/session.py @@ -668,13 +668,12 @@ def createDataFrame(self, data, schema=None, samplingRatio=None, verifySchema=Tr except Exception as e: from pyspark.util import _exception_message - if self.conf.get("spark.sql.execution.arrow.fallback.enabled", "false") \ + if self.conf.get("spark.sql.execution.arrow.fallback.enabled", "true") \ .lower() == "true": msg = ( "createDataFrame attempted Arrow optimization because " "'spark.sql.execution.arrow.enabled' is set to true; however, " - "failed by the reason below:\n" - " %s\n" + "failed by the reason below:\n %s\n" "Attempts non-optimization as " "'spark.sql.execution.arrow.fallback.enabled' is set to " "true." % _exception_message(e)) @@ -683,8 +682,7 @@ def createDataFrame(self, data, schema=None, samplingRatio=None, verifySchema=Tr msg = ( "createDataFrame attempted Arrow optimization because " "'spark.sql.execution.arrow.enabled' is set to true; however, " - "failed by the reason below:\n" - " %s\n" + "failed by the reason below:\n %s\n" "For fallback to non-optimization automatically, please set true to " "'spark.sql.execution.arrow.fallback.enabled'." % _exception_message(e)) raise RuntimeError(msg) diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index 0f52d50a50a23..83961144a6c55 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -197,6 +197,23 @@ def tearDownClass(cls): ReusedPySparkTestCase.tearDownClass() cls.spark.stop() + @contextmanager + def sql_conf(self, key, value): + """ + A convenient context manager to test some configuration specific logic. This sets the + configurations then restores it back. + """ + + orig_value = self.spark.conf.get(key, None) + self.spark.conf.set(key, value) + try: + yield + finally: + if orig_value is None: + self.spark.conf.unset(key) + else: + self.spark.conf.set(key, orig_value) + def assertPandasEqual(self, expected, result): msg = ("DataFrames are not equal: " + "\n\nExpected:\n%s\n%s" % (expected, expected.dtypes) + @@ -3460,6 +3477,8 @@ def setUpClass(cls): cls.spark.conf.set("spark.sql.session.timeZone", tz) cls.spark.conf.set("spark.sql.execution.arrow.enabled", "true") + # Disable fallback by default to easily detect the failures. + cls.spark.conf.set("spark.sql.execution.arrow.fallback.enabled", "false") cls.schema = StructType([ StructField("1_str_t", StringType(), True), StructField("2_int_t", IntegerType(), True), @@ -3495,22 +3514,10 @@ def create_pandas_data_frame(self): data_dict["4_float_t"] = np.float32(data_dict["4_float_t"]) return pd.DataFrame(data=data_dict) - @contextmanager - def arrow_fallback(self, enabled): - orig_value = self.spark.conf.get("spark.sql.execution.arrow.fallback.enabled", None) - self.spark.conf.set("spark.sql.execution.arrow.fallback.enabled", enabled) - try: - yield - finally: - if orig_value is None: - self.spark.conf.unset("spark.sql.execution.arrow.fallback.enabled") - else: - self.spark.conf.set("spark.sql.execution.arrow.fallback.enabled", orig_value) - def test_toPandas_fallback_enabled(self): import pandas as pd - with self.arrow_fallback(True): + with self.sql_conf("spark.sql.execution.arrow.fallback.enabled", True): schema = StructType([StructField("map", MapType(StringType(), IntegerType()), True)]) df = self.spark.createDataFrame([({u'a': 1},)], schema=schema) with QuietTest(self.sc): @@ -3525,7 +3532,7 @@ def test_toPandas_fallback_enabled(self): self.assertPandasEqual(pdf, pd.DataFrame({u'map': [{u'a': 1}]})) def test_toPandas_fallback_disabled(self): - with self.arrow_fallback(False): + with self.sql_conf("spark.sql.execution.arrow.fallback.enabled", False): schema = StructType([StructField("map", MapType(StringType(), IntegerType()), True)]) df = self.spark.createDataFrame([(None,)], schema=schema) with QuietTest(self.sc): @@ -3650,7 +3657,7 @@ def test_createDataFrame_with_incorrect_schema(self): pdf = self.create_pandas_data_frame() wrong_schema = StructType(list(reversed(self.schema))) with QuietTest(self.sc): - with self.assertRaisesRegexp(TypeError, ".*field.*can.not.accept.*type"): + with self.assertRaisesRegexp(RuntimeError, ".*No cast.*string.*timestamp.*"): self.spark.createDataFrame(pdf, schema=wrong_schema) def test_createDataFrame_with_names(self): @@ -3675,7 +3682,7 @@ def test_createDataFrame_column_name_encoding(self): def test_createDataFrame_with_single_data_type(self): import pandas as pd with QuietTest(self.sc): - with self.assertRaisesRegexp(TypeError, ".*IntegerType.*tuple"): + with self.assertRaisesRegexp(RuntimeError, ".*IntegerType.*not supported.*"): self.spark.createDataFrame(pd.DataFrame({"a": [1]}), schema="int") def test_createDataFrame_does_not_modify_input(self): @@ -3734,7 +3741,7 @@ def test_createDataFrame_fallback_enabled(self): import pandas as pd with QuietTest(self.sc): - with self.arrow_fallback(True): + with self.sql_conf("spark.sql.execution.arrow.fallback.enabled", True): with warnings.catch_warnings(record=True) as warns: df = self.spark.createDataFrame( pd.DataFrame([[{u'a': 1}]]), "a: map") @@ -3750,7 +3757,7 @@ def test_createDataFrame_fallback_disabled(self): import pandas as pd with QuietTest(self.sc): - with self.arrow_fallback(False): + with self.sql_conf("spark.sql.execution.arrow.fallback.enabled", False): with self.assertRaisesRegexp(Exception, 'Unsupported type'): self.spark.createDataFrame( pd.DataFrame([[{u'a': 1}]]), "a: map") diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index cde4c9cc62ad7..4e002d078cb6c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1070,9 +1070,8 @@ object SQLConf { val ARROW_FALLBACK_ENABLE = buildConf("spark.sql.execution.arrow.fallback.enabled") - .doc("When true, the optimization by 'spark.sql.execution.arrow.enabled' " + - "could be disabled when it is unable to be used, and fallback to " + - "non-optimization.") + .doc("When true, optimizations enabled by 'spark.sql.execution.arrow.enabled' will " + + "fallback automatically to non-optimized implementations if an error occurs.") .booleanConf .createWithDefault(true) From cfb08a1d9b4fdea5a06605f53db90e1a7408be85 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Tue, 27 Feb 2018 12:47:46 +0900 Subject: [PATCH 3/8] Fix some comments --- python/pyspark/sql/tests.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index 83961144a6c55..85683c7b33bec 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -200,8 +200,8 @@ def tearDownClass(cls): @contextmanager def sql_conf(self, key, value): """ - A convenient context manager to test some configuration specific logic. This sets the - configurations then restores it back. + A convenient context manager to test some configuration specific logic. This sets + `value` to the configuration `key` then restores it back. """ orig_value = self.spark.conf.get(key, None) From 229a5f786028a8b50af7429da9f02ec70b7d4e49 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Tue, 27 Feb 2018 21:48:17 +0900 Subject: [PATCH 4/8] Address comments --- docs/sql-programming-guide.md | 4 ++++ python/pyspark/sql/dataframe.py | 6 ++++-- python/pyspark/sql/tests.py | 2 +- .../scala/org/apache/spark/sql/internal/SQLConf.scala | 8 ++++---- 4 files changed, 13 insertions(+), 7 deletions(-) diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index e7d415a2bb40a..b6311da0665cf 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -1689,6 +1689,10 @@ using the call `toPandas()` and when creating a Spark DataFrame from a Pandas Da `createDataFrame(pandas_df)`. To use Arrow when executing these calls, users need to first set the Spark configuration 'spark.sql.execution.arrow.enabled' to 'true'. This is disabled by default. +In addition, optimizations enabled by 'spark.sql.execution.arrow.enabled' will fallback automatically +to non-optimized implementations if an error occurs. This can be controlled by +'spark.sql.execution.arrow.fallback.enabled'. +
{% include_example dataframe_with_arrow python/sql/arrow.py %} diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index c0f27d7db74ea..1f6c5d9a3ec03 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -2035,8 +2035,10 @@ def toPandas(self): msg = ( "toPandas attempted Arrow optimization because " "'spark.sql.execution.arrow.enabled' is set to true; however, " - "failed unexpectedly:\n" - " %s" % _exception_message(e)) + "failed unexpectedly:\n %s\n" + "Note that 'spark.sql.execution.arrow.fallback.enabled' does " + "not have an effect in such failure in the middle of " + "computation." % _exception_message(e)) raise RuntimeError(msg) # Below is toPandas without Arrow optimization. diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index 85683c7b33bec..44d62504555d9 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -201,7 +201,7 @@ def tearDownClass(cls): def sql_conf(self, key, value): """ A convenient context manager to test some configuration specific logic. This sets - `value` to the configuration `key` then restores it back. + `value` to the configuration `key` and then restores it back when it exits. """ orig_value = self.spark.conf.get(key, None) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 4e002d078cb6c..34f5c6537c521 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1058,7 +1058,7 @@ object SQLConf { .intConf .createWithDefault(100) - val ARROW_EXECUTION_ENABLE = + val ARROW_EXECUTION_ENABLED = buildConf("spark.sql.execution.arrow.enabled") .doc("When true, make use of Apache Arrow for columnar data transfers. Currently available " + "for use with pyspark.sql.DataFrame.toPandas, and " + @@ -1068,7 +1068,7 @@ object SQLConf { .booleanConf .createWithDefault(false) - val ARROW_FALLBACK_ENABLE = + val ARROW_FALLBACK_ENABLED = buildConf("spark.sql.execution.arrow.fallback.enabled") .doc("When true, optimizations enabled by 'spark.sql.execution.arrow.enabled' will " + "fallback automatically to non-optimized implementations if an error occurs.") @@ -1525,9 +1525,9 @@ class SQLConf extends Serializable with Logging { def rangeExchangeSampleSizePerPartition: Int = getConf(RANGE_EXCHANGE_SAMPLE_SIZE_PER_PARTITION) - def arrowEnable: Boolean = getConf(ARROW_EXECUTION_ENABLE) + def arrowEnable: Boolean = getConf(ARROW_EXECUTION_ENABLED) - def arrowFallbackEnable: Boolean = getConf(ARROW_FALLBACK_ENABLE) + def arrowFallbackEnable: Boolean = getConf(ARROW_FALLBACK_ENABLED) def arrowMaxRecordsPerBatch: Int = getConf(ARROW_EXECUTION_MAX_RECORDS_PER_BATCH) From ed30c205d95a6555475a06376f0d88e53e2f3da3 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Thu, 1 Mar 2018 00:28:46 +0900 Subject: [PATCH 5/8] Address comments --- docs/sql-programming-guide.md | 6 ++--- python/pyspark/sql/dataframe.py | 8 +++--- python/pyspark/sql/tests.py | 27 +++++++++++-------- .../apache/spark/sql/internal/SQLConf.scala | 4 +-- 4 files changed, 26 insertions(+), 19 deletions(-) diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index b6311da0665cf..f9c7967fe3064 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -1689,9 +1689,9 @@ using the call `toPandas()` and when creating a Spark DataFrame from a Pandas Da `createDataFrame(pandas_df)`. To use Arrow when executing these calls, users need to first set the Spark configuration 'spark.sql.execution.arrow.enabled' to 'true'. This is disabled by default. -In addition, optimizations enabled by 'spark.sql.execution.arrow.enabled' will fallback automatically -to non-optimized implementations if an error occurs. This can be controlled by -'spark.sql.execution.arrow.fallback.enabled'. +In addition, optimizations enabled by 'spark.sql.execution.arrow.enabled' could fallback automatically +to non-optimized implementations if an error occurs before the actual computation within Spark. +This can be controlled by 'spark.sql.execution.arrow.fallback.enabled'.
diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index 1f6c5d9a3ec03..57036c3fa8773 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -1986,7 +1986,7 @@ def toPandas(self): timezone = None if self.sql_ctx.getConf("spark.sql.execution.arrow.enabled", "false").lower() == "true": - should_fallback = False + use_arrow = True try: from pyspark.sql.types import to_arrow_schema from pyspark.sql.utils import require_minimum_pyarrow_version @@ -2005,7 +2005,7 @@ def toPandas(self): "'spark.sql.execution.arrow.fallback.enabled' is set to " "true." % _exception_message(e)) warnings.warn(msg) - should_fallback = True + use_arrow = False else: msg = ( "toPandas attempted Arrow optimization because " @@ -2015,7 +2015,9 @@ def toPandas(self): "'spark.sql.execution.arrow.fallback.enabled'." % _exception_message(e)) raise RuntimeError(msg) - if not should_fallback: + # Try to use Arrow optimization when the schema is supported and the required version + # of PyArrow is found, if 'spark.sql.execution.arrow.fallback.enabled' is enabled. + if use_arrow: try: from pyspark.sql.types import _check_dataframe_convert_date, \ _check_dataframe_localize_timestamps diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index 44d62504555d9..ca1b7ee7de270 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -198,21 +198,26 @@ def tearDownClass(cls): cls.spark.stop() @contextmanager - def sql_conf(self, key, value): + def sql_conf(self, pairs): """ A convenient context manager to test some configuration specific logic. This sets `value` to the configuration `key` and then restores it back when it exits. """ + assert isinstance(pairs, dict), "pairs should be a dictionary." - orig_value = self.spark.conf.get(key, None) - self.spark.conf.set(key, value) + keys = pairs.keys() + new_values = pairs.values() + old_values = [self.spark.conf.get(key, None) for key in keys] + for key, new_value in zip(keys, new_values): + self.spark.conf.set(key, new_value) try: yield finally: - if orig_value is None: - self.spark.conf.unset(key) - else: - self.spark.conf.set(key, orig_value) + for key, old_value in zip(keys, old_values): + if old_value is None: + self.spark.conf.unset(key) + else: + self.spark.conf.set(key, old_value) def assertPandasEqual(self, expected, result): msg = ("DataFrames are not equal: " + @@ -3517,7 +3522,7 @@ def create_pandas_data_frame(self): def test_toPandas_fallback_enabled(self): import pandas as pd - with self.sql_conf("spark.sql.execution.arrow.fallback.enabled", True): + with self.sql_conf({"spark.sql.execution.arrow.fallback.enabled": True}): schema = StructType([StructField("map", MapType(StringType(), IntegerType()), True)]) df = self.spark.createDataFrame([({u'a': 1},)], schema=schema) with QuietTest(self.sc): @@ -3532,7 +3537,7 @@ def test_toPandas_fallback_enabled(self): self.assertPandasEqual(pdf, pd.DataFrame({u'map': [{u'a': 1}]})) def test_toPandas_fallback_disabled(self): - with self.sql_conf("spark.sql.execution.arrow.fallback.enabled", False): + with self.sql_conf({"spark.sql.execution.arrow.fallback.enabled": False}): schema = StructType([StructField("map", MapType(StringType(), IntegerType()), True)]) df = self.spark.createDataFrame([(None,)], schema=schema) with QuietTest(self.sc): @@ -3741,7 +3746,7 @@ def test_createDataFrame_fallback_enabled(self): import pandas as pd with QuietTest(self.sc): - with self.sql_conf("spark.sql.execution.arrow.fallback.enabled", True): + with self.sql_conf({"spark.sql.execution.arrow.fallback.enabled": True}): with warnings.catch_warnings(record=True) as warns: df = self.spark.createDataFrame( pd.DataFrame([[{u'a': 1}]]), "a: map") @@ -3757,7 +3762,7 @@ def test_createDataFrame_fallback_disabled(self): import pandas as pd with QuietTest(self.sc): - with self.sql_conf("spark.sql.execution.arrow.fallback.enabled", False): + with self.sql_conf({"spark.sql.execution.arrow.fallback.enabled": False}): with self.assertRaisesRegexp(Exception, 'Unsupported type'): self.spark.createDataFrame( pd.DataFrame([[{u'a': 1}]]), "a: map") diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 34f5c6537c521..3f96112659c11 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1525,9 +1525,9 @@ class SQLConf extends Serializable with Logging { def rangeExchangeSampleSizePerPartition: Int = getConf(RANGE_EXCHANGE_SAMPLE_SIZE_PER_PARTITION) - def arrowEnable: Boolean = getConf(ARROW_EXECUTION_ENABLED) + def arrowEnabled: Boolean = getConf(ARROW_EXECUTION_ENABLED) - def arrowFallbackEnable: Boolean = getConf(ARROW_FALLBACK_ENABLED) + def arrowFallbackEnabled: Boolean = getConf(ARROW_FALLBACK_ENABLED) def arrowMaxRecordsPerBatch: Int = getConf(ARROW_EXECUTION_MAX_RECORDS_PER_BATCH) From af60cb75b52479fab636a20fd4face25aa9791e3 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Mon, 5 Mar 2018 23:13:09 +0900 Subject: [PATCH 6/8] Fix a nit --- python/pyspark/sql/dataframe.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index 57036c3fa8773..a24b9e1baf596 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -2016,7 +2016,7 @@ def toPandas(self): raise RuntimeError(msg) # Try to use Arrow optimization when the schema is supported and the required version - # of PyArrow is found, if 'spark.sql.execution.arrow.fallback.enabled' is enabled. + # of PyArrow is found, if 'spark.sql.execution.arrow.enabled' is enabled. if use_arrow: try: from pyspark.sql.types import _check_dataframe_convert_date, \ From b5bea82a9b84a3478b634052aa2662a44311a512 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Tue, 6 Mar 2018 23:56:12 +0900 Subject: [PATCH 7/8] Fix a nit --- python/pyspark/sql/tests.py | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index ca1b7ee7de270..5bfc91176508a 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -3537,12 +3537,11 @@ def test_toPandas_fallback_enabled(self): self.assertPandasEqual(pdf, pd.DataFrame({u'map': [{u'a': 1}]})) def test_toPandas_fallback_disabled(self): - with self.sql_conf({"spark.sql.execution.arrow.fallback.enabled": False}): - schema = StructType([StructField("map", MapType(StringType(), IntegerType()), True)]) - df = self.spark.createDataFrame([(None,)], schema=schema) - with QuietTest(self.sc): - with self.assertRaisesRegexp(Exception, 'Unsupported type'): - df.toPandas() + schema = StructType([StructField("map", MapType(StringType(), IntegerType()), True)]) + df = self.spark.createDataFrame([(None,)], schema=schema) + with QuietTest(self.sc): + with self.assertRaisesRegexp(Exception, 'Unsupported type'): + df.toPandas() def test_null_conversion(self): df_null = self.spark.createDataFrame([tuple([None for _ in range(len(self.data[0]))])] + @@ -3762,10 +3761,9 @@ def test_createDataFrame_fallback_disabled(self): import pandas as pd with QuietTest(self.sc): - with self.sql_conf({"spark.sql.execution.arrow.fallback.enabled": False}): - with self.assertRaisesRegexp(Exception, 'Unsupported type'): - self.spark.createDataFrame( - pd.DataFrame([[{u'a': 1}]]), "a: map") + with self.assertRaisesRegexp(Exception, 'Unsupported type'): + self.spark.createDataFrame( + pd.DataFrame([[{u'a': 1}]]), "a: map") # Regression test for SPARK-23314 def test_timestamp_dst(self): From 4ccaa81038e9e87b9772394f5b39866b65c798d8 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Thu, 8 Mar 2018 09:13:32 +0900 Subject: [PATCH 8/8] Fix nits --- docs/sql-programming-guide.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index f9c7967fe3064..280a443523563 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -1690,7 +1690,7 @@ using the call `toPandas()` and when creating a Spark DataFrame from a Pandas Da the Spark configuration 'spark.sql.execution.arrow.enabled' to 'true'. This is disabled by default. In addition, optimizations enabled by 'spark.sql.execution.arrow.enabled' could fallback automatically -to non-optimized implementations if an error occurs before the actual computation within Spark. +to non-Arrow optimization implementation if an error occurs before the actual computation within Spark. This can be controlled by 'spark.sql.execution.arrow.fallback.enabled'.
@@ -1804,7 +1804,7 @@ working with timestamps in `pandas_udf`s to get the best performance, see ## Upgrading From Spark SQL 2.3 to 2.4 - Since Spark 2.4, Spark maximizes the usage of a vectorized ORC reader for ORC files by default. To do that, `spark.sql.orc.impl` and `spark.sql.orc.filterPushdown` change their default values to `native` and `true` respectively. - - In PySpark, when Arrow optimization is enabled, previously `toPandas` just failed when Arrow optimization is unabled to be used whereas `createDataFrame` from Pandas DataFrame allowed the fallback to non-optimization. Now, both `toPandas` and `createDataFrame` from Pandas DataFrame allow the fallback by default, which can be switched by `spark.sql.execution.arrow.fallback.enabled`. + - In PySpark, when Arrow optimization is enabled, previously `toPandas` just failed when Arrow optimization is unabled to be used whereas `createDataFrame` from Pandas DataFrame allowed the fallback to non-optimization. Now, both `toPandas` and `createDataFrame` from Pandas DataFrame allow the fallback by default, which can be switched off by `spark.sql.execution.arrow.fallback.enabled`. ## Upgrading From Spark SQL 2.2 to 2.3