From 60fbcf868b59efc051f8a2d5748576c13d2f6e7c Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Wed, 25 Mar 2020 12:51:09 -0700 Subject: [PATCH 1/5] Deal with duplicate column names. --- python/pyspark/sql/pandas/conversion.py | 31 +++++++++++++++------- python/pyspark/sql/tests/test_dataframe.py | 19 +++++++++++++ 2 files changed, 40 insertions(+), 10 deletions(-) diff --git a/python/pyspark/sql/pandas/conversion.py b/python/pyspark/sql/pandas/conversion.py index 8548cd222bf10..ec21f2aa691b8 100644 --- a/python/pyspark/sql/pandas/conversion.py +++ b/python/pyspark/sql/pandas/conversion.py @@ -132,8 +132,11 @@ def toPandas(self): # Below is toPandas without Arrow optimization. pdf = pd.DataFrame.from_records(self.collect(), columns=self.columns) - dtype = {} - for field in self.schema: + dtype = [None] * len(self.schema) + for fieldIdx in range(len(self.schema)): + field = self.schema[fieldIdx] + pandas_col = pdf.iloc[:,fieldIdx] + pandas_type = PandasConversionMixin._to_corrected_pandas_type(field.dataType) # SPARK-21766: if an integer field is nullable and has null values, it can be # inferred by pandas as float column. Once we convert the column with NaN back @@ -141,16 +144,24 @@ def toPandas(self): # float type, not the corrected type from the schema in this case. if pandas_type is not None and \ not(isinstance(field.dataType, IntegralType) and field.nullable and - pdf[field.name].isnull().any()): - dtype[field.name] = pandas_type + pandas_col.isnull().any()): + dtype[fieldIdx] = pandas_type # Ensure we fall back to nullable numpy types, even when whole column is null: - if isinstance(field.dataType, IntegralType) and pdf[field.name].isnull().any(): - dtype[field.name] = np.float64 - if isinstance(field.dataType, BooleanType) and pdf[field.name].isnull().any(): - dtype[field.name] = np.object + if isinstance(field.dataType, IntegralType) and pandas_col.isnull().any(): + dtype[fieldIdx] = np.float64 + if isinstance(field.dataType, BooleanType) and pandas_col.isnull().any(): + dtype[fieldIdx] = np.object + + df = pd.DataFrame() + for index in range(len(dtype)): + t = dtype[index] + if t is not None: + series = pdf.iloc[:,index].astype(t, copy=False) + else: + series = pdf.iloc[:,index] + df.insert(index, self.schema[index].name, series, allow_duplicates = True) - for f, t in dtype.items(): - pdf[f] = pdf[f].astype(t, copy=False) + pdf = df if timezone is None: return pdf diff --git a/python/pyspark/sql/tests/test_dataframe.py b/python/pyspark/sql/tests/test_dataframe.py index 942cd4b4b0ea3..fff01bf3e6ba9 100644 --- a/python/pyspark/sql/tests/test_dataframe.py +++ b/python/pyspark/sql/tests/test_dataframe.py @@ -529,6 +529,25 @@ def test_to_pandas(self): self.assertEquals(types[4], np.object) # datetime.date self.assertEquals(types[5], 'datetime64[ns]') + @unittest.skipIf(not have_pandas, pandas_requirement_message) + def test_to_pandas_on_cross_join(self): + import numpy as np + + sql = """ + select t1.*, t2.* from ( + select explode(sequence(1, 3)) v + ) t1 left join ( + select explode(sequence(1, 3)) v + ) t2 + """ + with self.sql_conf({"spark.sql.crossJoin.enabled": True}): + df = self.spark.sql(sql) + pdf = df.toPandas() + types = pdf.dtypes + self.assertEquals(types.iloc[0], np.int32) + self.assertEquals(types.iloc[1], np.int32) + + @unittest.skipIf(have_pandas, "Required Pandas was found.") def test_to_pandas_required_pandas_not_found(self): with QuietTest(self.sc): From 6b9d6d69485f44e2292288a60eed216a882dc2fe Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Wed, 25 Mar 2020 14:33:57 -0700 Subject: [PATCH 2/5] Fix style. --- python/pyspark/sql/pandas/conversion.py | 8 ++++---- python/pyspark/sql/tests/test_dataframe.py | 1 - 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/python/pyspark/sql/pandas/conversion.py b/python/pyspark/sql/pandas/conversion.py index ec21f2aa691b8..21ead36e48c64 100644 --- a/python/pyspark/sql/pandas/conversion.py +++ b/python/pyspark/sql/pandas/conversion.py @@ -135,7 +135,7 @@ def toPandas(self): dtype = [None] * len(self.schema) for fieldIdx in range(len(self.schema)): field = self.schema[fieldIdx] - pandas_col = pdf.iloc[:,fieldIdx] + pandas_col = pdf.iloc[:, fieldIdx] pandas_type = PandasConversionMixin._to_corrected_pandas_type(field.dataType) # SPARK-21766: if an integer field is nullable and has null values, it can be @@ -156,10 +156,10 @@ def toPandas(self): for index in range(len(dtype)): t = dtype[index] if t is not None: - series = pdf.iloc[:,index].astype(t, copy=False) + series = pdf.iloc[:, index].astype(t, copy=False) else: - series = pdf.iloc[:,index] - df.insert(index, self.schema[index].name, series, allow_duplicates = True) + series = pdf.iloc[:, index] + df.insert(index, self.schema[index].name, series, allow_duplicates=True) pdf = df diff --git a/python/pyspark/sql/tests/test_dataframe.py b/python/pyspark/sql/tests/test_dataframe.py index fff01bf3e6ba9..b3f4d5cd6b944 100644 --- a/python/pyspark/sql/tests/test_dataframe.py +++ b/python/pyspark/sql/tests/test_dataframe.py @@ -547,7 +547,6 @@ def test_to_pandas_on_cross_join(self): self.assertEquals(types.iloc[0], np.int32) self.assertEquals(types.iloc[1], np.int32) - @unittest.skipIf(have_pandas, "Required Pandas was found.") def test_to_pandas_required_pandas_not_found(self): with QuietTest(self.sc): From 536107e98fa9b1f2d838b704064d20439f60ac61 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Wed, 25 Mar 2020 21:58:47 -0700 Subject: [PATCH 3/5] For comment. --- python/pyspark/sql/pandas/conversion.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/python/pyspark/sql/pandas/conversion.py b/python/pyspark/sql/pandas/conversion.py index 21ead36e48c64..553bd0651cdee 100644 --- a/python/pyspark/sql/pandas/conversion.py +++ b/python/pyspark/sql/pandas/conversion.py @@ -153,8 +153,7 @@ def toPandas(self): dtype[fieldIdx] = np.object df = pd.DataFrame() - for index in range(len(dtype)): - t = dtype[index] + for index, t in enumerate(dtype): if t is not None: series = pdf.iloc[:, index].astype(t, copy=False) else: From b8e69e0eba86fdf308d708da91fb82107f4f084c Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Thu, 26 Mar 2020 18:27:52 -0700 Subject: [PATCH 4/5] Avoid using insert for non-duplicate column names. --- python/pyspark/sql/pandas/conversion.py | 25 +++++++++++++++++++------ 1 file changed, 19 insertions(+), 6 deletions(-) diff --git a/python/pyspark/sql/pandas/conversion.py b/python/pyspark/sql/pandas/conversion.py index 553bd0651cdee..812bc1265f811 100644 --- a/python/pyspark/sql/pandas/conversion.py +++ b/python/pyspark/sql/pandas/conversion.py @@ -21,6 +21,7 @@ xrange = range else: from itertools import izip as zip +from collections import Counter from pyspark import since from pyspark.rdd import _load_from_socket @@ -131,11 +132,14 @@ def toPandas(self): # Below is toPandas without Arrow optimization. pdf = pd.DataFrame.from_records(self.collect(), columns=self.columns) + column_counter = Counter(self.columns) dtype = [None] * len(self.schema) - for fieldIdx in range(len(self.schema)): - field = self.schema[fieldIdx] - pandas_col = pdf.iloc[:, fieldIdx] + for fieldIdx, field in enumerate(self.schema): + if column_counter[field.name] > 1: + pandas_col = pdf.iloc[:, fieldIdx] + else: + pandas_col = pdf[field.name] pandas_type = PandasConversionMixin._to_corrected_pandas_type(field.dataType) # SPARK-21766: if an integer field is nullable and has null values, it can be @@ -154,11 +158,20 @@ def toPandas(self): df = pd.DataFrame() for index, t in enumerate(dtype): + column_name = self.schema[index].name + + if column_counter[column_name] > 1: + series = pdf.iloc[:, index] + else: + series = pdf[column_name] + if t is not None: - series = pdf.iloc[:, index].astype(t, copy=False) + series = series.astype(t, copy=False) + + if column_counter[column_name] > 1: + df.insert(index, column_name, series, allow_duplicates=True) else: - series = pdf.iloc[:, index] - df.insert(index, self.schema[index].name, series, allow_duplicates=True) + df[column_name] = series pdf = df From 1cf1f121f346c93f944feff97e56db0a1a9f7cea Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Thu, 26 Mar 2020 19:02:32 -0700 Subject: [PATCH 5/5] Add some comments to explain it. --- python/pyspark/sql/pandas/conversion.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/python/pyspark/sql/pandas/conversion.py b/python/pyspark/sql/pandas/conversion.py index 812bc1265f811..47cf8bbc5b688 100644 --- a/python/pyspark/sql/pandas/conversion.py +++ b/python/pyspark/sql/pandas/conversion.py @@ -136,6 +136,7 @@ def toPandas(self): dtype = [None] * len(self.schema) for fieldIdx, field in enumerate(self.schema): + # For duplicate column name, we use `iloc` to access it. if column_counter[field.name] > 1: pandas_col = pdf.iloc[:, fieldIdx] else: @@ -160,6 +161,7 @@ def toPandas(self): for index, t in enumerate(dtype): column_name = self.schema[index].name + # For duplicate column name, we use `iloc` to access it. if column_counter[column_name] > 1: series = pdf.iloc[:, index] else: @@ -168,6 +170,9 @@ def toPandas(self): if t is not None: series = series.astype(t, copy=False) + # `insert` API makes copy of data, we only do it for Series of duplicate column names. + # `pdf.iloc[:, index] = pdf.iloc[:, index]...` doesn't always work because `iloc` could + # return a view or a copy depending by context. if column_counter[column_name] > 1: df.insert(index, column_name, series, allow_duplicates=True) else: