From 369f5ea145a29a98d55b134b6662dfab7c961125 Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Tue, 24 Mar 2026 19:13:40 +0000 Subject: [PATCH 1/6] something --- .../yaml/tests/iceberg_add_files.yaml | 55 +++++++++++++++++++ 1 file changed, 55 insertions(+) create mode 100644 sdks/python/apache_beam/yaml/tests/iceberg_add_files.yaml diff --git a/sdks/python/apache_beam/yaml/tests/iceberg_add_files.yaml b/sdks/python/apache_beam/yaml/tests/iceberg_add_files.yaml new file mode 100644 index 000000000000..d1591a959319 --- /dev/null +++ b/sdks/python/apache_beam/yaml/tests/iceberg_add_files.yaml @@ -0,0 +1,55 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +fixtures: + - name: TEMP_DIR + type: "tempfile.TemporaryDirectory" + +pipelines: + # Pipeline 1: Write a dummy Parquet file + - pipeline: + type: chain + transforms: + - type: Create + config: + elements: + - {label: "11a", rank: 0, bool: true} + - {label: "37a", rank: 1, bool: false} + - {label: "389a", rank: 2, bool: false} + - {label: "3821b", rank: 3, bool: true} + - {label: "990c", rank: 4, bool: true} + - {label: "1024d", rank: 5, bool: false} + - type: WriteToParquet + config: + path: "{TEMP_DIR}/data/data" + + # Pipeline 2: Add our generated file to the Iceberg table + - pipeline: + type: chain + transforms: + - type: Create + config: + elements: + # By default, Beam writes a sharded file like -00000-of-00001 + - {file: "{TEMP_DIR}/data/data-00000-of-00001"} + - type: IcebergAddFiles + config: + table: "default.table" + location_prefix: "{TEMP_DIR}/data/" + catalog_properties: + type: "hadoop" + warehouse: "{TEMP_DIR}/dir" From 15d6012095fa953af948c7e3c542b5b8b9853d51 Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Tue, 24 Mar 2026 19:14:20 +0000 Subject: [PATCH 2/6] add iceberg transform --- sdks/python/apache_beam/yaml/standard_io.yaml | 21 +++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/sdks/python/apache_beam/yaml/standard_io.yaml b/sdks/python/apache_beam/yaml/standard_io.yaml index 6673222f19a0..be0948d510dc 100644 --- a/sdks/python/apache_beam/yaml/standard_io.yaml +++ b/sdks/python/apache_beam/yaml/standard_io.yaml @@ -403,3 +403,24 @@ 'ReadFromIcebergCDC': 'beam:schematransform:org.apache.beam:iceberg_cdc_read:v1' config: gradle_target: 'sdks:java:io:expansion-service:shadowJar' + +#IcebergAddFiles +- type: renaming + transforms: + 'IcebergAddFiles': 'IcebergAddFiles' + config: + mappings: + 'IcebergAddFiles': + table: 'table' + catalog_properties: 'catalog_properties' + config_properties: 'config_properties' + triggering_frequency_seconds: 'triggering_frequency_seconds' + append_batch_size: 'append_batch_size' + location_prefix: 'location_prefix' + underlying_provider: + type: beamJar + transforms: + 'IcebergAddFiles': 'beam:schematransform:iceberg_add_files:v1' + config: + gradle_target: 'sdks:java:io:expansion-service:shadowJar' + From 802f69229670fa05a6540a839ab7f06b6e358ded Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Wed, 25 Mar 2026 15:33:46 +0000 Subject: [PATCH 3/6] update pipelines with verification --- .../yaml/tests/iceberg_add_files.yaml | 39 ++++++++++++++++++- 1 file changed, 38 insertions(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/yaml/tests/iceberg_add_files.yaml b/sdks/python/apache_beam/yaml/tests/iceberg_add_files.yaml index d1591a959319..d246db577c6b 100644 --- a/sdks/python/apache_beam/yaml/tests/iceberg_add_files.yaml +++ b/sdks/python/apache_beam/yaml/tests/iceberg_add_files.yaml @@ -20,6 +20,21 @@ fixtures: type: "tempfile.TemporaryDirectory" pipelines: + # Pipeline 0: Initialize Iceberg table + - pipeline: + type: chain + transforms: + - type: Create + config: + elements: + - {label: "dummy", rank: -1, bool: true} + - type: WriteToIceberg + config: + table: "default.table" + catalog_properties: + type: "hadoop" + warehouse: "{TEMP_DIR}/dir" + # Pipeline 1: Write a dummy Parquet file - pipeline: type: chain @@ -36,6 +51,7 @@ pipelines: - type: WriteToParquet config: path: "{TEMP_DIR}/data/data" + file_name_suffix: ".parquet" # Pipeline 2: Add our generated file to the Iceberg table - pipeline: @@ -45,7 +61,7 @@ pipelines: config: elements: # By default, Beam writes a sharded file like -00000-of-00001 - - {file: "{TEMP_DIR}/data/data-00000-of-00001"} + - {file: "{TEMP_DIR}/data/data-00000-of-00001.parquet"} - type: IcebergAddFiles config: table: "default.table" @@ -53,3 +69,24 @@ pipelines: catalog_properties: type: "hadoop" warehouse: "{TEMP_DIR}/dir" + + # Pipeline 3: Read from Iceberg and verify the contents + - pipeline: + type: chain + transforms: + - type: ReadFromIceberg + config: + table: "default.table" + catalog_properties: + type: "hadoop" + warehouse: "{TEMP_DIR}/dir" + - type: AssertEqual + config: + elements: + - {label: "dummy", rank: -1, bool: true} + - {label: "11a", rank: 0, bool: true} + - {label: "37a", rank: 1, bool: false} + - {label: "389a", rank: 2, bool: false} + - {label: "3821b", rank: 3, bool: true} + - {label: "990c", rank: 4, bool: true} + - {label: "1024d", rank: 5, bool: false} From 0b99815054461755e169071ca2ec8a634ad78175 Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Wed, 25 Mar 2026 17:53:23 +0000 Subject: [PATCH 4/6] fix comments --- sdks/python/apache_beam/yaml/standard_io.yaml | 3 +++ .../apache_beam/yaml/tests/iceberg_add_files.yaml | 15 --------------- 2 files changed, 3 insertions(+), 15 deletions(-) diff --git a/sdks/python/apache_beam/yaml/standard_io.yaml b/sdks/python/apache_beam/yaml/standard_io.yaml index be0948d510dc..0903d6dc8c34 100644 --- a/sdks/python/apache_beam/yaml/standard_io.yaml +++ b/sdks/python/apache_beam/yaml/standard_io.yaml @@ -417,6 +417,9 @@ triggering_frequency_seconds: 'triggering_frequency_seconds' append_batch_size: 'append_batch_size' location_prefix: 'location_prefix' + partition_fields: 'partition_fields' + table_properties: 'table_properties' + error_handling: 'error_handling' underlying_provider: type: beamJar transforms: diff --git a/sdks/python/apache_beam/yaml/tests/iceberg_add_files.yaml b/sdks/python/apache_beam/yaml/tests/iceberg_add_files.yaml index d246db577c6b..1b518f3794e5 100644 --- a/sdks/python/apache_beam/yaml/tests/iceberg_add_files.yaml +++ b/sdks/python/apache_beam/yaml/tests/iceberg_add_files.yaml @@ -20,20 +20,6 @@ fixtures: type: "tempfile.TemporaryDirectory" pipelines: - # Pipeline 0: Initialize Iceberg table - - pipeline: - type: chain - transforms: - - type: Create - config: - elements: - - {label: "dummy", rank: -1, bool: true} - - type: WriteToIceberg - config: - table: "default.table" - catalog_properties: - type: "hadoop" - warehouse: "{TEMP_DIR}/dir" # Pipeline 1: Write a dummy Parquet file - pipeline: @@ -83,7 +69,6 @@ pipelines: - type: AssertEqual config: elements: - - {label: "dummy", rank: -1, bool: true} - {label: "11a", rank: 0, bool: true} - {label: "37a", rank: 1, bool: false} - {label: "389a", rank: 2, bool: false} From 772176b548ce0f662c3974df6638449edc409024 Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Wed, 25 Mar 2026 18:15:54 +0000 Subject: [PATCH 5/6] address dlq error --- .../yaml/tests/iceberg_add_files.yaml | 33 +++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/sdks/python/apache_beam/yaml/tests/iceberg_add_files.yaml b/sdks/python/apache_beam/yaml/tests/iceberg_add_files.yaml index 1b518f3794e5..b2798a2b70c8 100644 --- a/sdks/python/apache_beam/yaml/tests/iceberg_add_files.yaml +++ b/sdks/python/apache_beam/yaml/tests/iceberg_add_files.yaml @@ -75,3 +75,36 @@ pipelines: - {label: "3821b", rank: 3, bool: true} - {label: "990c", rank: 4, bool: true} - {label: "1024d", rank: 5, bool: false} + + # Pipeline 4: Add an invalid file to trigger the DLQ + - pipeline: + type: composite + transforms: + - type: Create + config: + elements: + - {file: "gs://dummy-bucket/does-not-exist.txt"} + - type: IcebergAddFiles + name: AddInvalidFile + input: Create + config: + table: "default.table" + location_prefix: "gs://dummy-bucket/" + catalog_properties: + type: "hadoop" + warehouse: "{TEMP_DIR}/dir" + error_handling: + output: error_output + - type: WriteToJson + name: WriteErrorsToJson + input: AddInvalidFile.error_output + config: + path: "{TEMP_DIR}/error.json" + + # Pipeline 5: Ensure errors were written + - pipeline: + type: chain + transforms: + - type: ReadFromJson + config: + path: "{TEMP_DIR}/error.json*" From 59ba559308afebb82760fc6d79af73485850de35 Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Wed, 25 Mar 2026 20:04:55 +0000 Subject: [PATCH 6/6] add an AssertEqual per comment --- sdks/python/apache_beam/yaml/tests/iceberg_add_files.yaml | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/sdks/python/apache_beam/yaml/tests/iceberg_add_files.yaml b/sdks/python/apache_beam/yaml/tests/iceberg_add_files.yaml index b2798a2b70c8..1e089f19ac0e 100644 --- a/sdks/python/apache_beam/yaml/tests/iceberg_add_files.yaml +++ b/sdks/python/apache_beam/yaml/tests/iceberg_add_files.yaml @@ -108,3 +108,8 @@ pipelines: - type: ReadFromJson config: path: "{TEMP_DIR}/error.json*" + - type: AssertEqual + config: + elements: + - {file: "gs://dummy-bucket/does-not-exist.txt", error: "Could not determine the file's format"} +