Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -48,6 +49,9 @@ final class BigQueryTableOperations extends BaseMetastoreTableOperations {
private final FileIO fileIO;
private final TableReference tableReference;

/** Table loaded in doRefresh() for reuse in updateTable() to avoid redundant API call. */
private volatile Table metastoreTable;

BigQueryTableOperations(
BigQueryMetastoreClient client, FileIO fileIO, TableReference tableReference) {
this.client = client;
Expand All @@ -60,9 +64,11 @@ final class BigQueryTableOperations extends BaseMetastoreTableOperations {
public void doRefresh() {
// Must default to null.
String metadataLocation = null;
this.metastoreTable = null;
try {
this.metastoreTable = client.load(tableReference);
metadataLocation =
loadMetadataLocationOrThrow(client.load(tableReference).getExternalCatalogTableOptions());
loadMetadataLocationOrThrow(metastoreTable.getExternalCatalogTableOptions());
} catch (NoSuchTableException e) {
if (currentMetadataLocation() != null) {
// Re-throws the exception because the table must exist in this case.
Expand All @@ -86,7 +92,7 @@ public void doCommit(TableMetadata base, TableMetadata metadata) {
if (base == null) {
createTable(newMetadataLocation, metadata);
} else {
updateTable(base.metadataFileLocation(), newMetadataLocation, metadata);
updateTable(newMetadataLocation, metadata);
}
commitStatus = BaseMetastoreOperations.CommitStatus.SUCCESS;
} catch (CommitFailedException | CommitStateUnknownException e) {
Expand Down Expand Up @@ -149,35 +155,24 @@ private void addConnectionIfProvided(Table tableBuilder, Map<String, String> met
}

/** Update table properties with concurrent update detection using etag. */
private void updateTable(
String oldMetadataLocation, String newMetadataLocation, TableMetadata metadata) {
Table table = client.load(tableReference);
if (table.getEtag().isEmpty()) {
private void updateTable(String newMetadataLocation, TableMetadata metadata) {
Preconditions.checkState(
metastoreTable != null,
"Table %s must be loaded during refresh before commit",
tableName());

if (metastoreTable.getEtag().isEmpty()) {
throw new ValidationException(
"Etag of legacy table %s is empty, manually update the table via the BigQuery API or"
+ " recreate and retry",
tableName());
}
ExternalCatalogTableOptions options = table.getExternalCatalogTableOptions();
addConnectionIfProvided(table, metadata.properties());

// If `metadataLocationFromMetastore` is different from metadata location of base, it means

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this check removed?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this check removed?

Thank you for your review Manu.

This check becomes redundant with caching.

Before:

  1. doRefresh() loads table -> metadata location = "v1"
  2. Someone else commits -> metadata location = "v2"
  3. updateTable() loads table again -> sees "v2"
  4. Check catches: "v1" != "v2" -> fail

With caching:

  1. doRefresh() loads table -> metadata location = "v1", cached
  2. Someone else commits -> metadata location = "v2"
  3. updateTable() uses cached table -> still sees "v1"
  4. Check passes: "v1" == "v1" (compares against itself)
  5. tables.patch fails with HTTP 412 (ETag mismatch) -> Iceberg retries

The ETag check in tables.patch catches the same conflict, so this check no longer adds value.

// someone has updated metadata location in metastore, which is a conflict update.
String metadataLocationFromMetastore =
options.getParameters().getOrDefault(METADATA_LOCATION_PROP, "");
if (!metadataLocationFromMetastore.isEmpty()
&& !metadataLocationFromMetastore.equals(oldMetadataLocation)) {
throw new CommitFailedException(
"Cannot commit base metadata location '%s' is not same as the current table metadata location '%s' for"
+ " %s.%s",
oldMetadataLocation,
metadataLocationFromMetastore,
tableReference.getDatasetId(),
tableReference.getTableId());
}
ExternalCatalogTableOptions options = metastoreTable.getExternalCatalogTableOptions();
addConnectionIfProvided(metastoreTable, metadata.properties());

options.setParameters(buildTableParameters(newMetadataLocation, metadata));
client.update(tableReference, table);
client.update(tableReference, metastoreTable);
this.metastoreTable = null;
}

// To make the table queryable from Hive, the user would likely be setting the HIVE_ENGINE_ENABLED
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,18 +180,11 @@ public Table update(TableReference tableReference, Table table) {
String incomingEtag = table.getEtag();
String requiredEtag = existingTable.getEtag();

// The real patch() uses an If-Match header which is passed separately,
// NOT on the incoming table object.
// The BigQueryTableOperations does NOT set the ETag on the Table object
// it passes to the client update() method.
// For a fake, we assume the ETag check needs to be simulated based on
// state, BUT the real client.update() expects the ETAG as a separate parameter
// (or implicitly via setIfMatch header, which this Fake doesn't see).
// To make the fake usable, we'll assume that if an ETag *is* present
// on the incoming table object, it must match.
// Simulate ETag-based optimistic locking. If the incoming table has an ETag,
// it must match the current ETag in the store.
if (incomingEtag != null && !incomingEtag.equals(requiredEtag)) {
throw new CommitFailedException(
"Etag mismatch for table: %s. Required: %s, Found: %s",
"Cannot commit: Etag mismatch for table: %s. Required: %s, Found: %s",
tableReference, requiredEtag, incomingEtag);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,26 +185,25 @@ public void failWhenEtagMismatch() throws Exception {
}

@Test
public void failWhenMetadataLocationDiff() throws Exception {
public void failWhenConcurrentModificationDetected() throws Exception {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you verify table is only loaded once?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the review Manu. Sorry about that, I have added verification to confirm table is loaded only once in this commit.

Table tableWithEtag = createTestTable().setEtag("etag");
Table tableWithNewMetadata =
new Table()
.setEtag("etag")
.setExternalCatalogTableOptions(
new ExternalCatalogTableOptions()
.setParameters(ImmutableMap.of(METADATA_LOCATION_PROP, "a/new/location")));

reset(client);
// Two invocations, for loadTable and commit.
when(client.load(TABLE_REFERENCE)).thenReturn(tableWithEtag, tableWithNewMetadata);
when(client.load(TABLE_REFERENCE)).thenReturn(tableWithEtag);

org.apache.iceberg.Table loadedTable = catalog.loadTable(IDENTIFIER);

when(client.update(any(), any())).thenReturn(tableWithEtag);
// Simulate concurrent modification detected via ETag mismatch
when(client.update(any(), any()))
.thenThrow(new CommitFailedException("Cannot commit: Etag mismatch"));

assertThatThrownBy(
() -> loadedTable.updateSchema().addColumn("n", Types.IntegerType.get()).commit())
.isInstanceOf(CommitFailedException.class)
.hasMessageContaining("is not same as the current table metadata location");
.hasMessageContaining("Cannot commit");

// Verify table is loaded only once
verify(client, times(1)).load(TABLE_REFERENCE);
}

@Test
Expand Down