Skip to content

Commit b6d8f42

Browse files
authored
Iceberg Add Files (#37701)
* add files transform and schematransform * minor fixes * add tests * create table if needed; determine partition spec from metrics * spotless * add batch route as well * spotless * extract SchemaTransform logic out * add integration tests * spotless * spotless * fix deps * add a few more tests; add error handling output * clarify comments * add comment
1 parent 8e90c36 commit b6d8f42

File tree

9 files changed

+2095
-5
lines changed

9 files changed

+2095
-5
lines changed
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
{
22
"comment": "Modify this file in a trivial way to cause this test suite to run.",
3-
"modification": 4
3+
"modification": 1
44
}

sdks/java/io/iceberg/build.gradle

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,8 @@ dependencies {
5151
implementation library.java.joda_time
5252
implementation "org.apache.parquet:parquet-column:$parquet_version"
5353
implementation "org.apache.parquet:parquet-hadoop:$parquet_version"
54+
implementation "org.apache.parquet:parquet-common:$parquet_version"
55+
implementation project(":sdks:java:io:parquet")
5456
implementation "org.apache.orc:orc-core:$orc_version"
5557
implementation "org.apache.iceberg:iceberg-core:$iceberg_version"
5658
implementation "org.apache.iceberg:iceberg-api:$iceberg_version"
@@ -74,11 +76,13 @@ dependencies {
7476
testImplementation library.java.bigdataoss_gcsio
7577
testImplementation library.java.bigdataoss_util_hadoop
7678
testImplementation "org.apache.parquet:parquet-avro:$parquet_version"
77-
testImplementation "org.apache.parquet:parquet-common:$parquet_version"
7879
testImplementation "org.apache.iceberg:iceberg-data:$iceberg_version"
7980
testImplementation project(path: ":sdks:java:core", configuration: "shadowTest")
8081
testImplementation project(":sdks:java:extensions:google-cloud-platform-core")
8182
testImplementation library.java.junit
83+
testImplementation library.java.hamcrest
84+
testImplementation project(path: ":sdks:java:extensions:avro")
85+
testImplementation 'org.awaitility:awaitility:4.2.0'
8286

8387
// Hive catalog test dependencies
8488
testImplementation project(path: ":sdks:java:io:iceberg:hive")
@@ -95,6 +99,7 @@ dependencies {
9599
testImplementation project(path: ":sdks:java:io:iceberg:bqms", configuration: "shadow")
96100
testImplementation project(":sdks:java:io:google-cloud-platform")
97101
testImplementation library.java.google_api_services_bigquery
102+
testImplementation 'com.google.cloud:google-cloud-storage'
98103

99104
testImplementation library.java.google_auth_library_oauth2_http
100105
testRuntimeOnly library.java.slf4j_jdk14

sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AddFiles.java

Lines changed: 671 additions & 0 deletions
Large diffs are not rendered by default.
Lines changed: 190 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,190 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.beam.sdk.io.iceberg;
19+
20+
import static org.apache.beam.sdk.io.iceberg.AddFiles.ERROR_TAG;
21+
import static org.apache.beam.sdk.io.iceberg.AddFiles.OUTPUT_TAG;
22+
import static org.apache.beam.sdk.io.iceberg.AddFilesSchemaTransformProvider.Configuration;
23+
import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
24+
25+
import com.google.auto.service.AutoService;
26+
import com.google.auto.value.AutoValue;
27+
import java.util.List;
28+
import java.util.Map;
29+
import org.apache.beam.sdk.schemas.AutoValueSchema;
30+
import org.apache.beam.sdk.schemas.Schema;
31+
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
32+
import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription;
33+
import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
34+
import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
35+
import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
36+
import org.apache.beam.sdk.schemas.transforms.providers.ErrorHandling;
37+
import org.apache.beam.sdk.transforms.Filter;
38+
import org.apache.beam.sdk.transforms.MapElements;
39+
import org.apache.beam.sdk.values.PCollectionRowTuple;
40+
import org.apache.beam.sdk.values.TypeDescriptors;
41+
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
42+
import org.checkerframework.checker.nullness.qual.Nullable;
43+
import org.joda.time.Duration;
44+
45+
@AutoService(SchemaTransformProvider.class)
46+
public class AddFilesSchemaTransformProvider extends TypedSchemaTransformProvider<Configuration> {
47+
@Override
48+
public AddFilesSchemaTransform from(Configuration configuration) {
49+
return new AddFilesSchemaTransform(configuration);
50+
}
51+
52+
@Override
53+
public String identifier() {
54+
return "beam:schematransform:iceberg_add_files:v1";
55+
}
56+
57+
@DefaultSchema(AutoValueSchema.class)
58+
@AutoValue
59+
public abstract static class Configuration {
60+
public static Builder builder() {
61+
return new AutoValue_AddFilesSchemaTransformProvider_Configuration.Builder();
62+
}
63+
64+
@SchemaFieldDescription("A fully-qualified table identifier.")
65+
public abstract String getTable();
66+
67+
@SchemaFieldDescription("Properties used to set up the Iceberg catalog.")
68+
public abstract @Nullable Map<String, String> getCatalogProperties();
69+
70+
@SchemaFieldDescription("Properties passed to the Hadoop ")
71+
public abstract @Nullable Map<String, String> getConfigProperties();
72+
73+
@SchemaFieldDescription(
74+
"For a streaming pipeline, sets the frequency at which incoming files are appended. Defaults to 600 (10 minutes). "
75+
+ "A commit is triggered when either this or append batch size is reached.")
76+
public abstract @Nullable Integer getTriggeringFrequencySeconds();
77+
78+
@SchemaFieldDescription(
79+
"For a streaming pipeline, sets the desired number of appended files per commit. Defaults to 100,000 files. "
80+
+ "A commit is triggered when either this or append triggering interval is reached.")
81+
public abstract @Nullable Integer getAppendBatchSize();
82+
83+
@SchemaFieldDescription(
84+
"The prefix shared among all partitions. For example, a data file may have the following"
85+
+ " location:%n"
86+
+ "'gs://bucket/namespace/table/data/id=13/name=beam/data_file.parquet'%n%n"
87+
+ "The provided prefix should go up until the partition information:%n"
88+
+ "'gs://bucket/namespace/table/data/'.%n"
89+
+ "If not provided, will try determining each DataFile's partition from its metrics metadata.")
90+
public abstract @Nullable String getLocationPrefix();
91+
92+
@SchemaFieldDescription(
93+
"Fields used to create a partition spec that is applied when tables are created. For a field 'foo', "
94+
+ "the available partition transforms are:\n\n"
95+
+ "- `foo`\n"
96+
+ "- `truncate(foo, N)`\n"
97+
+ "- `bucket(foo, N)`\n"
98+
+ "- `hour(foo)`\n"
99+
+ "- `day(foo)`\n"
100+
+ "- `month(foo)`\n"
101+
+ "- `year(foo)`\n"
102+
+ "- `void(foo)`\n\n"
103+
+ "For more information on partition transforms, please visit https://iceberg.apache.org/spec/#partition-transforms.")
104+
public abstract @Nullable List<String> getPartitionFields();
105+
106+
@SchemaFieldDescription(
107+
"Iceberg table properties to be set on the table when it is created.\n"
108+
+ "For more information on table properties,"
109+
+ " please visit https://iceberg.apache.org/docs/latest/configuration/#table-properties.")
110+
public abstract @Nullable Map<String, String> getTableProperties();
111+
112+
@SchemaFieldDescription("This option specifies whether and where to output unwritable rows.")
113+
public abstract @Nullable ErrorHandling getErrorHandling();
114+
115+
@AutoValue.Builder
116+
public abstract static class Builder {
117+
public abstract Builder setTable(String table);
118+
119+
public abstract Builder setCatalogProperties(Map<String, String> catalogProperties);
120+
121+
public abstract Builder setConfigProperties(Map<String, String> confProperties);
122+
123+
public abstract Builder setTriggeringFrequencySeconds(Integer triggeringFrequencySeconds);
124+
125+
public abstract Builder setAppendBatchSize(Integer size);
126+
127+
public abstract Builder setLocationPrefix(String prefix);
128+
129+
public abstract Builder setPartitionFields(List<String> fields);
130+
131+
public abstract Builder setTableProperties(Map<String, String> props);
132+
133+
public abstract Builder setErrorHandling(ErrorHandling errorHandling);
134+
135+
public abstract Configuration build();
136+
}
137+
138+
public IcebergCatalogConfig getIcebergCatalog() {
139+
return IcebergCatalogConfig.builder()
140+
.setCatalogProperties(getCatalogProperties())
141+
.setConfigProperties(getConfigProperties())
142+
.build();
143+
}
144+
}
145+
146+
public static class AddFilesSchemaTransform extends SchemaTransform {
147+
private final Configuration configuration;
148+
149+
public AddFilesSchemaTransform(Configuration configuration) {
150+
this.configuration = configuration;
151+
}
152+
153+
@Override
154+
public PCollectionRowTuple expand(PCollectionRowTuple input) {
155+
Schema inputSchema = input.getSinglePCollection().getSchema();
156+
Preconditions.checkState(
157+
inputSchema.getFieldCount() == 1
158+
&& inputSchema.getField(0).getType().getTypeName().equals(Schema.TypeName.STRING),
159+
"Incoming Row Schema must contain only one field of type String. Instead, got schema: %s",
160+
inputSchema);
161+
162+
@Nullable Integer frequency = configuration.getTriggeringFrequencySeconds();
163+
164+
PCollectionRowTuple result =
165+
input
166+
.getSinglePCollection()
167+
.apply("Filter empty paths", Filter.by(row -> row.getString(0) != null))
168+
.apply(
169+
"ExtractPaths",
170+
MapElements.into(TypeDescriptors.strings())
171+
.via(row -> checkStateNotNull(row.getString(0))))
172+
.apply(
173+
new AddFiles(
174+
configuration.getIcebergCatalog(),
175+
configuration.getTable(),
176+
configuration.getLocationPrefix(),
177+
configuration.getPartitionFields(),
178+
configuration.getTableProperties(),
179+
configuration.getAppendBatchSize(),
180+
frequency != null ? Duration.standardSeconds(frequency) : null));
181+
182+
PCollectionRowTuple output = PCollectionRowTuple.of("snapshots", result.get(OUTPUT_TAG));
183+
ErrorHandling errorHandling = configuration.getErrorHandling();
184+
if (errorHandling != null) {
185+
output = output.and(errorHandling.getOutput(), result.get(ERROR_TAG));
186+
}
187+
return output;
188+
}
189+
}
190+
}

sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/PartitionUtils.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,10 +68,13 @@ class PartitionUtils {
6868

6969
static PartitionSpec toPartitionSpec(
7070
@Nullable List<String> fields, org.apache.beam.sdk.schemas.Schema beamSchema) {
71+
return toPartitionSpec(fields, IcebergUtils.beamSchemaToIcebergSchema(beamSchema));
72+
}
73+
74+
static PartitionSpec toPartitionSpec(@Nullable List<String> fields, Schema schema) {
7175
if (fields == null) {
7276
return PartitionSpec.unpartitioned();
7377
}
74-
Schema schema = IcebergUtils.beamSchemaToIcebergSchema(beamSchema);
7578
PartitionSpec.Builder builder = PartitionSpec.builderFor(schema);
7679

7780
for (String field : fields) {

sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ReadUtils.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,9 +46,11 @@
4646
import org.apache.iceberg.encryption.EncryptedInputFile;
4747
import org.apache.iceberg.expressions.Evaluator;
4848
import org.apache.iceberg.expressions.Expression;
49+
import org.apache.iceberg.expressions.Expressions;
4950
import org.apache.iceberg.hadoop.HadoopInputFile;
5051
import org.apache.iceberg.io.CloseableIterable;
5152
import org.apache.iceberg.io.InputFile;
53+
import org.apache.iceberg.mapping.MappingUtil;
5254
import org.apache.iceberg.mapping.NameMapping;
5355
import org.apache.iceberg.mapping.NameMappingParser;
5456
import org.apache.iceberg.parquet.ParquetReader;
@@ -112,6 +114,34 @@ static ParquetReader<Record> createReader(FileScanTask task, Table table, Schema
112114
true);
113115
}
114116

117+
static ParquetReader<Record> createReader(InputFile inputFile, Schema schema) {
118+
ParquetReadOptions.Builder optionsBuilder;
119+
if (inputFile instanceof HadoopInputFile) {
120+
// remove read properties already set that may conflict with this read
121+
Configuration conf = new Configuration(((HadoopInputFile) inputFile).getConf());
122+
for (String property : READ_PROPERTIES_TO_REMOVE) {
123+
conf.unset(property);
124+
}
125+
optionsBuilder = HadoopReadOptions.builder(conf);
126+
} else {
127+
optionsBuilder = ParquetReadOptions.builder();
128+
}
129+
optionsBuilder =
130+
optionsBuilder
131+
.withRange(0, inputFile.getLength())
132+
.withMaxAllocationInBytes(MAX_FILE_BUFFER_SIZE);
133+
134+
return new ParquetReader<>(
135+
inputFile,
136+
schema,
137+
optionsBuilder.build(),
138+
fileSchema -> GenericParquetReaders.buildReader(schema, fileSchema),
139+
MappingUtil.create(schema),
140+
Expressions.alwaysTrue(),
141+
false,
142+
true);
143+
}
144+
115145
static Map<Integer, ?> constantsMap(
116146
FileScanTask task,
117147
BiFunction<Type, Object, Object> converter,

0 commit comments

Comments
 (0)