[flink] add coordinate and worker operator for small changelog files compaction#4380
Conversation
# Conflicts: # paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java
| } | ||
|
|
||
| private void emitPartitionChangelogCompactTask(BinaryRow partition) { | ||
| PartitionChangelog partitionChangelog = partitionChangelogs.get(partition); |
There was a problem hiding this comment.
partitionChangelog may be null or not?
| private final Map<Integer, List<DataFileMeta>> newFileChangelogFiles; | ||
| private final Map<Integer, List<DataFileMeta>> compactChangelogFiles; | ||
|
|
||
| public long totalFileSize() { |
There was a problem hiding this comment.
This mothod has not be called, delete it?
|
|
||
| private static class PartitionChangelog { | ||
| private long totalFileSize; | ||
| private final Map<Integer, List<DataFileMeta>> newFileChangelogFiles; |
| partitionChangelogs.remove(partition); | ||
| } | ||
|
|
||
| private void emitAllPartitionsChanglogCompactTask() { |
There was a problem hiding this comment.
partitionChangelogs.keySet().forEach(this::emitPartitionChangelogCompactTask);
| } | ||
|
|
||
| private void emitPartitionChangelogCompactTask(BinaryRow partition) { | ||
| PartitionChangelog partitionChangelog = partitionChangelogs.get(partition); |
There was a problem hiding this comment.
partitionChangelog may be null or not?
| public class ChangelogCompactTask implements Serializable { | ||
| private final long checkpointId; | ||
| private final BinaryRow partition; | ||
| private final Map<Integer, List<DataFileMeta>> newFileChangelogFiles; |
| public List<Committable> doCompact(FileStoreTable table) throws Exception { | ||
| FileStorePathFactory pathFactory = table.store().pathFactory(); | ||
|
|
||
| // copy all changelog files to a new big file |
There was a problem hiding this comment.
Two for statement has lots of some code, you can avoid this.
|
|
||
| // copy all changelog files to a new big file | ||
| for (Map.Entry<Integer, List<DataFileMeta>> entry : newFileChangelogFiles.entrySet()) { | ||
| Integer bucket = entry.getKey(); |
| + CompactedChangelogReadOnlyFormat.getIdentifier( | ||
| baseResult.meta.fileFormat()))); | ||
|
|
||
| List<Committable> newCommittables = new ArrayList<>(); |
There was a problem hiding this comment.
List newCommittables = new ArrayList<>(bucketedResults.entrySet().size());
| import org.apache.flink.api.common.typeutils.TypeSerializer; | ||
|
|
||
| /** Type information for {@link ChangelogCompactTask}. */ | ||
| public class ChangelogTaskTypeInfo extends TypeInformation<ChangelogCompactTask> { |
| private void copyFile( | ||
| FileStoreTable table, Path path, int bucket, boolean isCompactResult, DataFileMeta meta) | ||
| throws Exception { | ||
| if (outputStream == null) { |
There was a problem hiding this comment.
copyFile is only called in doCompact, so outputStream can be a local variable instead of a class member.
| assertThat(compactedChangelogs2).hasSize(2); | ||
| assertThat(listAllFilesWithPrefix("changelog-")).isEmpty(); | ||
|
|
||
| // write update data |
| + "'changelog-producer' = 'lookup', " | ||
| + "'lookup-wait' = '%s', " | ||
| + "'deletion-vectors.enabled' = '%s', " | ||
| + "'changelog.compact.parallelism' = '%s'", |
There was a problem hiding this comment.
What is this table option? Also why do you change write buffer size?
…compaction (apache#4380) This closes apache#4380. --------- Co-authored-by: tsreaper <tsreaper96@gmail.com>
Purpose
Linked issue: close #xxx
Add a Coordinator node to small changelog files compaction pipeline to decide how to concatenate it into a target file size result file, which can be one or multiple files, and add a worker node to merge those small files.
Tests
API and Format
Documentation