Skip to content

[SPARK-27778][PYTHON] Fix toPandas conversion of empty DataFrame with Arrow enabled#24650

Closed
dvogelbacher wants to merge 5 commits into
apache:masterfrom
dvogelbacher:dv/fixNoPartitionArrowConversion
Closed

[SPARK-27778][PYTHON] Fix toPandas conversion of empty DataFrame with Arrow enabled#24650
dvogelbacher wants to merge 5 commits into
apache:masterfrom
dvogelbacher:dv/fixNoPartitionArrowConversion

Conversation

@dvogelbacher

Copy link
Copy Markdown
Contributor

What changes were proposed in this pull request?

#22275 introduced a performance improvement where we send partitions out of order to python and then, as a last step, send the partition order as well.
However, if there are no partitions we will never send the partition order and we will get an "EofError" on the python side.
This PR fixes this by also sending the partition order if there are no partitions present.

How was this patch tested?

New unit test added.

@dvogelbacher

Copy link
Copy Markdown
Contributor Author

@BryanCutler can you take a look at this one

@BryanCutler BryanCutler left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for catching this @dvogelbacher !

Comment thread python/pyspark/sql/tests/test_arrow.py
@BryanCutler

Copy link
Copy Markdown
Member

ok to test

@SparkQA

SparkQA commented May 21, 2019

Copy link
Copy Markdown

Test build #105580 has finished for PR 24650 at commit d635a74.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

0 until numPartitions,
handlePartitionBatches)

if (numPartitions == 0) {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is well-commented. Can you add another comment that we should end stream when partitions are empty?

Also, I would do:

partitions = 0 until numPartitions
sparkSession.sparkContext.runJob(
  arrowBatchRdd,
  (ctx: TaskContext, it: Iterator[Array[Byte]]) => it.toArray,
  partitions,
  handlePartitionBatches)

if (partitions.isEmpty) {
  // Currently result handler is not called when given partitions are empty.
  // Therefore, we should end stream here.
  doAfterLastPartition()
}

@HyukjinKwon

Copy link
Copy Markdown
Member

Looks fine given skimming the codes.

@HyukjinKwon HyukjinKwon changed the title [SPARK-27778][PySpark] Fix toPandas conversion using arrow for DFs with no partitions [SPARK-27778][PYTHON] Fix toPandas conversion of empty DataFrame with Arrow enabled May 21, 2019
@SparkQA

SparkQA commented May 21, 2019

Copy link
Copy Markdown

Test build #105588 has finished for PR 24650 at commit 52a51bf.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@BryanCutler

Copy link
Copy Markdown
Member

I had another thought about this, the stuff in doAfterLastPartition could be removed from handlePartitionBatches and called after runJob regardless if partitions are empty or not. This really wouldn't make any difference performance-wise because it's just moving it outside the callback function and Python is waiting on it anyway.

It also has the benefit that the number of partitions would not have to be kept track of, so the variables partitionCount and numPartitions could be removed. It would be a lot clearer then too.

What do you think @dvogelbacher and @HyukjinKwon ?

@HyukjinKwon

Copy link
Copy Markdown
Member

Yea, SGTM.

@dvogelbacher

Copy link
Copy Markdown
Contributor Author

yes, that's a good idea @BryanCutler, it is much clearer. I've made the change.

@SparkQA

SparkQA commented May 21, 2019

Copy link
Copy Markdown

Test build #105619 has finished for PR 24650 at commit 9f4bc3e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@BryanCutler BryanCutler left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks pretty good now, just a couple more minor things that could be done to clean it up a bit more if you wouldn't mind.

Comment thread sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala Outdated
Comment thread sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala Outdated
@dvogelbacher

Copy link
Copy Markdown
Contributor Author

of course, I addressed the comments @BryanCutler

@HyukjinKwon HyukjinKwon left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, this way looks better. Looks good to me too

@SparkQA

SparkQA commented May 22, 2019

Copy link
Copy Markdown

Test build #105647 has finished for PR 24650 at commit db6f4b1.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon

Copy link
Copy Markdown
Member

Merged to master.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants