diff --git a/docs/docs/flink-configuration.md b/docs/docs/flink-configuration.md index 42dc15f5b3d2..1ac16d7fc3e9 100644 --- a/docs/docs/flink-configuration.md +++ b/docs/docs/flink-configuration.md @@ -146,14 +146,56 @@ INSERT INTO tableName /*+ OPTIONS('upsert-enabled'='true') */ ... ``` -| Flink option | Default | Description | -| ---------------------- | ------------------------------------------ | ------------------------------------------------------------ | -| write-format | Table write.format.default | File format to use for this write operation; parquet, avro, or orc | -| target-file-size-bytes | As per table property | Overrides this table's write.target-file-size-bytes | -| upsert-enabled | Table write.upsert.enabled | Overrides this table's write.upsert.enabled | -| overwrite-enabled | false | Overwrite the table's data, overwrite mode shouldn't be enable when configuring to use UPSERT data stream. | -| distribution-mode | Table write.distribution-mode | Overrides this table's write.distribution-mode | -| compression-codec | Table write.(fileformat).compression-codec | Overrides this table's compression codec for this write | -| compression-level | Table write.(fileformat).compression-level | Overrides this table's compression level for Parquet and Avro tables for this write | -| compression-strategy | Table write.orc.compression-strategy | Overrides this table's compression strategy for ORC tables for this write | -| write-parallelism | Upstream operator parallelism | Overrides the writer parallelism | +| Flink option | Default | Description | +|-----------------------------------------|--------------------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------| +| write-format | Table write.format.default | File format to use for this write operation; parquet, avro, or orc | +| target-file-size-bytes | As per table property | Overrides this table's write.target-file-size-bytes | +| upsert-enabled | Table write.upsert.enabled | Overrides this table's write.upsert.enabled | +| overwrite-enabled | false | Overwrite the table's data, overwrite mode shouldn't be enable when configuring to use UPSERT data stream. | +| distribution-mode | Table write.distribution-mode | Overrides this table's write.distribution-mode. RANGE distribution is in experimental status. | +| range-distribution-statistics-type | Auto | Range distribution data statistics collection type: Map, Sketch, Auto. See details [here](#range-distribution-statistics-type). | +| range-distribution-sort-key-base-weight | 0.0 (double) | Base weight for every sort key relative to target traffic weight per writer task. See details [here](#range-distribution-sort-key-base-weight). | +| compression-codec | Table write.(fileformat).compression-codec | Overrides this table's compression codec for this write | +| compression-level | Table write.(fileformat).compression-level | Overrides this table's compression level for Parquet and Avro tables for this write | +| compression-strategy | Table write.orc.compression-strategy | Overrides this table's compression strategy for ORC tables for this write | +| write-parallelism | Upstream operator parallelism | Overrides the writer parallelism | + +#### Range distribution statistics type + +Config value is a enum type: `Map`, `Sketch`, `Auto`. + + +#### Range distribution sort key base weight + +`range-distribution-sort-key-base-weight`: `0.0`. + +If sort order contains partition columns, each sort key would map to one partition and data +file. This relative weight can avoid placing too many small files for sort keys with low +traffic. It is a double value that defines the minimal weight for each sort key. `0.02` means +each key has a base weight of `2%` of the targeted traffic weight per writer task. + +E.g. the sink Iceberg table is partitioned daily by event time. Assume the data stream +contains events from now up to 180 days ago. With event time, traffic weight distribution +across different days typically has a long tail pattern. Current day contains the most +traffic. The older days (long tail) contain less and less traffic. Assume writer parallelism +is `10`. The total weight across all 180 days is `10,000`. Target traffic weight per writer +task would be `1,000`. Assume the weight sum for the oldest 150 days is `1,000`. Normally, +the range partitioner would put all the oldest 150 days in one writer task. That writer task +would write to 150 small files (one per day). Keeping 150 open files can potentially consume +large amount of memory. Flushing and uploading 150 files (however small) at checkpoint time +can also be potentially slow. If this config is set to `0.02`. It means every sort key has a +base weight of `2%` of targeted weight of `1,000` for every write task. It would essentially +avoid placing more than `50` data files (one per day) on one writer task no matter how small +they are. + +This is only applicable to {@link StatisticsType#Map} for low-cardinality scenario. For +{@link StatisticsType#Sketch} high-cardinality sort columns, they are usually not used as +partition columns. Otherwise, too many partitions and small files may be generated during +write. Sketch range partitioner simply splits high-cardinality keys into ordered ranges. \ No newline at end of file diff --git a/docs/docs/flink-writes.md b/docs/docs/flink-writes.md index b916a5f9b7b0..f53b5d832efe 100644 --- a/docs/docs/flink-writes.md +++ b/docs/docs/flink-writes.md @@ -262,6 +262,107 @@ INSERT INTO tableName /*+ OPTIONS('upsert-enabled'='true') */ Check out all the options here: [write-options](flink-configuration.md#write-options) +## Distribution mode + +Flink streaming writer supports both `HASH` and `RANGE` distribution mode. +You can enable it via `FlinkSink#Builder#distributionMode(DistributionMode)` +or via [write-options](flink-configuration.md#write-options). + +### Hash distribution + +HASH distribution shuffles data by partition key (partitioned table) or +equality fields (non-partitioned table). It simply leverages Flink's +`DataStream#keyBy` to distribute the data. + +HASH distribution has a few limitations. + + +### Range distribution (experimental) + +RANGE distribution shuffles data by partition key or sort order via a custom range partitioner. +Range distribution collects traffic statistics to guide the range partitioner to +evenly distribute traffic to writer tasks. + +Range distribution only shuffle the data via range partitioner. Rows are *not* sorted within +a data file, which Flink streaming writer doesn't support yet. + +#### Use cases + +RANGE distribution can be applied to an Iceberg table that either is partitioned or +has SortOrder defined. For a partitioned table without SortOrder, partition columns +are used as sort order. If SortOrder is explicitly defined for the table, it is used by +the range partitioner. + +Range distribution can handle skewed data. E.g. + + +Range distribution can also cluster data on non-partition columns. +E.g., table is partitioned hourly on ingestion time. Queries often include +predicate on a non-partition column like `device_id` or `country_code`. +Range partition would improve the query performance by clustering on the non-partition column +when table `SortOrder` is defined with the non-partition column. + +#### Traffic statistics + +Statistics are collected by every shuffle operator subtask and aggregated by the coordinator +for every checkpoint cycle. Aggregated statistics are broadcast to all subtasks and +applied to the range partitioner in the next checkpoint. So it may take up to two checkpoint +cycles to detect traffic distribution change and apply the new statistics to range partitioner. + +Range distribution can work with low cardinality (like `country_code`) +or high cardinality (like `device_id`) scenarios. + + +#### Usage + +Here is how to enable range distribution in Java. There are two optional advanced configs. Default should +work well for most cases. See [write-options](flink-configuration.md#write-options) for details. +```java +FlinkSink.forRowData(input) + ... + .distributionMode(DistributionMode.RANGE) + .rangeDistributionStatisticsType(StatisticsType.Auto) + .rangeDistributionSortKeyBaseWeight(0.0d) + .append(); +``` + +### Overhead + +Data shuffling (hash or range) has computational overhead of serialization/deserialization +and network I/O. Expect some increase of CPU utilization. + +Range distribution also collect and aggregate data distribution statistics. +That would also incur some CPU overhead. Memory overhead is typically +small if using default statistics type of `Auto`. Don't use `Map` statistics +type if key cardinality is high. That could result in significant memory footprint +and large network exchange for statistics aggregation. + ## Notes Flink streaming write jobs rely on snapshot summary to keep the last committed checkpoint ID, and diff --git a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/FlinkConfParser.java b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/FlinkConfParser.java index 7167859e600c..d5eea6706b39 100644 --- a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/FlinkConfParser.java +++ b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/FlinkConfParser.java @@ -53,6 +53,10 @@ public LongConfParser longConf() { return new LongConfParser(); } + public DoubleConfParser doubleConf() { + return new DoubleConfParser(); + } + public > EnumConfParser enumConfParser(Class enumClass) { return new EnumConfParser<>(enumClass); } @@ -135,6 +139,29 @@ public Long parseOptional() { } } + class DoubleConfParser extends ConfParser { + private Double defaultValue; + + @Override + protected DoubleConfParser self() { + return this; + } + + public DoubleConfParser defaultValue(double value) { + this.defaultValue = value; + return self(); + } + + public double parse() { + Preconditions.checkArgument(defaultValue != null, "Default value cannot be null"); + return parse(Double::parseDouble, defaultValue); + } + + public Double parseOptional() { + return parse(Double::parseDouble, null); + } + } + class StringConfParser extends ConfParser { private String defaultValue; diff --git a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java index ca7b1120bc81..a31902d49a8b 100644 --- a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java +++ b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java @@ -26,6 +26,7 @@ import org.apache.iceberg.FileFormat; import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; +import org.apache.iceberg.flink.sink.shuffle.StatisticsType; /** * A class for common Iceberg configs for Flink writes. @@ -167,6 +168,26 @@ public DistributionMode distributionMode() { return DistributionMode.fromName(modeName); } + public StatisticsType rangeDistributionStatisticsType() { + String name = + confParser + .stringConf() + .option(FlinkWriteOptions.RANGE_DISTRIBUTION_STATISTICS_TYPE.key()) + .flinkConfig(FlinkWriteOptions.RANGE_DISTRIBUTION_STATISTICS_TYPE) + .defaultValue(FlinkWriteOptions.RANGE_DISTRIBUTION_STATISTICS_TYPE.defaultValue()) + .parse(); + return StatisticsType.valueOf(name); + } + + public double rangeDistributionSortKeyBaseWeight() { + return confParser + .doubleConf() + .option(FlinkWriteOptions.RANGE_DISTRIBUTION_SORT_KEY_BASE_WEIGHT.key()) + .flinkConfig(FlinkWriteOptions.RANGE_DISTRIBUTION_SORT_KEY_BASE_WEIGHT) + .defaultValue(FlinkWriteOptions.RANGE_DISTRIBUTION_SORT_KEY_BASE_WEIGHT.defaultValue()) + .parse(); + } + public int workerPoolSize() { return confParser .intConf() diff --git a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java index df73f2e09cac..c35286774874 100644 --- a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java +++ b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java @@ -23,6 +23,7 @@ import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ConfigOptions; import org.apache.iceberg.SnapshotRef; +import org.apache.iceberg.flink.sink.shuffle.StatisticsType; /** Flink sink write options */ public class FlinkWriteOptions { @@ -60,6 +61,19 @@ private FlinkWriteOptions() {} public static final ConfigOption DISTRIBUTION_MODE = ConfigOptions.key("distribution-mode").stringType().noDefaultValue(); + public static final ConfigOption RANGE_DISTRIBUTION_STATISTICS_TYPE = + ConfigOptions.key("range-distribution-statistics-type") + .stringType() + .defaultValue(StatisticsType.Auto.name()) + .withDescription("Type of statistics collection: Auto, Map, Sketch"); + + public static final ConfigOption RANGE_DISTRIBUTION_SORT_KEY_BASE_WEIGHT = + ConfigOptions.key("range-distribution-sort-key-base-weight") + .doubleType() + .defaultValue(0.0d) + .withDescription( + "Base weight for every sort key relative to target weight per writer task"); + // Branch to write to public static final ConfigOption BRANCH = ConfigOptions.key("branch").stringType().defaultValue(SnapshotRef.MAIN_BRANCH); diff --git a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java index 769af7d77140..2256d1e874ce 100644 --- a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java +++ b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java @@ -53,13 +53,19 @@ import org.apache.iceberg.FileFormat; import org.apache.iceberg.PartitionField; import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Partitioning; import org.apache.iceberg.Schema; import org.apache.iceberg.SerializableTable; +import org.apache.iceberg.SortOrder; import org.apache.iceberg.Table; import org.apache.iceberg.flink.FlinkSchemaUtil; import org.apache.iceberg.flink.FlinkWriteConf; import org.apache.iceberg.flink.FlinkWriteOptions; import org.apache.iceberg.flink.TableLoader; +import org.apache.iceberg.flink.sink.shuffle.DataStatisticsOperatorFactory; +import org.apache.iceberg.flink.sink.shuffle.RangePartitioner; +import org.apache.iceberg.flink.sink.shuffle.StatisticsOrRecord; +import org.apache.iceberg.flink.sink.shuffle.StatisticsType; import org.apache.iceberg.flink.util.FlinkCompatibilityUtil; import org.apache.iceberg.io.WriteResult; import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; @@ -233,15 +239,68 @@ public Builder flinkConf(ReadableConfig config) { * @return {@link Builder} to connect the iceberg table. */ public Builder distributionMode(DistributionMode mode) { - Preconditions.checkArgument( - !DistributionMode.RANGE.equals(mode), - "Flink does not support 'range' write distribution mode now."); if (mode != null) { writeOptions.put(FlinkWriteOptions.DISTRIBUTION_MODE.key(), mode.modeName()); } return this; } + /** + * Range distribution needs to collect statistics about data distribution to properly shuffle + * the records in relatively balanced way. In general, low cardinality should use {@link + * StatisticsType#Map} and high cardinality should use {@link StatisticsType#Sketch} Refer to + * {@link StatisticsType} Javadoc for more details. + * + *

Default is {@link StatisticsType#Auto} where initially Map statistics is used. But if + * cardinality is higher than the threshold (currently 10K) as defined in {@code + * SketchUtil#OPERATOR_SKETCH_SWITCH_THRESHOLD}, statistics collection automatically switches to + * the sketch reservoir sampling. + * + *

Explicit set the statistics type if the default behavior doesn't work. + * + * @param type to specify the statistics type for range distribution. + * @return {@link Builder} to connect the iceberg table. + */ + public Builder rangeDistributionStatisticsType(StatisticsType type) { + if (type != null) { + writeOptions.put(FlinkWriteOptions.RANGE_DISTRIBUTION_STATISTICS_TYPE.key(), type.name()); + } + return this; + } + + /** + * If sort order contains partition columns, each sort key would map to one partition and data + * file. This relative weight can avoid placing too many small files for sort keys with low + * traffic. It is a double value that defines the minimal weight for each sort key. `0.02` means + * each key has a base weight of `2%` of the targeted traffic weight per writer task. + * + *

E.g. the sink Iceberg table is partitioned daily by event time. Assume the data stream + * contains events from now up to 180 days ago. With event time, traffic weight distribution + * across different days typically has a long tail pattern. Current day contains the most + * traffic. The older days (long tail) contain less and less traffic. Assume writer parallelism + * is `10`. The total weight across all 180 days is `10,000`. Target traffic weight per writer + * task would be `1,000`. Assume the weight sum for the oldest 150 days is `1,000`. Normally, + * the range partitioner would put all the oldest 150 days in one writer task. That writer task + * would write to 150 small files (one per day). Keeping 150 open files can potentially consume + * large amount of memory. Flushing and uploading 150 files (however small) at checkpoint time + * can also be potentially slow. If this config is set to `0.02`. It means every sort key has a + * base weight of `2%` of targeted weight of `1,000` for every write task. It would essentially + * avoid placing more than `50` data files (one per day) on one writer task no matter how small + * they are. + * + *

This is only applicable to {@link StatisticsType#Map} for low-cardinality scenario. For + * {@link StatisticsType#Sketch} high-cardinality sort columns, they are usually not used as + * partition columns. Otherwise, too many partitions and small files may be generated during + * write. Sketch range partitioner simply splits high-cardinality keys into ordered ranges. + * + *

Default is {@code 0.0%}. + */ + public Builder rangeDistributionSortKeyBaseWeight(double weight) { + writeOptions.put( + FlinkWriteOptions.RANGE_DISTRIBUTION_SORT_KEY_BASE_WEIGHT.key(), Double.toString(weight)); + return this; + } + /** * Configuring the write parallel number for iceberg stream writer. * @@ -349,18 +408,20 @@ private DataStreamSink chainIcebergOperators() { // Find out the equality field id list based on the user-provided equality field column names. List equalityFieldIds = checkAndGetEqualityFieldIds(); - // Convert the requested flink table schema to flink row type. RowType flinkRowType = toFlinkRowType(table.schema(), tableSchema); + int writerParallelism = + flinkWriteConf.writeParallelism() == null + ? rowDataInput.getParallelism() + : flinkWriteConf.writeParallelism(); // Distribute the records from input data stream based on the write.distribution-mode and // equality fields. DataStream distributeStream = - distributeDataStream( - rowDataInput, equalityFieldIds, table.spec(), table.schema(), flinkRowType); + distributeDataStream(rowDataInput, equalityFieldIds, flinkRowType, writerParallelism); // Add parallel writers that append rows to files SingleOutputStreamOperator writerStream = - appendWriter(distributeStream, flinkRowType, equalityFieldIds); + appendWriter(distributeStream, flinkRowType, equalityFieldIds, writerParallelism); // Add single-parallelism committer that commits files // after successful checkpoint or end of input @@ -447,7 +508,10 @@ private SingleOutputStreamOperator appendCommitter( } private SingleOutputStreamOperator appendWriter( - DataStream input, RowType flinkRowType, List equalityFieldIds) { + DataStream input, + RowType flinkRowType, + List equalityFieldIds, + int writerParallelism) { // Validate the equality fields and partition fields if we enable the upsert mode. if (flinkWriteConf.upsertMode()) { Preconditions.checkState( @@ -481,17 +545,13 @@ private SingleOutputStreamOperator appendWriter( IcebergStreamWriter streamWriter = createStreamWriter(tableSupplier, flinkWriteConf, flinkRowType, equalityFieldIds); - int parallelism = - flinkWriteConf.writeParallelism() == null - ? input.getParallelism() - : flinkWriteConf.writeParallelism(); SingleOutputStreamOperator writerStream = input .transform( operatorName(ICEBERG_STREAM_WRITER_NAME), TypeInformation.of(WriteResult.class), streamWriter) - .setParallelism(parallelism); + .setParallelism(writerParallelism); if (uidPrefix != null) { writerStream = writerStream.uid(uidPrefix + "-writer"); } @@ -501,12 +561,15 @@ private SingleOutputStreamOperator appendWriter( private DataStream distributeDataStream( DataStream input, List equalityFieldIds, - PartitionSpec partitionSpec, - Schema iSchema, - RowType flinkRowType) { + RowType flinkRowType, + int writerParallelism) { DistributionMode writeMode = flinkWriteConf.distributionMode(); - LOG.info("Write distribution mode is '{}'", writeMode.modeName()); + + Schema iSchema = table.schema(); + PartitionSpec partitionSpec = table.spec(); + SortOrder sortOrder = table.sortOrder(); + switch (writeMode) { case NONE: if (equalityFieldIds.isEmpty()) { @@ -548,21 +611,52 @@ private DataStream distributeDataStream( } case RANGE: - if (equalityFieldIds.isEmpty()) { + // Ideally, exception should be thrown in the combination of range distribution and + // equality fields. Primary key case should use hash distribution mode. + // Keep the current behavior of falling back to keyBy for backward compatibility. + if (!equalityFieldIds.isEmpty()) { LOG.warn( - "Fallback to use 'none' distribution mode, because there are no equality fields set " - + "and {}=range is not supported yet in flink", - WRITE_DISTRIBUTION_MODE); - return input; - } else { - LOG.info( - "Distribute rows by equality fields, because there are equality fields set " - + "and{}=range is not supported yet in flink", + "Hash distribute rows by equality fields, even though {}=range is set. " + + "Range distribution for primary keys are not always safe in " + + "Flink streaming writer.", WRITE_DISTRIBUTION_MODE); return input.keyBy( new EqualityFieldKeySelector(iSchema, flinkRowType, equalityFieldIds)); } + // range distribute by partition key or sort key if table has an SortOrder + Preconditions.checkState( + sortOrder.isSorted() || partitionSpec.isPartitioned(), + "Invalid write distribution mode: range. Need to define sort order or partition spec."); + if (sortOrder.isUnsorted()) { + sortOrder = Partitioning.sortOrderFor(partitionSpec); + LOG.info("Construct sort order from partition spec"); + } + + LOG.info("Range distribute rows by sort order: {}", sortOrder); + StatisticsType statisticsType = flinkWriteConf.rangeDistributionStatisticsType(); + SingleOutputStreamOperator shuffleStream = + input + .transform( + operatorName("range-shuffle"), + TypeInformation.of(StatisticsOrRecord.class), + new DataStatisticsOperatorFactory( + iSchema, + sortOrder, + writerParallelism, + statisticsType, + flinkWriteConf.rangeDistributionSortKeyBaseWeight())) + // Set the parallelism same as input operator to encourage chaining + .setParallelism(input.getParallelism()); + if (uidPrefix != null) { + shuffleStream = shuffleStream.uid(uidPrefix + "-shuffle"); + } + + return shuffleStream + .partitionCustom(new RangePartitioner(iSchema, sortOrder), r -> r) + .filter(StatisticsOrRecord::hasRecord) + .map(StatisticsOrRecord::record); + default: throw new RuntimeException("Unrecognized " + WRITE_DISTRIBUTION_MODE + ": " + writeMode); } @@ -577,12 +671,9 @@ static RowType toFlinkRowType(Schema schema, TableSchema requestedSchema) { TypeUtil.validateWriteSchema(schema, writeSchema, true, true); // We use this flink schema to read values from RowData. The flink's TINYINT and SMALLINT will - // be promoted to - // iceberg INTEGER, that means if we use iceberg's table schema to read TINYINT (backend by 1 - // 'byte'), we will - // read 4 bytes rather than 1 byte, it will mess up the byte array in BinaryRowData. So here - // we must use flink - // schema. + // be promoted to iceberg INTEGER, that means if we use iceberg's table schema to read TINYINT + // (backend by 1 'byte'), we will read 4 bytes rather than 1 byte, it will mess up the + // byte array in BinaryRowData. So here we must use flink schema. return (RowType) requestedSchema.toRowDataType().getLogicalType(); } else { return FlinkSchemaUtil.convert(schema); diff --git a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsOperatorFactory.java b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsOperatorFactory.java new file mode 100644 index 000000000000..dc147bf36d13 --- /dev/null +++ b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsOperatorFactory.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink.shuffle; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.operators.coordination.OperatorCoordinator; +import org.apache.flink.runtime.operators.coordination.OperatorEventGateway; +import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory; +import org.apache.flink.streaming.api.operators.CoordinatedOperatorFactory; +import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory; +import org.apache.flink.streaming.api.operators.StreamOperator; +import org.apache.flink.streaming.api.operators.StreamOperatorParameters; +import org.apache.flink.table.data.RowData; +import org.apache.iceberg.Schema; +import org.apache.iceberg.SortOrder; + +@Internal +public class DataStatisticsOperatorFactory extends AbstractStreamOperatorFactory + implements CoordinatedOperatorFactory, + OneInputStreamOperatorFactory { + + private final Schema schema; + private final SortOrder sortOrder; + private final int downstreamParallelism; + private final StatisticsType type; + private final double closeFileCostWeightPercentage; + + public DataStatisticsOperatorFactory( + Schema schema, + SortOrder sortOrder, + int downstreamParallelism, + StatisticsType type, + double closeFileCostWeightPercentage) { + this.schema = schema; + this.sortOrder = sortOrder; + this.downstreamParallelism = downstreamParallelism; + this.type = type; + this.closeFileCostWeightPercentage = closeFileCostWeightPercentage; + } + + @Override + public OperatorCoordinator.Provider getCoordinatorProvider( + String operatorName, OperatorID operatorID) { + return new DataStatisticsCoordinatorProvider( + operatorName, + operatorID, + schema, + sortOrder, + downstreamParallelism, + type, + closeFileCostWeightPercentage); + } + + @SuppressWarnings("unchecked") + @Override + public > T createStreamOperator( + StreamOperatorParameters parameters) { + OperatorID operatorId = parameters.getStreamConfig().getOperatorID(); + String operatorName = parameters.getStreamConfig().getOperatorName(); + OperatorEventGateway gateway = + parameters.getOperatorEventDispatcher().getOperatorEventGateway(operatorId); + + DataStatisticsOperator rangeStatisticsOperator = + new DataStatisticsOperator( + operatorName, schema, sortOrder, gateway, downstreamParallelism, type); + + rangeStatisticsOperator.setup( + parameters.getContainingTask(), parameters.getStreamConfig(), parameters.getOutput()); + parameters + .getOperatorEventDispatcher() + .registerEventHandler(operatorId, rangeStatisticsOperator); + + return (T) rangeStatisticsOperator; + } + + @SuppressWarnings("rawtypes") + @Override + public Class getStreamOperatorClass(ClassLoader classLoader) { + return DataStatisticsOperator.class; + } +} diff --git a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSinkExtended.java b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSinkExtended.java index 482cfd110bde..b63547d433a4 100644 --- a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSinkExtended.java +++ b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSinkExtended.java @@ -20,6 +20,7 @@ import static org.apache.iceberg.flink.FlinkCatalogFactory.ICEBERG_CATALOG_TYPE_HADOOP; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assumptions.assumeThat; import java.io.File; import java.util.Arrays; @@ -46,6 +47,7 @@ import org.apache.iceberg.Parameter; import org.apache.iceberg.ParameterizedTestExtension; import org.apache.iceberg.Parameters; +import org.apache.iceberg.Snapshot; import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; import org.apache.iceberg.catalog.Namespace; @@ -54,6 +56,7 @@ import org.apache.iceberg.hadoop.HadoopCatalog; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -241,4 +244,93 @@ public void testHashDistributeMode() throws Exception { sql("DROP TABLE IF EXISTS %s.%s", FLINK_DATABASE, tableName); } } + + @TestTemplate + public void testRangeDistributionPartitionColumn() { + // Range partitioner currently only works with streaming writes (with checkpoints) + assumeThat(isStreamingJob).isTrue(); + + // Initialize a BoundedSource table to precisely emit those rows in only one checkpoint. + List> rowsPerCheckpoint = + IntStream.range(1, 6) + .mapToObj( + checkpointId -> { + List charRows = Lists.newArrayList(); + // emit 26x10 rows for each checkpoint cycle + for (int i = 0; i < 10; ++i) { + for (char c = 'a'; c <= 'z'; c++) { + charRows.add(Row.of(c - 'a', String.valueOf(c))); + } + } + return charRows; + }) + .collect(Collectors.toList()); + List flattenedRows = + rowsPerCheckpoint.stream().flatMap(List::stream).collect(Collectors.toList()); + + String dataId = BoundedTableFactory.registerDataSet(rowsPerCheckpoint); + sql( + "CREATE TABLE %s(id INT NOT NULL, data STRING NOT NULL)" + + " WITH ('connector'='BoundedSource', 'data-id'='%s')", + SOURCE_TABLE, dataId); + + assertThat(sql("SELECT * FROM %s", SOURCE_TABLE)) + .as("Should have the expected rows in source table.") + .containsExactlyInAnyOrderElementsOf(flattenedRows); + + Map tableProps = + ImmutableMap.of( + "write.format.default", + FileFormat.PARQUET.name(), + TableProperties.WRITE_DISTRIBUTION_MODE, + DistributionMode.RANGE.modeName()); + + String tableName = "test_hash_distribution_mode"; + sql( + "CREATE TABLE %s(id INT, data VARCHAR) PARTITIONED BY (data) WITH %s", + tableName, toWithClause(tableProps)); + + try { + // Insert data set. + sql("INSERT INTO %s SELECT * FROM %s", tableName, SOURCE_TABLE); + + assertThat(sql("SELECT * FROM %s", tableName)) + .as("Should have the expected rows in sink table.") + .containsExactlyInAnyOrderElementsOf(flattenedRows); + + Table table = catalog.loadTable(TableIdentifier.of(ICEBERG_NAMESPACE, tableName)); + // ordered in reverse timeline from the newest snapshot to the oldest snapshot + List snapshots = Lists.newArrayList(table.snapshots().iterator()); + // only keep the snapshots with added data files + snapshots = + snapshots.stream() + .filter(snapshot -> snapshot.addedDataFiles(table.io()).iterator().hasNext()) + .collect(Collectors.toList()); + + // Sometimes we will have more checkpoints than the bounded source if we pass the + // auto checkpoint interval. Thus producing multiple snapshots. + assertThat(snapshots).hasSizeGreaterThanOrEqualTo(5); + + // It takes 2 checkpoint cycle for statistics collection and application + // of the globally aggregated statistics in the range partitioner. + // The last two checkpoints should have range shuffle applied + List rangePartitionedCycles = + snapshots.subList(snapshots.size() - 2, snapshots.size()); + + for (Snapshot snapshot : rangePartitionedCycles) { + List addedDataFiles = + Lists.newArrayList(snapshot.addedDataFiles(table.io()).iterator()); + // range partition results in each partition only assigned to one writer task + // maybe less than 26 partitions as BoundedSource doesn't always precisely + // control the checkpoint boundary. + // It is hard to precisely control the test condition in SQL tests. + // Here only minimal safe assertions are applied to avoid flakiness. + // If there are no shuffling, the number of data files could be as high as + // 26 * 4 as the default parallelism is set to 4 for the mini cluster. + assertThat(addedDataFiles).hasSizeLessThanOrEqualTo(26); + } + } finally { + sql("DROP TABLE IF EXISTS %s.%s", FLINK_DATABASE, tableName); + } + } } diff --git a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkDistributionMode.java b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkDistributionMode.java index 75e397d3f203..df8c3c79d3e3 100644 --- a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkDistributionMode.java +++ b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkDistributionMode.java @@ -20,28 +20,37 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.assertj.core.api.Assumptions.assumeThat; import java.io.IOException; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.test.junit5.MiniClusterExtension; import org.apache.flink.types.Row; +import org.apache.iceberg.DataFile; import org.apache.iceberg.DistributionMode; import org.apache.iceberg.FileFormat; import org.apache.iceberg.Parameter; import org.apache.iceberg.ParameterizedTestExtension; import org.apache.iceberg.Parameters; import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Snapshot; import org.apache.iceberg.TableProperties; import org.apache.iceberg.flink.FlinkWriteOptions; import org.apache.iceberg.flink.HadoopCatalogExtension; import org.apache.iceberg.flink.MiniFlinkClusterExtension; import org.apache.iceberg.flink.SimpleDataUtil; import org.apache.iceberg.flink.TestFixtures; +import org.apache.iceberg.flink.sink.shuffle.StatisticsType; +import org.apache.iceberg.flink.source.BoundedTestSource; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.types.Conversions; +import org.apache.iceberg.types.Types; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.TestTemplate; import org.junit.jupiter.api.extension.ExtendWith; @@ -177,4 +186,309 @@ public void testOverrideWriteConfigWithUnknownDistributionMode() { .isInstanceOf(IllegalArgumentException.class) .hasMessage("Invalid distribution mode: UNRECOGNIZED"); } + + @TestTemplate + public void testRangeDistributionWithoutSortOrderUnpartitioned() throws Exception { + assumeThat(partitioned).isFalse(); + + table + .updateProperties() + .set(TableProperties.WRITE_DISTRIBUTION_MODE, DistributionMode.RANGE.modeName()) + .commit(); + + int numOfCheckpoints = 6; + DataStream dataStream = + env.addSource( + createRangeDistributionBoundedSource(createCharRows(numOfCheckpoints, 10)), + ROW_TYPE_INFO); + FlinkSink.Builder builder = + FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA) + .table(table) + .tableLoader(tableLoader) + .writeParallelism(parallelism); + + // Range distribution requires either sort order or partition spec defined + assertThatThrownBy(builder::append) + .isInstanceOf(IllegalStateException.class) + .hasMessage( + "Invalid write distribution mode: range. Need to define sort order or partition spec."); + } + + @TestTemplate + public void testRangeDistributionWithoutSortOrderPartitioned() throws Exception { + assumeThat(partitioned).isTrue(); + + table + .updateProperties() + .set(TableProperties.WRITE_DISTRIBUTION_MODE, DistributionMode.RANGE.modeName()) + .commit(); + + int numOfCheckpoints = 6; + DataStream dataStream = + env.addSource( + createRangeDistributionBoundedSource(createCharRows(numOfCheckpoints, 10)), + ROW_TYPE_INFO); + FlinkSink.Builder builder = + FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA) + .table(table) + .tableLoader(tableLoader) + .writeParallelism(parallelism); + + // sort based on partition columns + builder.append(); + env.execute(getClass().getSimpleName()); + + table.refresh(); + // ordered in reverse timeline from the newest snapshot to the oldest snapshot + List snapshots = Lists.newArrayList(table.snapshots().iterator()); + // only keep the snapshots with added data files + snapshots = + snapshots.stream() + .filter(snapshot -> snapshot.addedDataFiles(table.io()).iterator().hasNext()) + .collect(Collectors.toList()); + + // Sometimes we will have more checkpoints than the bounded source if we pass the + // auto checkpoint interval. Thus producing multiple snapshots. + assertThat(snapshots).hasSizeGreaterThanOrEqualTo(numOfCheckpoints); + } + + @TestTemplate + public void testRangeDistributionWithSortOrder() throws Exception { + table + .updateProperties() + .set(TableProperties.WRITE_DISTRIBUTION_MODE, DistributionMode.RANGE.modeName()) + .commit(); + table.replaceSortOrder().asc("data").commit(); + + int numOfCheckpoints = 6; + DataStream dataStream = + env.addSource( + createRangeDistributionBoundedSource(createCharRows(numOfCheckpoints, 10)), + ROW_TYPE_INFO); + FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA) + .table(table) + .tableLoader(tableLoader) + .writeParallelism(parallelism) + .rangeDistributionStatisticsType(StatisticsType.Map) + .append(); + env.execute(getClass().getSimpleName()); + + table.refresh(); + // ordered in reverse timeline from the newest snapshot to the oldest snapshot + List snapshots = Lists.newArrayList(table.snapshots().iterator()); + // only keep the snapshots with added data files + snapshots = + snapshots.stream() + .filter(snapshot -> snapshot.addedDataFiles(table.io()).iterator().hasNext()) + .collect(Collectors.toList()); + + // Sometimes we will have more checkpoints than the bounded source if we pass the + // auto checkpoint interval. Thus producing multiple snapshots. + assertThat(snapshots).hasSizeGreaterThanOrEqualTo(numOfCheckpoints); + + // It takes 2 checkpoint cycle for statistics collection and application + // of the globally aggregated statistics in the range partitioner. + // The last two checkpoints should have range shuffle applied + List rangePartitionedCycles = + snapshots.subList(snapshots.size() - 2, snapshots.size()); + + if (partitioned) { + for (Snapshot snapshot : rangePartitionedCycles) { + List addedDataFiles = + Lists.newArrayList(snapshot.addedDataFiles(table.io()).iterator()); + // up to 26 partitions + assertThat(addedDataFiles).hasSizeLessThanOrEqualTo(26); + } + } else { + for (Snapshot snapshot : rangePartitionedCycles) { + List addedDataFiles = + Lists.newArrayList(snapshot.addedDataFiles(table.io()).iterator()); + // each writer task should only write one file for non-partition sort column + assertThat(addedDataFiles).hasSize(parallelism); + // verify there is no overlap in min-max stats range + if (parallelism == 2) { + assertIdColumnStatsNoRangeOverlap(addedDataFiles.get(0), addedDataFiles.get(1)); + } + } + } + } + + @TestTemplate + public void testRangeDistributionSketchWithSortOrder() throws Exception { + table + .updateProperties() + .set(TableProperties.WRITE_DISTRIBUTION_MODE, DistributionMode.RANGE.modeName()) + .commit(); + table.replaceSortOrder().asc("id").commit(); + + int numOfCheckpoints = 6; + DataStream dataStream = + env.addSource( + createRangeDistributionBoundedSource(createIntRows(numOfCheckpoints, 1_000)), + ROW_TYPE_INFO); + FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA) + .table(table) + .tableLoader(tableLoader) + .writeParallelism(parallelism) + .rangeDistributionStatisticsType(StatisticsType.Sketch) + .append(); + env.execute(getClass().getSimpleName()); + + table.refresh(); + // ordered in reverse timeline from the newest snapshot to the oldest snapshot + List snapshots = Lists.newArrayList(table.snapshots().iterator()); + // only keep the snapshots with added data files + snapshots = + snapshots.stream() + .filter(snapshot -> snapshot.addedDataFiles(table.io()).iterator().hasNext()) + .collect(Collectors.toList()); + + // Sometimes we will have more checkpoints than the bounded source if we pass the + // auto checkpoint interval. Thus producing multiple snapshots. + assertThat(snapshots).hasSizeGreaterThanOrEqualTo(numOfCheckpoints); + + // It takes 2 checkpoint cycle for statistics collection and application + // of the globally aggregated statistics in the range partitioner. + // The last two checkpoints should have range shuffle applied + List rangePartitionedCycles = + snapshots.subList(snapshots.size() - 2, snapshots.size()); + + // since the input has a single value for the data column, + // it is always the same partition. Hence there is no difference + // for partitioned or not + for (Snapshot snapshot : rangePartitionedCycles) { + List addedDataFiles = + Lists.newArrayList(snapshot.addedDataFiles(table.io()).iterator()); + // each writer task should only write one file for non-partition sort column + assertThat(addedDataFiles).hasSize(parallelism); + // verify there is no overlap in min-max stats range + if (parallelism == 2) { + assertIdColumnStatsNoRangeOverlap(addedDataFiles.get(0), addedDataFiles.get(1)); + } + } + } + + /** Test migration from Map stats to Sketch stats */ + @TestTemplate + public void testRangeDistributionStatisticsMigration() throws Exception { + table + .updateProperties() + .set(TableProperties.WRITE_DISTRIBUTION_MODE, DistributionMode.RANGE.modeName()) + .commit(); + table.replaceSortOrder().asc("id").commit(); + + int numOfCheckpoints = 4; + List> rowsPerCheckpoint = Lists.newArrayListWithCapacity(numOfCheckpoints); + for (int checkpointId = 0; checkpointId < numOfCheckpoints; ++checkpointId) { + // checkpointId 2 would emit 11_000 records which is larger than + // the OPERATOR_SKETCH_SWITCH_THRESHOLD of 10_000. + // This should trigger the stats migration. + int maxId = checkpointId < 1 ? 1_000 : 11_000; + List rows = Lists.newArrayListWithCapacity(maxId); + for (int j = 0; j < maxId; ++j) { + // fixed value "a" for the data (possible partition column) + rows.add(Row.of(j, "a")); + } + + rowsPerCheckpoint.add(rows); + } + + DataStream dataStream = + env.addSource(createRangeDistributionBoundedSource(rowsPerCheckpoint), ROW_TYPE_INFO); + FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA) + .table(table) + .tableLoader(tableLoader) + .writeParallelism(parallelism) + .rangeDistributionStatisticsType(StatisticsType.Auto) + .append(); + env.execute(getClass().getSimpleName()); + + table.refresh(); + // ordered in reverse timeline from the newest snapshot to the oldest snapshot + List snapshots = Lists.newArrayList(table.snapshots().iterator()); + // only keep the snapshots with added data files + snapshots = + snapshots.stream() + .filter(snapshot -> snapshot.addedDataFiles(table.io()).iterator().hasNext()) + .collect(Collectors.toList()); + + // Sometimes we will have more checkpoints than the bounded source if we pass the + // auto checkpoint interval. Thus producing multiple snapshots. + assertThat(snapshots).hasSizeGreaterThanOrEqualTo(numOfCheckpoints); + + // It takes 2 checkpoint cycle for statistics collection and application + // of the globally aggregated statistics in the range partitioner. + // The last two checkpoints should have range shuffle applied + List rangePartitionedCycles = + snapshots.subList(snapshots.size() - 2, snapshots.size()); + + // since the input has a single value for the data column, + // it is always the same partition. Hence there is no difference + // for partitioned or not + for (Snapshot snapshot : rangePartitionedCycles) { + List addedDataFiles = + Lists.newArrayList(snapshot.addedDataFiles(table.io()).iterator()); + // each writer task should only write one file for non-partition sort column + // sometimes + assertThat(addedDataFiles).hasSize(parallelism); + // verify there is no overlap in min-max stats range + if (parallelism == 2) { + assertIdColumnStatsNoRangeOverlap(addedDataFiles.get(0), addedDataFiles.get(1)); + } + } + } + + private BoundedTestSource createRangeDistributionBoundedSource( + List> rowsPerCheckpoint) { + return new BoundedTestSource<>(rowsPerCheckpoint); + } + + private List> createCharRows(int numOfCheckpoints, int countPerChar) { + List> rowsPerCheckpoint = Lists.newArrayListWithCapacity(numOfCheckpoints); + for (int checkpointId = 0; checkpointId < numOfCheckpoints; ++checkpointId) { + List rows = Lists.newArrayListWithCapacity(26 * countPerChar); + for (int j = 0; j < countPerChar; ++j) { + for (char c = 'a'; c <= 'z'; ++c) { + rows.add(Row.of(1, String.valueOf(c))); + } + } + + rowsPerCheckpoint.add(rows); + } + + return rowsPerCheckpoint; + } + + private List> createIntRows(int numOfCheckpoints, int maxId) { + List> rowsPerCheckpoint = Lists.newArrayListWithCapacity(numOfCheckpoints); + for (int checkpointId = 0; checkpointId < numOfCheckpoints; ++checkpointId) { + List rows = Lists.newArrayListWithCapacity(maxId); + for (int j = 0; j < maxId; ++j) { + // fixed value "a" for the data (possible partition column) + rows.add(Row.of(j, "a")); + } + + rowsPerCheckpoint.add(rows); + } + + return rowsPerCheckpoint; + } + + private void assertIdColumnStatsNoRangeOverlap(DataFile file1, DataFile file2) { + // id column has fieldId 1 + int file1LowerBound = + Conversions.fromByteBuffer(Types.IntegerType.get(), file1.lowerBounds().get(1)); + int file1UpperBound = + Conversions.fromByteBuffer(Types.IntegerType.get(), file1.upperBounds().get(1)); + int file2LowerBound = + Conversions.fromByteBuffer(Types.IntegerType.get(), file2.lowerBounds().get(1)); + int file2UpperBound = + Conversions.fromByteBuffer(Types.IntegerType.get(), file2.upperBounds().get(1)); + + if (file1LowerBound < file2LowerBound) { + assertThat(file1UpperBound).isLessThanOrEqualTo(file2LowerBound); + } else { + assertThat(file2UpperBound).isLessThanOrEqualTo(file1LowerBound); + } + } } diff --git a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java index 577c54976b9a..b283b8390a2b 100644 --- a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java +++ b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java @@ -30,6 +30,7 @@ import org.apache.flink.types.Row; import org.apache.iceberg.DataFile; import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.DistributionMode; import org.apache.iceberg.FileFormat; import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.ParameterizedTestExtension; @@ -184,11 +185,21 @@ public void testUpsertModeCheck() throws Exception { .hasMessage( "OVERWRITE mode shouldn't be enable when configuring to use UPSERT data stream."); - assertThatThrownBy( - () -> builder.equalityFieldColumns(ImmutableList.of()).overwrite(false).append()) - .isInstanceOf(IllegalStateException.class) - .hasMessage( - "Equality field columns shouldn't be empty when configuring to use UPSERT data stream."); + if (writeDistributionMode.equals(DistributionMode.RANGE.modeName()) && !partitioned) { + // validation error thrown from distributeDataStream + assertThatThrownBy( + () -> builder.equalityFieldColumns(ImmutableList.of()).overwrite(false).append()) + .isInstanceOf(IllegalStateException.class) + .hasMessage( + "Invalid write distribution mode: range. Need to define sort order or partition spec."); + } else { + // validation error thrown from appendWriter + assertThatThrownBy( + () -> builder.equalityFieldColumns(ImmutableList.of()).overwrite(false).append()) + .isInstanceOf(IllegalStateException.class) + .hasMessage( + "Equality field columns shouldn't be empty when configuring to use UPSERT data stream."); + } } @TestTemplate