Add Physical Partitioning::Range enum variant#22207
Conversation
|
Thank you for opening this pull request! Reviewer note: cargo-semver-checks reported the current version number is not SemVer-compatible with the changes in this pull request (compared against the base branch). Details |
366c4ac to
cad5e05
Compare
stuhood
left a comment
There was a problem hiding this comment.
Thanks for iterating on this!
| /// ```text | ||
| /// exprs = [date, city] | ||
| /// | ||
| /// partition 0: | ||
| /// date in [2021-01-01, 2022-01-01) | ||
| /// city in [Allston, Boston) | ||
| /// | ||
| /// partition 1: | ||
| /// date in [2021-01-01, 2022-01-01) | ||
| /// city in [Boston, NYC) | ||
| /// ``` |
There was a problem hiding this comment.
Are these supposed to be representing compound keys or multi-dimensional partitioning?
If they are compound keys, then I think that it would be clearer to express them as:
[
(2021-01-01, Allston),
(2022-01-01, Boston),
...
]
If this is supposed to be multi-dimensional partitioning, then I think that that might be unnecessary, as mentioned on the discussion thread: any particular join only needs to consider 1 dimension (possibly with compound keys).
There was a problem hiding this comment.
This is supposed to be multi-dimensional. For example we are partitoned on independent id, time keys thus this would accurately represent our layout.
I do see what you are saying about the join needing a single key, which will work for our case as well. But maybe this can start as single dimension with compound keys and extend if the use case arises to avoid complexity?
There was a problem hiding this comment.
Ok, interesting. Yea, if there are multiple consumers who are interested in multi-dimensional partitioning, and it can still reduce down to a base-case of single-dimension partitioning for consumers who don't need that complexity, then perhaps it could make sense to bake it in here.
I'll be honest though: my largest concern is just that I have no experience with multi: only single. So I have less useful feedback to give.
One thing that could likely be a good exercise in terms of the representation would be figuring out what datastructure you would/could use to efficiently partition in multiple dimensions, and then bias towards a representation which allows you to construct that datastructure. In one dimensional partitioning, that's essentially just a binary-tree/b-tree/sorted structure: hence the desire for non-overlapping contiguous ranges (to avoid needing something more complex like an interval tree). For multi-dimensional partitioning, what structure would you use, and what would the inputs to construct one be? I expect that fully covering the space (contiguous, no-overlap) makes the multi-dimensional datastructure cheaper/simpler as well.
There was a problem hiding this comment.
Yeah, maybe look to see how other DBs support this. I know ClickHouse and InfluxDB do
There was a problem hiding this comment.
I looked at ClickHouse and InfluxDB, I foudn that they store physical partitioning metadata, but did not find anything like a “multi-dimensional repartition this row.”
I looked into systems that try to do a true multi-dimensional partitioning and there aren't many that really do it. I think fo good reason. It would treat the columns like time and city as independent axes, which in simple cases is great and easy but when things start to overlap or more nuanced it seem we would need a routing structure like a grid/sparse map/KDB-tree (these were very complicated).
The closest thing I found was in Sedona where they do spatial partitioning using quadtree and kdbtree:
- https://sedona.apache.org/1.7.1/api/rdocs/reference/sedona_apply_spatial_partitioner.html
- Quadtree: https://www.geeksforgeeks.org/dsa/quad-tree/
- KDBTree: https://en.wikipedia.org/wiki/K-D-B-tree
With compound-key range partitioning it is more clear and still efficient on repartition routing:
1. evaluate `(time, city)` as one ordered key
2. binary-search split points
3. route to a partition.
Compound-key range partitioning should cover most join/planner cases like @stuhood mentioned. We are typicaly asking "are the two sides of this join compatible" for things like dynamic filters. The thing it lacks compare to true multi-dimensional partitioning is independent routing. So, for example, it cannot directly represent “time bucket X and city bucket Y map to partition P” which is useful when we want to do optimizations on each axis independently like pruning on the individual columns:
WHERE time >= '2022' AND time < '2023'
AND city >= 'Boston' AND city < 'NYC'So I think compound-key range partitioning is the right move. If there is a use for this I would say that this should be its own separate implementation.
There was a problem hiding this comment.
I foudn that they store physical partitioning metadata, but did not find anything like a “multi-dimensional repartition this row.”
I agree -- Influx's model is best modeled as "compound key" (it is not multi-dimensional partitioning)
So I think compound-key range partitioning is the right move. If there is a use for this I would say that this should be its own separate implementation.
I agree
| lower: Option<RangeBound>, | ||
| upper: Option<RangeBound>, |
There was a problem hiding this comment.
The problem with encoding these as intervals as opposed to points (as suggested here) is that in order to use a more efficient re-partitioning strategy based on a sorted representation, you need to start by converting this representation back into the points representation, which involves a bunch of validation that the ranges are not overlapping, sorted, contiguous (so that you can floor), etc.
I don't feel strongly about it, but I think that a point-based representation involves a lot fewer special cases.
A points representation must cover the entire set of valid values (by construction). That doesn't let you use the partitioning strategy to short circuit if the ranges "Partially" cover the valid values (in the sense of being a partial function... e.g. TryFrom vs From). But honestly, I don't think that allowing for partial partitioning is a good idea anyway: for example, the Repartition operator wouldn't actually know what to do with a row which didn't map to any partition: it can't discard rows, because it doesn't know what operator is consuming it... so it would have to error. So I think that in practice, all Range partitioning strategies would need to be complete anyway, and this extra generality is just complexity.
There was a problem hiding this comment.
The reason I leaned toward this was readability. I think we could make the documentation clear or even provide helpers to abstract this nicely so I am not concerned with this.
I am ok with dong split points as well as long as other maintainers think this is ok for public API 👍
There was a problem hiding this comment.
A points representation must cover the entire set of valid values (by construction).
I also prefer a split points representation for the same reason. Specifically, I think split points ensures that any particular row value is in EXACTLY one partition. We would prevent user errors that could lead to cases where there are rows that don't belong in any partition or in more than one partition.
This also would make the sorting semantics easier
|
Note there is more discussion here |
alamb
left a comment
There was a problem hiding this comment.
Thank you @stuhood , @Dandandan and @gene-bordegaray for your work on this
I think this looks good to me. I see a new update just got pushed so submitting y feedback now and will review what just got pushed
Suggestions:
- We wait until we branch datafusion 54 (#21080) -- should be today or tomorrow
| /// ```text | ||
| /// exprs = [date, city] | ||
| /// | ||
| /// partition 0: | ||
| /// date in [2021-01-01, 2022-01-01) | ||
| /// city in [Allston, Boston) | ||
| /// | ||
| /// partition 1: | ||
| /// date in [2021-01-01, 2022-01-01) | ||
| /// city in [Boston, NYC) | ||
| /// ``` |
There was a problem hiding this comment.
I foudn that they store physical partitioning metadata, but did not find anything like a “multi-dimensional repartition this row.”
I agree -- Influx's model is best modeled as "compound key" (it is not multi-dimensional partitioning)
So I think compound-key range partitioning is the right move. If there is a use for this I would say that this should be its own separate implementation.
I agree
| lower: Option<RangeBound>, | ||
| upper: Option<RangeBound>, |
There was a problem hiding this comment.
A points representation must cover the entire set of valid values (by construction).
I also prefer a split points representation for the same reason. Specifically, I think split points ensures that any particular row value is in EXACTLY one partition. We would prevent user errors that could lead to cases where there are rows that don't belong in any partition or in more than one partition.
This also would make the sorting semantics easier
| sort_exprs: Vec<PhysicalSortExpr>, | ||
| /// Boundaries between adjacent partitions. `N` split points define `N + 1` | ||
| /// lower-inclusive, upper-exclusive partitions. Values equal to a split | ||
| /// point belong to the partition after that split point. |
There was a problem hiding this comment.
See above for a potential more formal way of specifying this. i recommend making the docs on RangePartitioning detailed and just leave a pointer from split_points to the main docs
| /// Ordered partitioning key. Sort options are part of the partitioning | ||
| /// because `ASC`/`DESC` and null ordering decide which side of a split point | ||
| /// a row belongs to. | ||
| sort_exprs: Vec<PhysicalSortExpr>, |
There was a problem hiding this comment.
Instead of sort_exprs, what do you think about using pre-existing LexOrdering: https://docs.rs/datafusion/latest/datafusion/physical_expr/struct.LexOrdering.html ?
There was a problem hiding this comment.
yup much better 👍
| /// Boundaries between adjacent partitions. `N` split points define `N + 1` | ||
| /// lower-inclusive, upper-exclusive partitions. Values equal to a split | ||
| /// point belong to the partition after that split point. | ||
| split_points: Vec<Vec<ScalarValue>>, |
There was a problem hiding this comment.
For future API extenability I recommend wrapping this Vec in a struct
split_points: Vec<SpitPoint>,And then
struct SplitPoint {
points: Vec<ScalarValue>
}That woudl both give us a good place to add documentation and things like Display impls, but if we ever wanted to add additional types of split points (like maybe inf or expressions) we wouldn't have to make a bunch of API changes
There was a problem hiding this comment.
yes, good point thank you
Partitioning::Range enum variant
stuhood
left a comment
There was a problem hiding this comment.
This representation looks good to me!
As to the multi-dimensional partitioning decision: I am very fine either way, but I do think that it could be very cool to actually lean in there (especially with 3+ consumers doing multi-dimensional)... it could potentially allow more optimization passes in upstream around pruning partitions.
But it could also be a third variant at some point, so not blocking.
alamb
left a comment
There was a problem hiding this comment.
Thank you @gene-bordegaray and @stuhood -- I had a few small comments but I think this PR now looks (really) good to merge
We have also branched the 54 release so I think we are clear to merge it from a release perspective.
Since this is a fairly substantial / fundamental new feature, before merging I think we should
- Send an email to the dev list (and maybe discord channel) saying that we have a proposed new API for a new partitioning and invite anyone else that is interested to review and provide feedback
- Leave this open for several more days to allow more time to collect feedback
- Add a note in the 55 upgrade guide (to be written) explaining that we adding new range partitioning
All in all, this is great work -- thanks again
|
|
||
| #[test] | ||
| fn test_multi_partition_range_does_not_satisfy_hash_distribution() -> Result<()> { | ||
| let schema = Arc::new(Schema::new(vec![ |
There was a problem hiding this comment.
the setup for the schema and creating col_a and col_b and the quivalence properties and range partitioning is the same in a bunch of these cases -- maybe it could be moved into a helper function to reduce code repetition which would make it easier to verify what each test is checking)
For example, if you had something like this
struct TestFixture {
/// schema with columns a, b
schema: SchemaRef,
col: Arc<dyn PhysicalExpr>,
...
}You could write this test with a lot less boilerplate like
#[test]
fn test_multi_partition_range_does_not_satisfy_hash_distribution() -> Result<()> {
let fixture = TestFixture::new();
let required = Distribution::HashPartitioned(vec![fixture.col_a, fixture.col_b]);
assert_eq!(
range_partitioning.satisfaction(&required, &fixture.eq_properties, false),
PartitioningSatisfaction::NotSatisfied
);
}There was a problem hiding this comment.
added a fixture for all partitioning tests, eliminted lots of boiler plate 😄
| /// The caller is responsible for satisfying the contract documented on | ||
| /// [`RangePartitioning`]. | ||
| pub fn new(ordering: LexOrdering, split_points: Vec<SplitPoint>) -> Self { | ||
| Self { |
There was a problem hiding this comment.
Given there is an invariant that all the values in split_points are in the correct order compared to the ordering, it seems like we should at least offer a RangePartitioning::try_new that validates that invariant and document that new does not chekc
There was a problem hiding this comment.
Good point, added this and use now in the proto
Also added some unit tests covering this 👍
d1aa4c5 to
11c4c18
Compare
|
Feel free to ping when we are going to merge this guy and I can rebase it 👍 |
…05/expr_partitioning_enum_mechanical
|
I took the liberty of merging up from main to resolve a conflict. I'll plan to merge this tomorrow unless someone wants more time to review or add comments. I think we should drop a note to the mailing list (dev@datafusion.apache.org) I didn't see anything on https://lists.apache.org/list.html?dev@datafusion.apache.org Maybe just a quick note like this: https://lists.apache.org/thread/mbw6q0ccndn2xq0kq8f28jrj5wppzqdn I think we can merge this PR in first, and then send the note (as people will hvae plenty of time to comment before we release DataFusion 55) |
|
Here is a note that was sent to the dev mailing list: https://lists.apache.org/thread/14d9fthyoyq76xd3yb89swxclvw91jfp I put this PR in the merge queue -- thank you @gene-bordegaray -- very excited to see this make progress |
Which issue does this PR close?
ExprPartitioningas described in thread: [DISCUSSION] Extending Partitioning to Support More Variants #21992.Rationale for this change
DataFusion currently cannot truthfully represent range-partitioned physical data. Some sources may be range partitioned, but have to advertise another partitioning shape or fall back to unknown partitioning.
This PR introduces the metadata shape for range partitioning without implementing optimizer or execution behavior yet. The goal is to establish the public representation first, then implement planning, compatibility, and execution behavior incrementally in follow-up PRs.
What changes are included in this PR?
Partitioning::Range(RangePartitioning).RangePartitioningRangePartitionRangeIntervalRangeBoundnot_impl_err!handling for range partitioning at call sites.UnknownPartitioning.Are these changes tested?
Yes.
Are there any user-facing changes?
Yes. This adds new public physical partitioning API and proto for range partitioning.