From 9e769a9d15242e7fbdac209e3dcef03f9e21c852 Mon Sep 17 00:00:00 2001 From: yejunhao Date: Wed, 10 May 2023 11:27:50 +0800 Subject: [PATCH 1/3] Revert "[core] limit parallelly read file memory usage, extract some methods (#1072)" This reverts commit 5d23c7da2ae4973c7d70f9e89a4b83220264142c. --- .../generated/core_configuration.html | 18 +- .../apache/paimon/AppendOnlyFileStore.java | 3 +- .../java/org/apache/paimon/CoreOptions.java | 13 -- .../org/apache/paimon/KeyValueFileStore.java | 3 +- .../manifest/AbstractManifestEntry.java | 189 ------------------ .../apache/paimon/manifest/ManifestEntry.java | 122 ++++++++++- .../operation/AbstractFileStoreScan.java | 100 ++++----- .../operation/AppendOnlyFileStoreScan.java | 6 +- .../operation/KeyValueFileStoreScan.java | 6 +- .../paimon/utils/ParallellyExecuteUtils.java | 86 -------- .../utils/ParallellyExecuteUtilsTest.java | 162 --------------- 11 files changed, 173 insertions(+), 535 deletions(-) delete mode 100644 paimon-core/src/main/java/org/apache/paimon/manifest/AbstractManifestEntry.java delete mode 100644 paimon-core/src/main/java/org/apache/paimon/utils/ParallellyExecuteUtils.java delete mode 100644 paimon-core/src/test/java/org/apache/paimon/utils/ParallellyExecuteUtilsTest.java diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html index 418149ae3c3d..72128e48c378 100644 --- a/docs/layouts/shortcodes/generated/core_configuration.html +++ b/docs/layouts/shortcodes/generated/core_configuration.html @@ -200,6 +200,12 @@

Enum

Specify the merge engine for table with primary key.

Possible values: + +
sort-engine
+ loser-tree +

Enum

+ Specify the sort engine for table with primary key.

Possible values: +
num-levels
(none) @@ -296,12 +302,6 @@ Long End condition "watermark" for bounded streaming mode. Stream reading will end when a larger watermark snapshot is encountered. - -
scan.manifest.parallelism
- (none) - Integer - The parallelism of scanning manifest files, default value is the size of cpu processor.Note: Scale-up this parameter will increase memory usage while scanning manifest files.We can consider downsize it when we encounter an out of memory exception while scanning -
scan.mode
default @@ -350,12 +350,6 @@ Duration The maximum time of completed snapshots to retain. - -
sort-engine
- loser-tree -

Enum

- Specify the sort engine for table with primary key.

Possible values: -
source.split.open-file-cost
4 mb diff --git a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java index 41abd3cae2e6..8e28b5b2dd84 100644 --- a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java +++ b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java @@ -96,8 +96,7 @@ private AppendOnlyFileStoreScan newScan(boolean forWrite) { manifestFileFactory(forWrite), manifestListFactory(forWrite), options.bucket(), - forWrite, - options.scanManifestParallelism()); + forWrite); } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java index ad7ccda11a55..d75702f6ef51 100644 --- a/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java @@ -394,15 +394,6 @@ public class CoreOptions implements Serializable { "End condition \"watermark\" for bounded streaming mode. Stream" + " reading will end when a larger watermark snapshot is encountered."); - public static final ConfigOption SCAN_MANIFEST_PARALLELISM = - key("scan.manifest.parallelism") - .intType() - .noDefaultValue() - .withDescription( - "The parallelism of scanning manifest files, default value is the size of cpu processor." - + "Note: Scale-up this parameter will increase memory usage while scanning manifest files." - + "We can consider downsize it when we encounter an out of memory exception while scanning"); - public static final ConfigOption LOG_CONSISTENCY = key("log.consistency") .enumType(LogConsistency.class) @@ -864,10 +855,6 @@ public Long scanSnapshotId() { return options.get(SCAN_SNAPSHOT_ID); } - public Integer scanManifestParallelism() { - return options.get(SCAN_MANIFEST_PARALLELISM); - } - public Optional sequenceField() { return options.getOptional(SEQUENCE_FIELD); } diff --git a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java index 079b00ace234..be257efa43d9 100644 --- a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java +++ b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java @@ -125,8 +125,7 @@ private KeyValueFileStoreScan newScan(boolean forWrite) { manifestFileFactory(forWrite), manifestListFactory(forWrite), options.bucket(), - forWrite, - options.scanManifestParallelism()); + forWrite); } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/manifest/AbstractManifestEntry.java b/paimon-core/src/main/java/org/apache/paimon/manifest/AbstractManifestEntry.java deleted file mode 100644 index cdd54cc0a9de..000000000000 --- a/paimon-core/src/main/java/org/apache/paimon/manifest/AbstractManifestEntry.java +++ /dev/null @@ -1,189 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.paimon.manifest; - -import org.apache.paimon.data.BinaryRow; -import org.apache.paimon.utils.FileStorePathFactory; -import org.apache.paimon.utils.Preconditions; - -import java.util.Collection; -import java.util.LinkedHashMap; -import java.util.Map; -import java.util.Objects; - -/** Abstract a simplest model of manifest file. */ -public abstract class AbstractManifestEntry { - protected final FileKind kind; - protected final String fileName; - // for tables without partition this field should be a row with 0 columns (not null) - protected final BinaryRow partition; - protected final int bucket; - protected final int totalBuckets; - protected final int level; - - public AbstractManifestEntry( - FileKind kind, - String fileName, - BinaryRow partition, - int bucket, - int totalBuckets, - int level) { - this.kind = kind; - this.fileName = fileName; - this.partition = partition; - this.bucket = bucket; - this.totalBuckets = totalBuckets; - this.level = level; - } - - public FileKind kind() { - return kind; - } - - public BinaryRow partition() { - return partition; - } - - public int bucket() { - return bucket; - } - - public int totalBuckets() { - return totalBuckets; - } - - public int level() { - return level; - } - - public Identifier identifier() { - return new Identifier(partition, bucket, level, fileName); - } - - @Override - public boolean equals(Object o) { - if (!(o instanceof AbstractManifestEntry)) { - return false; - } - AbstractManifestEntry that = (AbstractManifestEntry) o; - return Objects.equals(kind, that.kind) - && Objects.equals(partition, that.partition) - && bucket == that.bucket - && level == that.level - && Objects.equals(fileName, that.fileName); - } - - @Override - public int hashCode() { - return Objects.hash(kind, partition, bucket, level, fileName); - } - - @Override - public String toString() { - return String.format("{%s, %s, %d, %d, %s}", kind, partition, bucket, level, fileName); - } - - public static Collection mergeEntries( - Iterable entries) { - LinkedHashMap map = new LinkedHashMap<>(); - mergeEntries(entries, map); - return map.values(); - } - - public static void mergeEntries( - Iterable entries, Map map) { - for (T entry : entries) { - Identifier identifier = entry.identifier(); - switch (entry.kind()) { - case ADD: - Preconditions.checkState( - !map.containsKey(identifier), - "Trying to add file %s which is already added. Manifest might be corrupted.", - identifier); - map.put(identifier, entry); - break; - case DELETE: - // each dataFile will only be added once and deleted once, - // if we know that it is added before then both add and delete entry can be - // removed because there won't be further operations on this file, - // otherwise we have to keep the delete entry because the add entry must be - // in the previous manifest files - if (map.containsKey(identifier)) { - map.remove(identifier); - } else { - map.put(identifier, entry); - } - break; - default: - throw new UnsupportedOperationException( - "Unknown value kind " + entry.kind().name()); - } - } - } - - /** - * The same {@link Identifier} indicates that the {@link AbstractManifestEntry} refers to the - * same data file. - */ - public static class Identifier { - public final BinaryRow partition; - public final int bucket; - public final int level; - public final String fileName; - - private Identifier(BinaryRow partition, int bucket, int level, String fileName) { - this.partition = partition; - this.bucket = bucket; - this.level = level; - this.fileName = fileName; - } - - @Override - public boolean equals(Object o) { - if (!(o instanceof Identifier)) { - return false; - } - Identifier that = (Identifier) o; - return Objects.equals(partition, that.partition) - && bucket == that.bucket - && level == that.level - && Objects.equals(fileName, that.fileName); - } - - @Override - public int hashCode() { - return Objects.hash(partition, bucket, level, fileName); - } - - @Override - public String toString() { - return String.format("{%s, %d, %d, %s}", partition, bucket, level, fileName); - } - - public String toString(FileStorePathFactory pathFactory) { - return pathFactory.getPartitionString(partition) - + ", bucket " - + bucket - + ", level " - + level - + ", file " - + fileName; - } - } -} diff --git a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java index 5ad9f9972fc2..84af1081963f 100644 --- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java +++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java @@ -24,30 +24,61 @@ import org.apache.paimon.types.IntType; import org.apache.paimon.types.RowType; import org.apache.paimon.types.TinyIntType; +import org.apache.paimon.utils.FileStorePathFactory; import org.apache.paimon.utils.Preconditions; import java.util.ArrayList; import java.util.Collection; +import java.util.LinkedHashMap; import java.util.List; +import java.util.Map; import java.util.Objects; import static org.apache.paimon.utils.SerializationUtils.newBytesType; /** Entry of a manifest file, representing an addition / deletion of a data file. */ -public class ManifestEntry extends AbstractManifestEntry { +public class ManifestEntry { + private final FileKind kind; + // for tables without partition this field should be a row with 0 columns (not null) + private final BinaryRow partition; + private final int bucket; + private final int totalBuckets; private final DataFileMeta file; public ManifestEntry( FileKind kind, BinaryRow partition, int bucket, int totalBuckets, DataFileMeta file) { - super(kind, file.fileName(), partition, bucket, totalBuckets, file.level()); + this.kind = kind; + this.partition = partition; + this.bucket = bucket; + this.totalBuckets = totalBuckets; this.file = file; } + public FileKind kind() { + return kind; + } + + public BinaryRow partition() { + return partition; + } + + public int bucket() { + return bucket; + } + + public int totalBuckets() { + return totalBuckets; + } + public DataFileMeta file() { return file; } + public Identifier identifier() { + return new Identifier(partition, bucket, file.level(), file.fileName()); + } + public static RowType schema() { List fields = new ArrayList<>(); fields.add(new DataField(0, "_KIND", new TinyIntType(false))); @@ -81,6 +112,43 @@ public String toString() { return String.format("{%s, %s, %d, %d, %s}", kind, partition, bucket, totalBuckets, file); } + public static Collection mergeEntries(List entries) { + LinkedHashMap map = new LinkedHashMap<>(); + mergeEntries(entries, map); + return map.values(); + } + + public static void mergeEntries( + List entries, Map map) { + for (ManifestEntry entry : entries) { + ManifestEntry.Identifier identifier = entry.identifier(); + switch (entry.kind()) { + case ADD: + Preconditions.checkState( + !map.containsKey(identifier), + "Trying to add file %s which is already added. Manifest might be corrupted.", + identifier); + map.put(identifier, entry); + break; + case DELETE: + // each dataFile will only be added once and deleted once, + // if we know that it is added before then both add and delete entry can be + // removed because there won't be further operations on this file, + // otherwise we have to keep the delete entry because the add entry must be + // in the previous manifest files + if (map.containsKey(identifier)) { + map.remove(identifier); + } else { + map.put(identifier, entry); + } + break; + default: + throw new UnsupportedOperationException( + "Unknown value kind " + entry.kind().name()); + } + } + } + public static void assertNoDelete(Collection entries) { for (ManifestEntry entry : entries) { Preconditions.checkState( @@ -89,4 +157,54 @@ public static void assertNoDelete(Collection entries) { entry.file().fileName()); } } + + /** + * The same {@link Identifier} indicates that the {@link ManifestEntry} refers to the same data + * file. + */ + public static class Identifier { + public final BinaryRow partition; + public final int bucket; + public final int level; + public final String fileName; + + private Identifier(BinaryRow partition, int bucket, int level, String fileName) { + this.partition = partition; + this.bucket = bucket; + this.level = level; + this.fileName = fileName; + } + + @Override + public boolean equals(Object o) { + if (!(o instanceof Identifier)) { + return false; + } + Identifier that = (Identifier) o; + return Objects.equals(partition, that.partition) + && bucket == that.bucket + && level == that.level + && Objects.equals(fileName, that.fileName); + } + + @Override + public int hashCode() { + return Objects.hash(partition, bucket, level, fileName); + } + + @Override + public String toString() { + return String.format("{%s, %d, %d, %s}", partition, bucket, level, fileName); + } + + public String toString(FileStorePathFactory pathFactory) { + return pathFactory.getPartitionString(partition) + + ", bucket " + + bucket + + ", level " + + level + + ", file " + + fileName; + } + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java index f81a5cdc50d5..0650d9a5554d 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java @@ -21,7 +21,6 @@ import org.apache.paimon.Snapshot; import org.apache.paimon.data.BinaryRow; import org.apache.paimon.data.InternalRow; -import org.apache.paimon.manifest.AbstractManifestEntry; import org.apache.paimon.manifest.ManifestCacheFilter; import org.apache.paimon.manifest.ManifestEntry; import org.apache.paimon.manifest.ManifestEntrySerializer; @@ -36,9 +35,8 @@ import org.apache.paimon.stats.FieldStatsArraySerializer; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.FileStorePathFactory; +import org.apache.paimon.utils.FileUtils; import org.apache.paimon.utils.Filter; -import org.apache.paimon.utils.Pair; -import org.apache.paimon.utils.ParallellyExecuteUtils; import org.apache.paimon.utils.RowDataToObjectArrayConverter; import org.apache.paimon.utils.SnapshotManager; @@ -49,6 +47,7 @@ import java.util.List; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutionException; import java.util.function.Function; import java.util.stream.Collectors; @@ -79,7 +78,6 @@ public abstract class AbstractFileStoreScan implements FileStoreScan { private Filter levelFilter = null; private ManifestCacheFilter manifestCacheFilter = null; - private Integer scanManifestParallelism; public AbstractFileStoreScan( RowType partitionType, @@ -89,8 +87,7 @@ public AbstractFileStoreScan( ManifestFile.Factory manifestFileFactory, ManifestList.Factory manifestListFactory, int numOfBuckets, - boolean checkNumOfBuckets, - Integer scanManifestParallelism) { + boolean checkNumOfBuckets) { this.partitionStatsConverter = new FieldStatsArraySerializer(partitionType); this.partitionConverter = new RowDataToObjectArrayConverter(partitionType); checkArgument(bucketKeyType.getFieldCount() > 0, "The bucket keys should not be empty."); @@ -102,7 +99,6 @@ public AbstractFileStoreScan( this.numOfBuckets = numOfBuckets; this.checkNumOfBuckets = checkNumOfBuckets; this.tableSchemas = new ConcurrentHashMap<>(); - this.scanManifestParallelism = scanManifestParallelism; } @Override @@ -199,28 +195,6 @@ public FileStoreScan withManifestCacheFilter(ManifestCacheFilter manifestFilter) @Override public Plan plan() { - - Pair> planResult = doPlan(this::readManifestFileMeta); - - final Long readSnapshotId = planResult.getLeft(); - final List files = planResult.getRight(); - - return new Plan() { - @Nullable - @Override - public Long snapshotId() { - return readSnapshotId; - } - - @Override - public List files() { - return files; - } - }; - } - - private Pair> doPlan( - Function> readManifest) { List manifests = specifiedManifests; Long snapshotId = specifiedSnapshotId; if (manifests == null) { @@ -235,21 +209,28 @@ private Pair> doPlan( } } + final Long readSnapshot = snapshotId; final List readManifests = manifests; - Iterable entries = - ParallellyExecuteUtils.parallelismBatchIterable( - files -> - files.parallelStream() - .filter(this::filterManifestFileMeta) - .flatMap(m -> readManifest.apply(m).stream()) - .filter(this::filterByStats) - .collect(Collectors.toList()), - readManifests, - scanManifestParallelism); - - List files = new ArrayList<>(); - for (T file : AbstractManifestEntry.mergeEntries(entries)) { + List entries; + try { + entries = + FileUtils.COMMON_IO_FORK_JOIN_POOL + .submit( + () -> + readManifests + .parallelStream() + .filter(this::filterManifestFileMeta) + .flatMap(m -> readManifestFileMeta(m).stream()) + .filter(this::filterByStats) + .collect(Collectors.toList())) + .get(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException("Failed to read ManifestEntry list concurrently", e); + } + + List files = new ArrayList<>(); + for (ManifestEntry file : ManifestEntry.mergeEntries(entries)) { if (checkNumOfBuckets && file.totalBuckets() != numOfBuckets) { String partInfo = partitionConverter.getArity() > 0 @@ -268,8 +249,7 @@ private Pair> doPlan( } // bucket filter should not be applied along with partition filter - // because the specifiedBucket is computed against the current - // numOfBuckets + // because the specifiedBucket is computed against the current numOfBuckets // however entry.bucket() was computed against the old numOfBuckets // and thus the filtered manifest entries might be empty // which renders the bucket check invalid @@ -277,7 +257,19 @@ private Pair> doPlan( files.add(file); } } - return Pair.of(snapshotId, files); + + return new Plan() { + @Nullable + @Override + public Long snapshotId() { + return readSnapshot; + } + + @Override + public List files() { + return files; + } + }; } private List readManifests(Snapshot snapshot) { @@ -324,29 +316,19 @@ private boolean filterManifestFileMeta(ManifestFileMeta manifest) { } /** Note: Keep this thread-safe. */ - private boolean filterByBucket(AbstractManifestEntry entry) { + private boolean filterByBucket(ManifestEntry entry) { return (specifiedBucket == null || entry.bucket() == specifiedBucket); } /** Note: Keep this thread-safe. */ - private boolean filterByBucketSelector(AbstractManifestEntry entry) { + private boolean filterByBucketSelector(ManifestEntry entry) { return (bucketSelector == null || bucketSelector.select(entry.bucket(), entry.totalBuckets())); } /** Note: Keep this thread-safe. */ - private boolean filterByLevel(AbstractManifestEntry entry) { - return (levelFilter == null || levelFilter.test(entry.level())); - } - - /** Note: Keep this thread-safe. */ - private boolean filterByStats(AbstractManifestEntry entry) { - // filterByStats is an action that is completed as much as possible and does not have an - // impact if it is not done. - if (entry instanceof ManifestEntry) { - return filterByStats((ManifestEntry) entry); - } - return true; + private boolean filterByLevel(ManifestEntry entry) { + return (levelFilter == null || levelFilter.test(entry.file().level())); } /** Note: Keep this thread-safe. */ diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java index 97d761d106ba..fdab1742340f 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java @@ -52,8 +52,7 @@ public AppendOnlyFileStoreScan( ManifestFile.Factory manifestFileFactory, ManifestList.Factory manifestListFactory, int numOfBuckets, - boolean checkNumOfBuckets, - Integer scanManifestParallelism) { + boolean checkNumOfBuckets) { super( partitionType, bucketKeyType, @@ -62,8 +61,7 @@ public AppendOnlyFileStoreScan( manifestFileFactory, manifestListFactory, numOfBuckets, - checkNumOfBuckets, - scanManifestParallelism); + checkNumOfBuckets); this.rowType = rowType; this.fieldStatsConverters = new FieldStatsConverters(sid -> scanTableSchema(sid).fields(), schemaId); diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java index 5f85ba0288f2..a35177b006f6 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java @@ -54,8 +54,7 @@ public KeyValueFileStoreScan( ManifestFile.Factory manifestFileFactory, ManifestList.Factory manifestListFactory, int numOfBuckets, - boolean checkNumOfBuckets, - Integer scanManifestParallelism) { + boolean checkNumOfBuckets) { super( partitionType, bucketKeyType, @@ -64,8 +63,7 @@ public KeyValueFileStoreScan( manifestFileFactory, manifestListFactory, numOfBuckets, - checkNumOfBuckets, - scanManifestParallelism); + checkNumOfBuckets); this.fieldStatsConverters = new FieldStatsConverters( sid -> keyValueFieldsExtractor.keyFields(scanTableSchema(sid)), schemaId); diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/ParallellyExecuteUtils.java b/paimon-core/src/main/java/org/apache/paimon/utils/ParallellyExecuteUtils.java deleted file mode 100644 index 93f52ad14841..000000000000 --- a/paimon-core/src/main/java/org/apache/paimon/utils/ParallellyExecuteUtils.java +++ /dev/null @@ -1,86 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.paimon.utils; - -import org.apache.paimon.shade.guava30.com.google.common.collect.Lists; - -import java.util.ArrayDeque; -import java.util.Iterator; -import java.util.List; -import java.util.NoSuchElementException; -import java.util.Queue; -import java.util.concurrent.CompletableFuture; -import java.util.function.Function; - -/** - * This class is a parallel execution util class, which mainly aim to process tasks parallelly with - * memory control. - */ -public class ParallellyExecuteUtils { - - // reduce memory usage by batch iterable process, the cached result in memory will be queueSize - public static Iterable parallelismBatchIterable( - Function, List> processor, List input, Integer queueSize) { - if (queueSize == null) { - queueSize = FileUtils.COMMON_IO_FORK_JOIN_POOL.getParallelism(); - } else if (queueSize <= 0) { - throw new NegativeArraySizeException("queue size should not be negetive"); - } - - final Queue> stack = new ArrayDeque<>(Lists.partition(input, queueSize)); - - return () -> - new Iterator() { - List activeList = null; - private int index = 0; - - @Override - public boolean hasNext() { - advanceIfNeeded(); - return activeList != null && index < activeList.size(); - } - - @Override - public T next() { - advanceIfNeeded(); - if (activeList == null || index >= activeList.size()) { - throw new NoSuchElementException(); - } - return activeList.get(index++); - } - - private void advanceIfNeeded() { - while ((activeList == null || index >= activeList.size()) - && stack.size() > 0) { - // reset index - index = 0; - try { - activeList = - CompletableFuture.supplyAsync( - () -> processor.apply(stack.poll()), - FileUtils.COMMON_IO_FORK_JOIN_POOL) - .get(); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - } - }; - } -} diff --git a/paimon-core/src/test/java/org/apache/paimon/utils/ParallellyExecuteUtilsTest.java b/paimon-core/src/test/java/org/apache/paimon/utils/ParallellyExecuteUtilsTest.java deleted file mode 100644 index 6d1fd13214b0..000000000000 --- a/paimon-core/src/test/java/org/apache/paimon/utils/ParallellyExecuteUtilsTest.java +++ /dev/null @@ -1,162 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.paimon.utils; - -import org.assertj.core.api.Assertions; -import org.junit.jupiter.api.Test; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.Iterator; -import java.util.List; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.stream.Collectors; - -/** This test mainly test for the methods in {@link ParallellyExecuteUtils}. */ -public class ParallellyExecuteUtilsTest { - - @Test - public void testParallelismBatchIterable() { - List nums = new ArrayList<>(); - - for (int i = 0; i < 10000; i++) { - nums.add(i); - } - - Iterable re = - ParallellyExecuteUtils.parallelismBatchIterable( - l -> l.parallelStream().map(i -> i + 1).collect(Collectors.toList()), - nums, - null); - - AtomicInteger atomicInteger = new AtomicInteger(0); - re.forEach( - i -> - Assertions.assertThat(i) - .isEqualTo(nums.get(atomicInteger.getAndIncrement()) + 1)); - } - - @Test - public void testParallelismBatchIterable2() { - List nums = new ArrayList<>(); - - for (int i = 0; i < 12345; i++) { - nums.add(i); - } - - Iterable re = - ParallellyExecuteUtils.parallelismBatchIterable( - l -> l.parallelStream().map(i -> i + 1).collect(Collectors.toList()), - nums, - null); - - AtomicInteger atomicInteger = new AtomicInteger(0); - re.forEach( - i -> - Assertions.assertThat(i) - .isEqualTo(nums.get(atomicInteger.getAndIncrement()) + 1)); - } - - @Test - public void testParallelismBatchIterable3() { - List nums = new ArrayList<>(); - - for (int i = 0; i < 10000; i++) { - nums.add(i); - } - - Iterable re = - ParallellyExecuteUtils.parallelismBatchIterable( - l -> l.parallelStream().map(i -> i + 1).collect(Collectors.toList()), - nums, - null); - - Iterator iterator = re.iterator(); - for (int i = 0; i < 100; i++) { - iterator.hasNext(); - } - - AtomicInteger atomicInteger = new AtomicInteger(0); - while (iterator.hasNext()) { - Integer i = iterator.next(); - Assertions.assertThat(i).isEqualTo(nums.get(atomicInteger.getAndIncrement()) + 1); - } - } - - @Test - public void testParallelismBatchIterable4() { - List nums = new ArrayList<>(); - - for (int i = 0; i < 12345; i++) { - nums.add(i); - } - - Iterable re = - ParallellyExecuteUtils.parallelismBatchIterable( - l -> l.parallelStream().map(i -> i + 1).collect(Collectors.toList()), - nums, - null); - - Iterator iterator = re.iterator(); - for (int i = 0; i < 123; i++) { - iterator.hasNext(); - } - - AtomicInteger atomicInteger = new AtomicInteger(0); - while (iterator.hasNext()) { - Integer i = iterator.next(); - Assertions.assertThat(i).isEqualTo(nums.get(atomicInteger.getAndIncrement()) + 1); - } - } - - @Test - public void testForEmptyInput() { - Iterable re = - ParallellyExecuteUtils.parallelismBatchIterable( - l -> l.parallelStream().map(i -> i + 1).collect(Collectors.toList()), - (List) Collections.EMPTY_LIST, - null); - Assertions.assertThat(!re.iterator().hasNext()).isTrue(); - } - - @Test - public void testForSingletonInput() { - Iterable re = - ParallellyExecuteUtils.parallelismBatchIterable( - l -> l.parallelStream().map(i -> i + 1).collect(Collectors.toList()), - Collections.singletonList(1), - null); - re.forEach(i -> Assertions.assertThat(i).isEqualTo(2)); - } - - @Test - public void testDifferentQueueSizeWithFilterElement() { - for (int queueSize = 1; queueSize < 20; queueSize++) { - Iterable re = - ParallellyExecuteUtils.parallelismBatchIterable( - l -> l.parallelStream().filter(i -> i > 5).collect(Collectors.toList()), - Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10), - queueSize); - Integer[] result = new Integer[] {6, 7, 8, 9, 10}; - - Assertions.assertThat(re).hasSameElementsAs(Arrays.asList(result)); - } - } -} From a0a53f4ed00391edb9249bcf8ac6fbdfa0fad0a6 Mon Sep 17 00:00:00 2001 From: yejunhao Date: Wed, 10 May 2023 13:20:45 +0800 Subject: [PATCH 2/3] [core] limit parallelly read file memory usage (cherry-pick part of reverted code, we don't need AbstractEntry, but we still need memory control) (#1072) --- .../generated/core_configuration.html | 24 +-- .../apache/paimon/AppendOnlyFileStore.java | 3 +- .../java/org/apache/paimon/CoreOptions.java | 13 ++ .../org/apache/paimon/KeyValueFileStore.java | 3 +- .../apache/paimon/manifest/ManifestEntry.java | 4 +- .../operation/AbstractFileStoreScan.java | 75 ++++---- .../operation/AppendOnlyFileStoreScan.java | 6 +- .../operation/KeyValueFileStoreScan.java | 6 +- .../paimon/utils/ParallellyExecuteUtils.java | 86 ++++++++++ .../utils/ParallellyExecuteUtilsTest.java | 162 ++++++++++++++++++ 10 files changed, 328 insertions(+), 54 deletions(-) create mode 100644 paimon-core/src/main/java/org/apache/paimon/utils/ParallellyExecuteUtils.java create mode 100644 paimon-core/src/test/java/org/apache/paimon/utils/ParallellyExecuteUtilsTest.java diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html index 72128e48c378..25c05792638c 100644 --- a/docs/layouts/shortcodes/generated/core_configuration.html +++ b/docs/layouts/shortcodes/generated/core_configuration.html @@ -32,12 +32,6 @@

Enum

Whether to double write to a changelog file. This changelog file keeps the details of data changes, it can be read directly during stream reads.

Possible values:
  • "none": No changelog file.
  • "input": Double write to a changelog file when flushing memory table, the changelog is from input.
  • "full-compaction": Generate changelog files with each full compaction.
  • "lookup": Generate changelog files through 'lookup' before committing the data writing.
- -
changelog-producer.row-deduplicate
- false -

Boolean

- Whether to generate -U, +U changelog for the same record. This configuration is only valid for the changelog-producer is lookup or full-compaction. -
commit.force-compact
false @@ -200,12 +194,6 @@

Enum

Specify the merge engine for table with primary key.

Possible values:
  • "deduplicate": De-duplicate and keep the last row.
  • "partial-update": Partial update non-null fields.
  • "aggregation": Aggregate fields with same primary key.
- -
sort-engine
- loser-tree -

Enum

- Specify the sort engine for table with primary key.

Possible values:
  • "min-heap": Use min-heap for multiway sorting.
  • "loser-tree": Use loser-tree for multiway sorting. Compared with heapsort, loser-tree has fewer comparisons and is more efficient.
-
num-levels
(none) @@ -302,6 +290,12 @@ Long End condition "watermark" for bounded streaming mode. Stream reading will end when a larger watermark snapshot is encountered. + +
scan.manifest.parallelism
+ (none) + Integer + The parallelism of scanning manifest files, default value is the size of cpu processor.Note: Scale-up this parameter will increase memory usage while scanning manifest files.We can consider downsize it when we encounter an out of memory exception while scanning +
scan.mode
default @@ -350,6 +344,12 @@ Duration The maximum time of completed snapshots to retain. + +
sort-engine
+ loser-tree +

Enum

+ Specify the sort engine for table with primary key.

Possible values:
  • "min-heap": Use min-heap for multiway sorting.
  • "loser-tree": Use loser-tree for multiway sorting. Compared with heapsort, loser-tree has fewer comparisons and is more efficient.
+
source.split.open-file-cost
4 mb diff --git a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java index 8e28b5b2dd84..41abd3cae2e6 100644 --- a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java +++ b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java @@ -96,7 +96,8 @@ private AppendOnlyFileStoreScan newScan(boolean forWrite) { manifestFileFactory(forWrite), manifestListFactory(forWrite), options.bucket(), - forWrite); + forWrite, + options.scanManifestParallelism()); } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java index d75702f6ef51..c49833e0b97c 100644 --- a/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java @@ -394,6 +394,15 @@ public class CoreOptions implements Serializable { "End condition \"watermark\" for bounded streaming mode. Stream" + " reading will end when a larger watermark snapshot is encountered."); + public static final ConfigOption SCAN_MANIFEST_PARALLELISM = + key("scan.manifest.parallelism") + .intType() + .noDefaultValue() + .withDescription( + "The parallelism of scanning manifest files, default value is the size of cpu processor. " + + "Note: Scale-up this parameter will increase memory usage while scanning manifest files. " + + "We can consider downsize it when we encounter an out of memory exception while scanning"); + public static final ConfigOption LOG_CONSISTENCY = key("log.consistency") .enumType(LogConsistency.class) @@ -855,6 +864,10 @@ public Long scanSnapshotId() { return options.get(SCAN_SNAPSHOT_ID); } + public Integer scanManifestParallelism() { + return options.get(SCAN_MANIFEST_PARALLELISM); + } + public Optional sequenceField() { return options.getOptional(SEQUENCE_FIELD); } diff --git a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java index be257efa43d9..079b00ace234 100644 --- a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java +++ b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java @@ -125,7 +125,8 @@ private KeyValueFileStoreScan newScan(boolean forWrite) { manifestFileFactory(forWrite), manifestListFactory(forWrite), options.bucket(), - forWrite); + forWrite, + options.scanManifestParallelism()); } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java index 84af1081963f..1c175e589be5 100644 --- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java +++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java @@ -112,14 +112,14 @@ public String toString() { return String.format("{%s, %s, %d, %d, %s}", kind, partition, bucket, totalBuckets, file); } - public static Collection mergeEntries(List entries) { + public static Collection mergeEntries(Iterable entries) { LinkedHashMap map = new LinkedHashMap<>(); mergeEntries(entries, map); return map.values(); } public static void mergeEntries( - List entries, Map map) { + Iterable entries, Map map) { for (ManifestEntry entry : entries) { ManifestEntry.Identifier identifier = entry.identifier(); switch (entry.kind()) { diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java index 0650d9a5554d..4f3b3e3ba1e1 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java @@ -35,8 +35,9 @@ import org.apache.paimon.stats.FieldStatsArraySerializer; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.FileStorePathFactory; -import org.apache.paimon.utils.FileUtils; import org.apache.paimon.utils.Filter; +import org.apache.paimon.utils.Pair; +import org.apache.paimon.utils.ParallellyExecuteUtils; import org.apache.paimon.utils.RowDataToObjectArrayConverter; import org.apache.paimon.utils.SnapshotManager; @@ -47,7 +48,6 @@ import java.util.List; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ExecutionException; import java.util.function.Function; import java.util.stream.Collectors; @@ -78,6 +78,7 @@ public abstract class AbstractFileStoreScan implements FileStoreScan { private Filter levelFilter = null; private ManifestCacheFilter manifestCacheFilter = null; + private Integer scanManifestParallelism; public AbstractFileStoreScan( RowType partitionType, @@ -87,7 +88,8 @@ public AbstractFileStoreScan( ManifestFile.Factory manifestFileFactory, ManifestList.Factory manifestListFactory, int numOfBuckets, - boolean checkNumOfBuckets) { + boolean checkNumOfBuckets, + Integer scanManifestParallelism) { this.partitionStatsConverter = new FieldStatsArraySerializer(partitionType); this.partitionConverter = new RowDataToObjectArrayConverter(partitionType); checkArgument(bucketKeyType.getFieldCount() > 0, "The bucket keys should not be empty."); @@ -99,6 +101,7 @@ public AbstractFileStoreScan( this.numOfBuckets = numOfBuckets; this.checkNumOfBuckets = checkNumOfBuckets; this.tableSchemas = new ConcurrentHashMap<>(); + this.scanManifestParallelism = scanManifestParallelism; } @Override @@ -195,6 +198,28 @@ public FileStoreScan withManifestCacheFilter(ManifestCacheFilter manifestFilter) @Override public Plan plan() { + + Pair> planResult = doPlan(this::readManifestFileMeta); + + final Long readSnapshotId = planResult.getLeft(); + final List files = planResult.getRight(); + + return new Plan() { + @Nullable + @Override + public Long snapshotId() { + return readSnapshotId; + } + + @Override + public List files() { + return files; + } + }; + } + + private Pair> doPlan( + Function> readManifest) { List manifests = specifiedManifests; Long snapshotId = specifiedSnapshotId; if (manifests == null) { @@ -209,25 +234,18 @@ public Plan plan() { } } - final Long readSnapshot = snapshotId; final List readManifests = manifests; - List entries; - try { - entries = - FileUtils.COMMON_IO_FORK_JOIN_POOL - .submit( - () -> - readManifests - .parallelStream() - .filter(this::filterManifestFileMeta) - .flatMap(m -> readManifestFileMeta(m).stream()) - .filter(this::filterByStats) - .collect(Collectors.toList())) - .get(); - } catch (InterruptedException | ExecutionException e) { - throw new RuntimeException("Failed to read ManifestEntry list concurrently", e); - } + Iterable entries = + ParallellyExecuteUtils.parallelismBatchIterable( + files -> + files.parallelStream() + .filter(this::filterManifestFileMeta) + .flatMap(m -> readManifest.apply(m).stream()) + .filter(this::filterByStats) + .collect(Collectors.toList()), + readManifests, + scanManifestParallelism); List files = new ArrayList<>(); for (ManifestEntry file : ManifestEntry.mergeEntries(entries)) { @@ -249,7 +267,8 @@ public Plan plan() { } // bucket filter should not be applied along with partition filter - // because the specifiedBucket is computed against the current numOfBuckets + // because the specifiedBucket is computed against the current + // numOfBuckets // however entry.bucket() was computed against the old numOfBuckets // and thus the filtered manifest entries might be empty // which renders the bucket check invalid @@ -257,19 +276,7 @@ public Plan plan() { files.add(file); } } - - return new Plan() { - @Nullable - @Override - public Long snapshotId() { - return readSnapshot; - } - - @Override - public List files() { - return files; - } - }; + return Pair.of(snapshotId, files); } private List readManifests(Snapshot snapshot) { diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java index fdab1742340f..97d761d106ba 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java @@ -52,7 +52,8 @@ public AppendOnlyFileStoreScan( ManifestFile.Factory manifestFileFactory, ManifestList.Factory manifestListFactory, int numOfBuckets, - boolean checkNumOfBuckets) { + boolean checkNumOfBuckets, + Integer scanManifestParallelism) { super( partitionType, bucketKeyType, @@ -61,7 +62,8 @@ public AppendOnlyFileStoreScan( manifestFileFactory, manifestListFactory, numOfBuckets, - checkNumOfBuckets); + checkNumOfBuckets, + scanManifestParallelism); this.rowType = rowType; this.fieldStatsConverters = new FieldStatsConverters(sid -> scanTableSchema(sid).fields(), schemaId); diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java index a35177b006f6..5f85ba0288f2 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java @@ -54,7 +54,8 @@ public KeyValueFileStoreScan( ManifestFile.Factory manifestFileFactory, ManifestList.Factory manifestListFactory, int numOfBuckets, - boolean checkNumOfBuckets) { + boolean checkNumOfBuckets, + Integer scanManifestParallelism) { super( partitionType, bucketKeyType, @@ -63,7 +64,8 @@ public KeyValueFileStoreScan( manifestFileFactory, manifestListFactory, numOfBuckets, - checkNumOfBuckets); + checkNumOfBuckets, + scanManifestParallelism); this.fieldStatsConverters = new FieldStatsConverters( sid -> keyValueFieldsExtractor.keyFields(scanTableSchema(sid)), schemaId); diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/ParallellyExecuteUtils.java b/paimon-core/src/main/java/org/apache/paimon/utils/ParallellyExecuteUtils.java new file mode 100644 index 000000000000..93f52ad14841 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/utils/ParallellyExecuteUtils.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.utils; + +import org.apache.paimon.shade.guava30.com.google.common.collect.Lists; + +import java.util.ArrayDeque; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.Queue; +import java.util.concurrent.CompletableFuture; +import java.util.function.Function; + +/** + * This class is a parallel execution util class, which mainly aim to process tasks parallelly with + * memory control. + */ +public class ParallellyExecuteUtils { + + // reduce memory usage by batch iterable process, the cached result in memory will be queueSize + public static Iterable parallelismBatchIterable( + Function, List> processor, List input, Integer queueSize) { + if (queueSize == null) { + queueSize = FileUtils.COMMON_IO_FORK_JOIN_POOL.getParallelism(); + } else if (queueSize <= 0) { + throw new NegativeArraySizeException("queue size should not be negetive"); + } + + final Queue> stack = new ArrayDeque<>(Lists.partition(input, queueSize)); + + return () -> + new Iterator() { + List activeList = null; + private int index = 0; + + @Override + public boolean hasNext() { + advanceIfNeeded(); + return activeList != null && index < activeList.size(); + } + + @Override + public T next() { + advanceIfNeeded(); + if (activeList == null || index >= activeList.size()) { + throw new NoSuchElementException(); + } + return activeList.get(index++); + } + + private void advanceIfNeeded() { + while ((activeList == null || index >= activeList.size()) + && stack.size() > 0) { + // reset index + index = 0; + try { + activeList = + CompletableFuture.supplyAsync( + () -> processor.apply(stack.poll()), + FileUtils.COMMON_IO_FORK_JOIN_POOL) + .get(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + } + }; + } +} diff --git a/paimon-core/src/test/java/org/apache/paimon/utils/ParallellyExecuteUtilsTest.java b/paimon-core/src/test/java/org/apache/paimon/utils/ParallellyExecuteUtilsTest.java new file mode 100644 index 000000000000..6d1fd13214b0 --- /dev/null +++ b/paimon-core/src/test/java/org/apache/paimon/utils/ParallellyExecuteUtilsTest.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.utils; + +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; + +/** This test mainly test for the methods in {@link ParallellyExecuteUtils}. */ +public class ParallellyExecuteUtilsTest { + + @Test + public void testParallelismBatchIterable() { + List nums = new ArrayList<>(); + + for (int i = 0; i < 10000; i++) { + nums.add(i); + } + + Iterable re = + ParallellyExecuteUtils.parallelismBatchIterable( + l -> l.parallelStream().map(i -> i + 1).collect(Collectors.toList()), + nums, + null); + + AtomicInteger atomicInteger = new AtomicInteger(0); + re.forEach( + i -> + Assertions.assertThat(i) + .isEqualTo(nums.get(atomicInteger.getAndIncrement()) + 1)); + } + + @Test + public void testParallelismBatchIterable2() { + List nums = new ArrayList<>(); + + for (int i = 0; i < 12345; i++) { + nums.add(i); + } + + Iterable re = + ParallellyExecuteUtils.parallelismBatchIterable( + l -> l.parallelStream().map(i -> i + 1).collect(Collectors.toList()), + nums, + null); + + AtomicInteger atomicInteger = new AtomicInteger(0); + re.forEach( + i -> + Assertions.assertThat(i) + .isEqualTo(nums.get(atomicInteger.getAndIncrement()) + 1)); + } + + @Test + public void testParallelismBatchIterable3() { + List nums = new ArrayList<>(); + + for (int i = 0; i < 10000; i++) { + nums.add(i); + } + + Iterable re = + ParallellyExecuteUtils.parallelismBatchIterable( + l -> l.parallelStream().map(i -> i + 1).collect(Collectors.toList()), + nums, + null); + + Iterator iterator = re.iterator(); + for (int i = 0; i < 100; i++) { + iterator.hasNext(); + } + + AtomicInteger atomicInteger = new AtomicInteger(0); + while (iterator.hasNext()) { + Integer i = iterator.next(); + Assertions.assertThat(i).isEqualTo(nums.get(atomicInteger.getAndIncrement()) + 1); + } + } + + @Test + public void testParallelismBatchIterable4() { + List nums = new ArrayList<>(); + + for (int i = 0; i < 12345; i++) { + nums.add(i); + } + + Iterable re = + ParallellyExecuteUtils.parallelismBatchIterable( + l -> l.parallelStream().map(i -> i + 1).collect(Collectors.toList()), + nums, + null); + + Iterator iterator = re.iterator(); + for (int i = 0; i < 123; i++) { + iterator.hasNext(); + } + + AtomicInteger atomicInteger = new AtomicInteger(0); + while (iterator.hasNext()) { + Integer i = iterator.next(); + Assertions.assertThat(i).isEqualTo(nums.get(atomicInteger.getAndIncrement()) + 1); + } + } + + @Test + public void testForEmptyInput() { + Iterable re = + ParallellyExecuteUtils.parallelismBatchIterable( + l -> l.parallelStream().map(i -> i + 1).collect(Collectors.toList()), + (List) Collections.EMPTY_LIST, + null); + Assertions.assertThat(!re.iterator().hasNext()).isTrue(); + } + + @Test + public void testForSingletonInput() { + Iterable re = + ParallellyExecuteUtils.parallelismBatchIterable( + l -> l.parallelStream().map(i -> i + 1).collect(Collectors.toList()), + Collections.singletonList(1), + null); + re.forEach(i -> Assertions.assertThat(i).isEqualTo(2)); + } + + @Test + public void testDifferentQueueSizeWithFilterElement() { + for (int queueSize = 1; queueSize < 20; queueSize++) { + Iterable re = + ParallellyExecuteUtils.parallelismBatchIterable( + l -> l.parallelStream().filter(i -> i > 5).collect(Collectors.toList()), + Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10), + queueSize); + Integer[] result = new Integer[] {6, 7, 8, 9, 10}; + + Assertions.assertThat(re).hasSameElementsAs(Arrays.asList(result)); + } + } +} From 689fdac50a12a14492c39a003393901b7186d334 Mon Sep 17 00:00:00 2001 From: yejunhao Date: Wed, 10 May 2023 13:43:24 +0800 Subject: [PATCH 3/3] [fix] fix docs --- docs/layouts/shortcodes/generated/core_configuration.html | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html index 25c05792638c..65f084aa2da9 100644 --- a/docs/layouts/shortcodes/generated/core_configuration.html +++ b/docs/layouts/shortcodes/generated/core_configuration.html @@ -32,6 +32,12 @@

Enum

Whether to double write to a changelog file. This changelog file keeps the details of data changes, it can be read directly during stream reads.

Possible values:
  • "none": No changelog file.
  • "input": Double write to a changelog file when flushing memory table, the changelog is from input.
  • "full-compaction": Generate changelog files with each full compaction.
  • "lookup": Generate changelog files through 'lookup' before committing the data writing.
+ +
changelog-producer.row-deduplicate
+ false + Boolean + Whether to generate -U, +U changelog for the same record. This configuration is only valid for the changelog-producer is lookup or full-compaction. +
commit.force-compact
false @@ -294,7 +300,7 @@
scan.manifest.parallelism
(none) Integer - The parallelism of scanning manifest files, default value is the size of cpu processor.Note: Scale-up this parameter will increase memory usage while scanning manifest files.We can consider downsize it when we encounter an out of memory exception while scanning + The parallelism of scanning manifest files, default value is the size of cpu processor. Note: Scale-up this parameter will increase memory usage while scanning manifest files. We can consider downsize it when we encounter an out of memory exception while scanning
scan.mode