Skip to content

[SPARK-10542] [PYSPARK] fix serialize namedtuple#8707

Closed
davies wants to merge 4 commits into
apache:masterfrom
davies:fix_namedtuple
Closed

[SPARK-10542] [PYSPARK] fix serialize namedtuple#8707
davies wants to merge 4 commits into
apache:masterfrom
davies:fix_namedtuple

Conversation

@davies

@davies davies commented Sep 10, 2015

Copy link
Copy Markdown
Contributor

No description provided.

@SparkQA

SparkQA commented Sep 10, 2015

Copy link
Copy Markdown

Test build #42289 has finished for PR 8707 at commit d7ef6ce.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA

SparkQA commented Sep 10, 2015

Copy link
Copy Markdown

Test build #42293 has finished for PR 8707 at commit 6b9095b.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA

SparkQA commented Sep 10, 2015

Copy link
Copy Markdown

Test build #42295 has finished for PR 8707 at commit 1d766aa.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA

SparkQA commented Sep 10, 2015

Copy link
Copy Markdown

Test build #42298 has finished for PR 8707 at commit a2f9f36.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA

SparkQA commented Sep 10, 2015

Copy link
Copy Markdown

Test build #1739 has finished for PR 8707 at commit a2f9f36.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class SparkHadoopWriter(jobConf: JobConf)
    • class BlockRDD[T: ClassTag](sc: SparkContext, @transient val blockIds: Array[BlockId])
    • class ZippedWithIndexRDD[T: ClassTag](prev: RDD[T]) extends RDD[(T, Long)](prev)
    • case class Instance(w: Double, a: Vector, b: Double)
    • class DefaultSource extends RelationProvider with DataSourceRegister
    • class WeibullGenerator(
    • class IndexToString(JavaTransformer, HasInputCol, HasOutputCol):
    • class Checkpoint(ssc: StreamingContext, val checkpointTime: Time)
    • abstract class InputDStream[T: ClassTag] (ssc_ : StreamingContext)
    • abstract class ReceiverInputDStream[T: ClassTag](ssc_ : StreamingContext)

@SparkQA

SparkQA commented Sep 11, 2015

Copy link
Copy Markdown

Test build #42304 has finished for PR 8707 at commit 9326697.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@JoshRosen

Copy link
Copy Markdown
Contributor

Over at https://issues.apache.org/jira/browse/SPARK-10544, someone commented to mention that other types of built-in types do not seem to be pickleable in 1.5. For instance, here's the example that they gave:

sc.parallelize(["the red", "Fox Runs", "FAST"]).map(str.lower).count()

However, this specific example also seems to fail in 1.3.1, so I don't think that this is a regression. Just wanted to mention this discussion here to make sure you were aware of it.

@JoshRosen

Copy link
Copy Markdown
Contributor

Do you have any intuition for why this worked prior to 1.5 without the changes implemented here?

Comment thread python/pyspark/tests.py

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, interesting: presumably P and P2 are different classes but instances created from them are still comparable for equality. Do we also need to check that those instances claim to belong to the same class? It seems way less likely that users could rely on the class comparison behavior, so probably not a huge priority to look at.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These instances should become to difference classes.

@JoshRosen

Copy link
Copy Markdown
Contributor

Actually, one point of confusion: it looks like python/pyspark/serializers.py had some methods for serializing namedtuple classes with its _restore and _hack_namedtuple(cls) methods, but this patch also adds logic to cloudpickle.py to handle pickling of these instances. Do we need both of these methods? Are some only needed by certain versions of Python? It would be good to add some comments to explain some of this stuff, since it's not obvious from reading the code.

@davies

davies commented Sep 11, 2015

Copy link
Copy Markdown
Contributor Author

The HACK in serializers.py is used for cPickler, not cloudpickle.

@davies

davies commented Sep 11, 2015

Copy link
Copy Markdown
Contributor Author

Before 1.5, the old way work in CPython, but not PyPy (we don't have a unit test for it).

@davies

davies commented Sep 11, 2015

Copy link
Copy Markdown
Contributor Author

@JoshRosen BTW, this patch introduce a special case for namedtuple, it should be safe to merge into branch-1.5.

@JoshRosen

Copy link
Copy Markdown
Contributor

Empirically, this seems to work, so unless you think that we should investigate the root cause any further I'm fine with giving this an LGTM and merging to 1.5. Feel free to merge yourself, or I can do it.

@davies

davies commented Sep 15, 2015

Copy link
Copy Markdown
Contributor Author

I tried to find the root cause, but it seems hard to work in all Python versions (you can see them in the older commit), finally switch to current approach. merging this into master and 1.5 branch, thanks!

asfgit pushed a commit that referenced this pull request Sep 15, 2015
Author: Davies Liu <davies@databricks.com>

Closes #8707 from davies/fix_namedtuple.
@asfgit asfgit closed this in 5520418 Sep 15, 2015
@coderfi

coderfi commented Sep 17, 2015

Copy link
Copy Markdown
Contributor

FYI, this works for us @ NinthDecimal. Thanks for the fix, it was a stumper!

Python 2.7.6
Spark-1.5 Branch @ 4c4a9ba28d9052fad45caca9a1eba9ef9db309d5

@ghost

ghost commented Jun 16, 2016

Copy link
Copy Markdown

Hi! I am not sure if this is related but is I look for this issue everything points me here basically. I'm getting

  ...
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python2.7/pickle.py", line 649, in save_dict
    self._batch_setitems(obj.iteritems())
  File "/usr/lib/python2.7/pickle.py", line 681, in _batch_setitems
    save(v)
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/opt/spark/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 315, in save_builtin_function
    return self.save_function(obj)
  File "/opt/spark/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 191, in save_function
    if islambda(obj) or obj.__code__.co_filename == '<stdin>' or themodule is None:
AttributeError: 'builtin_function_or_method' object has no attribute '__code__'

When trying to create a data frame from an RDD:

rdd = self.sc.textFile(self.input_file_path).map(lambda line: self.process_line(line))

schema = StructType([StructField(u'Variable', StringType(), nullable=False),
                     StructField(u'Time', TimestampType(), nullable=False),
                     StructField(u'Value', FloatType(), nullable=False)])

return sql_context.createDataFrame(rdd, schema)

I am on PySpark 1.6.0 - any ideas what I'm doing wrong here?

@philastrophist

Copy link
Copy Markdown

I also get this error when using namedtuples

n = namedtuple('test', ['a', 'b'])
cloudpickle.dumps(n(1,2))

ashangit pushed a commit to ashangit/spark that referenced this pull request Oct 19, 2016
Author: Davies Liu <davies@databricks.com>

Closes apache#8707 from davies/fix_namedtuple.

(cherry picked from commit d5c0361)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants