From 6efb3f85ed6ff341263738939f4d157067b3ab43 Mon Sep 17 00:00:00 2001 From: Rui Li Date: Tue, 25 Mar 2025 14:42:02 +0800 Subject: [PATCH 1/7] Core, Hive: Double check commit status in case of commit conflict for NoLock --- .../iceberg/BaseMetastoreOperations.java | 35 ++++++++++-- .../iceberg/BaseMetastoreTableOperations.java | 2 +- .../iceberg/hive/HiveTableOperations.java | 55 +++++++++++++------ .../iceberg/hive/HiveMetastoreExtension.java | 12 +--- .../iceberg/hive/TestHiveCommitLocks.java | 40 +++++++++++++- .../iceberg/hive/TestHiveMetastore.java | 15 +++++ .../resources/hive-schema-3.1.0.derby.sql | 2 +- 7 files changed, 127 insertions(+), 34 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/BaseMetastoreOperations.java b/core/src/main/java/org/apache/iceberg/BaseMetastoreOperations.java index 09c2249046f4..bc8dd4981a13 100644 --- a/core/src/main/java/org/apache/iceberg/BaseMetastoreOperations.java +++ b/core/src/main/java/org/apache/iceberg/BaseMetastoreOperations.java @@ -28,6 +28,7 @@ import static org.apache.iceberg.TableProperties.COMMIT_STATUS_CHECKS_TOTAL_WAIT_MS_DEFAULT; import java.util.Map; +import java.util.Optional; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; import org.apache.iceberg.util.PropertyUtil; @@ -63,6 +64,31 @@ protected CommitStatus checkCommitStatus( String newMetadataLocation, Map properties, Supplier commitStatusSupplier) { + if (metadataLocationCommitted( + tableOrViewName, newMetadataLocation, properties, commitStatusSupplier) + .orElse(false)) { + return CommitStatus.SUCCESS; + } + return CommitStatus.UNKNOWN; + } + + /** + * Attempt to load the content and see if any current or past metadata location matches the one we + * were attempting to set. + * + * @param tableOrViewName full name of the Table/View + * @param newMetadataLocation the path of the new commit file + * @param properties properties for retry + * @param commitStatusSupplier check if the latest metadata presents or not using metadata + * location for table. + * @return Empty if locations cannot be checked, e.g. unable to refresh. True if the new location + * is committed, false otherwise. + */ + protected Optional metadataLocationCommitted( + String tableOrViewName, + String newMetadataLocation, + Map properties, + Supplier commitStatusSupplier) { int maxAttempts = PropertyUtil.propertyAsInt( properties, COMMIT_NUM_STATUS_CHECKS, COMMIT_NUM_STATUS_CHECKS_DEFAULT); @@ -78,7 +104,7 @@ protected CommitStatus checkCommitStatus( COMMIT_STATUS_CHECKS_TOTAL_WAIT_MS, COMMIT_STATUS_CHECKS_TOTAL_WAIT_MS_DEFAULT); - AtomicReference status = new AtomicReference<>(CommitStatus.UNKNOWN); + AtomicReference res = new AtomicReference<>(null); Tasks.foreach(newMetadataLocation) .retry(maxAttempts) @@ -96,23 +122,24 @@ protected CommitStatus checkCommitStatus( "Commit status check: Commit to {} of {} succeeded", tableOrViewName, newMetadataLocation); - status.set(CommitStatus.SUCCESS); + res.set(true); } else { LOG.warn( "Commit status check: Commit to {} of {} unknown, new metadata location is not current " + "or in history", tableOrViewName, newMetadataLocation); + res.set(false); } }); - if (status.get() == CommitStatus.UNKNOWN) { + if (res.get() == null) { LOG.error( "Cannot determine commit state to {}. Failed during checking {} times. " + "Treating commit state as unknown.", tableOrViewName, maxAttempts); } - return status.get(); + return Optional.ofNullable(res.get()); } } diff --git a/core/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java b/core/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java index dbab9e813966..8e16476270e6 100644 --- a/core/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java +++ b/core/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java @@ -310,7 +310,7 @@ protected CommitStatus checkCommitStatus(String newMetadataLocation, TableMetada * @return true if the new metadata location is the current metadata location or present within * previous metadata files. */ - private boolean checkCurrentMetadataLocation(String newMetadataLocation) { + protected boolean checkCurrentMetadataLocation(String newMetadataLocation) { TableMetadata metadata = refresh(); String currentMetadataFileLocation = metadata.metadataFileLocation(); return currentMetadataFileLocation.equals(newMetadataLocation) diff --git a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java index 619f20ab87a3..4357e139d8a1 100644 --- a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java +++ b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java @@ -268,16 +268,6 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) { throw e; } catch (Throwable e) { - if (e.getMessage() != null - && e.getMessage() - .contains( - "The table has been modified. The parameter value for key '" - + HiveTableOperations.METADATA_LOCATION_PROP - + "' is")) { - throw new CommitFailedException( - e, "The table %s.%s has been modified concurrently", database, tableName); - } - if (e.getMessage() != null && e.getMessage().contains("Table/View 'HIVE_LOCKS' does not exist")) { throw new RuntimeException( @@ -287,15 +277,25 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) { e); } - LOG.error( - "Cannot tell if commit to {}.{} succeeded, attempting to reconnect and check.", - database, - tableName, - e); commitStatus = BaseMetastoreOperations.CommitStatus.UNKNOWN; - commitStatus = - BaseMetastoreOperations.CommitStatus.valueOf( - checkCommitStatus(newMetadataLocation, metadata).name()); + if (e.getMessage() != null + && e.getMessage() + .contains( + "The table has been modified. The parameter value for key '" + + HiveTableOperations.METADATA_LOCATION_PROP + + "' is")) { + commitStatus = handleConcurrentModification(e, newMetadataLocation, metadata); + } else { + LOG.error( + "Cannot tell if commit to {}.{} succeeded, attempting to reconnect and check.", + database, + tableName, + e); + commitStatus = + BaseMetastoreOperations.CommitStatus.valueOf( + checkCommitStatus(newMetadataLocation, metadata).name()); + } + switch (commitStatus) { case SUCCESS: break; @@ -324,6 +324,25 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) { "Committed to table {} with the new metadata location {}", fullName, newMetadataLocation); } + private BaseMetastoreOperations.CommitStatus handleConcurrentModification( + Throwable throwable, String newMetadataLocation, TableMetadata metadata) { + Optional locationCommitted = + metadataLocationCommitted( + tableName(), + newMetadataLocation, + metadata.properties(), + () -> checkCurrentMetadataLocation(newMetadataLocation)); + if (locationCommitted.isPresent()) { + if (locationCommitted.get()) { + return BaseMetastoreOperations.CommitStatus.SUCCESS; + } else { + throw new CommitFailedException( + throwable, "The table %s.%s has been modified concurrently", database, tableName); + } + } + return BaseMetastoreOperations.CommitStatus.UNKNOWN; + } + private void setHmsTableParameters( String newMetadataLocation, Table tbl, diff --git a/hive-metastore/src/test/java/org/apache/iceberg/hive/HiveMetastoreExtension.java b/hive-metastore/src/test/java/org/apache/iceberg/hive/HiveMetastoreExtension.java index c750ff4de62e..e76b4474fc80 100644 --- a/hive-metastore/src/test/java/org/apache/iceberg/hive/HiveMetastoreExtension.java +++ b/hive-metastore/src/test/java/org/apache/iceberg/hive/HiveMetastoreExtension.java @@ -40,16 +40,10 @@ private HiveMetastoreExtension(String databaseName, Map hiveConf @Override public void beforeAll(ExtensionContext extensionContext) throws Exception { - metastore = new TestHiveMetastore(); - HiveConf hiveConfWithOverrides = new HiveConf(TestHiveMetastore.class); - if (hiveConfOverride != null) { - for (Map.Entry kv : hiveConfOverride.entrySet()) { - hiveConfWithOverrides.set(kv.getKey(), kv.getValue()); - } - } + metastore = new TestHiveMetastore(hiveConfOverride); - metastore.start(hiveConfWithOverrides); - metastoreClient = new HiveMetaStoreClient(hiveConfWithOverrides); + metastore.start(new HiveConf(TestHiveMetastore.class)); + metastoreClient = new HiveMetaStoreClient(metastore.hiveConf()); if (null != databaseName) { String dbPath = metastore.getDatabasePath(databaseName); Database db = new Database(databaseName, "description", dbPath, Maps.newHashMap()); diff --git a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCommitLocks.java b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCommitLocks.java index d12a8503313b..447904aaa5b3 100644 --- a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCommitLocks.java +++ b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCommitLocks.java @@ -22,6 +22,7 @@ import static org.apache.iceberg.types.Types.NestedField.required; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.any; import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.doAnswer; @@ -63,6 +64,7 @@ import org.apache.iceberg.Schema; import org.apache.iceberg.Table; import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.TableProperties; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.exceptions.CommitFailedException; import org.apache.iceberg.hadoop.ConfigProperties; @@ -109,7 +111,12 @@ public class TestHiveCommitLocks { private static final HiveMetastoreExtension HIVE_METASTORE_EXTENSION = HiveMetastoreExtension.builder() .withDatabase(DB_NAME) - .withConfig(ImmutableMap.of(HiveConf.ConfVars.HIVE_TXN_TIMEOUT.varname, "1s")) + .withConfig( + ImmutableMap.of( + HiveConf.ConfVars.HIVE_TXN_TIMEOUT.varname, + "1s", + HiveConf.ConfVars.METASTORE_TRY_DIRECT_SQL.varname, + "true")) .build(); private static HiveCatalog catalog; @@ -205,6 +212,37 @@ public static void cleanup() { } } + @Test + public void testMultipleAlterTableForNoLock() throws Exception { + Table table = catalog.loadTable(TABLE_IDENTIFIER); + table.updateProperties().set(TableProperties.HIVE_LOCK_ENABLED, "false").commit(); + spyOps.refresh(); + TableMetadata metadataV3 = spyOps.current(); + AtomicReference alterTableException = new AtomicReference<>(null); + doAnswer( + i -> { + try { + // mock a situation where alter table is unexpectedly invoked more than once + i.callRealMethod(); + return i.callRealMethod(); + } catch (Throwable e) { + alterTableException.compareAndSet(null, e); + throw e; + } + }) + .when(spyClient) + .alter_table_with_environmentContext(anyString(), anyString(), any(), any()); + spyOps.commit(metadataV3, metadataV1); + verify(spyClient, times(1)) + .alter_table_with_environmentContext(anyString(), anyString(), any(), any()); + assertThat(alterTableException) + .as("Expecting to trigger an exception indicating table has been modified") + .hasValueMatching( + t -> + t.getMessage() + .contains("The table has been modified. The parameter value for key '")); + } + @Test public void testLockAcquisitionAtFirstTime() throws TException, InterruptedException { doReturn(acquiredLockResponse).when(spyClient).lock(any()); diff --git a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveMetastore.java b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveMetastore.java index c141f0cced02..310577231f0b 100644 --- a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveMetastore.java +++ b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveMetastore.java @@ -31,6 +31,7 @@ import java.sql.Connection; import java.sql.DriverManager; import java.sql.SQLException; +import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import org.apache.hadoop.conf.Configuration; @@ -128,6 +129,15 @@ public class TestHiveMetastore { private TServer server; private HiveMetaStore.HMSHandler baseHandler; private HiveClientPool clientPool; + private final Map hiveConfOverride; + + public TestHiveMetastore() { + this(null); + } + + public TestHiveMetastore(Map hiveConfOverride) { + this.hiveConfOverride = hiveConfOverride; + } /** * Starts a TestHiveMetastore with the default connection pool size (5) and the default HiveConf. @@ -271,6 +281,11 @@ private void initConf(HiveConf conf, int port) { // Setting this to avoid thrift exception during running Iceberg tests outside Iceberg. conf.set( HiveConf.ConfVars.HIVE_IN_TEST.varname, HiveConf.ConfVars.HIVE_IN_TEST.getDefaultValue()); + if (hiveConfOverride != null) { + for (Map.Entry kv : hiveConfOverride.entrySet()) { + conf.set(kv.getKey(), kv.getValue()); + } + } } private static void setupMetastoreDB(String dbURL) throws SQLException, IOException { diff --git a/hive-metastore/src/test/resources/hive-schema-3.1.0.derby.sql b/hive-metastore/src/test/resources/hive-schema-3.1.0.derby.sql index 55097d6639f2..fde283491596 100644 --- a/hive-metastore/src/test/resources/hive-schema-3.1.0.derby.sql +++ b/hive-metastore/src/test/resources/hive-schema-3.1.0.derby.sql @@ -130,7 +130,7 @@ CREATE TABLE "APP"."TAB_COL_STATS"( "BIT_VECTOR" BLOB ); -CREATE TABLE "APP"."TABLE_PARAMS" ("TBL_ID" BIGINT NOT NULL, "PARAM_KEY" VARCHAR(256) NOT NULL, "PARAM_VALUE" CLOB); +CREATE TABLE "APP"."TABLE_PARAMS" ("TBL_ID" BIGINT NOT NULL, "PARAM_KEY" VARCHAR(256) NOT NULL, "PARAM_VALUE" VARCHAR(32672)); CREATE TABLE "APP"."BUCKETING_COLS" ("SD_ID" BIGINT NOT NULL, "BUCKET_COL_NAME" VARCHAR(256), "INTEGER_IDX" INTEGER NOT NULL); From ed60da35d1ff007e8ff26f5505d3f0f268d7c6c2 Mon Sep 17 00:00:00 2001 From: Rui Li Date: Tue, 25 Mar 2025 17:06:27 +0800 Subject: [PATCH 2/7] fix test --- .../main/java/org/apache/iceberg/hive/HiveTableOperations.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java index 4357e139d8a1..b88e1dca507d 100644 --- a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java +++ b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java @@ -277,7 +277,6 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) { e); } - commitStatus = BaseMetastoreOperations.CommitStatus.UNKNOWN; if (e.getMessage() != null && e.getMessage() .contains( @@ -286,6 +285,7 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) { + "' is")) { commitStatus = handleConcurrentModification(e, newMetadataLocation, metadata); } else { + commitStatus = BaseMetastoreOperations.CommitStatus.UNKNOWN; LOG.error( "Cannot tell if commit to {}.{} succeeded, attempting to reconnect and check.", database, From 3fe077ae89e893c1836418920bf6b55d9fc8fbae Mon Sep 17 00:00:00 2001 From: Rui Li Date: Tue, 25 Mar 2025 18:39:40 +0800 Subject: [PATCH 3/7] address comments --- .../iceberg/hive/HiveTableOperations.java | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java index b88e1dca507d..48cc28fcede7 100644 --- a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java +++ b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java @@ -277,15 +277,22 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) { e); } + commitStatus = BaseMetastoreOperations.CommitStatus.UNKNOWN; if (e.getMessage() != null && e.getMessage() .contains( "The table has been modified. The parameter value for key '" + HiveTableOperations.METADATA_LOCATION_PROP + "' is")) { - commitStatus = handleConcurrentModification(e, newMetadataLocation, metadata); + // It's possible the HMS client incorrectly retries a successful operation, due to network + // issue for example, and triggers this exception. So we need double-check to + // make sure this is really a concurrent modification + commitStatus = handlePossibleConcurrentModification(newMetadataLocation, metadata); + if (commitStatus == BaseMetastoreOperations.CommitStatus.FAILURE) { + throw new CommitFailedException( + e, "The table %s.%s has been modified concurrently", database, tableName); + } } else { - commitStatus = BaseMetastoreOperations.CommitStatus.UNKNOWN; LOG.error( "Cannot tell if commit to {}.{} succeeded, attempting to reconnect and check.", database, @@ -324,8 +331,8 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) { "Committed to table {} with the new metadata location {}", fullName, newMetadataLocation); } - private BaseMetastoreOperations.CommitStatus handleConcurrentModification( - Throwable throwable, String newMetadataLocation, TableMetadata metadata) { + private BaseMetastoreOperations.CommitStatus handlePossibleConcurrentModification( + String newMetadataLocation, TableMetadata metadata) { Optional locationCommitted = metadataLocationCommitted( tableName(), @@ -336,8 +343,7 @@ private BaseMetastoreOperations.CommitStatus handleConcurrentModification( if (locationCommitted.get()) { return BaseMetastoreOperations.CommitStatus.SUCCESS; } else { - throw new CommitFailedException( - throwable, "The table %s.%s has been modified concurrently", database, tableName); + return BaseMetastoreOperations.CommitStatus.FAILURE; } } return BaseMetastoreOperations.CommitStatus.UNKNOWN; From 991a7838e00a696193a29fad9390e4bb979b36f6 Mon Sep 17 00:00:00 2001 From: Rui Li Date: Wed, 26 Mar 2025 12:34:56 +0800 Subject: [PATCH 4/7] add checkCommitStatusStrict --- .../iceberg/BaseMetastoreOperations.java | 57 ++++++++++++++----- .../iceberg/BaseMetastoreTableOperations.java | 30 +++++++++- .../iceberg/hive/HiveTableOperations.java | 25 ++------ 3 files changed, 73 insertions(+), 39 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/BaseMetastoreOperations.java b/core/src/main/java/org/apache/iceberg/BaseMetastoreOperations.java index bc8dd4981a13..7969b332afbd 100644 --- a/core/src/main/java/org/apache/iceberg/BaseMetastoreOperations.java +++ b/core/src/main/java/org/apache/iceberg/BaseMetastoreOperations.java @@ -50,41 +50,73 @@ public enum CommitStatus { * were attempting to set. This is used as a last resort when we are dealing with exceptions that * may indicate the commit has failed but don't have proof that this is the case. Note that all * the previous locations must also be searched on the chance that a second committer was able to - * successfully commit on top of our commit. + * successfully commit on top of our commit. When the {@code newMetadataLocation} is not in the + * history or the {@code commitStatusSupplier} fails repeatedly the method returns {@link + * CommitStatus#UNKNOWN}, because possible pending retries might still commit the change. * * @param tableOrViewName full name of the Table/View * @param newMetadataLocation the path of the new commit file * @param properties properties for retry * @param commitStatusSupplier check if the latest metadata presents or not using metadata * location for table. - * @return Commit Status of Success, Failure or Unknown + * @return Commit Status of Success or Unknown */ protected CommitStatus checkCommitStatus( String tableOrViewName, String newMetadataLocation, Map properties, Supplier commitStatusSupplier) { - if (metadataLocationCommitted( - tableOrViewName, newMetadataLocation, properties, commitStatusSupplier) - .orElse(false)) { - return CommitStatus.SUCCESS; + Optional committed = + metadataLocationCommitted( + tableOrViewName, newMetadataLocation, properties, commitStatusSupplier); + if (committed.isPresent()) { + if (committed.get()) { + return CommitStatus.SUCCESS; + } + LOG.warn( + "Commit status check: Commit to {} of {} unknown, new metadata location is not current " + + "or in history", + tableOrViewName, + newMetadataLocation); } return CommitStatus.UNKNOWN; } /** * Attempt to load the content and see if any current or past metadata location matches the one we - * were attempting to set. + * were attempting to set. This is used as a last resort when we are dealing with exceptions that + * may indicate the commit has failed and don't have proof that this is the case, but we can be + * sure that no retry attempts for the commit will be successful later. Note that all the previous + * locations must also be searched on the chance that a second committer was able to successfully + * commit on top of our commit. When the {@code newMetadataLocation} is not in the history the + * method returns {@link CommitStatus#FAILURE}, when the {@code commitStatusSupplier} fails + * repeatedly the method returns {@link CommitStatus#UNKNOWN}. * * @param tableOrViewName full name of the Table/View * @param newMetadataLocation the path of the new commit file * @param properties properties for retry * @param commitStatusSupplier check if the latest metadata presents or not using metadata * location for table. - * @return Empty if locations cannot be checked, e.g. unable to refresh. True if the new location - * is committed, false otherwise. + * @return Commit Status of Success, Failure or Unknown */ - protected Optional metadataLocationCommitted( + protected CommitStatus checkCommitStatusStrict( + String tableOrViewName, + String newMetadataLocation, + Map properties, + Supplier commitStatusSupplier) { + Optional committed = + metadataLocationCommitted( + tableOrViewName, newMetadataLocation, properties, commitStatusSupplier); + if (committed.isPresent()) { + if (committed.get()) { + return CommitStatus.SUCCESS; + } + return CommitStatus.FAILURE; + } + return CommitStatus.UNKNOWN; + } + + private Optional metadataLocationCommitted( String tableOrViewName, String newMetadataLocation, Map properties, @@ -124,11 +156,6 @@ protected Optional metadataLocationCommitted( newMetadataLocation); res.set(true); } else { - LOG.warn( - "Commit status check: Commit to {} of {} unknown, new metadata location is not current " - + "or in history", - tableOrViewName, - newMetadataLocation); res.set(false); } }); diff --git a/core/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java b/core/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java index 8e16476270e6..e56f2b20051f 100644 --- a/core/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java +++ b/core/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java @@ -23,6 +23,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.function.Predicate; +import org.apache.iceberg.BaseMetastoreOperations.CommitStatus; import org.apache.iceberg.encryption.EncryptionManager; import org.apache.iceberg.exceptions.AlreadyExistsException; import org.apache.iceberg.exceptions.CommitFailedException; @@ -286,11 +287,12 @@ public long newSnapshotId() { * were attempting to set. This is used as a last resort when we are dealing with exceptions that * may indicate the commit has failed but are not proof that this is the case. Past locations must * also be searched on the chance that a second committer was able to successfully commit on top - * of our commit. + * of our commit. When the {@code newMetadataLocation} is not found, the method returns {@link + * CommitStatus#UNKNOWN}. * * @param newMetadataLocation the path of the new commit file * @param config metadata to use for configuration - * @return Commit Status of Success, Failure or Unknown + * @return Commit Status of Success, Unknown */ protected CommitStatus checkCommitStatus(String newMetadataLocation, TableMetadata config) { return CommitStatus.valueOf( @@ -302,6 +304,28 @@ protected CommitStatus checkCommitStatus(String newMetadataLocation, TableMetada .name()); } + /** + * Attempt to load the table and see if any current or past metadata location matches the one we + * were attempting to set. This is used as a last resort when we are dealing with exceptions that + * may indicate the commit has failed but are not proof that this is the case. Past locations must + * also be searched on the chance that a second committer was able to successfully commit on top + * of our commit. When the {@code newMetadataLocation} is not found, the method returns {@link + * CommitStatus#FAILURE}. + * + * @param newMetadataLocation the path of the new commit file + * @param config metadata to use for configuration + * @return Commit Status of Success, Failure or Unknown + */ + protected CommitStatus checkCommitStatusStrict(String newMetadataLocation, TableMetadata config) { + return CommitStatus.valueOf( + checkCommitStatusStrict( + tableName(), + newMetadataLocation, + config.properties(), + () -> checkCurrentMetadataLocation(newMetadataLocation)) + .name()); + } + /** * Validate if the new metadata location is the current metadata location or present within * previous metadata files. @@ -310,7 +334,7 @@ protected CommitStatus checkCommitStatus(String newMetadataLocation, TableMetada * @return true if the new metadata location is the current metadata location or present within * previous metadata files. */ - protected boolean checkCurrentMetadataLocation(String newMetadataLocation) { + private boolean checkCurrentMetadataLocation(String newMetadataLocation) { TableMetadata metadata = refresh(); String currentMetadataFileLocation = metadata.metadataFileLocation(); return currentMetadataFileLocation.equals(newMetadataLocation) diff --git a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java index 48cc28fcede7..6e636e8afcbd 100644 --- a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java +++ b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java @@ -285,9 +285,10 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) { + HiveTableOperations.METADATA_LOCATION_PROP + "' is")) { // It's possible the HMS client incorrectly retries a successful operation, due to network - // issue for example, and triggers this exception. So we need double-check to - // make sure this is really a concurrent modification - commitStatus = handlePossibleConcurrentModification(newMetadataLocation, metadata); + // issue for example, and triggers this exception. So we need double-check to make sure + // this is really a concurrent modification. Hitting this exception means no pending + // requests, if any, can succeed later, so it's safe to check status in strict mode + commitStatus = checkCommitStatusStrict(newMetadataLocation, metadata); if (commitStatus == BaseMetastoreOperations.CommitStatus.FAILURE) { throw new CommitFailedException( e, "The table %s.%s has been modified concurrently", database, tableName); @@ -331,24 +332,6 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) { "Committed to table {} with the new metadata location {}", fullName, newMetadataLocation); } - private BaseMetastoreOperations.CommitStatus handlePossibleConcurrentModification( - String newMetadataLocation, TableMetadata metadata) { - Optional locationCommitted = - metadataLocationCommitted( - tableName(), - newMetadataLocation, - metadata.properties(), - () -> checkCurrentMetadataLocation(newMetadataLocation)); - if (locationCommitted.isPresent()) { - if (locationCommitted.get()) { - return BaseMetastoreOperations.CommitStatus.SUCCESS; - } else { - return BaseMetastoreOperations.CommitStatus.FAILURE; - } - } - return BaseMetastoreOperations.CommitStatus.UNKNOWN; - } - private void setHmsTableParameters( String newMetadataLocation, Table tbl, From bf871bc1e4b212c47a604361edc4fcbd1e7af683 Mon Sep 17 00:00:00 2001 From: Rui Li Date: Wed, 26 Mar 2025 14:56:01 +0800 Subject: [PATCH 5/7] refactor and enable direct sql globally --- .../iceberg/BaseMetastoreOperations.java | 40 +++++-------------- .../iceberg/hive/TestHiveMetastore.java | 2 +- 2 files changed, 11 insertions(+), 31 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/BaseMetastoreOperations.java b/core/src/main/java/org/apache/iceberg/BaseMetastoreOperations.java index 7969b332afbd..0635b56a7fba 100644 --- a/core/src/main/java/org/apache/iceberg/BaseMetastoreOperations.java +++ b/core/src/main/java/org/apache/iceberg/BaseMetastoreOperations.java @@ -28,7 +28,6 @@ import static org.apache.iceberg.TableProperties.COMMIT_STATUS_CHECKS_TOTAL_WAIT_MS_DEFAULT; import java.util.Map; -import java.util.Optional; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; import org.apache.iceberg.util.PropertyUtil; @@ -66,20 +65,18 @@ protected CommitStatus checkCommitStatus( String newMetadataLocation, Map properties, Supplier commitStatusSupplier) { - Optional committed = - metadataLocationCommitted( + CommitStatus strictStatus = + checkCommitStatusStrict( tableOrViewName, newMetadataLocation, properties, commitStatusSupplier); - if (committed.isPresent()) { - if (committed.get()) { - return CommitStatus.SUCCESS; - } + if (strictStatus == CommitStatus.FAILURE) { LOG.warn( "Commit status check: Commit to {} of {} unknown, new metadata location is not current " + "or in history", tableOrViewName, newMetadataLocation); + return CommitStatus.UNKNOWN; } - return CommitStatus.UNKNOWN; + return strictStatus; } /** @@ -104,23 +101,6 @@ protected CommitStatus checkCommitStatusStrict( String newMetadataLocation, Map properties, Supplier commitStatusSupplier) { - Optional committed = - metadataLocationCommitted( - tableOrViewName, newMetadataLocation, properties, commitStatusSupplier); - if (committed.isPresent()) { - if (committed.get()) { - return CommitStatus.SUCCESS; - } - return CommitStatus.FAILURE; - } - return CommitStatus.UNKNOWN; - } - - private Optional metadataLocationCommitted( - String tableOrViewName, - String newMetadataLocation, - Map properties, - Supplier commitStatusSupplier) { int maxAttempts = PropertyUtil.propertyAsInt( properties, COMMIT_NUM_STATUS_CHECKS, COMMIT_NUM_STATUS_CHECKS_DEFAULT); @@ -136,7 +116,7 @@ private Optional metadataLocationCommitted( COMMIT_STATUS_CHECKS_TOTAL_WAIT_MS, COMMIT_STATUS_CHECKS_TOTAL_WAIT_MS_DEFAULT); - AtomicReference res = new AtomicReference<>(null); + AtomicReference status = new AtomicReference<>(CommitStatus.UNKNOWN); Tasks.foreach(newMetadataLocation) .retry(maxAttempts) @@ -154,19 +134,19 @@ private Optional metadataLocationCommitted( "Commit status check: Commit to {} of {} succeeded", tableOrViewName, newMetadataLocation); - res.set(true); + status.set(CommitStatus.SUCCESS); } else { - res.set(false); + status.set(CommitStatus.FAILURE); } }); - if (res.get() == null) { + if (status.get() == CommitStatus.UNKNOWN) { LOG.error( "Cannot determine commit state to {}. Failed during checking {} times. " + "Treating commit state as unknown.", tableOrViewName, maxAttempts); } - return Optional.ofNullable(res.get()); + return status.get(); } } diff --git a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveMetastore.java b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveMetastore.java index 310577231f0b..3a222f451867 100644 --- a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveMetastore.java +++ b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveMetastore.java @@ -275,7 +275,7 @@ private void initConf(HiveConf conf, int port) { conf.set(HiveConf.ConfVars.METASTOREURIS.varname, "thrift://localhost:" + port); conf.set( HiveConf.ConfVars.METASTOREWAREHOUSE.varname, "file:" + HIVE_LOCAL_DIR.getAbsolutePath()); - conf.set(HiveConf.ConfVars.METASTORE_TRY_DIRECT_SQL.varname, "false"); + conf.set(HiveConf.ConfVars.METASTORE_TRY_DIRECT_SQL.varname, "true"); conf.set(HiveConf.ConfVars.METASTORE_DISALLOW_INCOMPATIBLE_COL_TYPE_CHANGES.varname, "false"); conf.set("iceberg.hive.client-pool-size", "2"); // Setting this to avoid thrift exception during running Iceberg tests outside Iceberg. From f5dae6b6ea98b37a475f999b0485def971bb175a Mon Sep 17 00:00:00 2001 From: Rui Li Date: Wed, 26 Mar 2025 19:53:15 +0800 Subject: [PATCH 6/7] avoid affecting spark tests --- .../iceberg/hive/HiveMetastoreExtension.java | 12 +++++-- .../iceberg/hive/TestHiveCommitLocks.java | 7 +--- .../iceberg/hive/TestHiveMetastore.java | 32 ++++++++----------- .../resources/hive-schema-3.1.0.derby.sql | 8 ++--- 4 files changed, 28 insertions(+), 31 deletions(-) diff --git a/hive-metastore/src/test/java/org/apache/iceberg/hive/HiveMetastoreExtension.java b/hive-metastore/src/test/java/org/apache/iceberg/hive/HiveMetastoreExtension.java index e76b4474fc80..fe37223423fa 100644 --- a/hive-metastore/src/test/java/org/apache/iceberg/hive/HiveMetastoreExtension.java +++ b/hive-metastore/src/test/java/org/apache/iceberg/hive/HiveMetastoreExtension.java @@ -40,10 +40,16 @@ private HiveMetastoreExtension(String databaseName, Map hiveConf @Override public void beforeAll(ExtensionContext extensionContext) throws Exception { - metastore = new TestHiveMetastore(hiveConfOverride); + metastore = new TestHiveMetastore(); + HiveConf hiveConfWithOverrides = new HiveConf(TestHiveMetastore.class); + if (hiveConfOverride != null) { + for (Map.Entry kv : hiveConfOverride.entrySet()) { + hiveConfWithOverrides.set(kv.getKey(), kv.getValue()); + } + } - metastore.start(new HiveConf(TestHiveMetastore.class)); - metastoreClient = new HiveMetaStoreClient(metastore.hiveConf()); + metastore.start(hiveConfWithOverrides, 5, true); + metastoreClient = new HiveMetaStoreClient(hiveConfWithOverrides); if (null != databaseName) { String dbPath = metastore.getDatabasePath(databaseName); Database db = new Database(databaseName, "description", dbPath, Maps.newHashMap()); diff --git a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCommitLocks.java b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCommitLocks.java index 447904aaa5b3..0ffcb057095f 100644 --- a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCommitLocks.java +++ b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCommitLocks.java @@ -111,12 +111,7 @@ public class TestHiveCommitLocks { private static final HiveMetastoreExtension HIVE_METASTORE_EXTENSION = HiveMetastoreExtension.builder() .withDatabase(DB_NAME) - .withConfig( - ImmutableMap.of( - HiveConf.ConfVars.HIVE_TXN_TIMEOUT.varname, - "1s", - HiveConf.ConfVars.METASTORE_TRY_DIRECT_SQL.varname, - "true")) + .withConfig(ImmutableMap.of(HiveConf.ConfVars.HIVE_TXN_TIMEOUT.varname, "1s")) .build(); private static HiveCatalog catalog; diff --git a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveMetastore.java b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveMetastore.java index 3a222f451867..9736b32e8727 100644 --- a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveMetastore.java +++ b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveMetastore.java @@ -31,7 +31,6 @@ import java.sql.Connection; import java.sql.DriverManager; import java.sql.SQLException; -import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import org.apache.hadoop.conf.Configuration; @@ -129,15 +128,6 @@ public class TestHiveMetastore { private TServer server; private HiveMetaStore.HMSHandler baseHandler; private HiveClientPool clientPool; - private final Map hiveConfOverride; - - public TestHiveMetastore() { - this(null); - } - - public TestHiveMetastore(Map hiveConfOverride) { - this.hiveConfOverride = hiveConfOverride; - } /** * Starts a TestHiveMetastore with the default connection pool size (5) and the default HiveConf. @@ -163,10 +153,21 @@ public void start(HiveConf conf) { * @param poolSize The number of threads in the executor pool */ public void start(HiveConf conf, int poolSize) { + start(conf, poolSize, false); + } + + /** + * Starts a TestHiveMetastore with a provided connection pool size and HiveConf. + * + * @param conf The hive configuration to use + * @param poolSize The number of threads in the executor pool + * @param directSql Used to turn on directSql + */ + public void start(HiveConf conf, int poolSize, boolean directSql) { try { TServerSocket socket = new TServerSocket(0); int port = socket.getServerSocket().getLocalPort(); - initConf(conf, port); + initConf(conf, port, directSql); this.hiveConf = conf; this.server = newThriftServer(socket, poolSize, hiveConf); @@ -271,21 +272,16 @@ private TServer newThriftServer(TServerSocket socket, int poolSize, HiveConf con return new TThreadPoolServer(args); } - private void initConf(HiveConf conf, int port) { + private void initConf(HiveConf conf, int port, boolean directSql) { conf.set(HiveConf.ConfVars.METASTOREURIS.varname, "thrift://localhost:" + port); conf.set( HiveConf.ConfVars.METASTOREWAREHOUSE.varname, "file:" + HIVE_LOCAL_DIR.getAbsolutePath()); - conf.set(HiveConf.ConfVars.METASTORE_TRY_DIRECT_SQL.varname, "true"); + conf.set(HiveConf.ConfVars.METASTORE_TRY_DIRECT_SQL.varname, String.valueOf(directSql)); conf.set(HiveConf.ConfVars.METASTORE_DISALLOW_INCOMPATIBLE_COL_TYPE_CHANGES.varname, "false"); conf.set("iceberg.hive.client-pool-size", "2"); // Setting this to avoid thrift exception during running Iceberg tests outside Iceberg. conf.set( HiveConf.ConfVars.HIVE_IN_TEST.varname, HiveConf.ConfVars.HIVE_IN_TEST.getDefaultValue()); - if (hiveConfOverride != null) { - for (Map.Entry kv : hiveConfOverride.entrySet()) { - conf.set(kv.getKey(), kv.getValue()); - } - } } private static void setupMetastoreDB(String dbURL) throws SQLException, IOException { diff --git a/hive-metastore/src/test/resources/hive-schema-3.1.0.derby.sql b/hive-metastore/src/test/resources/hive-schema-3.1.0.derby.sql index fde283491596..b7b095c81ac1 100644 --- a/hive-metastore/src/test/resources/hive-schema-3.1.0.derby.sql +++ b/hive-metastore/src/test/resources/hive-schema-3.1.0.derby.sql @@ -52,9 +52,9 @@ CREATE TABLE "APP"."DATABASE_PARAMS" ("DB_ID" BIGINT NOT NULL, "PARAM_KEY" VARCH CREATE TABLE "APP"."TBL_COL_PRIVS" ("TBL_COLUMN_GRANT_ID" BIGINT NOT NULL, "COLUMN_NAME" VARCHAR(767), "CREATE_TIME" INTEGER NOT NULL, "GRANT_OPTION" SMALLINT NOT NULL, "GRANTOR" VARCHAR(128), "GRANTOR_TYPE" VARCHAR(128), "PRINCIPAL_NAME" VARCHAR(128), "PRINCIPAL_TYPE" VARCHAR(128), "TBL_COL_PRIV" VARCHAR(128), "TBL_ID" BIGINT, "AUTHORIZER" VARCHAR(128)); -CREATE TABLE "APP"."SERDE_PARAMS" ("SERDE_ID" BIGINT NOT NULL, "PARAM_KEY" VARCHAR(256) NOT NULL, "PARAM_VALUE" CLOB); +CREATE TABLE "APP"."SERDE_PARAMS" ("SERDE_ID" BIGINT NOT NULL, "PARAM_KEY" VARCHAR(256) NOT NULL, "PARAM_VALUE" VARCHAR(32672)); -CREATE TABLE "APP"."COLUMNS_V2" ("CD_ID" BIGINT NOT NULL, "COMMENT" VARCHAR(4000), "COLUMN_NAME" VARCHAR(767) NOT NULL, "TYPE_NAME" CLOB, "INTEGER_IDX" INTEGER NOT NULL); +CREATE TABLE "APP"."COLUMNS_V2" ("CD_ID" BIGINT NOT NULL, "COMMENT" VARCHAR(4000), "COLUMN_NAME" VARCHAR(767) NOT NULL, "TYPE_NAME" VARCHAR(32672), "INTEGER_IDX" INTEGER NOT NULL); CREATE TABLE "APP"."SORT_COLS" ("SD_ID" BIGINT NOT NULL, "COLUMN_NAME" VARCHAR(767), "ORDER" INTEGER NOT NULL, "INTEGER_IDX" INTEGER NOT NULL); @@ -138,7 +138,7 @@ CREATE TABLE "APP"."TYPE_FIELDS" ("TYPE_NAME" BIGINT NOT NULL, "COMMENT" VARCHAR CREATE TABLE "APP"."NUCLEUS_TABLES" ("CLASS_NAME" VARCHAR(128) NOT NULL, "TABLE_NAME" VARCHAR(128) NOT NULL, "TYPE" VARCHAR(4) NOT NULL, "OWNER" VARCHAR(2) NOT NULL, "VERSION" VARCHAR(20) NOT NULL, "INTERFACE_NAME" VARCHAR(256) DEFAULT NULL); -CREATE TABLE "APP"."SD_PARAMS" ("SD_ID" BIGINT NOT NULL, "PARAM_KEY" VARCHAR(256) NOT NULL, "PARAM_VALUE" CLOB); +CREATE TABLE "APP"."SD_PARAMS" ("SD_ID" BIGINT NOT NULL, "PARAM_KEY" VARCHAR(256) NOT NULL, "PARAM_VALUE" VARCHAR(32672)); CREATE TABLE "APP"."SKEWED_STRING_LIST" ("STRING_LIST_ID" BIGINT NOT NULL); @@ -218,7 +218,7 @@ CREATE TABLE "APP"."MV_CREATION_METADATA" ( "CAT_NAME" VARCHAR(256) NOT NULL, "DB_NAME" VARCHAR(128) NOT NULL, "TBL_NAME" VARCHAR(256) NOT NULL, - "TXN_LIST" CLOB, + "TXN_LIST" VARCHAR(32672), "MATERIALIZATION_TIME" BIGINT NOT NULL ); From 5c5f6724ac4c8434925623a74e7ec61cc12c44d3 Mon Sep 17 00:00:00 2001 From: Rui Li Date: Wed, 2 Apr 2025 17:58:41 +0800 Subject: [PATCH 7/7] remove unnecessary parsing of CommitStatus --- .../iceberg/BaseMetastoreTableOperations.java | 24 ++++++++----------- .../iceberg/hive/HiveTableOperations.java | 4 +--- 2 files changed, 11 insertions(+), 17 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java b/core/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java index e56f2b20051f..9fa52d52ea5d 100644 --- a/core/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java +++ b/core/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java @@ -295,13 +295,11 @@ public long newSnapshotId() { * @return Commit Status of Success, Unknown */ protected CommitStatus checkCommitStatus(String newMetadataLocation, TableMetadata config) { - return CommitStatus.valueOf( - checkCommitStatus( - tableName(), - newMetadataLocation, - config.properties(), - () -> checkCurrentMetadataLocation(newMetadataLocation)) - .name()); + return checkCommitStatus( + tableName(), + newMetadataLocation, + config.properties(), + () -> checkCurrentMetadataLocation(newMetadataLocation)); } /** @@ -317,13 +315,11 @@ protected CommitStatus checkCommitStatus(String newMetadataLocation, TableMetada * @return Commit Status of Success, Failure or Unknown */ protected CommitStatus checkCommitStatusStrict(String newMetadataLocation, TableMetadata config) { - return CommitStatus.valueOf( - checkCommitStatusStrict( - tableName(), - newMetadataLocation, - config.properties(), - () -> checkCurrentMetadataLocation(newMetadataLocation)) - .name()); + return checkCommitStatusStrict( + tableName(), + newMetadataLocation, + config.properties(), + () -> checkCurrentMetadataLocation(newMetadataLocation)); } /** diff --git a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java index 6e636e8afcbd..0e801b57e5eb 100644 --- a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java +++ b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java @@ -299,9 +299,7 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) { database, tableName, e); - commitStatus = - BaseMetastoreOperations.CommitStatus.valueOf( - checkCommitStatus(newMetadataLocation, metadata).name()); + commitStatus = checkCommitStatus(newMetadataLocation, metadata); } switch (commitStatus) {