[SPARK-57247][SQL][CONNECT] Support DataFrame.zip in Spark Connect#56300
Closed
zhengruifeng wants to merge 2 commits into
Closed
[SPARK-57247][SQL][CONNECT] Support DataFrame.zip in Spark Connect#56300zhengruifeng wants to merge 2 commits into
zhengruifeng wants to merge 2 commits into
Conversation
976d13c to
c83b782
Compare
HyukjinKwon
approved these changes
Jun 3, 2026
a7a6066 to
104b48d
Compare
Implements the `DataFrame.zip` operation on the Spark Connect path, following up on apache#54976 which deferred Connect support. **Protocol (relations.proto)** Adds a `Zip` message with `left` and `right` `Relation` fields (field number 48 in the `Relation` oneof). Python stubs regenerated via the connect-gen-protos Docker image (buf 1.66.1 + mypy 1.19.1 + mypy-protobuf 3.3.0 + ruff 0.14.8). **Server (SparkConnectPlanner)** Adds `transformZip` that directly constructs the unresolved `logical.Zip(left, right)` plan, dispatched via `RelTypeCase.ZIP`. `ResolveZip` then runs during analysis, same as the classic path. **Scala Connect Dataset** Replaces the `UnsupportedOperationException` stub with `sparkSession.newDataFrame { builder => builder.getZipBuilder... }`, following the `crossJoin`/`buildJoin` pattern. **Python Connect** - `plan.py`: adds `class Zip(LogicalPlan)` following the `NearestByJoin` pattern. - `dataframe.py`: replaces the `PySparkNotImplementedError` stub with a `plan.Zip` call; removes the doctest suppression. - `test_parity_zip.py`: runs the full `DataFrameZipTestsMixin` against Connect instead of asserting `NOT_IMPLEMENTED`. Generated-by: Claude Code
- DataFrameSuite: end-to-end test that zips two projections of the same DataFrame and asserts the column names and collected values. - PlanGenerationTestSuite: "zip" test serializes the plan to proto and compares against the new golden file (zip.proto.bin / zip.json). - ProtoToParsedPlanTestSuite: "zip" test deserializes the proto.bin golden file, runs it through SparkConnectPlanner + Analyzer, and compares the explained plan against the new zip.explain golden file. - test_connect_plan.py: test_zip asserts that the proto plan for `left.zip(right)` has the `zip` field set with the expected left/ right read sources. Generated-by: Claude Code
104b48d to
aaddf3a
Compare
cloud-fan
approved these changes
Jun 5, 2026
zhengruifeng
added a commit
that referenced
this pull request
Jun 6, 2026
### What changes were proposed in this pull request? This is the follow-up to #54976 ([SPARK-55886]) which implemented `DataFrame.zip` for the classic path and deferred Spark Connect support. This PR wires up the Connect path end-to-end. - **Protocol (`relations.proto`)**: adds a `Zip` message with `left` and `right` `Relation` fields (field 48 in the `Relation` oneof). Python stubs regenerated via the `connect-gen-protos` Docker image (buf 1.66.1 + mypy 1.19.1 + mypy-protobuf 3.3.0 + ruff 0.14.8). - **Server (`SparkConnectPlanner`)**: adds `transformZip` that directly constructs the unresolved `logical.Zip(left, right)` plan, dispatched via `RelTypeCase.ZIP`. `ResolveZip` then runs during analysis, same as the classic path. - **Scala Connect `Dataset`**: replaces the `UnsupportedOperationException` stub with `sparkSession.newDataFrame { builder => builder.getZipBuilder.setLeft(...).setRight(...) }`, following the `crossJoin`/`buildJoin` pattern. - **Python Connect `plan.py`**: adds `class Zip(LogicalPlan)` following the `NearestByJoin` pattern. - **Python Connect `dataframe.py`**: replaces the `PySparkNotImplementedError` stub with a `plan.Zip` call; removes the doctest suppression (`del DataFrame.zip.__doc__`) that was added when Connect was unsupported. ### Why are the changes needed? `DataFrame.zip` was merged (#54976) with Connect deferred. This PR completes the implementation so Connect users can use `zip` on equal footing with the classic path. ### Does this PR introduce _any_ user-facing change? Yes. `DataFrame.zip` now works on the Spark Connect path. Previously it raised `PySparkNotImplementedError: [NOT_IMPLEMENTED] zip is not implemented.` ### How was this patch tested? - `test_parity_zip.py`: runs the full `DataFrameZipTestsMixin` (basic projections, expressions, one-sided base, `withColumn`, chained `withColumn`, longer chains, parent-with-chained-child, `withColumnRenamed`, scalar Python UDF, pandas UDF, and two error cases) against a Connect session. - `test_connect_plan.py`: asserts that the proto plan for `left.zip(right)` has the `zip` field set with the expected left/right sources. - `PlanGenerationTestSuite`: serializes a `zip` plan to proto and compares against a new golden file (`zip.proto.bin`). - `ProtoToParsedPlanTestSuite`: deserializes the proto golden file, runs it through `SparkConnectPlanner` + `Analyzer`, and compares the explained plan against `zip.explain`. - `DataFrameSuite` (Connect): end-to-end test that zips two projections over a Connect session and asserts the resulting columns and values. ### Was this patch authored or co-authored using generative AI tooling? Generated-by: Claude Code Closes #56300 from zhengruifeng/spark-dev-2-df-zip-connect-dev2. Authored-by: Ruifeng Zheng <ruifengz@apache.org> Signed-off-by: Ruifeng Zheng <ruifengz@foxmail.com> (cherry picked from commit f3f5677) Signed-off-by: Ruifeng Zheng <ruifengz@foxmail.com>
Contributor
Author
|
thanks, merged to master/4.x |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
What changes were proposed in this pull request?
This is the follow-up to #54976 ([SPARK-55886]) which implemented
DataFrame.zipfor the classic path and deferred Spark Connect support. This PR wires up the Connect path end-to-end.relations.proto): adds aZipmessage withleftandrightRelationfields (field 48 in theRelationoneof). Python stubs regenerated via theconnect-gen-protosDocker image (buf 1.66.1 + mypy 1.19.1 + mypy-protobuf 3.3.0 + ruff 0.14.8).SparkConnectPlanner): addstransformZipthat directly constructs the unresolvedlogical.Zip(left, right)plan, dispatched viaRelTypeCase.ZIP.ResolveZipthen runs during analysis, same as the classic path.Dataset: replaces theUnsupportedOperationExceptionstub withsparkSession.newDataFrame { builder => builder.getZipBuilder.setLeft(...).setRight(...) }, following thecrossJoin/buildJoinpattern.plan.py: addsclass Zip(LogicalPlan)following theNearestByJoinpattern.dataframe.py: replaces thePySparkNotImplementedErrorstub with aplan.Zipcall; removes the doctest suppression (del DataFrame.zip.__doc__) that was added when Connect was unsupported.Why are the changes needed?
DataFrame.zipwas merged (#54976) with Connect deferred. This PR completes the implementation so Connect users can usezipon equal footing with the classic path.Does this PR introduce any user-facing change?
Yes.
DataFrame.zipnow works on the Spark Connect path. Previously it raisedPySparkNotImplementedError: [NOT_IMPLEMENTED] zip is not implemented.How was this patch tested?
test_parity_zip.py: runs the fullDataFrameZipTestsMixin(basic projections, expressions, one-sided base,withColumn, chainedwithColumn, longer chains, parent-with-chained-child,withColumnRenamed, scalar Python UDF, pandas UDF, and two error cases) against a Connect session.test_connect_plan.py: asserts that the proto plan forleft.zip(right)has thezipfield set with the expected left/right sources.PlanGenerationTestSuite: serializes azipplan to proto and compares against a new golden file (zip.proto.bin).ProtoToParsedPlanTestSuite: deserializes the proto golden file, runs it throughSparkConnectPlanner+Analyzer, and compares the explained plan againstzip.explain.DataFrameSuite(Connect): end-to-end test that zips two projections over a Connect session and asserts the resulting columns and values.Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude Code