diff --git a/sdks/python/apache_beam/yaml/standard_io.yaml b/sdks/python/apache_beam/yaml/standard_io.yaml index 6673222f19a0..0903d6dc8c34 100644 --- a/sdks/python/apache_beam/yaml/standard_io.yaml +++ b/sdks/python/apache_beam/yaml/standard_io.yaml @@ -403,3 +403,27 @@ 'ReadFromIcebergCDC': 'beam:schematransform:org.apache.beam:iceberg_cdc_read:v1' config: gradle_target: 'sdks:java:io:expansion-service:shadowJar' + +#IcebergAddFiles +- type: renaming + transforms: + 'IcebergAddFiles': 'IcebergAddFiles' + config: + mappings: + 'IcebergAddFiles': + table: 'table' + catalog_properties: 'catalog_properties' + config_properties: 'config_properties' + triggering_frequency_seconds: 'triggering_frequency_seconds' + append_batch_size: 'append_batch_size' + location_prefix: 'location_prefix' + partition_fields: 'partition_fields' + table_properties: 'table_properties' + error_handling: 'error_handling' + underlying_provider: + type: beamJar + transforms: + 'IcebergAddFiles': 'beam:schematransform:iceberg_add_files:v1' + config: + gradle_target: 'sdks:java:io:expansion-service:shadowJar' + diff --git a/sdks/python/apache_beam/yaml/tests/iceberg_add_files.yaml b/sdks/python/apache_beam/yaml/tests/iceberg_add_files.yaml new file mode 100644 index 000000000000..1e089f19ac0e --- /dev/null +++ b/sdks/python/apache_beam/yaml/tests/iceberg_add_files.yaml @@ -0,0 +1,115 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +fixtures: + - name: TEMP_DIR + type: "tempfile.TemporaryDirectory" + +pipelines: + + # Pipeline 1: Write a dummy Parquet file + - pipeline: + type: chain + transforms: + - type: Create + config: + elements: + - {label: "11a", rank: 0, bool: true} + - {label: "37a", rank: 1, bool: false} + - {label: "389a", rank: 2, bool: false} + - {label: "3821b", rank: 3, bool: true} + - {label: "990c", rank: 4, bool: true} + - {label: "1024d", rank: 5, bool: false} + - type: WriteToParquet + config: + path: "{TEMP_DIR}/data/data" + file_name_suffix: ".parquet" + + # Pipeline 2: Add our generated file to the Iceberg table + - pipeline: + type: chain + transforms: + - type: Create + config: + elements: + # By default, Beam writes a sharded file like -00000-of-00001 + - {file: "{TEMP_DIR}/data/data-00000-of-00001.parquet"} + - type: IcebergAddFiles + config: + table: "default.table" + location_prefix: "{TEMP_DIR}/data/" + catalog_properties: + type: "hadoop" + warehouse: "{TEMP_DIR}/dir" + + # Pipeline 3: Read from Iceberg and verify the contents + - pipeline: + type: chain + transforms: + - type: ReadFromIceberg + config: + table: "default.table" + catalog_properties: + type: "hadoop" + warehouse: "{TEMP_DIR}/dir" + - type: AssertEqual + config: + elements: + - {label: "11a", rank: 0, bool: true} + - {label: "37a", rank: 1, bool: false} + - {label: "389a", rank: 2, bool: false} + - {label: "3821b", rank: 3, bool: true} + - {label: "990c", rank: 4, bool: true} + - {label: "1024d", rank: 5, bool: false} + + # Pipeline 4: Add an invalid file to trigger the DLQ + - pipeline: + type: composite + transforms: + - type: Create + config: + elements: + - {file: "gs://dummy-bucket/does-not-exist.txt"} + - type: IcebergAddFiles + name: AddInvalidFile + input: Create + config: + table: "default.table" + location_prefix: "gs://dummy-bucket/" + catalog_properties: + type: "hadoop" + warehouse: "{TEMP_DIR}/dir" + error_handling: + output: error_output + - type: WriteToJson + name: WriteErrorsToJson + input: AddInvalidFile.error_output + config: + path: "{TEMP_DIR}/error.json" + + # Pipeline 5: Ensure errors were written + - pipeline: + type: chain + transforms: + - type: ReadFromJson + config: + path: "{TEMP_DIR}/error.json*" + - type: AssertEqual + config: + elements: + - {file: "gs://dummy-bucket/does-not-exist.txt", error: "Could not determine the file's format"} +