Skip to content

CollapseRepartition physical optimizer#3991

Closed
Jefffrey wants to merge 1 commit into
apache:masterfrom
Jefffrey:41_collapse_repartition
Closed

CollapseRepartition physical optimizer#3991
Jefffrey wants to merge 1 commit into
apache:masterfrom
Jefffrey:41_collapse_repartition

Conversation

@Jefffrey

Copy link
Copy Markdown
Contributor

Which issue does this PR close?

Closes #41

Rationale for this change

Add repartition elimination as suggested by #41

What changes are included in this PR?

New physical optimizer to eliminate adjacent RepartitionExec nodes

Are there any user-facing changes?

@github-actions github-actions Bot added the core Core DataFusion crate label Oct 27, 2022
@Jefffrey

Copy link
Copy Markdown
Contributor Author

Unsure if this is what is meant by #41, but I gave it a shot

There's probably more to the optimization that could be done, such as dealing with cases like having a Projection sandwiched between two repartition nodes, or cases like this:

HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: "int_col", index: 0 }, Column { name: "int_col3", index: 0 })]                                                                                                    
 *RepartitionExec: partitioning=Hash([Column { name: "int_col", index: 0 }], 12)                                                                                                                                                       
    HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: "int_col", index: 0 }, Column { name: "int_col2", index: 0 })]                                                                                                
     *RepartitionExec: partitioning=Hash([Column { name: "int_col", index: 0 }], 12)                                                                                                                                                   
        ProjectionExec: expr=[int_col@2 as int_col, double_col@3 as double_col, CAST(date_string_col@4 AS Utf8) as alltypes_plain.date_string_col]                                                                                     
          FilterExec: id@0 > 1 AND CAST(tinyint_col@1 AS Float64) < double_col@3                                                                                                                                                       
            ParquetExec: limit=None, partitions=[home/jeffrey/Code/arrow-datafusion/parquet-testing/data/alltypes_plain.parquet], predicate=id_max@0 > 1 AND true, projection=[id, tinyint_col, int_col, double_col, date_string_col]  
      RepartitionExec: partitioning=Hash([Column { name: "int_col2", index: 0 }], 12)                                                                                                                                                  
        ProjectionExec: expr=[int_col@0 as int_col2]                                                                                                                                                                                   
          ProjectionExec: expr=[int_col@2 as int_col, double_col@3 as double_col, CAST(date_string_col@4 AS Utf8) as alltypes_plain.date_string_col]                                                                                   
            FilterExec: id@0 > 1 AND CAST(tinyint_col@1 AS Float64) < double_col@3                                                                                                                                                     
              ParquetExec: limit=None, partitions=[home/jeffrey/Code/arrow-datafusion/parquet-testing/data/alltypes_plain.parquet], predicate=id_max@0 > 1 AND true, projection=[id, tinyint_col, int_col, double_col, date_string_col]
  RepartitionExec: partitioning=Hash([Column { name: "int_col3", index: 0 }], 12)                                                                                                                                                      
    ProjectionExec: expr=[int_col@0 as int_col3]                                                                                                                                                                                       
      ProjectionExec: expr=[int_col@2 as int_col, double_col@3 as double_col, CAST(date_string_col@4 AS Utf8) as alltypes_plain.date_string_col]                                                                                       
        FilterExec: id@0 > 1 AND CAST(tinyint_col@1 AS Float64) < double_col@3                                                                                                                                                         
          ParquetExec: limit=None, partitions=[home/jeffrey/Code/arrow-datafusion/parquet-testing/data/alltypes_plain.parquet], predicate=id_max@0 > 1 AND true, projection=[id, tinyint_col, int_col, double_col, date_string_col]

Where could potentially collapse the two marked RepartitionExec nodes? Unsure if that is correct.

Appreciate any feedback on this

@mingmwang

Copy link
Copy Markdown
Contributor

There is a general implementation in below PR, please take a look.
#3855
I working on split this PR to small ones now.

@Dandandan

Copy link
Copy Markdown
Contributor

@Jefffrey the physical nodes have information on them (output_partitioning) that enables retrieving the child partitioning and seeing that an extra repartitioning will not change it, doing it in a more generic way wether there is a filter, projection, aggregate, join in between.
See PR from @mingmwang

@Jefffrey

Copy link
Copy Markdown
Contributor Author

I see, so should #41 be linked to #3855 as well then, if I'm understanding correctly?

@Jefffrey Jefffrey closed this Oct 30, 2022
@Jefffrey Jefffrey deleted the 41_collapse_repartition branch November 12, 2022 00:11
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

core Core DataFusion crate

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Support hash repartion elimination

3 participants