From 2c6bc0b8940e5736d7ba2c7c9634d985027a6525 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Thu, 26 Mar 2026 14:05:08 -0400 Subject: [PATCH] add batch import yaml IT --- .../apache/beam/sdk/io/iceberg/AddFiles.java | 11 +-- .../yaml/tests/iceberg_add_files_batch.yaml | 88 +++++++++++++++++++ 2 files changed, 91 insertions(+), 8 deletions(-) create mode 100644 sdks/python/apache_beam/yaml/tests/iceberg_add_files_batch.yaml diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AddFiles.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AddFiles.java index e250536382ed..904b1d1866dd 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AddFiles.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AddFiles.java @@ -557,20 +557,15 @@ public CommitFilesDoFn(IcebergCatalogConfig catalogConfig, String identifier) { this.identifier = identifier; } - @StartBundle - public void start() { - if (table == null) { - table = catalogConfig.catalog().loadTable(TableIdentifier.parse(identifier)); - } - } - @ProcessElement public void process( @Element KV> files, @AlwaysFetched @StateId("lastCommitTimestamp") ValueState lastCommitTimestamp, OutputReceiver output) { String commitId = commitHash(files.getValue()); - Table table = checkStateNotNull(this.table); + if (table == null) { + table = catalogConfig.catalog().loadTable(TableIdentifier.parse(identifier)); + } table.refresh(); if (shouldSkip(commitId, lastCommitTimestamp.read())) { diff --git a/sdks/python/apache_beam/yaml/tests/iceberg_add_files_batch.yaml b/sdks/python/apache_beam/yaml/tests/iceberg_add_files_batch.yaml new file mode 100644 index 000000000000..d9117201740a --- /dev/null +++ b/sdks/python/apache_beam/yaml/tests/iceberg_add_files_batch.yaml @@ -0,0 +1,88 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +fixtures: + - name: TEMP_DIR + type: "tempfile.TemporaryDirectory" + + +pipelines: + # Pipeline 1: Write dummy Parquet files + - pipeline: + type: chain + transforms: + - type: Create + config: + elements: + - {label: "11a", rank: 0, bool: true} + - {label: "37a", rank: 1, bool: false} + - {label: "389a", rank: 2, bool: false} + - {label: "3821b", rank: 3, bool: true} + - {label: "990c", rank: 4, bool: true} + - {label: "1024d", rank: 5, bool: false} + - type: WriteToParquet + config: + path: "{TEMP_DIR}/data/data" + file_name_suffix: ".parquet" + num_shards: 6 + + # Pipeline 2: Register our generated files in our Iceberg table + - pipeline: + type: chain + transforms: + - type: ReadMatchFiles + config: + file_pattern: "{TEMP_DIR}/data/data*.parquet" + - type: MapToFields + config: + language: python + fields: + path: + callable: "lambda row: row.path" + output_type: string + - type: IcebergAddFiles + config: + table: "default.table" + catalog_properties: + type: "hadoop" + warehouse: "{TEMP_DIR}/dir" + + providers: + - type: python + config: { } + transforms: + ReadMatchFiles: 'apache_beam.io.fileio.MatchFiles' + + # Pipeline 3: Read from Iceberg and verify the contents + - pipeline: + type: chain + transforms: + - type: ReadFromIceberg + config: + table: "default.table" + catalog_properties: + type: "hadoop" + warehouse: "{TEMP_DIR}/dir" + - type: AssertEqual + config: + elements: + - {label: "11a", rank: 0, bool: true} + - {label: "37a", rank: 1, bool: false} + - {label: "389a", rank: 2, bool: false} + - {label: "3821b", rank: 3, bool: true} + - {label: "990c", rank: 4, bool: true} + - {label: "1024d", rank: 5, bool: false}