Skip to content

Flink: handle rescale properly and refactor statistics#10457

Merged
stevenzwu merged 4 commits into
apache:mainfrom
stevenzwu:shuffle-rescale-handling
Jul 22, 2024
Merged

Flink: handle rescale properly and refactor statistics#10457
stevenzwu merged 4 commits into
apache:mainfrom
stevenzwu:shuffle-rescale-handling

Conversation

@stevenzwu
Copy link
Copy Markdown
Contributor

close issue #10441

@stevenzwu stevenzwu requested a review from pvary June 6, 2024 18:18
@github-actions github-actions Bot added the flink label Jun 6, 2024
private final StatisticsType type;
private final Map<SortKey, Long> keyFrequency;
private final SortKey[] rangeBounds;
private final SortKey[] keySamples;
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this rename is needed so that AggregatedStatistics can be used to store both the complete samples and calculated range bounds.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be better to have a separate class for global stat, and aggregate stat? It's a bit late here, and it's a bit hard to follow when the keys keySamples are actually range bounds, and when they contain the full data...
Maybe tomorrow it will be easier to follow 😀

Copy link
Copy Markdown
Contributor Author

@stevenzwu stevenzwu Jun 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can definitely agree that it can be confusing. I also thought about it. We need to duplicate the serializer too. That was the main reason that I didn't go that route. I can make the change if you think it is better to separate them out.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's talk about this offline. I don't see all the cons and pros ATM.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reading through the code again, I'm more-and-more convinced, that we have 2 different objects here:

  • RangeBounds (former global statistics) - key-values of the weights used by the partitioner with hash
  • Statistics (former completed statistics) - Sketch or Map without hash, but full of data

I think we are just confusing them because of historical reasons.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, conceptually we have two types of objects here. for Map statistics, there is no difference btw global statistics and completed statistics, as there is no further reduction in stats size. For sketch statistics, global statistics is a lot smaller with range bounds.

We can introduce two types CompleteStatistics and GlobalStatistics. We can also introduce a base type AggregatedStatistics. I am trying to avoid duplicate the AggregatedStatisticsSerializer as it can work for both types. Maybe generics can enable code reused and solve the most duplications.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

last commit separated out the two types

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have removed the AggregatedStatistics base class. With the change to the GlobalStatistics on Map key assignment, the base class doesn't make much sense anymore. Now GlobalStatistics is used by partitioner and CompletedStatistics are raw aggregated stats.

StatisticsUtil.deserializeAggregatedStatistics(
statisticsEvent.statisticsBytes(), aggregatedStatisticsSerializer);
checkStatisticsTypeMigration();
output.collect(new StreamRecord<>(StatisticsOrRecord.fromStatistics(globalStatistics)));
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is a bug previously. we actually don't want to apply the new stats immediately during normal aggregation and propagation phase. switch happens at checkpoint boundary.

applyImmediately flag is added in this PR to distinguish the stats requested during rescale case. In this case, immediate application is desired.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why don't apply the new statistics immediately?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh... i remember. Don't want to mess with the ongoing files

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah. because Iceberg sink flush and commit at checkpoint boundary, switching at checkpoint boundary allows all subtasks to switch to the new stats for the same checkpoint cycle. Otherwise, some records are shuffled based on old stats and some are shuffled based on new stats.

// Asynchronously request the latest global statistics calculated with new downstream
// parallelism. It is possible events may have started flowing before coordinator responds
// with global statistics. In this case, range partitioner would just blindly shuffle
// records in round robin fashion.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we could be better off if we use the old ranges with some heuristics?

  • If we have higher number of subtasks, just use the old ranges, and leave idle tasks
  • If we have lower number of subtasks, then use modulo?

Or the communication is fast enough, that it doesn't worth the complexity?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

great question. I also thought about the fallback heuristic. Initially, I was thinking maybe start with simple and we can improve this part if it turns out to be a problem. communication should be relatively fast (maybe a few to 10 ms) if parallelism/fan-out is not very high.

If we have higher number of subtasks, just use the old ranges, and leave idle tasks

scale-up doesn't require any code change. it works like this already

If we have lower number of subtasks, then use modulo?

I agree modulo could be a sensible strategy

  • pro: better clustering than round-robin
  • con: uneven distribution. some subtasks may get double the loads than the other subtasks. but if the stats refresh is fast (like less than a dozen of ms). maybe this is not a concern.

Hence, I am in favor of implementing the fallback behavior for rescale

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, we haven't added the SketchRangePartitioner yet. fallback handling would be implemented there. so it will be out of the scope for this PR.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added the SketchRangePartitioner and RangePartitioner to this PR. so the scope is bigger now

CHAR_KEYS.get("b"),
CHAR_KEYS.get("b"),
CHAR_KEYS.get("a"),
CHAR_KEYS.get("a"),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the key samples are not ordered, just randomly selected samples from the incoming records?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have expected a bit more (based on my tests with sketches over long values), but having arbitrary keys probably doesn't allow better approximation

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sketch returns the array that captures the reservoir samples. before array is full, samples were added to the array in order.

@stevenzwu stevenzwu force-pushed the shuffle-rescale-handling branch 3 times, most recently from 01ef84a to 42c0113 Compare June 7, 2024 03:51
@pvary
Copy link
Copy Markdown
Contributor

pvary commented Jun 10, 2024

@stevenzwu: Unrelated question, but come up when I have reading the PR:

  • What happens in CDC cases, where the partition of the record might end up on multiple ranges. Did we make sure that the records with the same id go to the same subtask, so we can create a positional delete if there are multiple changes for the same id?

@stevenzwu
Copy link
Copy Markdown
Contributor Author

  • What happens in CDC cases, where the partition of the record might end up on multiple ranges. Did we make sure that the records with the same id go to the same subtask, so we can create a positional delete if there are multiple changes for the same id?

CDC/Upsert should use existing hash distribution in FlinkSink. range distribution shouldn't be used in this case

@pvary
Copy link
Copy Markdown
Contributor

pvary commented Jun 10, 2024

  • What happens in CDC cases, where the partition of the record might end up on multiple ranges. Did we make sure that the records with the same id go to the same subtask, so we can create a positional delete if there are multiple changes for the same id?

CDC/Upsert should use existing hash distribution in FlinkSink. range distribution shouldn't be used in this case

I think we can overlay the hash distribution above the ranges, and we could make it work, but I undestand your reluctance to try to grab too much in one go.

@stevenzwu
Copy link
Copy Markdown
Contributor Author

I think we can overlay the hash distribution above the ranges,

Not sure if we want that, hash distribution (keyBy) is simple and low overhead. Range distribution requires statistics collection.

@pvary
Copy link
Copy Markdown
Contributor

pvary commented Jun 10, 2024

I think we can overlay the hash distribution above the ranges,

Not sure if we want that, hash distribution (keyBy) is simple and low overhead. Range distribution requires statistics collection.

If you partition your orders by time, and need to update the order if it was canceled, the your key/partition is not equally distributed, and hashing is probably not a good option.

I would like to see the range partitioning as a precursor for writing ordered files with Flink. If we use similar constructs as the Flink SQL ORDER_BY then we can order the rows before writing them out. If we want to do this for CDC streams then we need to send the records with the same id to the same subtask.

Again, not something immediate, but might worth revisiting later.

@stevenzwu
Copy link
Copy Markdown
Contributor Author

stevenzwu commented Jun 10, 2024

I think we can overlay the hash distribution above the ranges,

Not sure if we want that, hash distribution (keyBy) is simple and low overhead. Range distribution requires statistics collection.

If you partition your orders by time, and need to update the order if it was canceled, the your key/partition is not equally distributed, and hashing is probably not a good option.

I would like to see the range partitioning as a precursor for writing ordered files with Flink. If we use similar constructs as the Flink SQL ORDER_BY then we can order the rows before writing them out. If we want to do this for CDC streams then we need to send the records with the same id to the same subtask.

Again, not something immediate, but might worth revisiting later.

I am happy to discuss it here. Agree that it is not sth we want to address in this PR.

ORDER_BY is essentially the SortOrder defined in table properties. Note that currently Flink writer doesn't sort rows within a data file. Range partitioner only range split keys across files for better clustering.

In the example of orders table partitioned by time (say hourly), the primary keys would be (hour(ts), order_id) tuple. hash distribution (keyBy) can work and ensure correctness. You are saying range distribution would work better, because the better clustering of order_id within an hourly partition, correct? With hash distribution, each subtask writes the number of data files equals to the number of hours in a checkpoint cycle. Range distribution can handles the event time skew (recent hours have more data than distant past). Yes, I agree with that assessment. However, there are some challenges with handling new values (like new hours as time advances). Right now, it is handled by round-robin, as there is no assumption of SortOrder should be handled as primary keys. Primary keys are better/safer handled as hash distribution.

@stevenzwu stevenzwu force-pushed the shuffle-rescale-handling branch 2 times, most recently from 39235fe to e6b3d9c Compare June 10, 2024 23:43
Comment on lines +32 to +33
Map<SortKey, Long> keyFrequency,
SortKey[] rangeBounds) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we just send the rangeBounds back as a GlobalStatistics?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, we do. rangeBounds is also an array of SortKey.

Here is an example from SketchUtil

   * To understand how range bounds are used in range partitioning, here is an example for human
   * ages with 4 partitions: [15, 32, 60]. The 4 ranges would be
   *
   * <ul>
   *   <li>age <= 15
   *   <li>age > 15 && age <= 32
   *   <li>age >32 && age <= 60
   *   <li>age > 60
   * </ul>

@stevenzwu stevenzwu force-pushed the shuffle-rescale-handling branch 3 times, most recently from 2bfd489 to 00fed5b Compare June 11, 2024 23:19
@stevenzwu stevenzwu changed the title Flink: handle rescale properly for range bounds in sketch statistics Flink: handle rescale properly and refactor statistics Jun 12, 2024
@stevenzwu stevenzwu force-pushed the shuffle-rescale-handling branch from 00fed5b to 163dbd7 Compare June 12, 2024 04:40
Comment on lines +72 to +81
private Partitioner<RowData> delegatePartitioner(GlobalStatistics statistics) {
if (statistics.type() == StatisticsType.Map) {
return new MapRangePartitioner(schema, sortOrder, statistics.mapAssignment());
} else if (statistics.type() == StatisticsType.Sketch) {
return new SketchRangePartitioner(schema, sortOrder, statistics.rangeBounds());
} else {
throw new IllegalArgumentException(
String.format("Invalid statistics type: %s. Should be Map or Sketch", statistics.type()));
}
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still struggling a bit creating a good mental model for the GlobalStatistics distribution.

I feel that separating out the GlobalStatistics would be good if we could create a data structure which could be used by both MapRangePartitioner and SketchRangePartitioner, but sadly these partitioners require different input data.

Would it make sense to serialize/parametrize the partitioner on the JM side and send it in the records instead of the GlobalStatistics in the StatisticsOrRecord?
Like PartitionerOrRecord, and on the partitioner side we just call the partition and forget it.

Would this be easier to understand for others?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel that separating out the GlobalStatistics would be good if we could create a data structure which could be used by both MapRangePartitioner and SketchRangePartitioner

GlobalStatistics is for that purpose. it contains either map assignment or range bounds.

Would it make sense to serialize/parametrize the partitioner on the JM side and send it in the records instead of the GlobalStatistics in the StatisticsOrRecord?

not sure if there is much benefit to serialize the delegate partitioner on the JM side and ship to the subtasks. that would require Java serialization and make checkpoint state more complex for schema evolution.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My issue with the GlobalStatistics is that it is not really global. The main payload (mapAssignments and rangeBounds) are different for the different Partitioners. So basically we pretend that we send a statistics, but in reality the data already describes the partitioner. Maybe it would be cleaner to accept this, and mirror this change in the code/class names etc.

Instead of writing a serializer for the GlobalStatistics, we can write a serializer for the respective partitioners. I don't see this as a blocker.

I don't have a very strong opinion on this, and I wanted you to understand why I feel that the code is a bit awkward in this case. What is usually a yellow flag for me, is to have an object where some fields are mutually exclusive. I try to examine those again to understand if we have a single object, or we just merged 2 objects to a single one.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

GlobalStatistics is global in the sense it is aggregated statistics by coordinator. We have a facade/proxy partitioner that would delegate the partition decision to underline partitioner based on the statistics type.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After our offline discussion I understand your points better. I understand that the concept is

  • That we collect the statistics
  • Then use the statistics to create a partitioner
    and we try to stick to the concept.

I still think it is confusing, that currently we collect 2 types of statistics and each of them are tightly coupled to the 2 types of partitioners we have. (Type.Map is always used by MapRangePartitioner, and Type.Sketch is always used by SketchRangePartitioner). We could create the MapRangePartitioner and the SketchRangePartitioner on the coordinator side, and send them to a PartitionerExecutor which just deserializes them and runs them. So the real logic would be in a single place (DataStatisticsCoordinator).

That said, the improvement is just coding style preference, as the Partitioners still need to serialize the underlying statistics, so the performance would be the same.

So we can move forward with your proposed solution.
Thanks for the discussion!

@stevenzwu stevenzwu force-pushed the shuffle-rescale-handling branch from 163dbd7 to 7051b49 Compare June 18, 2024 23:39
Copy link
Copy Markdown
Contributor

@pvary pvary left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi Steven,
I think we discussed every comment.
Could we run the tests one more time before merging? It was a long time ago when they were running, and it might be good to double check before merging.

@stevenzwu
Copy link
Copy Markdown
Contributor Author

Hi Steven, I think we discussed every comment. Could we run the tests one more time before merging? It was a long time ago when they were running, and it might be good to double check before merging.

let me rebase with the latest main branch

@stevenzwu stevenzwu force-pushed the shuffle-rescale-handling branch from 7051b49 to 7446959 Compare July 20, 2024 22:24
@stevenzwu stevenzwu merged commit 604b2bb into apache:main Jul 22, 2024
@stevenzwu
Copy link
Copy Markdown
Contributor Author

thanks @pvary for the review

stevenzwu added a commit to stevenzwu/iceberg that referenced this pull request Jul 23, 2024
stevenzwu added a commit to stevenzwu/iceberg that referenced this pull request Jul 23, 2024
stevenzwu added a commit that referenced this pull request Jul 26, 2024
jasonf20 pushed a commit to jasonf20/iceberg that referenced this pull request Aug 4, 2024
zachdisc pushed a commit to zachdisc/iceberg that referenced this pull request Dec 23, 2024
zachdisc pushed a commit to zachdisc/iceberg that referenced this pull request Dec 23, 2024
czy006 pushed a commit to czy006/iceberg that referenced this pull request Apr 2, 2025
czy006 pushed a commit to czy006/iceberg that referenced this pull request Apr 2, 2025
czy006 pushed a commit to czy006/iceberg that referenced this pull request Apr 2, 2025
czy006 pushed a commit to czy006/iceberg that referenced this pull request Apr 2, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Development

Successfully merging this pull request may close these issues.

2 participants