Skip to content

Commit 4095102

Browse files
authored
Iceberg AddFiles -- handle FileNotFound exceptions (#37952)
* handle filenotfound exceptions * trigger ITs
1 parent 0868552 commit 4095102

File tree

3 files changed

+74
-16
lines changed

3 files changed

+74
-16
lines changed
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
{
22
"comment": "Modify this file in a trivial way to cause this test suite to run.",
3-
"modification": 1
3+
"modification": 2
44
}

sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AddFiles.java

Lines changed: 48 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
2424
import static org.apache.beam.sdk.values.PCollection.IsBounded.BOUNDED;
2525

26+
import java.io.FileNotFoundException;
2627
import java.io.IOException;
2728
import java.nio.ByteBuffer;
2829
import java.nio.channels.SeekableByteChannel;
@@ -81,6 +82,7 @@
8182
import org.apache.iceberg.catalog.TableIdentifier;
8283
import org.apache.iceberg.data.Record;
8384
import org.apache.iceberg.exceptions.AlreadyExistsException;
85+
import org.apache.iceberg.exceptions.NoSuchTableException;
8486
import org.apache.iceberg.io.CloseableIterable;
8587
import org.apache.iceberg.io.InputFile;
8688
import org.apache.iceberg.mapping.MappingUtil;
@@ -240,7 +242,18 @@ public void process(@Element String filePath, MultiOutputReceiver output)
240242
}
241243

242244
if (table == null) {
243-
table = getOrCreateTable(getSchema(filePath, format));
245+
try {
246+
table = getOrCreateTable(filePath, format);
247+
} catch (FileNotFoundException e) {
248+
output
249+
.get(ERRORS)
250+
.output(
251+
Row.withSchema(ERROR_SCHEMA)
252+
.addValues(filePath, checkStateNotNull(e.getMessage()))
253+
.build());
254+
numErrorFiles.inc();
255+
return;
256+
}
244257
}
245258

246259
// Check if the file path contains the provided prefix
@@ -256,9 +269,24 @@ public void process(@Element String filePath, MultiOutputReceiver output)
256269

257270
InputFile inputFile = table.io().newInputFile(filePath);
258271

259-
Metrics metrics =
260-
getFileMetrics(
261-
inputFile, format, MetricsConfig.forTable(table), MappingUtil.create(table.schema()));
272+
Metrics metrics;
273+
try {
274+
metrics =
275+
getFileMetrics(
276+
inputFile,
277+
format,
278+
MetricsConfig.forTable(table),
279+
MappingUtil.create(table.schema()));
280+
} catch (FileNotFoundException e) {
281+
output
282+
.get(ERRORS)
283+
.output(
284+
Row.withSchema(ERROR_SCHEMA)
285+
.addValues(filePath, checkStateNotNull(e.getMessage()))
286+
.build());
287+
numErrorFiles.inc();
288+
return;
289+
}
262290

263291
// Figure out which partition this DataFile should go to
264292
String partitionPath;
@@ -304,16 +332,23 @@ private static <W, T> T transformValue(Transform<W, T> transform, Type type, Obj
304332
return transform.bind(type).apply((W) value);
305333
}
306334

307-
private Table getOrCreateTable(org.apache.iceberg.Schema schema) {
308-
PartitionSpec spec = PartitionUtils.toPartitionSpec(partitionFields, schema);
335+
private Table getOrCreateTable(String filePath, FileFormat format) throws IOException {
336+
TableIdentifier tableId = TableIdentifier.parse(identifier);
309337
try {
310-
return tableProps == null
311-
? catalogConfig.catalog().createTable(TableIdentifier.parse(identifier), schema, spec)
312-
: catalogConfig
313-
.catalog()
314-
.createTable(TableIdentifier.parse(identifier), schema, spec, tableProps);
315-
} catch (AlreadyExistsException e) { // if table already exists, just load it
316-
return catalogConfig.catalog().loadTable(TableIdentifier.parse(identifier));
338+
return catalogConfig.catalog().loadTable(tableId);
339+
} catch (NoSuchTableException e) {
340+
try {
341+
org.apache.iceberg.Schema schema = getSchema(filePath, format);
342+
PartitionSpec spec = PartitionUtils.toPartitionSpec(partitionFields, schema);
343+
344+
return tableProps == null
345+
? catalogConfig.catalog().createTable(TableIdentifier.parse(identifier), schema, spec)
346+
: catalogConfig
347+
.catalog()
348+
.createTable(TableIdentifier.parse(identifier), schema, spec, tableProps);
349+
} catch (AlreadyExistsException e2) { // if table already exists, just load it
350+
return catalogConfig.catalog().loadTable(TableIdentifier.parse(identifier));
351+
}
317352
}
318353
}
319354

sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/AddFilesTest.java

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
2424
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;
2525
import static org.hamcrest.MatcherAssert.assertThat;
26+
import static org.hamcrest.Matchers.containsString;
2627
import static org.hamcrest.Matchers.hasEntry;
2728
import static org.junit.Assert.assertEquals;
2829
import static org.junit.Assert.assertThrows;
@@ -422,8 +423,6 @@ public void testPartitionPrefixErrors() throws Exception {
422423

423424
@Test
424425
public void testRecognizesBucketPartitionMismatch() throws IOException {
425-
catalog.dropTable(tableId);
426-
427426
String file1 = root + "data1.parquet";
428427
wrapper.wrap(record(-1, "And", 30));
429428
DataWriter<Record> writer = createWriter(file1, wrapper.copy());
@@ -464,6 +463,30 @@ public void testRecognizesBucketPartitionMismatch() throws IOException {
464463
pipeline.run().waitUntilFinish();
465464
}
466465

466+
@Test
467+
public void testCatchFileNotFoundException() throws IOException {
468+
String file = root + "non-existent.parquet";
469+
470+
PCollectionRowTuple outputTuple =
471+
pipeline
472+
.apply("Create Input", Create.of(file))
473+
.apply(new AddFiles(catalogConfig, tableId.toString(), null, null, null, 1, null));
474+
475+
PAssert.that(outputTuple.get("errors"))
476+
.satisfies(
477+
rows -> {
478+
Row error = Iterables.getOnlyElement(rows);
479+
String errorFile = error.getString("file");
480+
String message = error.getString("error");
481+
482+
assertEquals(file, errorFile);
483+
assertThat(message, containsString("No files found"));
484+
assertThat(message, containsString(errorFile));
485+
return null;
486+
});
487+
pipeline.run().waitUntilFinish();
488+
}
489+
467490
@Test
468491
public void testGetPartitionFromMetrics() throws IOException, InterruptedException {
469492
PartitionSpec partitionSpec =

0 commit comments

Comments
 (0)