Flink: put everything together for range distribution in Flink sink#10859
Conversation
| .defaultValue(StatisticsType.Auto.name()); | ||
|
|
||
| public static final ConfigOption<Double> CLOSE_FILE_COST_WEIGHT_PERCENTAGE = | ||
| ConfigOptions.key("close-file-cost-weight-percentage").doubleType().defaultValue(0.02d); |
There was a problem hiding this comment.
open to feedback on the config name, type, and default value. 0.02 means 2% of close file weight on the target weight per task. it avoids placing more than 50 files in one writer task.
There was a problem hiding this comment.
Also keeps open 50 writers (which means high memory footprint).
Does 0.02d mean 2 percent? In this case we can use close-file-cost-weight.
Or alternatively if percentage is in the name, then the value should be 2
There was a problem hiding this comment.
Yes, 0.02d meant 2%. I didn't go with the percentage in the naming and integer as value in order to get a bit more flexibility like maybe like 0.005d for 0.5%.
agree with the comment on naming that is probably not the best. maybe close-file-cost-weight. In the doc, we already explained 0.02d meant 2%.
There was a problem hiding this comment.
I renamed this config to RANGE_DISTRIBUTION_SORT_KEY_BASE_WEIGHT as I think it is more accurate. added more extensive Javadoc and explanation in the doc. hope that it is more clear to users.
Will follow up with a separate PR to change internal code from closeFileCost to sortKeyBaseWeight. it will touch a bunch of internal files and lines.
|
|
||
| // Convert the requested flink table schema to flink row type. | ||
| RowType flinkRowType = toFlinkRowType(table.schema(), tableSchema); | ||
| int writerParallelism = |
There was a problem hiding this comment.
writer parallelism is also needed by distributeDataStream method as the downstream operator parallelism for range partitioner.
|
@rodmeneses: This will effect your PR as well. Please sync with @stevenzwu about the order of the commits |
I don't think we should worry about the order. we can integrate the range distribution with the v2 sink separately after the v2 sink is merged. |
65edd7e to
2aaa30c
Compare
2aaa30c to
46825f9
Compare
6e1cbc9 to
1c612da
Compare
|
|
||
| Config value is a enum type: `Map`, `Sketch`, `Auto`. | ||
| <ul> | ||
| <li>Map: collect accurate sampling count for every single key. |
|
|
||
| ### Range distribution (experimental) | ||
|
|
||
| RANGE distribution shuffle data by partition key or sort order via a custom range partitioner. |
|
|
||
| #### Use cases | ||
|
|
||
| RANGE distribution can be applied an Iceberg table that either is partitioned or |
There was a problem hiding this comment.
can be applied -> can be applied to
| .withDescription("Type of statistics collection: Auto, Map, Sketch"); | ||
|
|
||
| public static final ConfigOption<Double> RANGE_DISTRIBUTION_SORT_KEY_BASE_WEIGHT = | ||
| ConfigOptions.key("ange-distribution-sort-key-base-weight") |
There was a problem hiding this comment.
ange-distribution-sort-key-base-weight -> range-distribution-sort-key-base-weight
| <li>For low cardinality scenario (like hundreds or thousands), | ||
| HashMap is used to track traffic distribution for every key. | ||
| If a new sort key value shows up, range partitioner would just | ||
| round-robin it to the writer tasks before traffic distribution has been learned. |
There was a problem hiding this comment.
learned. -> learned, let's remove the extra .
…ight as it is more accurate and probably more user friendly.
1c612da to
f8d559f
Compare
…pache#10859) (cherry picked from commit ed07fd1)
…pache#10859) (cherry picked from commit ed07fd1)
(cherry picked from commit ce772a6)
last PR for put everything together from the project: [Priority 2] Flink: support range distribution (view)