Skip to content

Commit 237b60f

Browse files
committed
Revert "Add support for Iceberg table identifiers with special characters (#33293)"
This reverts commit d6e0b0c.
1 parent b82bde8 commit 237b60f

File tree

11 files changed

+17
-97
lines changed

11 files changed

+17
-97
lines changed

.github/trigger_files/IO_Iceberg_Integration_Tests.json

Lines changed: 0 additions & 4 deletions
This file was deleted.

sdks/java/io/iceberg/build.gradle

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,10 +55,8 @@ dependencies {
5555
implementation "org.apache.iceberg:iceberg-api:$iceberg_version"
5656
implementation "org.apache.iceberg:iceberg-parquet:$iceberg_version"
5757
implementation "org.apache.iceberg:iceberg-orc:$iceberg_version"
58-
runtimeOnly "org.apache.iceberg:iceberg-gcp:$iceberg_version"
5958
implementation library.java.hadoop_common
60-
implementation library.java.jackson_core
61-
implementation library.java.jackson_databind
59+
runtimeOnly "org.apache.iceberg:iceberg-gcp:$iceberg_version"
6260

6361
testImplementation project(":sdks:java:managed")
6462
testImplementation library.java.hadoop_client

sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AppendFilesToTables.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import org.apache.iceberg.Snapshot;
4848
import org.apache.iceberg.Table;
4949
import org.apache.iceberg.catalog.Catalog;
50+
import org.apache.iceberg.catalog.TableIdentifier;
5051
import org.apache.iceberg.io.FileIO;
5152
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
5253
import org.slf4j.Logger;
@@ -132,7 +133,7 @@ public void processElement(
132133
return;
133134
}
134135

135-
Table table = getCatalog().loadTable(IcebergUtils.parseTableIdentifier(element.getKey()));
136+
Table table = getCatalog().loadTable(TableIdentifier.parse(element.getKey()));
136137

137138
// vast majority of the time, we will simply append data files.
138139
// in the rare case we get a batch that contains multiple partition specs, we will group

sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/FileWriteResult.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import org.apache.iceberg.DataFile;
2626
import org.apache.iceberg.PartitionSpec;
2727
import org.apache.iceberg.catalog.TableIdentifier;
28-
import org.apache.iceberg.catalog.TableIdentifierParser;
2928
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
3029

3130
@AutoValue
@@ -42,7 +41,7 @@ abstract class FileWriteResult {
4241
@SchemaIgnore
4342
public TableIdentifier getTableIdentifier() {
4443
if (cachedTableIdentifier == null) {
45-
cachedTableIdentifier = IcebergUtils.parseTableIdentifier(getTableIdentifierString());
44+
cachedTableIdentifier = TableIdentifier.parse(getTableIdentifierString());
4645
}
4746
return cachedTableIdentifier;
4847
}
@@ -68,7 +67,7 @@ abstract static class Builder {
6867

6968
@SchemaIgnore
7069
public Builder setTableIdentifier(TableIdentifier tableId) {
71-
return setTableIdentifierString(TableIdentifierParser.toJson(tableId));
70+
return setTableIdentifierString(tableId.toString());
7271
}
7372

7473
public abstract FileWriteResult build();

sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProvider.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.apache.beam.sdk.values.PCollection;
3232
import org.apache.beam.sdk.values.PCollectionRowTuple;
3333
import org.apache.beam.sdk.values.Row;
34+
import org.apache.iceberg.catalog.TableIdentifier;
3435

3536
/**
3637
* SchemaTransform implementation for {@link IcebergIO#readRows}. Reads records from Iceberg and
@@ -85,7 +86,7 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) {
8586
.getPipeline()
8687
.apply(
8788
IcebergIO.readRows(configuration.getIcebergCatalog())
88-
.from(IcebergUtils.parseTableIdentifier(configuration.getTable())));
89+
.from(TableIdentifier.parse(configuration.getTable())));
8990

9091
return PCollectionRowTuple.of(OUTPUT_TAG, output);
9192
}

sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergScanConfig.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
2424
import org.apache.iceberg.Table;
2525
import org.apache.iceberg.catalog.TableIdentifier;
26-
import org.apache.iceberg.catalog.TableIdentifierParser;
2726
import org.apache.iceberg.expressions.Expression;
2827
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
2928
import org.checkerframework.checker.nullness.qual.Nullable;
@@ -52,9 +51,7 @@ public enum ScanType {
5251
public Table getTable() {
5352
if (cachedTable == null) {
5453
cachedTable =
55-
getCatalogConfig()
56-
.catalog()
57-
.loadTable(IcebergUtils.parseTableIdentifier(getTableIdentifier()));
54+
getCatalogConfig().catalog().loadTable(TableIdentifier.parse(getTableIdentifier()));
5855
}
5956
return cachedTable;
6057
}
@@ -129,7 +126,7 @@ public abstract static class Builder {
129126
public abstract Builder setTableIdentifier(String tableIdentifier);
130127

131128
public Builder setTableIdentifier(TableIdentifier tableIdentifier) {
132-
return this.setTableIdentifier(TableIdentifierParser.toJson(tableIdentifier));
129+
return this.setTableIdentifier(tableIdentifier.toString());
133130
}
134131

135132
public Builder setTableIdentifier(String... names) {

sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java

Lines changed: 0 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,6 @@
1919

2020
import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;
2121

22-
import com.fasterxml.jackson.core.JsonProcessingException;
23-
import com.fasterxml.jackson.databind.JsonNode;
24-
import com.fasterxml.jackson.databind.ObjectMapper;
2522
import java.nio.ByteBuffer;
2623
import java.time.LocalDate;
2724
import java.time.LocalDateTime;
@@ -39,8 +36,6 @@
3936
import org.apache.beam.sdk.values.Row;
4037
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
4138
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
42-
import org.apache.iceberg.catalog.TableIdentifier;
43-
import org.apache.iceberg.catalog.TableIdentifierParser;
4439
import org.apache.iceberg.data.GenericRecord;
4540
import org.apache.iceberg.data.Record;
4641
import org.apache.iceberg.types.Type;
@@ -52,9 +47,6 @@
5247

5348
/** Utilities for converting between Beam and Iceberg types, made public for user's convenience. */
5449
public class IcebergUtils {
55-
56-
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
57-
5850
private IcebergUtils() {}
5951

6052
private static final Map<Schema.TypeName, Type> BEAM_TYPES_TO_ICEBERG_TYPES =
@@ -514,13 +506,4 @@ private static Object getLogicalTypeValue(Object icebergValue, Schema.FieldType
514506
// LocalDateTime, LocalDate, LocalTime
515507
return icebergValue;
516508
}
517-
518-
public static TableIdentifier parseTableIdentifier(String table) {
519-
try {
520-
JsonNode jsonNode = OBJECT_MAPPER.readTree(table);
521-
return TableIdentifierParser.fromJson(jsonNode);
522-
} catch (JsonProcessingException e) {
523-
return TableIdentifier.parse(table);
524-
}
525-
}
526509
}

sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/OneTableDynamicDestinations.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ class OneTableDynamicDestinations implements DynamicDestinations, Externalizable
4141
@VisibleForTesting
4242
TableIdentifier getTableIdentifier() {
4343
if (tableId == null) {
44-
tableId = IcebergUtils.parseTableIdentifier(checkStateNotNull(tableIdString));
44+
tableId = TableIdentifier.parse(checkStateNotNull(tableIdString));
4545
}
4646
return tableId;
4747
}
@@ -86,6 +86,6 @@ public void writeExternal(ObjectOutput out) throws IOException {
8686
@Override
8787
public void readExternal(ObjectInput in) throws IOException {
8888
tableIdString = in.readUTF();
89-
tableId = IcebergUtils.parseTableIdentifier(tableIdString);
89+
tableId = TableIdentifier.parse(tableIdString);
9090
}
9191
}

sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/PortableIcebergDestinations.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.beam.sdk.values.Row;
2525
import org.apache.beam.sdk.values.ValueInSingleWindow;
2626
import org.apache.iceberg.FileFormat;
27+
import org.apache.iceberg.catalog.TableIdentifier;
2728
import org.checkerframework.checker.nullness.qual.Nullable;
2829

2930
class PortableIcebergDestinations implements DynamicDestinations {
@@ -72,7 +73,7 @@ public String getTableStringIdentifier(ValueInSingleWindow<Row> element) {
7273
@Override
7374
public IcebergDestination instantiateDestination(String dest) {
7475
return IcebergDestination.builder()
75-
.setTableIdentifier(IcebergUtils.parseTableIdentifier(dest))
76+
.setTableIdentifier(TableIdentifier.parse(dest))
7677
.setTableCreateConfig(null)
7778
.setFileFormat(FileFormat.fromString(fileFormat))
7879
.build();

sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOReadTest.java

Lines changed: 4 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,6 @@
2323

2424
import java.io.File;
2525
import java.io.IOException;
26-
import java.util.Arrays;
27-
import java.util.Collection;
2826
import java.util.List;
2927
import java.util.Map;
3028
import java.util.UUID;
@@ -70,11 +68,11 @@
7068
import org.junit.Test;
7169
import org.junit.rules.TemporaryFolder;
7270
import org.junit.runner.RunWith;
73-
import org.junit.runners.Parameterized;
71+
import org.junit.runners.JUnit4;
7472
import org.slf4j.Logger;
7573
import org.slf4j.LoggerFactory;
7674

77-
@RunWith(Parameterized.class)
75+
@RunWith(JUnit4.class)
7876
public class IcebergIOReadTest {
7977

8078
private static final Logger LOG = LoggerFactory.getLogger(IcebergIOReadTest.class);
@@ -85,21 +83,6 @@ public class IcebergIOReadTest {
8583

8684
@Rule public TestPipeline testPipeline = TestPipeline.create();
8785

88-
@Parameterized.Parameters
89-
public static Collection<Object[]> data() {
90-
return Arrays.asList(
91-
new Object[][] {
92-
{String.format("{\"namespace\": [\"default\"], \"name\": \"%s\"}", tableId())},
93-
{String.format("default.%s", tableId())},
94-
});
95-
}
96-
97-
public static String tableId() {
98-
return "table" + Long.toString(UUID.randomUUID().hashCode(), 16);
99-
}
100-
101-
@Parameterized.Parameter public String tableStringIdentifier;
102-
10386
static class PrintRow extends DoFn<Row, Row> {
10487

10588
@ProcessElement
@@ -111,7 +94,8 @@ public void process(@Element Row row, OutputReceiver<Row> output) throws Excepti
11194

11295
@Test
11396
public void testSimpleScan() throws Exception {
114-
TableIdentifier tableId = IcebergUtils.parseTableIdentifier(tableStringIdentifier);
97+
TableIdentifier tableId =
98+
TableIdentifier.of("default", "table" + Long.toString(UUID.randomUUID().hashCode(), 16));
11599
Table simpleTable = warehouse.createTable(tableId, TestFixtures.SCHEMA);
116100
final Schema schema = IcebergUtils.icebergSchemaToBeamSchema(TestFixtures.SCHEMA);
117101

0 commit comments

Comments
 (0)