Skip to content

Flink: Adjust the configuration precedence for the dynamic sink#13609

Merged
pvary merged 4 commits into
apache:mainfrom
Guosmilesmile:change_precedence
Jul 24, 2025
Merged

Flink: Adjust the configuration precedence for the dynamic sink#13609
pvary merged 4 commits into
apache:mainfrom
Guosmilesmile:change_precedence

Conversation

@Guosmilesmile
Copy link
Copy Markdown
Contributor

Now in DynamicWriter , the table configuration have higher priority and override the sink configuration. But generally, User-provided options should have precedence over table properties.

This aim is to adjust the configuration precedence for the dynamic sink.

Copy link
Copy Markdown
Contributor

@mxm mxm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for addressing this TODO @Guosmilesmile!

Comment on lines 109 to 111
// TODO: Handle precedence correctly for the write properties coming from
// the sink conf and from the table defaults
Map<String, String> tableWriteProperties =
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we addressed this, we should remove the comment.

Maps.newHashMap(commonWriteProperties);
tableWriteProperties.putAll(table.properties());
Maps.newHashMap(table.properties());
tableWriteProperties.putAll(commonWriteProperties);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder, should we use

here?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Map<String, String> tableWriteProperties is passed multiple times to other classes within RowDataTaskWriterFactory. Changing it to use FlinkConfParser would involve significant modifications, especially since some places have special logic for tableWriteProperties.

Alternatively, could we add a new method Map<String, String> properties() in FlinkConfParser to handle priority uniformly?

Do you have any better suggestions?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made a version using FlinkConfParser. Please take a look and see if it’s appropriate.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I think the old version was better. The Flink related configs are resolved earlier. At this point, we only have the general Iceberg write properties, for which it doesn't make sense to use the Flink conf parser.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for your suggestion. I will revert the changes.

@pvary
Copy link
Copy Markdown
Contributor

pvary commented Jul 24, 2025

Is this a behavioral change? Do we need to document it? Is there a release already with this feature?

Copy link
Copy Markdown
Contributor

@pvary pvary left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 so it could be merged to 1.10

Please create a test for this in follow up PR

@pvary pvary merged commit bc15453 into apache:main Jul 24, 2025
18 checks passed
@pvary
Copy link
Copy Markdown
Contributor

pvary commented Jul 24, 2025

Merged to main.
Thanks @Guosmilesmile for the PR and @mxm for the review!

Please in a follow-up PR create a test case which checks the config precedence.
Thanks,
Peter

@mxm
Copy link
Copy Markdown
Contributor

mxm commented Jul 24, 2025

Is this a behavioral change? Do we need to document it? Is there a release already with this feature?

Yes, but there is no official release of Dynamic Sink yet.

+1 for the test case.

@Guosmilesmile
Copy link
Copy Markdown
Contributor Author

Thanks for the review ! I will prepare a new pr for the test case soon.

@b-rick
Copy link
Copy Markdown

b-rick commented Jul 31, 2025

This change results in unintuitive behaviour for parquet compression codec:

This is flink-write-conf:

FlinkWriteConf

  public String parquetCompressionCodec() {
    return confParser
        .stringConf()
        .option(FlinkWriteOptions.COMPRESSION_CODEC.key())
        .flinkConfig(FlinkWriteOptions.COMPRESSION_CODEC)
        .tableProperty(TableProperties.PARQUET_COMPRESSION)
        .defaultValue(TableProperties.PARQUET_COMPRESSION_DEFAULT)
        .parse();
  }

It defaults to TableProperties.PARQUET_COMPRESSION_DEFAULT (which is gzip, which seems wrong, but that is unrelated)

Now, in my table properties, I have set write.parquet-compression-codec to zstd explicitly.

in Dynamic Writer:

 Map<String, String> tableWriteProperties =
                          Maps.newHashMap(table.properties());
                      tableWriteProperties.putAll(commonWriteProperties);

So as a result of this change, my explicit setting of write.parquet-compression-codec is overriden by a default in FlinkWriteConf, which seems like bad behaviour. Is this intended?

@Guosmilesmile
Copy link
Copy Markdown
Contributor Author

@b-rick Thank you very much for pointing this out. The configuration for parquet's default compression codec is somewhat special.

Since version 1.4.0, parquet's default compression codec changed from gzip to zstd by explicitly setting defaults in the table properties that apply only to new tables.

persistedProperties.put(
TableProperties.PARQUET_COMPRESSION,
TableProperties.PARQUET_COMPRESSION_DEFAULT_SINCE_1_4_0);

Common sinks can obtain the table information in advance, but dynamic sinks can only get the table information at runtime. So, in common sinks, the table is passed to SinkUtil.writeProperties and the compression codec is obtained as zstd, but in DynamicIcebergSink, null is passed, so it defaults to gzip.

Map<String, String> writeProperties =
SinkUtil.writeProperties(flinkWriteConf.dataFileFormat(), flinkWriteConf, null);

In my opinion, instead of passing null to SinkUtil.writeProperties, we could pass a Map with the property write.parquet.compression-codec: zstd configured. This would solve the issue in this scenario. However, if new default table properties are added in the future, this place would also need to be updated accordingly, which is not very convenient.

@pvary @mxm Do you have any other suggestions?

@mxm
Copy link
Copy Markdown
Contributor

mxm commented Jul 31, 2025

I think we should be moving configuration code like

FlinkWriteConf flinkWriteConf = new FlinkWriteConf(writeOptions, readableConfig);
into the runtime path, e.g. into the writer, where we can correctly resolve the write properties based on the user-provided properties, Flink config, table properties, and Iceberg-level defaults. It would be nice to use to resolve the write options.

@pvary
Copy link
Copy Markdown
Contributor

pvary commented Jul 31, 2025

I was playing around getting the config to the DynamicWriter, and found some serialization issues. If we can solve those then it would be nice the use the same mechanism to solve the configurations that in the normal sink

aiborodin pushed a commit to aiborodin/iceberg that referenced this pull request Sep 17, 2025
…e#13662)

These changes introduce a bug where flink default configuration (which
is incorrect) overrides table configuration, resulting in the
parquet compression type becoming gzip instead of zstd.

Revert "Flink: Adjust the configuration precedence for the dynamic
sink (apache#13609)"

This reverts commit bc15453.

Revert "Flink: Add test for adjust the configuration precedence in the
Dynamic Sink (apache#13662)"

This reverts commit 83da920.

Change-Id: I9ffaca3deed73b3160c2e21e2a632cb56213ba3f
Reviewed-on: https://gerrit.trading.imc.intra/c/data-engineering/iceberg/+/625661
Static-Analysis: Teamcity
Reviewed-by: Alexander Borodin <alex.borodin@imc.com>
Tested-by: Teamcity
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants