Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 53 additions & 11 deletions docs/docs/flink-configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -146,14 +146,56 @@ INSERT INTO tableName /*+ OPTIONS('upsert-enabled'='true') */
...
```

| Flink option | Default | Description |
| ---------------------- | ------------------------------------------ | ------------------------------------------------------------ |
| write-format | Table write.format.default | File format to use for this write operation; parquet, avro, or orc |
| target-file-size-bytes | As per table property | Overrides this table's write.target-file-size-bytes |
| upsert-enabled | Table write.upsert.enabled | Overrides this table's write.upsert.enabled |
| overwrite-enabled | false | Overwrite the table's data, overwrite mode shouldn't be enable when configuring to use UPSERT data stream. |
| distribution-mode | Table write.distribution-mode | Overrides this table's write.distribution-mode |
| compression-codec | Table write.(fileformat).compression-codec | Overrides this table's compression codec for this write |
| compression-level | Table write.(fileformat).compression-level | Overrides this table's compression level for Parquet and Avro tables for this write |
| compression-strategy | Table write.orc.compression-strategy | Overrides this table's compression strategy for ORC tables for this write |
| write-parallelism | Upstream operator parallelism | Overrides the writer parallelism |
| Flink option | Default | Description |
|-----------------------------------------|--------------------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------|
| write-format | Table write.format.default | File format to use for this write operation; parquet, avro, or orc |
| target-file-size-bytes | As per table property | Overrides this table's write.target-file-size-bytes |
| upsert-enabled | Table write.upsert.enabled | Overrides this table's write.upsert.enabled |
| overwrite-enabled | false | Overwrite the table's data, overwrite mode shouldn't be enable when configuring to use UPSERT data stream. |
| distribution-mode | Table write.distribution-mode | Overrides this table's write.distribution-mode. RANGE distribution is in experimental status. |
| range-distribution-statistics-type | Auto | Range distribution data statistics collection type: Map, Sketch, Auto. See details [here](#range-distribution-statistics-type). |
| range-distribution-sort-key-base-weight | 0.0 (double) | Base weight for every sort key relative to target traffic weight per writer task. See details [here](#range-distribution-sort-key-base-weight). |
| compression-codec | Table write.(fileformat).compression-codec | Overrides this table's compression codec for this write |
| compression-level | Table write.(fileformat).compression-level | Overrides this table's compression level for Parquet and Avro tables for this write |
| compression-strategy | Table write.orc.compression-strategy | Overrides this table's compression strategy for ORC tables for this write |
| write-parallelism | Upstream operator parallelism | Overrides the writer parallelism |

#### Range distribution statistics type

Config value is a enum type: `Map`, `Sketch`, `Auto`.
<ul>
<li>Map: collects accurate sampling count for every single key.
It should be used for low cardinality scenarios (like hundreds or thousands).
<li>Sketch: constructs a uniform random sampling via reservoir sampling.
It fits well for high cardinality scenarios (like millions), as memory footprint is kept low.
<li>Auto: starts with Maps statistics. But if cardinality is detected higher
than a threshold (currently 10,000), statistics are automatically switched to Sketch.
</ul>

#### Range distribution sort key base weight

`range-distribution-sort-key-base-weight`: `0.0`.

If sort order contains partition columns, each sort key would map to one partition and data
file. This relative weight can avoid placing too many small files for sort keys with low
traffic. It is a double value that defines the minimal weight for each sort key. `0.02` means
each key has a base weight of `2%` of the targeted traffic weight per writer task.

E.g. the sink Iceberg table is partitioned daily by event time. Assume the data stream
contains events from now up to 180 days ago. With event time, traffic weight distribution
across different days typically has a long tail pattern. Current day contains the most
traffic. The older days (long tail) contain less and less traffic. Assume writer parallelism
is `10`. The total weight across all 180 days is `10,000`. Target traffic weight per writer
task would be `1,000`. Assume the weight sum for the oldest 150 days is `1,000`. Normally,
the range partitioner would put all the oldest 150 days in one writer task. That writer task
would write to 150 small files (one per day). Keeping 150 open files can potentially consume
large amount of memory. Flushing and uploading 150 files (however small) at checkpoint time
can also be potentially slow. If this config is set to `0.02`. It means every sort key has a
base weight of `2%` of targeted weight of `1,000` for every write task. It would essentially
avoid placing more than `50` data files (one per day) on one writer task no matter how small
they are.

This is only applicable to {@link StatisticsType#Map} for low-cardinality scenario. For
{@link StatisticsType#Sketch} high-cardinality sort columns, they are usually not used as
partition columns. Otherwise, too many partitions and small files may be generated during
write. Sketch range partitioner simply splits high-cardinality keys into ordered ranges.
101 changes: 101 additions & 0 deletions docs/docs/flink-writes.md
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,107 @@ INSERT INTO tableName /*+ OPTIONS('upsert-enabled'='true') */

Check out all the options here: [write-options](flink-configuration.md#write-options)

## Distribution mode

Flink streaming writer supports both `HASH` and `RANGE` distribution mode.
You can enable it via `FlinkSink#Builder#distributionMode(DistributionMode)`
or via [write-options](flink-configuration.md#write-options).

### Hash distribution

HASH distribution shuffles data by partition key (partitioned table) or
equality fields (non-partitioned table). It simply leverages Flink's
Comment thread
pvary marked this conversation as resolved.
`DataStream#keyBy` to distribute the data.

HASH distribution has a few limitations.
<ul>
<li>It doesn't handle skewed data well. E.g. some partitions have a lot more data than others.
<li>It can result in unbalanced traffic distribution if cardinality of the partition key or
equality fields is low as demonstrated by [PR 4228](https://github.com/apache/iceberg/pull/4228).
<li>Writer parallelism is limited to the cardinality of the hash key.
If the cardinality is 10, only at most 10 writer tasks would get the traffic.
Having higher writer parallelism (even if traffic volume requires) won't help.
</ul>

### Range distribution (experimental)

RANGE distribution shuffles data by partition key or sort order via a custom range partitioner.
Range distribution collects traffic statistics to guide the range partitioner to
evenly distribute traffic to writer tasks.

Range distribution only shuffle the data via range partitioner. Rows are *not* sorted within
a data file, which Flink streaming writer doesn't support yet.

#### Use cases

RANGE distribution can be applied to an Iceberg table that either is partitioned or
has SortOrder defined. For a partitioned table without SortOrder, partition columns
are used as sort order. If SortOrder is explicitly defined for the table, it is used by
the range partitioner.

Range distribution can handle skewed data. E.g.
<ul>
<li>Table is partitioned by event time. Typically, recent hours have more data,
while the long-tail hours have less and less data.
<li>Table is partitioned by country code, where some countries (like US) have
a lot more traffic and smaller countries have a lot less data
<li>Table is partitioned by event type, where some types have a lot more data than others.
</ul>

Range distribution can also cluster data on non-partition columns.
E.g., table is partitioned hourly on ingestion time. Queries often include
predicate on a non-partition column like `device_id` or `country_code`.
Range partition would improve the query performance by clustering on the non-partition column
when table `SortOrder` is defined with the non-partition column.

#### Traffic statistics

Statistics are collected by every shuffle operator subtask and aggregated by the coordinator
for every checkpoint cycle. Aggregated statistics are broadcast to all subtasks and
applied to the range partitioner in the next checkpoint. So it may take up to two checkpoint
cycles to detect traffic distribution change and apply the new statistics to range partitioner.
Comment thread
pvary marked this conversation as resolved.

Range distribution can work with low cardinality (like `country_code`)
or high cardinality (like `device_id`) scenarios.
<ul>
<li>For low cardinality scenario (like hundreds or thousands),
HashMap is used to track traffic distribution for every key.
If a new sort key value shows up, range partitioner would just
round-robin it to the writer tasks before traffic distribution has been learned
about the new key.
<li>For high cardinality scenario (like millions or billions),
uniform random sampling (reservoir sampling) is used to compute range bounds
that split the sort key space evenly.
It keeps the memory footprint and network exchange low.
Comment thread
pvary marked this conversation as resolved.
Reservoir sampling work well if key distribution is relatively even.
If a single hot key has unbalanced large share of the traffic,
range split by uniform sampling probably won't work very well.
</ul>

#### Usage

Here is how to enable range distribution in Java. There are two optional advanced configs. Default should
work well for most cases. See [write-options](flink-configuration.md#write-options) for details.
```java
FlinkSink.forRowData(input)
...
.distributionMode(DistributionMode.RANGE)
.rangeDistributionStatisticsType(StatisticsType.Auto)
.rangeDistributionSortKeyBaseWeight(0.0d)
.append();
```

### Overhead

Data shuffling (hash or range) has computational overhead of serialization/deserialization
and network I/O. Expect some increase of CPU utilization.

Range distribution also collect and aggregate data distribution statistics.
That would also incur some CPU overhead. Memory overhead is typically
small if using default statistics type of `Auto`. Don't use `Map` statistics
type if key cardinality is high. That could result in significant memory footprint
and large network exchange for statistics aggregation.

## Notes

Flink streaming write jobs rely on snapshot summary to keep the last committed checkpoint ID, and
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ public LongConfParser longConf() {
return new LongConfParser();
}

public DoubleConfParser doubleConf() {
return new DoubleConfParser();
}

public <E extends Enum<E>> EnumConfParser<E> enumConfParser(Class<E> enumClass) {
return new EnumConfParser<>(enumClass);
}
Expand Down Expand Up @@ -135,6 +139,29 @@ public Long parseOptional() {
}
}

class DoubleConfParser extends ConfParser<DoubleConfParser, Double> {
private Double defaultValue;

@Override
protected DoubleConfParser self() {
return this;
}

public DoubleConfParser defaultValue(double value) {
this.defaultValue = value;
return self();
}

public double parse() {
Preconditions.checkArgument(defaultValue != null, "Default value cannot be null");
return parse(Double::parseDouble, defaultValue);
}

public Double parseOptional() {
return parse(Double::parseDouble, null);
}
}

class StringConfParser extends ConfParser<StringConfParser, String> {
private String defaultValue;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.flink.sink.shuffle.StatisticsType;

/**
* A class for common Iceberg configs for Flink writes.
Expand Down Expand Up @@ -167,6 +168,26 @@ public DistributionMode distributionMode() {
return DistributionMode.fromName(modeName);
}

public StatisticsType rangeDistributionStatisticsType() {
String name =
confParser
.stringConf()
.option(FlinkWriteOptions.RANGE_DISTRIBUTION_STATISTICS_TYPE.key())
.flinkConfig(FlinkWriteOptions.RANGE_DISTRIBUTION_STATISTICS_TYPE)
.defaultValue(FlinkWriteOptions.RANGE_DISTRIBUTION_STATISTICS_TYPE.defaultValue())
.parse();
return StatisticsType.valueOf(name);
}

public double rangeDistributionSortKeyBaseWeight() {
return confParser
.doubleConf()
.option(FlinkWriteOptions.RANGE_DISTRIBUTION_SORT_KEY_BASE_WEIGHT.key())
.flinkConfig(FlinkWriteOptions.RANGE_DISTRIBUTION_SORT_KEY_BASE_WEIGHT)
.defaultValue(FlinkWriteOptions.RANGE_DISTRIBUTION_SORT_KEY_BASE_WEIGHT.defaultValue())
.parse();
}

public int workerPoolSize() {
return confParser
.intConf()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.iceberg.SnapshotRef;
import org.apache.iceberg.flink.sink.shuffle.StatisticsType;

/** Flink sink write options */
public class FlinkWriteOptions {
Expand Down Expand Up @@ -60,6 +61,19 @@ private FlinkWriteOptions() {}
public static final ConfigOption<String> DISTRIBUTION_MODE =
ConfigOptions.key("distribution-mode").stringType().noDefaultValue();

public static final ConfigOption<String> RANGE_DISTRIBUTION_STATISTICS_TYPE =
ConfigOptions.key("range-distribution-statistics-type")
.stringType()
.defaultValue(StatisticsType.Auto.name())
.withDescription("Type of statistics collection: Auto, Map, Sketch");

public static final ConfigOption<Double> RANGE_DISTRIBUTION_SORT_KEY_BASE_WEIGHT =
ConfigOptions.key("range-distribution-sort-key-base-weight")
.doubleType()
.defaultValue(0.0d)
.withDescription(
"Base weight for every sort key relative to target weight per writer task");

// Branch to write to
public static final ConfigOption<String> BRANCH =
ConfigOptions.key("branch").stringType().defaultValue(SnapshotRef.MAIN_BRANCH);
Expand Down
Loading