diff --git a/bigquery/src/main/java/org/apache/iceberg/gcp/bigquery/BigQueryTableOperations.java b/bigquery/src/main/java/org/apache/iceberg/gcp/bigquery/BigQueryTableOperations.java index e5f0a449574c..37728aa15794 100644 --- a/bigquery/src/main/java/org/apache/iceberg/gcp/bigquery/BigQueryTableOperations.java +++ b/bigquery/src/main/java/org/apache/iceberg/gcp/bigquery/BigQueryTableOperations.java @@ -33,6 +33,7 @@ import org.apache.iceberg.exceptions.NoSuchTableException; import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,6 +49,9 @@ final class BigQueryTableOperations extends BaseMetastoreTableOperations { private final FileIO fileIO; private final TableReference tableReference; + /** Table loaded in doRefresh() for reuse in updateTable() to avoid redundant API call. */ + private volatile Table metastoreTable; + BigQueryTableOperations( BigQueryMetastoreClient client, FileIO fileIO, TableReference tableReference) { this.client = client; @@ -60,9 +64,11 @@ final class BigQueryTableOperations extends BaseMetastoreTableOperations { public void doRefresh() { // Must default to null. String metadataLocation = null; + this.metastoreTable = null; try { + this.metastoreTable = client.load(tableReference); metadataLocation = - loadMetadataLocationOrThrow(client.load(tableReference).getExternalCatalogTableOptions()); + loadMetadataLocationOrThrow(metastoreTable.getExternalCatalogTableOptions()); } catch (NoSuchTableException e) { if (currentMetadataLocation() != null) { // Re-throws the exception because the table must exist in this case. @@ -86,7 +92,7 @@ public void doCommit(TableMetadata base, TableMetadata metadata) { if (base == null) { createTable(newMetadataLocation, metadata); } else { - updateTable(base.metadataFileLocation(), newMetadataLocation, metadata); + updateTable(newMetadataLocation, metadata); } commitStatus = BaseMetastoreOperations.CommitStatus.SUCCESS; } catch (CommitFailedException | CommitStateUnknownException e) { @@ -149,35 +155,24 @@ private void addConnectionIfProvided(Table tableBuilder, Map met } /** Update table properties with concurrent update detection using etag. */ - private void updateTable( - String oldMetadataLocation, String newMetadataLocation, TableMetadata metadata) { - Table table = client.load(tableReference); - if (table.getEtag().isEmpty()) { + private void updateTable(String newMetadataLocation, TableMetadata metadata) { + Preconditions.checkState( + metastoreTable != null, + "Table %s must be loaded during refresh before commit", + tableName()); + + if (metastoreTable.getEtag().isEmpty()) { throw new ValidationException( "Etag of legacy table %s is empty, manually update the table via the BigQuery API or" + " recreate and retry", tableName()); } - ExternalCatalogTableOptions options = table.getExternalCatalogTableOptions(); - addConnectionIfProvided(table, metadata.properties()); - - // If `metadataLocationFromMetastore` is different from metadata location of base, it means - // someone has updated metadata location in metastore, which is a conflict update. - String metadataLocationFromMetastore = - options.getParameters().getOrDefault(METADATA_LOCATION_PROP, ""); - if (!metadataLocationFromMetastore.isEmpty() - && !metadataLocationFromMetastore.equals(oldMetadataLocation)) { - throw new CommitFailedException( - "Cannot commit base metadata location '%s' is not same as the current table metadata location '%s' for" - + " %s.%s", - oldMetadataLocation, - metadataLocationFromMetastore, - tableReference.getDatasetId(), - tableReference.getTableId()); - } + ExternalCatalogTableOptions options = metastoreTable.getExternalCatalogTableOptions(); + addConnectionIfProvided(metastoreTable, metadata.properties()); options.setParameters(buildTableParameters(newMetadataLocation, metadata)); - client.update(tableReference, table); + client.update(tableReference, metastoreTable); + this.metastoreTable = null; } // To make the table queryable from Hive, the user would likely be setting the HIVE_ENGINE_ENABLED diff --git a/bigquery/src/test/java/org/apache/iceberg/gcp/bigquery/FakeBigQueryMetastoreClient.java b/bigquery/src/test/java/org/apache/iceberg/gcp/bigquery/FakeBigQueryMetastoreClient.java index 0c6df15091a6..3619f7908c38 100644 --- a/bigquery/src/test/java/org/apache/iceberg/gcp/bigquery/FakeBigQueryMetastoreClient.java +++ b/bigquery/src/test/java/org/apache/iceberg/gcp/bigquery/FakeBigQueryMetastoreClient.java @@ -180,18 +180,11 @@ public Table update(TableReference tableReference, Table table) { String incomingEtag = table.getEtag(); String requiredEtag = existingTable.getEtag(); - // The real patch() uses an If-Match header which is passed separately, - // NOT on the incoming table object. - // The BigQueryTableOperations does NOT set the ETag on the Table object - // it passes to the client update() method. - // For a fake, we assume the ETag check needs to be simulated based on - // state, BUT the real client.update() expects the ETAG as a separate parameter - // (or implicitly via setIfMatch header, which this Fake doesn't see). - // To make the fake usable, we'll assume that if an ETag *is* present - // on the incoming table object, it must match. + // Simulate ETag-based optimistic locking. If the incoming table has an ETag, + // it must match the current ETag in the store. if (incomingEtag != null && !incomingEtag.equals(requiredEtag)) { throw new CommitFailedException( - "Etag mismatch for table: %s. Required: %s, Found: %s", + "Cannot commit: Etag mismatch for table: %s. Required: %s, Found: %s", tableReference, requiredEtag, incomingEtag); } diff --git a/bigquery/src/test/java/org/apache/iceberg/gcp/bigquery/TestBigQueryTableOperations.java b/bigquery/src/test/java/org/apache/iceberg/gcp/bigquery/TestBigQueryTableOperations.java index 4666ec61f4d3..c5095aedb000 100644 --- a/bigquery/src/test/java/org/apache/iceberg/gcp/bigquery/TestBigQueryTableOperations.java +++ b/bigquery/src/test/java/org/apache/iceberg/gcp/bigquery/TestBigQueryTableOperations.java @@ -185,26 +185,25 @@ public void failWhenEtagMismatch() throws Exception { } @Test - public void failWhenMetadataLocationDiff() throws Exception { + public void failWhenConcurrentModificationDetected() throws Exception { Table tableWithEtag = createTestTable().setEtag("etag"); - Table tableWithNewMetadata = - new Table() - .setEtag("etag") - .setExternalCatalogTableOptions( - new ExternalCatalogTableOptions() - .setParameters(ImmutableMap.of(METADATA_LOCATION_PROP, "a/new/location"))); reset(client); - // Two invocations, for loadTable and commit. - when(client.load(TABLE_REFERENCE)).thenReturn(tableWithEtag, tableWithNewMetadata); + when(client.load(TABLE_REFERENCE)).thenReturn(tableWithEtag); org.apache.iceberg.Table loadedTable = catalog.loadTable(IDENTIFIER); - when(client.update(any(), any())).thenReturn(tableWithEtag); + // Simulate concurrent modification detected via ETag mismatch + when(client.update(any(), any())) + .thenThrow(new CommitFailedException("Cannot commit: Etag mismatch")); + assertThatThrownBy( () -> loadedTable.updateSchema().addColumn("n", Types.IntegerType.get()).commit()) .isInstanceOf(CommitFailedException.class) - .hasMessageContaining("is not same as the current table metadata location"); + .hasMessageContaining("Cannot commit"); + + // Verify table is loaded only once + verify(client, times(1)).load(TABLE_REFERENCE); } @Test