Skip to content

fix: remove partition aware union logic#6009

Closed
crepererum wants to merge 1 commit into
apache:mainfrom
crepererum:crepererum/issue5970
Closed

fix: remove partition aware union logic#6009
crepererum wants to merge 1 commit into
apache:mainfrom
crepererum:crepererum/issue5970

Conversation

@crepererum

Copy link
Copy Markdown
Contributor

Which issue does this PR close?

Fixes #5970.

Rationale for this change

UnionExec shall be a simple node and not perform some optimizations on its own. If repartition / partition concatenation is desired, then there should be an optimizer pass for that. Being a simple node avoids confusion and optimizer bugs like #5970.

What changes are included in this PR?

Remove the entire "partition aware" logic from UnionExec

Are these changes tested?

  1. Existing tests pass
  2. Test for UNION ALL with ORDER BY results are inconsistent #5970 passes.

Are there any user-facing changes?

Less bugs.

`UnionExec` shall be a simple node and not perform some optimizations on
its own. If repartition / partition concatenation is desired, then there
should be an optimizer pass for that.

Fixes apache#5970.
@github-actions github-actions Bot added the core Core DataFusion crate label Apr 14, 2023
@mingmwang

Copy link
Copy Markdown
Contributor

Could you please add a flag to turn on/off the partition aware logic ?
Originally I add the logic is because there is an open ticket for this.
#189

And in SparkSQL, it has similar optimization.

For UnionExec, keep the ordering or keep the partitioning are conflict requirements. In some case we would like to keep
the ordering info but in some other cases/systems(distributed systems), we want to keep the partitioning to avoid data shuffle.

@mingmwang

Copy link
Copy Markdown
Contributor

And I do not think it is bug.

@crepererum

Copy link
Copy Markdown
Contributor Author

@mingmwang do you think we could change the optimizer to add a RepartitionExec or some "combine exec" for that special case w/o overloading UnionExec?

@crepererum

Copy link
Copy Markdown
Contributor Author

And I do not think it is bug.

I mean #5970 clearly shows that there is SOME bug in the DF code base and as illustrated there different parts of the code could be hold responsible for it. So while the technical bug is probably in the optimizer, the fact that UnionExec has its own internal optimizer and that people clearly don't know about it (also because it's not really documented) made me conclude that we should probably have a cleaner design instead of fixing more edge cases that are a consequence of the current design.

@mingmwang

Copy link
Copy Markdown
Contributor

And I do not think it is bug.

I mean #5970 clearly shows that there is SOME bug in the DF code base and as illustrated there different parts of the code could be hold responsible for it. So while the technical bug is probably in the optimizer, the fact that UnionExec has its own internal optimizer and that people clearly don't know about it (also because it's not really documented) made me conclude that we should probably have a cleaner design instead of fixing more edge cases that are a consequence of the current design.

I get your points. Agree that this internal optimization logic inside the UnionExec is not visible to others, this is not good.

Basically, we need to consider the following requirements

  1. Sometimes we would like the UnionExec to keep ordering
  2. Sometimes we would like the UnionExec to keep partition
  3. The physical plan should show or display the UnionExec is partition-aware or ordering-aware clearly.
  4. The physical optimizers rules(Enforce sort/enforce distribution) need to handle all the different cases correctly.

@crepererum

Copy link
Copy Markdown
Contributor Author

Good points. I esp. like the "keep ordering VS keep partition" semantics. How about we split UnionExec into two node types then? I think this would avoid a lot of confusion.

@mingmwang

Copy link
Copy Markdown
Contributor

Good points. I esp. like the "keep ordering VS keep partition" semantics. How about we split UnionExec into two node types then? I think this would avoid a lot of confusion.

At the very begin, I thought we could have two types of Union, a normal UnionExec and PartitionAwareUnionExec. We could have a physical rule to replace UnionExec to PartitionAwareUnionExec if capable, and a configure option to turn on/off the rule. But because the partition and ordering info are not static, they are changed dynamically. I not sure how this rule will interact with the enforcer rules.

@alamb

alamb commented Apr 14, 2023

Copy link
Copy Markdown
Contributor

I spent a non trivial time thinking about what a "PartitionAware" Union even means

It is entirely undocumented in
https://docs.rs/datafusion/22.0.0/datafusion/physical_plan/union/struct.UnionExec.html

Thanks to @mingmwang and @crepererum 's comment on #5970 (comment)

Given the operation seems so different, if we are going to keep the partition aware union, I think we should use a different structure name. Maybe we could call what is currently named "UnionExec with preserve partitioning" as Interleave -- that would imply the data from the different partitions was kept segregated in their own partitions but interleaved in the output partition streams.

The physical plan should show or display the UnionExec is partition-aware or ordering-aware clearly.

I will try and make a PR to do this to make the current state of affairs easier to understand

@alamb

alamb commented Apr 14, 2023

Copy link
Copy Markdown
Contributor

Update: 🤔 it seems from my experiment on #6013 that we have no explain coverage (at all) for UnionExec with preserve partitioning.

@mingmwang do you know of tests / examples of it being used so that I can add coverage?

@crepererum

Copy link
Copy Markdown
Contributor Author

This week I plan to extend this PR by the new node type (partition aware Union / interleave) and an optimizer pass that inserts it.

@mingmwang

mingmwang commented Apr 18, 2023

Copy link
Copy Markdown
Contributor

@alamb
Originally, the UT partition_aware_union in dataframe.rs should cover the basic correctness and partition count test.
But there is no UT to cover the physical plan description.

@mingmwang

Copy link
Copy Markdown
Contributor

@crepererum
Regarding the bug fix, if we still want to keep the functionality of partition-aware Union, you can fix the rule SortPushDown and do not push down the sort requirements for partition-aware Union.

In the method pushdown_requirement_to_children()

else if is_union(plan) {
        // UnionExec does not have real sort requirements for its input. Here we change the adjusted_request_ordering to UnionExec's output ordering and
        // propagate the sort requirements down to correct the unnecessary descendant SortExec under the UnionExec
        Ok(Some(vec![
            parent_required.map(|elem| elem.to_vec());
            plan.children().len()
        ]))
}

change to

else if let Some(UnionExec {
        partition_aware: false, ..
    }) = plan.as_any().downcast_ref::<UnionExec>()
    {
        // UnionExec does not have real sort requirements for its input. Here we change the adjusted_request_ordering to UnionExec's output ordering and
        // propagate the sort requirements down to correct the unnecessary descendant SortExec under the UnionExec
        Ok(Some(vec![
            parent_required.map(|elem| elem.to_vec());
            plan.children().len()
        ]))
    }

@mingmwang

mingmwang commented Apr 18, 2023

Copy link
Copy Markdown
Contributor

Sorry that #5970 is my bug, I wrote the first version of the Top-Down sort enforcement and forgot to handle the partition-aware union cases.

@crepererum

Copy link
Copy Markdown
Contributor Author

Filed #6045 as a more advanced solution. I think this is the best of both worlds.

@alamb alamb closed this in #6045 Apr 21, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

core Core DataFusion crate

Projects

None yet

Development

Successfully merging this pull request may close these issues.

UNION ALL with ORDER BY results are inconsistent

3 participants