[SPARK-52463][SDP] Add support for cluster_by in Python Pipelines APIs#52831
[SPARK-52463][SDP] Add support for cluster_by in Python Pipelines APIs#52831sryza wants to merge 10 commits into
Conversation
| | | ||
| |spark = SparkSession.active() | ||
| | | ||
| |@dp.materialized_view(cluster_by = ["cluster_col1"]) |
There was a problem hiding this comment.
let's also test non-existent columns
There was a problem hiding this comment.
I also wonder what will happen if the array is empty
There was a problem hiding this comment.
👍 adding a non-existent columns test to MaterializeTablesSuite and a test for empty array to PythonPipelineSuite (the table gets no clustering columns)
| @@ -885,4 +886,228 @@ abstract class MaterializeTablesSuite extends BaseCoreExecutionTest { | |||
| storageRoot = storageRoot | |||
| ) | |||
| } | |||
|
|
|||
| test("cluster columns with user schema") { | |||
There was a problem hiding this comment.
let's also test non-existent columns
| @@ -266,22 +266,35 @@ object DatasetManager extends Logging { | |||
| ) | |||
| val mergedProperties = resolveTableProperties(table, identifier) | |||
| val partitioning = table.partitionCols.toSeq.flatten.map(Expressions.identity) | |||
| val clustering = table.clusterCols.map(cols => | |||
| ClusterByTransform(cols.map(col => FieldReference(col)).toSeq) | |||
There was a problem hiding this comment.
nit: although identical, let's be consistent with partition col handling, and use Expressions.column
| @@ -104,6 +104,9 @@ message PipelineCommand { | |||
| spark.connect.DataType schema_data_type = 4; | |||
| string schema_string = 5; | |||
| } | |||
|
|
|||
| // Optional cluster columns for the table. | |||
| repeated string cluster_cols = 6; | |||
There was a problem hiding this comment.
Can we use the existing name, clustering_columns, consistently instead of adding a new variant, cluster_cols, @sryza , @cloud-fan , @gengliangwang ?
spark/sql/connect/common/src/main/protobuf/spark/connect/commands.proto
Lines 144 to 145 in ada1908
There was a problem hiding this comment.
In the proto file layer, I hope we can be consistent although this can be different in the API layer (Scala/Java/Python/...)
There was a problem hiding this comment.
Makes sense – I just updated the PR to reflect this.
dongjoon-hyun
left a comment
There was a problem hiding this comment.
Thank you. Could you fix the failures?
[info] *** 50 TESTS FAILED ***
[error] Failed tests:
[error] org.apache.spark.sql.connect.pipelines.PythonPipelineSuite
[error] org.apache.spark.sql.connect.pipelines.EndToEndAPISuite
[error] org.apache.spark.sql.connect.service.SparkConnectSessionHolderSuite
|
@dongjoon-hyun 👍 failures fixed with a rebase – it looks like there was a race condition with a Python protobuf library version bump. |
dongjoon-hyun
left a comment
There was a problem hiding this comment.
+1, LGTM. Thank you, @sryza .
### What changes were proposed in this pull request? In the `table` and `materialized_view` decorators, accept a `cluster_by` argument that determines the clustering columns. ### Why are the changes needed? Parity with the `clusterBy` argument accepted by `DataStreamReader` and `DataFrameWriter`. ### Does this PR introduce _any_ user-facing change? Adds a new parameter to public APIs. ### How was this patch tested? Unit tests and integration tests. ### Was this patch authored or co-authored using generative AI tooling? Closes #52831 from sryza/cluster-by. Authored-by: Sandy Ryza <sandy.ryza@databricks.com> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org> (cherry picked from commit a927a14) Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
|
Merged to master/4.1. |
### What changes were proposed in this pull request? In the `table` and `materialized_view` decorators, accept a `cluster_by` argument that determines the clustering columns. ### Why are the changes needed? Parity with the `clusterBy` argument accepted by `DataStreamReader` and `DataFrameWriter`. ### Does this PR introduce _any_ user-facing change? Adds a new parameter to public APIs. ### How was this patch tested? Unit tests and integration tests. ### Was this patch authored or co-authored using generative AI tooling? Closes apache#52831 from sryza/cluster-by. Authored-by: Sandy Ryza <sandy.ryza@databricks.com> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
### What changes were proposed in this pull request? In the `table` and `materialized_view` decorators, accept a `cluster_by` argument that determines the clustering columns. ### Why are the changes needed? Parity with the `clusterBy` argument accepted by `DataStreamReader` and `DataFrameWriter`. ### Does this PR introduce _any_ user-facing change? Adds a new parameter to public APIs. ### How was this patch tested? Unit tests and integration tests. ### Was this patch authored or co-authored using generative AI tooling? Closes apache#52831 from sryza/cluster-by. Authored-by: Sandy Ryza <sandy.ryza@databricks.com> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
…ift` source code ### What changes were proposed in this pull request? This PR aims to update Spark Connect-generated Swift source code with Apache Spark `4.1.0`. ### Why are the changes needed? To use the latest bug fixes and new messages to develop for new features of `4.1.0`. - apache/spark#53024 - apache/spark#52894 - apache/spark#52890 - apache/spark#52872 - apache/spark#52746 - apache/spark#52831 ``` $ git clone -b v4.1.0 https://github.com/apache/spark.git $ cd spark/sql/connect/common/src/main/protobuf/ $ protoc --swift_out=. spark/connect/*.proto $ protoc --grpc-swift_out=. spark/connect/*.proto // Remove empty GRPC files $ cd spark/connect $ grep 'This file contained no services' * | awk -F: '{print $1}' | xargs rm ``` ### Does this PR introduce _any_ user-facing change? Pass the CIs. ### How was this patch tested? Pass the CIs. I manually tested with `Apache Spark 4.1.0`. ``` $ swift test --no-parallel ... Test run with 203 tests in 21 suites passed after 33.163 seconds. ``` ### Was this patch authored or co-authored using generative AI tooling? No. Closes #271 from dongjoon-hyun/SPARK-54811. Authored-by: Dongjoon Hyun <dongjoon@apache.org> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
What changes were proposed in this pull request?
In the
@tableand@materialized_viewdecorators, accept acluster_byargument that determines the clustering columns.Why are the changes needed?
Parity with the
clusterByargument accepted byDataStreamReaderandDataFrameWriter.Does this PR introduce any user-facing change?
Adds a new parameter to public APIs.
How was this patch tested?
Unit tests and integration tests.
Was this patch authored or co-authored using generative AI tooling?