From 4fd138a5afe695ce5a6dd6c796265eeeca6065ab Mon Sep 17 00:00:00 2001 From: Laszlo Bodor Date: Fri, 25 Apr 2025 12:20:02 +0200 Subject: [PATCH 1/8] HIVE-28930: Implement a metastore service that expires iceberg table snapshots periodically --- .../org/apache/hadoop/hive/conf/HiveConf.java | 2 +- .../mr/hive/HiveIcebergStorageHandler.java | 16 +- .../iceberg/mr/hive/HiveIcebergUtil.java | 39 +++++ .../task/IcebergHouseKeeperService.java | 131 ++++++++++++++++ .../task/TestIcebergHouseKeeperService.java | 143 +++++++++++++++++ .../jdbc/miniHS2/StartMiniHS2Cluster.java | 4 +- .../org/apache/hive/jdbc/miniHS2/MiniHS2.java | 19 ++- .../hive/metastore/conf/MetastoreConf.java | 23 ++- .../hive/metastore/utils/TableFetcher.java | 148 ++++++++++++++++++ .../metastore/utils/TestTableFetcher.java | 53 +++++++ .../metastore/PartitionManagementTask.java | 64 +------- 11 files changed, 563 insertions(+), 79 deletions(-) create mode 100644 iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergUtil.java create mode 100644 iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/metastore/task/IcebergHouseKeeperService.java create mode 100644 iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/metastore/task/TestIcebergHouseKeeperService.java create mode 100644 standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/TableFetcher.java create mode 100644 standalone-metastore/metastore-common/src/test/java/org/apache/hadoop/hive/metastore/utils/TestTableFetcher.java diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 647836a7eb65..90d9b608d6d8 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2240,7 +2240,7 @@ public static enum ConfVars { "Use stats from iceberg table snapshot for query planning. This has two values metastore and iceberg"), HIVE_ICEBERG_EXPIRE_SNAPSHOT_NUMTHREADS("hive.iceberg.expire.snapshot.numthreads", 4, "The number of threads to be used for deleting files during expire snapshot. If set to 0 or below it uses the" + - " defult DirectExecutorService"), + " default DirectExecutorService"), HIVE_ICEBERG_MASK_DEFAULT_LOCATION("hive.iceberg.mask.default.location", false, "If this is set to true the URI for auth will have the default location masked with DEFAULT_TABLE_LOCATION"), diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java index 3702174ca2d0..49e2e0ad63ba 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java @@ -39,8 +39,6 @@ import java.util.Set; import java.util.UUID; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.commons.collections.MapUtils; @@ -1133,7 +1131,8 @@ private void deleteOrphanFiles(Table icebergTable, long timestampMillis, int num try { if (numThreads > 0) { LOG.info("Executing delete orphan files on iceberg table {} with {} threads", icebergTable.name(), numThreads); - deleteExecutorService = getDeleteExecutorService(icebergTable.name(), numThreads); + deleteExecutorService = HiveIcebergUtil.getDeleteExecutorService(icebergTable.name(), + numThreads); } HiveIcebergDeleteOrphanFiles deleteOrphanFiles = new HiveIcebergDeleteOrphanFiles(conf, icebergTable); @@ -1156,7 +1155,7 @@ private void expireSnapshot(Table icebergTable, AlterTableExecuteSpec.ExpireSnap try { if (numThreads > 0) { LOG.info("Executing expire snapshots on iceberg table {} with {} threads", icebergTable.name(), numThreads); - deleteExecutorService = getDeleteExecutorService(icebergTable.name(), numThreads); + deleteExecutorService = HiveIcebergUtil.getDeleteExecutorService(icebergTable.name(), numThreads); } if (expireSnapshotsSpec == null) { expireSnapshotWithDefaultParams(icebergTable, deleteExecutorService); @@ -1235,15 +1234,6 @@ private void expireSnapshotByIds(Table icebergTable, String[] idsToExpire, } } - private ExecutorService getDeleteExecutorService(String completeName, int numThreads) { - AtomicInteger deleteThreadsIndex = new AtomicInteger(0); - return Executors.newFixedThreadPool(numThreads, runnable -> { - Thread thread = new Thread(runnable); - thread.setName("remove-snapshot-" + completeName + "-" + deleteThreadsIndex.getAndIncrement()); - return thread; - }); - } - @Override public void alterTableSnapshotRefOperation(org.apache.hadoop.hive.ql.metadata.Table hmsTable, AlterTableSnapshotRefSpec alterTableSnapshotRefSpec) { diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergUtil.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergUtil.java new file mode 100644 index 000000000000..76a324c1c4ab --- /dev/null +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergUtil.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.mr.hive; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicInteger; + +public class HiveIcebergUtil { + + private HiveIcebergUtil() { + } + + public static ExecutorService getDeleteExecutorService(String completeName, int numThreads) { + AtomicInteger deleteThreadsIndex = new AtomicInteger(0); + return Executors.newFixedThreadPool(numThreads, runnable -> { + Thread thread = new Thread(runnable); + thread.setName("remove-snapshot-" + completeName + "-" + deleteThreadsIndex.getAndIncrement()); + return thread; + }); + } +} diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/metastore/task/IcebergHouseKeeperService.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/metastore/task/IcebergHouseKeeperService.java new file mode 100644 index 000000000000..a8158f3a7e92 --- /dev/null +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/metastore/task/IcebergHouseKeeperService.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.iceberg.mr.hive.metastore.task; + +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.common.TableName; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.MetastoreTaskThread; +import org.apache.hadoop.hive.metastore.api.GetTableRequest; +import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; +import org.apache.hadoop.hive.metastore.utils.TableFetcher; +import org.apache.iceberg.ExpireSnapshots; +import org.apache.iceberg.Table; +import org.apache.iceberg.mr.hive.HiveIcebergUtil; +import org.apache.iceberg.mr.hive.IcebergTableUtil; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; +import org.apache.thrift.TException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class IcebergHouseKeeperService implements MetastoreTaskThread { + private static final Logger LOG = LoggerFactory.getLogger(IcebergHouseKeeperService.class); + + private Configuration conf; + + @Override + public long runFrequency(TimeUnit unit) { + return MetastoreConf.getTimeVar(conf, MetastoreConf.ConfVars.ICEBERG_TABLE_EXPIRY_INTERVAL, unit); + } + + @Override + public void run() { + LOG.debug("Running IcebergHouseKeeperService..."); + + String catalogName = MetastoreConf.getVar(conf, MetastoreConf.ConfVars.ICEBERG_TABLE_EXPIRY_CATALOG_NAME); + String dbPattern = MetastoreConf.getVar(conf, MetastoreConf.ConfVars.ICEBERG_TABLE_EXPIRY_DATABASE_PATTERN); + String tablePattern = MetastoreConf.getVar(conf, MetastoreConf.ConfVars.ICEBERG_TABLE_EXPIRY_TABLE_PATTERN); + + try (IMetaStoreClient msc = new HiveMetaStoreClient(conf)) { + // TODO: Future improvement – modify TableFetcher to return HMS Table API objects directly, + // avoiding the need for subsequent msc.getTable calls to fetch each matched table individually + List tables = getTableFetcher(msc, catalogName, dbPattern, tablePattern).getTables(); + + LOG.debug("{} candidate tables found", tables.size()); + + for (TableName table : tables) { + expireSnapshotsForTable(getIcebergTable(table, msc)); + } + } catch (Exception e) { + LOG.error("Exception while running iceberg expiry service on catalog/db/table: {}/{}/{}", catalogName, dbPattern, + tablePattern, e); + } + } + + @VisibleForTesting + TableFetcher getTableFetcher(IMetaStoreClient msc, String catalogName, String dbPattern, String tablePattern) { + return new TableFetcher.Builder(msc, catalogName, dbPattern, tablePattern).tableTypes( + "EXTERNAL_TABLE") + .tableCondition( + hive_metastoreConstants.HIVE_FILTER_FIELD_PARAMS + "table_type like \"ICEBERG\" ") + .build(); + } + + private Table getIcebergTable(TableName table, IMetaStoreClient msc) throws TException { + GetTableRequest request = new GetTableRequest(table.getDb(), table.getTable()); + return IcebergTableUtil.getTable(conf, msc.getTable(request)); + } + + /** + * Deletes snapshots of an Iceberg table, using the number of threads defined by the + * Hive config HIVE_ICEBERG_EXPIRE_SNAPSHOT_NUMTHREADS. + * This is largely equivalent to the HiveIcebergStorageHandler.expireSnapshotWithDefaultParams method. + * + * @param icebergTable the iceberg Table reference + */ + private void expireSnapshotsForTable(Table icebergTable) { + LOG.info("Expire snapshots for: {}", icebergTable); + ExpireSnapshots expireSnapshots = icebergTable.expireSnapshots(); + + int numThreads = conf.getInt(HiveConf.ConfVars.HIVE_ICEBERG_EXPIRE_SNAPSHOT_NUMTHREADS.varname, + HiveConf.ConfVars.HIVE_ICEBERG_EXPIRE_SNAPSHOT_NUMTHREADS.defaultIntVal); + + ExecutorService deleteExecutorService = null; + try { + if (numThreads > 0) { + LOG.info("Executing expire snapshots on iceberg table {} with {} threads", icebergTable.name(), numThreads); + deleteExecutorService = HiveIcebergUtil.getDeleteExecutorService(icebergTable.name(), numThreads); + } + if (deleteExecutorService != null) { + expireSnapshots.executeDeleteWith(deleteExecutorService); + } + expireSnapshots.commit(); + } finally { + if (deleteExecutorService != null) { + deleteExecutorService.shutdown(); + } + } + } + + @Override + public Configuration getConf() { + return conf; + } + + @Override + public void setConf(Configuration configuration) { + conf = configuration; + } +} diff --git a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/metastore/task/TestIcebergHouseKeeperService.java b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/metastore/task/TestIcebergHouseKeeperService.java new file mode 100644 index 000000000000..cd77beac66f2 --- /dev/null +++ b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/metastore/task/TestIcebergHouseKeeperService.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.iceberg.mr.hive.metastore.task; + +import java.io.File; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import org.apache.hadoop.hive.common.TableName; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.TableType; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.GetTableRequest; +import org.apache.hadoop.hive.metastore.utils.TableFetcher; +import org.apache.hadoop.hive.ql.metadata.Hive; +import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DataFiles; +import org.apache.iceberg.mr.hive.IcebergTableUtil; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TestIcebergHouseKeeperService { + private static final Logger LOG = LoggerFactory.getLogger(TestIcebergHouseKeeperService.class); + + private static final HiveConf conf = new HiveConf(TestIcebergHouseKeeperService.class); + private static Hive db; + + @BeforeClass + public static void beforeClass() throws Exception { + conf.set("hive.security.authorization.enabled", "false"); + conf.set("hive.security.authorization.manager", + "org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdConfOnlyAuthorizerFactory"); + conf.set("iceberg.engine.hive.lock-enabled", "false"); + + db = Hive.get(conf); + } + + @AfterClass + public static void afterClass() { + db.close(true); + } + + @Test + public void testIcebergTableFetched() throws Exception { + createIcebergTable("iceberg_table"); + + IcebergHouseKeeperService service = new IcebergHouseKeeperService(); + TableFetcher tableFetcher = service.getTableFetcher(db.getMSC(), null, "default", "*"); + + List tables = tableFetcher.getTables(); + Assert.assertEquals(new TableName("hive", "default", "iceberg_table"), tables.get(0)); + } + + @Test + public void testExpireSnapshotsByServiceRun() throws Exception { + String tableName = "iceberg_table_snapshot_expiry_e2e_test"; + createIcebergTable(tableName); + IcebergHouseKeeperService service = getServiceForTable("default", tableName); + + GetTableRequest request = new GetTableRequest("default", tableName); + org.apache.iceberg.Table icebergTable = IcebergTableUtil.getTable(conf, db.getMSC().getTable(request)); + + String metadataDirectory = icebergTable.location().replaceAll("^[a-zA-Z]+:", "") + "/metadata"; + + DataFile datafile = DataFiles.builder(icebergTable.spec()) + .withRecordCount(3) + .withPath("/tmp/file.parquet") + .withFileSizeInBytes(10) + .build(); + + icebergTable.newAppend().appendFile(datafile).commit(); + assertSnapshotFiles(metadataDirectory, 1); + icebergTable.newAppend().appendFile(datafile).commit(); + assertSnapshotFiles(metadataDirectory, 2); + + Thread.sleep(1000); // allow snapshots that are 1000ms old to become eligible for snapshot expiry + service.run(); + + assertSnapshotFiles(metadataDirectory, 1); + db.dropTable("default", "iceberg_table_snapshot_expiry_e2e_test"); + } + + private void createIcebergTable(String name) throws Exception { + Table table = new Table("default", name); + List columns = Lists.newArrayList(); + columns.add(new FieldSchema("col", "string", "First column")); + table.setFields(columns); // Set columns + + table.setProperty("EXTERNAL", "TRUE"); + table.setTableType(TableType.EXTERNAL_TABLE); + table.setProperty("table_type", "ICEBERG"); + + table.setProperty("history.expire.max-snapshot-age-ms", "500"); + + db.createTable(table); + } + + /** + * Creates IcebergHouseKeeperService that's configured to clean up a table by database and table name. + * + * @param tableName to be cleaned up + * @return IcebergHouseKeeperService + */ + private IcebergHouseKeeperService getServiceForTable(String dbName, String tableName) { + IcebergHouseKeeperService service = new IcebergHouseKeeperService(); + HiveConf serviceConf = new HiveConf(conf); + serviceConf.set("hive.metastore.iceberg.table.expiry.database.pattern", dbName); + serviceConf.set("hive.metastore.iceberg.table.expiry.table.pattern", tableName); + service.setConf(serviceConf); + return service; + } + + private void assertSnapshotFiles(String metadataDirectory, int numberForSnapshotFiles) { + File[] matchingFiles = new File(metadataDirectory).listFiles((dir, name) -> name.startsWith("snap-")); + List files = Optional.ofNullable(matchingFiles).map(Arrays::asList).orElse(Collections.emptyList()); + LOG.debug("Snapshot files found in directory({}): {}", metadataDirectory, files); + Assert.assertEquals(String.format("Unexpected no. of snapshot files in metadata directory: %s", + metadataDirectory), numberForSnapshotFiles, files.size()); + } +} diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/miniHS2/StartMiniHS2Cluster.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/miniHS2/StartMiniHS2Cluster.java index 6edd85f6172e..553053e9e5a7 100644 --- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/miniHS2/StartMiniHS2Cluster.java +++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/miniHS2/StartMiniHS2Cluster.java @@ -50,6 +50,7 @@ public void testRunCluster() throws Exception { String confFilesProperty = System.getProperty("miniHS2.conf", "../../data/conf/hive-site.xml"); boolean usePortsFromConf = Boolean.parseBoolean(System.getProperty("miniHS2.usePortsFromConf", "false")); boolean isMetastoreRemote = Boolean.getBoolean("miniHS2.isMetastoreRemote"); + boolean withHouseKeepingThreads = Boolean.getBoolean("miniHS2.withHouseKeepingThreads"); boolean queryHistory = Boolean.getBoolean("miniHS2.queryHistory"); // Load conf files @@ -77,7 +78,8 @@ public void testRunCluster() throws Exception { } miniHS2 = new MiniHS2.Builder().withConf(conf).withClusterType(clusterType).withPortsFromConf(usePortsFromConf) - .withRemoteMetastore(isMetastoreRemote).withQueryHistory(queryHistory).build(); + .withRemoteMetastore(isMetastoreRemote).withHouseKeepingThreads(withHouseKeepingThreads) + .withQueryHistory(queryHistory).build(); Map confOverlay = new HashMap(); miniHS2.start(confOverlay); diff --git a/itests/util/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java b/itests/util/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java index 166ab90f642c..2c27daf29a03 100644 --- a/itests/util/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java +++ b/itests/util/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java @@ -86,6 +86,7 @@ public class MiniHS2 extends AbstractHiveService { private MiniClusterType miniClusterType = MiniClusterType.LOCALFS_ONLY; private boolean usePortsFromConf = false; private PamAuthenticator pamAuthenticator; + private boolean withHouseKeepingThreads; private boolean createTransactionalTables; private int hmsPort = 0; @@ -117,6 +118,7 @@ public static class Builder { private String authType = "KERBEROS"; private boolean isHA = false; private boolean cleanupLocalDirOnStartup = true; + private boolean withHouseKeepingThreads = false; private boolean createTransactionalTables = true; private boolean isMetastoreSecure; private String metastoreServerPrincipal; @@ -167,6 +169,11 @@ public Builder withRemoteMetastore(boolean isMetastoreRemote) { return this; } + public Builder withHouseKeepingThreads(boolean withHouseKeepingThreads) { + this.withHouseKeepingThreads = withHouseKeepingThreads; + return this; + } + public Builder withPortsFromConf(boolean usePortsFromConf) { this.usePortsFromConf = usePortsFromConf; return this; @@ -240,8 +247,8 @@ public MiniHS2 build() throws Exception { hiveConf.setBoolVar(ConfVars.HIVE_QUERY_HISTORY_ENABLED, useQueryHistory); return new MiniHS2(hiveConf, miniClusterType, useMiniKdc, serverPrincipal, serverKeytab, - isMetastoreRemote, createTransactionalTables, usePortsFromConf, authType, isHA, cleanupLocalDirOnStartup, - isMetastoreSecure, metastoreServerPrincipal, metastoreServerKeyTab, dataNodes); + isMetastoreRemote, withHouseKeepingThreads, createTransactionalTables, usePortsFromConf, authType, isHA, + cleanupLocalDirOnStartup, isMetastoreSecure, metastoreServerPrincipal, metastoreServerKeyTab, dataNodes); } } @@ -278,7 +285,8 @@ public boolean isUseMiniKdc() { } private MiniHS2(HiveConf hiveConf, MiniClusterType miniClusterType, boolean useMiniKdc, - String serverPrincipal, String serverKeytab, boolean isMetastoreRemote, boolean createTransactionalTables, + String serverPrincipal, String serverKeytab, boolean isMetastoreRemote, + boolean withHouseKeepingThreads, boolean createTransactionalTables, boolean usePortsFromConf, String authType, boolean isHA, boolean cleanupLocalDirOnStartup, boolean isMetastoreSecure, String metastoreServerPrincipal, String metastoreKeyTab, int dataNodes) throws Exception { @@ -306,6 +314,7 @@ private MiniHS2(HiveConf hiveConf, MiniClusterType miniClusterType, boolean useM this.isMetastoreSecure = isMetastoreSecure; this.cleanupLocalDirOnStartup = cleanupLocalDirOnStartup; this.usePortsFromConf = usePortsFromConf; + this.withHouseKeepingThreads = withHouseKeepingThreads; this.createTransactionalTables = createTransactionalTables; baseDir = getBaseDir(); localFS = FileSystem.getLocal(hiveConf); @@ -405,14 +414,14 @@ public MiniHS2(HiveConf hiveConf, MiniClusterType clusterType) throws Exception public MiniHS2(HiveConf hiveConf, MiniClusterType clusterType, boolean usePortsFromConf, boolean isMetastoreRemote) throws Exception { this(hiveConf, clusterType, false, null, null, - isMetastoreRemote, true, usePortsFromConf, "KERBEROS", false, true, + isMetastoreRemote, false, true, usePortsFromConf, "KERBEROS", false, true, false, null, null, DEFAULT_DATANODE_COUNT); } public void start(Map confOverlay) throws Exception { if (isMetastoreRemote) { hmsPort = MetaStoreTestUtils.startMetaStoreWithRetry(HadoopThriftAuthBridge.getBridge(), getHiveConf(), - false, false, false, false, createTransactionalTables); + true, false, withHouseKeepingThreads, false, createTransactionalTables); setWareHouseDir(MetastoreConf.getVar(getHiveConf(), MetastoreConf.ConfVars.WAREHOUSE)); } diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java index 6f13260bd3a1..2c31d3958c9f 100644 --- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java +++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java @@ -105,7 +105,9 @@ public class MetastoreConf { @VisibleForTesting static final String ACID_OPEN_TXNS_COUNTER_SERVICE_CLASS = "org.apache.hadoop.hive.metastore.txn.service.AcidOpenTxnsCounterService"; - + @VisibleForTesting + static final String ICEBERG_TABLE_SNAPSHOT_EXPIRY_SERVICE_CLASS = + "org.apache.iceberg.mr.hive.metastore.task.IcebergHouseKeeperService"; public static final String METASTORE_AUTHENTICATION_LDAP_USERMEMBERSHIPKEY_NAME = "metastore.authentication.ldap.userMembershipKey"; public static final String METASTORE_RETRYING_HANDLER_CLASS = @@ -916,6 +918,22 @@ public enum ConfVars { HMS_HANDLER_PROXY_CLASS("metastore.hmshandler.proxy", "hive.metastore.hmshandler.proxy", METASTORE_RETRYING_HANDLER_CLASS, "The proxy class name of HMSHandler, default is RetryingHMSHandler."), + ICEBERG_TABLE_EXPIRY_INTERVAL("metastore.iceberg.table.expiry.interval", + "hive.metastore.iceberg.table.expiry.interval", 3600, TimeUnit.SECONDS, + "Time interval describing how often the iceberg table expiry service runs."), + ICEBERG_TABLE_EXPIRY_CATALOG_NAME("metastore.iceberg.table.expiry.catalog.name", + "hive.metastore.iceberg.table.expiry.catalog.name", "hive", + "Iceberg table expiry service looks for tables under the specified catalog name"), + ICEBERG_TABLE_EXPIRY_DATABASE_PATTERN("metastore.iceberg.table.expiry.database.pattern", + "hive.metastore.iceberg.table.expiry.database.pattern", "none", + "Iceberg table expiry service searches for tables using the specified database pattern. " + + "By default, the pattern is set to 'none', which results in no matches (this is intentional" + + "to avoid expensive metastore calls unless explicitly configured by the user)."), + ICEBERG_TABLE_EXPIRY_TABLE_PATTERN("metastore.iceberg.table.expiry.table.pattern", + "hive.metastore.iceberg.table.expiry.table.pattern", "none", + "Iceberg table expiry service tables for tables using the specified table pattern. " + + "By default, the pattern is set to 'none', which results in no matches (this is intentional" + + "to avoid expensive metastore calls unless explicitly configured by the user)."), IDENTIFIER_FACTORY("datanucleus.identifierFactory", "datanucleus.identifierFactory", "datanucleus1", "Name of the identifier factory to use when generating table/column names etc. \n" + @@ -1493,7 +1511,8 @@ public enum ConfVars { ACID_TXN_CLEANER_SERVICE_CLASS + "," + ACID_OPEN_TXNS_COUNTER_SERVICE_CLASS + "," + MATERIALZIATIONS_REBUILD_LOCK_CLEANER_TASK_CLASS + "," + - PARTITION_MANAGEMENT_TASK_CLASS, + PARTITION_MANAGEMENT_TASK_CLASS + "," + + ICEBERG_TABLE_SNAPSHOT_EXPIRY_SERVICE_CLASS, "Comma-separated list of tasks that will be started in separate threads. These will be" + " started only when the metastore is running as a separate service. They must " + "implement " + METASTORE_TASK_THREAD_CLASS), diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/TableFetcher.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/TableFetcher.java new file mode 100644 index 000000000000..65b19a8f562c --- /dev/null +++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/TableFetcher.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.metastore.utils; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.hive.common.TableName; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.TableType; +import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Optional; +import java.util.Set; + +public class TableFetcher { + private static final Logger LOG = LoggerFactory.getLogger(TableFetcher.class); + + // mandatory client passed to this fetcher, has to be closed from caller + private final IMetaStoreClient client; + // mandatory catalogName + private final String catalogName; + // mandatory dbPattern: use "*" to fetch all, empty to fetch none + private final String dbPattern; + // optional tableTypes: comma separated table types to fetch, fetcher result is empty list if this is empty + // typical value: "MANAGED_TABLE,EXTERNAL_TABLE" + @VisibleForTesting + final Set tableTypes = new HashSet<>(); + // tableFilter built from the input tablePattern and custom table conditions + @VisibleForTesting + String tableFilter; + + private TableFetcher(Builder builder) { + this.client = builder.client; + if ("*".equalsIgnoreCase(builder.catalogName)) { + LOG.warn("Invalid wildcard '*' parameter for catalogName, exact catalog name is expected instead of regexp"); + } + this.catalogName = Optional.ofNullable(builder.catalogName).orElse("hive"); + this.dbPattern = Optional.ofNullable(builder.dbPattern).orElse(""); + String tablePattern = Optional.ofNullable(builder.tablePattern).orElse(""); + String stringTableTypes = Optional.ofNullable(builder.tableTypes).orElse(""); + + for (String type : stringTableTypes.split(",")) { + try { + tableTypes.add(TableType.valueOf(type.trim().toUpperCase()).name()); + } catch (IllegalArgumentException e) { + LOG.warn("Unknown table type: {}", type); + } + } + + buildTableFilter(tablePattern, builder.tableConditions); + } + + private void buildTableFilter(String tablePattern, List conditions) { + boolean external = tableTypes.contains(TableType.EXTERNAL_TABLE.name()); + boolean managed = tableTypes.contains(TableType.MANAGED_TABLE.name()); + if (!managed && external) { + // only for external tables + conditions.add( + hive_metastoreConstants.HIVE_FILTER_FIELD_TABLE_TYPE + " = \"" + TableType.EXTERNAL_TABLE.name() + "\" "); + } else if (managed && !external) { + // only for managed tables + conditions.add( + hive_metastoreConstants.HIVE_FILTER_FIELD_TABLE_TYPE + " = \"" + TableType.MANAGED_TABLE.name() + "\" "); + } + if (!tablePattern.trim().isEmpty()) { + conditions.add(hive_metastoreConstants.HIVE_FILTER_FIELD_TABLE_NAME + " like \"" + + tablePattern.replaceAll("\\*", ".*") + "\""); + } + this.tableFilter = String.join(" and ", conditions); + } + + public List getTables() throws Exception { + List candidates = new ArrayList<>(); + + // if tableTypes is empty, then a list with single empty string has to specified to scan no tables. + if (tableTypes.isEmpty()) { + LOG.info("Table fetcher returns empty list as no table types specified"); + return candidates; + } + + List databases = client.getDatabases(catalogName, dbPattern); + + for (String db : databases) { + Database database = client.getDatabase(catalogName, db); + if (MetaStoreUtils.checkIfDbNeedsToBeSkipped(database)) { + LOG.debug("Skipping table under database: {}", db); + continue; + } + if (MetaStoreUtils.isDbBeingPlannedFailedOver(database)) { + LOG.info("Skipping table that belongs to database {} being failed over.", db); + continue; + } + List tablesNames = client.listTableNamesByFilter(catalogName, db, tableFilter, -1); + tablesNames.forEach(tablesName -> candidates.add(TableName.fromString(tablesName, catalogName, db))); + } + return candidates; + } + + public static class Builder { + private final IMetaStoreClient client; + private final String catalogName; + private final String dbPattern; + private final String tablePattern; + private final List tableConditions = new ArrayList<>(); + private String tableTypes; + + public Builder(IMetaStoreClient client, String catalogName, String dbPattern, String tablePattern) { + this.client = client; + this.catalogName = catalogName; + this.dbPattern = dbPattern; + this.tablePattern = tablePattern; + } + + public Builder tableTypes(String tableTypes) { + this.tableTypes = tableTypes; + return this; + } + + public Builder tableCondition(String condition) { + this.tableConditions.add(condition); + return this; + } + + public TableFetcher build() { + return new TableFetcher(this); + } + } +} \ No newline at end of file diff --git a/standalone-metastore/metastore-common/src/test/java/org/apache/hadoop/hive/metastore/utils/TestTableFetcher.java b/standalone-metastore/metastore-common/src/test/java/org/apache/hadoop/hive/metastore/utils/TestTableFetcher.java new file mode 100644 index 000000000000..eb07f34a2696 --- /dev/null +++ b/standalone-metastore/metastore-common/src/test/java/org/apache/hadoop/hive/metastore/utils/TestTableFetcher.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.metastore.utils; + +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; +import org.junit.Assert; +import org.junit.Test; + +import static org.mockito.Mockito.mock; + +public class TestTableFetcher { + + @Test + public void testEmpty() { + TableFetcher fetcher = new TableFetcher.Builder(mock(IMetaStoreClient.class), null, null, null).build(); + // full empty parameter list leads to empty table filter + Assert.assertEquals("", fetcher.tableFilter); + } + + @Test + public void testAsterisk() { + TableFetcher fetcher = new TableFetcher.Builder(mock(IMetaStoreClient.class), "hive", "*", "*").build(); + // full empty parameter list leads to empty table filter + Assert.assertEquals("hive_filter_field_tableName__ like \".*\"", fetcher.tableFilter); + } + + @Test + public void testCustomCondition() { + TableFetcher fetcher = new TableFetcher.Builder(mock(IMetaStoreClient.class), "hive", "*", "*") + .tableCondition(hive_metastoreConstants.HIVE_FILTER_FIELD_PARAMS + "table_param like \"some_value\" ") + .build(); + // full empty parameter list leads to empty table filter + Assert.assertEquals( + "hive_filter_field_params__table_param like \"some_value\" and hive_filter_field_tableName__ like \".*\"", + fetcher.tableFilter); + } +} diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/PartitionManagementTask.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/PartitionManagementTask.java index ef0c4dcac47d..ca396fad70cb 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/PartitionManagementTask.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/PartitionManagementTask.java @@ -18,11 +18,8 @@ package org.apache.hadoop.hive.metastore; -import java.util.ArrayList; -import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -32,12 +29,11 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.common.TableName; -import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.conf.TimeValidator; -import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; +import org.apache.hadoop.hive.metastore.utils.TableFetcher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -100,58 +96,12 @@ public void run() { String dbPattern = MetastoreConf.getVar(conf, MetastoreConf.ConfVars.PARTITION_MANAGEMENT_DATABASE_PATTERN); String tablePattern = MetastoreConf.getVar(conf, MetastoreConf.ConfVars.PARTITION_MANAGEMENT_TABLE_PATTERN); String tableTypes = MetastoreConf.getVar(conf, MetastoreConf.ConfVars.PARTITION_MANAGEMENT_TABLE_TYPES); - Set tableTypesSet = new HashSet<>(); - for (String type : tableTypes.split(",")) { - try { - tableTypesSet.add(TableType.valueOf(type.trim().toUpperCase()).name()); - } catch (IllegalArgumentException e) { - // ignore - LOG.warn("Unknown table type: {}", type); - } - } - // if tableTypes is empty, then a list with single empty string has to specified to scan no tables. - // specifying empty here is equivalent to disabling the partition discovery altogether as it scans no tables. - if (tableTypesSet.isEmpty()) { - LOG.info("Skipping partition management as no table types specified"); - return; - } - - StringBuilder filterBuilder = new StringBuilder() - .append(hive_metastoreConstants.HIVE_FILTER_FIELD_PARAMS) - .append("discover__partitions").append(" like \"true\" "); - boolean external = tableTypesSet.contains(TableType.EXTERNAL_TABLE.name()); - boolean managed = tableTypesSet.contains(TableType.MANAGED_TABLE.name()); - if (!managed && external) { - // only for external tables - filterBuilder.append(" and ").append(hive_metastoreConstants.HIVE_FILTER_FIELD_TABLE_TYPE) - .append(" = \"").append(TableType.EXTERNAL_TABLE.name()).append("\" "); - } else if (managed && !external) { - // only for managed tables - filterBuilder.append(" and ").append(hive_metastoreConstants.HIVE_FILTER_FIELD_TABLE_TYPE) - .append(" = \"").append(TableType.MANAGED_TABLE.name()).append("\" "); - } - if (!tablePattern.trim().isEmpty()) { - filterBuilder.append(" and ") - .append(hive_metastoreConstants.HIVE_FILTER_FIELD_TABLE_NAME) - .append(" like \"").append(tablePattern.replaceAll("\\*", ".*")).append("\""); - } - - List databases = msc.getDatabases(catalogName, dbPattern); - List candidates = new ArrayList<>(); - for (String db : databases) { - Database database = msc.getDatabase(catalogName, db); - if (MetaStoreUtils.checkIfDbNeedsToBeSkipped(database)) { - LOG.debug("Skipping table under database: {}", db); - continue; - } - if (MetaStoreUtils.isDbBeingPlannedFailedOver(database)) { - LOG.info("Skipping table belongs to database {} being failed over.", db); - continue; - } - List tablesNames = msc.listTableNamesByFilter(catalogName, db, - filterBuilder.toString(), -1); - tablesNames.forEach(tablesName -> candidates.add(TableName.fromString(tablesName, catalogName, db))); - } + List candidates = + new TableFetcher.Builder(msc, catalogName, dbPattern, tablePattern).tableTypes(tableTypes) + .tableCondition( + hive_metastoreConstants.HIVE_FILTER_FIELD_PARAMS + "discover__partitions like \"true\" ") + .build() + .getTables(); if (candidates.isEmpty()) { LOG.info("Got empty table list in catalog: {}, dbPattern: {}", catalogName, dbPattern); From b88bb5073b60b057bd2994c7cb93ca930f896e72 Mon Sep 17 00:00:00 2001 From: Laszlo Bodor Date: Tue, 13 May 2025 09:59:24 +0200 Subject: [PATCH 2/8] PR comments --- .../mr/hive/HiveIcebergStorageHandler.java | 4 +- .../iceberg/mr/hive/HiveIcebergUtil.java | 39 ------------ .../iceberg/mr/hive/IcebergTableUtil.java | 11 ++++ .../task/IcebergHouseKeeperService.java | 59 ++++++++++++++----- .../hadoop/hive/metastore/txn/TxnStore.java | 2 +- 5 files changed, 58 insertions(+), 57 deletions(-) delete mode 100644 iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergUtil.java diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java index 49e2e0ad63ba..0a967cce6852 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java @@ -1131,7 +1131,7 @@ private void deleteOrphanFiles(Table icebergTable, long timestampMillis, int num try { if (numThreads > 0) { LOG.info("Executing delete orphan files on iceberg table {} with {} threads", icebergTable.name(), numThreads); - deleteExecutorService = HiveIcebergUtil.getDeleteExecutorService(icebergTable.name(), + deleteExecutorService = IcebergTableUtil.newDeleteThreadPool(icebergTable.name(), numThreads); } @@ -1155,7 +1155,7 @@ private void expireSnapshot(Table icebergTable, AlterTableExecuteSpec.ExpireSnap try { if (numThreads > 0) { LOG.info("Executing expire snapshots on iceberg table {} with {} threads", icebergTable.name(), numThreads); - deleteExecutorService = HiveIcebergUtil.getDeleteExecutorService(icebergTable.name(), numThreads); + deleteExecutorService = IcebergTableUtil.newDeleteThreadPool(icebergTable.name(), numThreads); } if (expireSnapshotsSpec == null) { expireSnapshotWithDefaultParams(icebergTable, deleteExecutorService); diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergUtil.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergUtil.java deleted file mode 100644 index 76a324c1c4ab..000000000000 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergUtil.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iceberg.mr.hive; - -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.atomic.AtomicInteger; - -public class HiveIcebergUtil { - - private HiveIcebergUtil() { - } - - public static ExecutorService getDeleteExecutorService(String completeName, int numThreads) { - AtomicInteger deleteThreadsIndex = new AtomicInteger(0); - return Executors.newFixedThreadPool(numThreads, runnable -> { - Thread thread = new Thread(runnable); - thread.setName("remove-snapshot-" + completeName + "-" + deleteThreadsIndex.getAndIncrement()); - return thread; - }); - } -} diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java index 79829c177f0a..54b283527a13 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java @@ -27,6 +27,9 @@ import java.util.Map; import java.util.Optional; import java.util.Properties; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BinaryOperator; import java.util.function.Function; import java.util.stream.Collectors; @@ -551,4 +554,12 @@ public static TransformSpec getTransformSpec(Table table, String transformName, return spec; } + public static ExecutorService newDeleteThreadPool(String completeName, int numThreads) { + AtomicInteger deleteThreadsIndex = new AtomicInteger(0); + return Executors.newFixedThreadPool(numThreads, runnable -> { + Thread thread = new Thread(runnable); + thread.setName("remove-snapshot-" + completeName + "-" + deleteThreadsIndex.getAndIncrement()); + return thread; + }); + } } diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/metastore/task/IcebergHouseKeeperService.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/metastore/task/IcebergHouseKeeperService.java index a8158f3a7e92..913cfce83093 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/metastore/task/IcebergHouseKeeperService.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/metastore/task/IcebergHouseKeeperService.java @@ -18,6 +18,8 @@ package org.apache.iceberg.mr.hive.metastore.task; +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; @@ -30,10 +32,12 @@ import org.apache.hadoop.hive.metastore.api.GetTableRequest; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; +import org.apache.hadoop.hive.metastore.txn.NoMutex; +import org.apache.hadoop.hive.metastore.txn.TxnStore; +import org.apache.hadoop.hive.metastore.txn.TxnUtils; import org.apache.hadoop.hive.metastore.utils.TableFetcher; import org.apache.iceberg.ExpireSnapshots; import org.apache.iceberg.Table; -import org.apache.iceberg.mr.hive.HiveIcebergUtil; import org.apache.iceberg.mr.hive.IcebergTableUtil; import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; import org.apache.thrift.TException; @@ -44,6 +48,13 @@ public class IcebergHouseKeeperService implements MetastoreTaskThread { private static final Logger LOG = LoggerFactory.getLogger(IcebergHouseKeeperService.class); private Configuration conf; + private TxnStore txnHandler; + private boolean shouldUseMutex; + + // table cache to avoid making repeated requests for the same Iceberg tables more than once per day + private final Cache tableCache = Caffeine.newBuilder() + .expireAfterWrite(1, TimeUnit.DAYS) + .build(); @Override public long runFrequency(TimeUnit unit) { @@ -58,19 +69,25 @@ public void run() { String dbPattern = MetastoreConf.getVar(conf, MetastoreConf.ConfVars.ICEBERG_TABLE_EXPIRY_DATABASE_PATTERN); String tablePattern = MetastoreConf.getVar(conf, MetastoreConf.ConfVars.ICEBERG_TABLE_EXPIRY_TABLE_PATTERN); - try (IMetaStoreClient msc = new HiveMetaStoreClient(conf)) { - // TODO: Future improvement – modify TableFetcher to return HMS Table API objects directly, - // avoiding the need for subsequent msc.getTable calls to fetch each matched table individually - List tables = getTableFetcher(msc, catalogName, dbPattern, tablePattern).getTables(); + TxnStore.MutexAPI mutex = shouldUseMutex ? txnHandler.getMutexAPI() : new NoMutex(); + + try (AutoCloseable closeable = mutex.acquireLock(TxnStore.MUTEX_KEY.IcebergHouseKeeper.name())) { + try (IMetaStoreClient msc = new HiveMetaStoreClient(conf)) { + // TODO: HIVE-28952 – modify TableFetcher to return HMS Table API objects directly, + // avoiding the need for subsequent msc.getTable calls to fetch each matched table individually + List tables = getTableFetcher(msc, catalogName, dbPattern, tablePattern).getTables(); - LOG.debug("{} candidate tables found", tables.size()); + LOG.debug("{} candidate tables found", tables.size()); - for (TableName table : tables) { - expireSnapshotsForTable(getIcebergTable(table, msc)); + for (TableName table : tables) { + expireSnapshotsForTable(getIcebergTable(table, msc)); + } + } catch (Exception e) { + LOG.error("Exception while running iceberg expiry service on catalog/db/table: {}/{}/{}", + catalogName, dbPattern, tablePattern, e); } } catch (Exception e) { - LOG.error("Exception while running iceberg expiry service on catalog/db/table: {}/{}/{}", catalogName, dbPattern, - tablePattern, e); + throw new RuntimeException(e); } } @@ -83,9 +100,16 @@ TableFetcher getTableFetcher(IMetaStoreClient msc, String catalogName, String db .build(); } - private Table getIcebergTable(TableName table, IMetaStoreClient msc) throws TException { - GetTableRequest request = new GetTableRequest(table.getDb(), table.getTable()); - return IcebergTableUtil.getTable(conf, msc.getTable(request)); + private Table getIcebergTable(TableName tableName, IMetaStoreClient msc) { + return tableCache.get(tableName, key -> { + LOG.debug("Getting iceberg table from metastore as it's not present in table cache: {}", tableName); + GetTableRequest request = new GetTableRequest(tableName.getDb(), tableName.getTable()); + try { + return IcebergTableUtil.getTable(conf, msc.getTable(request)); + } catch (TException e) { + throw new RuntimeException(e); + } + }); } /** @@ -96,7 +120,6 @@ private Table getIcebergTable(TableName table, IMetaStoreClient msc) throws TExc * @param icebergTable the iceberg Table reference */ private void expireSnapshotsForTable(Table icebergTable) { - LOG.info("Expire snapshots for: {}", icebergTable); ExpireSnapshots expireSnapshots = icebergTable.expireSnapshots(); int numThreads = conf.getInt(HiveConf.ConfVars.HIVE_ICEBERG_EXPIRE_SNAPSHOT_NUMTHREADS.varname, @@ -106,7 +129,7 @@ private void expireSnapshotsForTable(Table icebergTable) { try { if (numThreads > 0) { LOG.info("Executing expire snapshots on iceberg table {} with {} threads", icebergTable.name(), numThreads); - deleteExecutorService = HiveIcebergUtil.getDeleteExecutorService(icebergTable.name(), numThreads); + deleteExecutorService = IcebergTableUtil.newDeleteThreadPool(icebergTable.name(), numThreads); } if (deleteExecutorService != null) { expireSnapshots.executeDeleteWith(deleteExecutorService); @@ -119,6 +142,11 @@ private void expireSnapshotsForTable(Table icebergTable) { } } + @Override + public void enforceMutex(boolean enableMutex) { + this.shouldUseMutex = enableMutex; + } + @Override public Configuration getConf() { return conf; @@ -127,5 +155,6 @@ public Configuration getConf() { @Override public void setConf(Configuration configuration) { conf = configuration; + txnHandler = TxnUtils.getTxnStore(conf); } } diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java index d5349e6c706a..a56fbcbbd818 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java @@ -104,7 +104,7 @@ public interface TxnStore extends Configurable { enum MUTEX_KEY { - Initiator, Cleaner, HouseKeeper, TxnCleaner, + Initiator, Cleaner, HouseKeeper, IcebergHouseKeeper, TxnCleaner, CompactionScheduler, MaterializationRebuild } // Compactor states (Should really be enum) From 3bc702f304f8d9945167fe11e6a7b853259dddbe Mon Sep 17 00:00:00 2001 From: Laszlo Bodor Date: Wed, 14 May 2025 14:28:18 +0200 Subject: [PATCH 3/8] PR comments 2 + sonarqube method refactor --- .../task/IcebergHouseKeeperService.java | 34 +++++++++++-------- 1 file changed, 20 insertions(+), 14 deletions(-) diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/metastore/task/IcebergHouseKeeperService.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/metastore/task/IcebergHouseKeeperService.java index 913cfce83093..4fa2d1bf99b7 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/metastore/task/IcebergHouseKeeperService.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/metastore/task/IcebergHouseKeeperService.java @@ -53,6 +53,8 @@ public class IcebergHouseKeeperService implements MetastoreTaskThread { // table cache to avoid making repeated requests for the same Iceberg tables more than once per day private final Cache tableCache = Caffeine.newBuilder() + .maximumSize(1000) + .softValues() .expireAfterWrite(1, TimeUnit.DAYS) .build(); @@ -72,25 +74,29 @@ public void run() { TxnStore.MutexAPI mutex = shouldUseMutex ? txnHandler.getMutexAPI() : new NoMutex(); try (AutoCloseable closeable = mutex.acquireLock(TxnStore.MUTEX_KEY.IcebergHouseKeeper.name())) { - try (IMetaStoreClient msc = new HiveMetaStoreClient(conf)) { - // TODO: HIVE-28952 – modify TableFetcher to return HMS Table API objects directly, - // avoiding the need for subsequent msc.getTable calls to fetch each matched table individually - List tables = getTableFetcher(msc, catalogName, dbPattern, tablePattern).getTables(); - - LOG.debug("{} candidate tables found", tables.size()); - - for (TableName table : tables) { - expireSnapshotsForTable(getIcebergTable(table, msc)); - } - } catch (Exception e) { - LOG.error("Exception while running iceberg expiry service on catalog/db/table: {}/{}/{}", - catalogName, dbPattern, tablePattern, e); - } + expireTables(catalogName, dbPattern, tablePattern); } catch (Exception e) { throw new RuntimeException(e); } } + private void expireTables(String catalogName, String dbPattern, String tablePattern) { + try (IMetaStoreClient msc = new HiveMetaStoreClient(conf)) { + // TODO: HIVE-28952 – modify TableFetcher to return HMS Table API objects directly, + // avoiding the need for subsequent msc.getTable calls to fetch each matched table individually + List tables = getTableFetcher(msc, catalogName, dbPattern, tablePattern).getTables(); + + LOG.debug("{} candidate tables found", tables.size()); + + for (TableName table : tables) { + expireSnapshotsForTable(getIcebergTable(table, msc)); + } + } catch (Exception e) { + LOG.error("Exception while running iceberg expiry service on catalog/db/table: {}/{}/{}", + catalogName, dbPattern, tablePattern, e); + } + } + @VisibleForTesting TableFetcher getTableFetcher(IMetaStoreClient msc, String catalogName, String dbPattern, String tablePattern) { return new TableFetcher.Builder(msc, catalogName, dbPattern, tablePattern).tableTypes( From ded91ae3589bc9f7b4f6ffd6cf94cad4eca331c3 Mon Sep 17 00:00:00 2001 From: Laszlo Bodor Date: Wed, 14 May 2025 15:11:19 +0200 Subject: [PATCH 4/8] removed softValues --- .../mr/hive/metastore/task/IcebergHouseKeeperService.java | 1 - 1 file changed, 1 deletion(-) diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/metastore/task/IcebergHouseKeeperService.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/metastore/task/IcebergHouseKeeperService.java index 4fa2d1bf99b7..17e58ba96b17 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/metastore/task/IcebergHouseKeeperService.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/metastore/task/IcebergHouseKeeperService.java @@ -54,7 +54,6 @@ public class IcebergHouseKeeperService implements MetastoreTaskThread { // table cache to avoid making repeated requests for the same Iceberg tables more than once per day private final Cache tableCache = Caffeine.newBuilder() .maximumSize(1000) - .softValues() .expireAfterWrite(1, TimeUnit.DAYS) .build(); From bc146cfeeb318b9064b810295b3c18073ad38408 Mon Sep 17 00:00:00 2001 From: Laszlo Bodor Date: Thu, 29 May 2025 12:05:13 +0200 Subject: [PATCH 5/8] PR comments 3 --- .../task/IcebergHouseKeeperService.java | 29 +++++++------------ .../hive/metastore/conf/MetastoreConf.java | 5 ++-- .../hive/metastore/utils/TableFetcher.java | 3 +- 3 files changed, 16 insertions(+), 21 deletions(-) diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/metastore/task/IcebergHouseKeeperService.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/metastore/task/IcebergHouseKeeperService.java index 17e58ba96b17..aab274cded28 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/metastore/task/IcebergHouseKeeperService.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/metastore/task/IcebergHouseKeeperService.java @@ -50,6 +50,7 @@ public class IcebergHouseKeeperService implements MetastoreTaskThread { private Configuration conf; private TxnStore txnHandler; private boolean shouldUseMutex; + private ExecutorService deleteExecutorService = null; // table cache to avoid making repeated requests for the same Iceberg tables more than once per day private final Cache tableCache = Caffeine.newBuilder() @@ -126,25 +127,10 @@ private Table getIcebergTable(TableName tableName, IMetaStoreClient msc) { */ private void expireSnapshotsForTable(Table icebergTable) { ExpireSnapshots expireSnapshots = icebergTable.expireSnapshots(); - - int numThreads = conf.getInt(HiveConf.ConfVars.HIVE_ICEBERG_EXPIRE_SNAPSHOT_NUMTHREADS.varname, - HiveConf.ConfVars.HIVE_ICEBERG_EXPIRE_SNAPSHOT_NUMTHREADS.defaultIntVal); - - ExecutorService deleteExecutorService = null; - try { - if (numThreads > 0) { - LOG.info("Executing expire snapshots on iceberg table {} with {} threads", icebergTable.name(), numThreads); - deleteExecutorService = IcebergTableUtil.newDeleteThreadPool(icebergTable.name(), numThreads); - } - if (deleteExecutorService != null) { - expireSnapshots.executeDeleteWith(deleteExecutorService); - } - expireSnapshots.commit(); - } finally { - if (deleteExecutorService != null) { - deleteExecutorService.shutdown(); - } + if (deleteExecutorService != null) { + expireSnapshots.executeDeleteWith(deleteExecutorService); } + expireSnapshots.commit(); } @Override @@ -161,5 +147,12 @@ public Configuration getConf() { public void setConf(Configuration configuration) { conf = configuration; txnHandler = TxnUtils.getTxnStore(conf); + + int numThreads = conf.getInt(HiveConf.ConfVars.HIVE_ICEBERG_EXPIRE_SNAPSHOT_NUMTHREADS.varname, + HiveConf.ConfVars.HIVE_ICEBERG_EXPIRE_SNAPSHOT_NUMTHREADS.defaultIntVal); + if (numThreads > 0) { + LOG.info("Will expire Iceberg snapshots using an executor service with {} threads", numThreads); + deleteExecutorService = IcebergTableUtil.newDeleteThreadPool("iceberg-housekeeper-service", numThreads); + } } } diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java index 2c31d3958c9f..5519b9ee20b1 100644 --- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java +++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java @@ -921,13 +921,14 @@ public enum ConfVars { ICEBERG_TABLE_EXPIRY_INTERVAL("metastore.iceberg.table.expiry.interval", "hive.metastore.iceberg.table.expiry.interval", 3600, TimeUnit.SECONDS, "Time interval describing how often the iceberg table expiry service runs."), + // TODO: HIVE-28974: Implement pattern-based catalog retrieval in metastore client ICEBERG_TABLE_EXPIRY_CATALOG_NAME("metastore.iceberg.table.expiry.catalog.name", "hive.metastore.iceberg.table.expiry.catalog.name", "hive", "Iceberg table expiry service looks for tables under the specified catalog name"), ICEBERG_TABLE_EXPIRY_DATABASE_PATTERN("metastore.iceberg.table.expiry.database.pattern", - "hive.metastore.iceberg.table.expiry.database.pattern", "none", + "hive.metastore.iceberg.table.expiry.database.pattern", "", "Iceberg table expiry service searches for tables using the specified database pattern. " + - "By default, the pattern is set to 'none', which results in no matches (this is intentional" + + "By default, the pattern is set to empty string, which results in no matches (this is intentional" + "to avoid expensive metastore calls unless explicitly configured by the user)."), ICEBERG_TABLE_EXPIRY_TABLE_PATTERN("metastore.iceberg.table.expiry.table.pattern", "hive.metastore.iceberg.table.expiry.table.pattern", "none", diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/TableFetcher.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/TableFetcher.java index 65b19a8f562c..0e10635bc743 100644 --- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/TableFetcher.java +++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/TableFetcher.java @@ -21,6 +21,7 @@ import org.apache.hadoop.hive.common.TableName; import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.TableType; +import org.apache.hadoop.hive.metastore.Warehouse; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.slf4j.Logger; @@ -54,7 +55,7 @@ private TableFetcher(Builder builder) { if ("*".equalsIgnoreCase(builder.catalogName)) { LOG.warn("Invalid wildcard '*' parameter for catalogName, exact catalog name is expected instead of regexp"); } - this.catalogName = Optional.ofNullable(builder.catalogName).orElse("hive"); + this.catalogName = Optional.ofNullable(builder.catalogName).orElse(Warehouse.DEFAULT_CATALOG_NAME); this.dbPattern = Optional.ofNullable(builder.dbPattern).orElse(""); String tablePattern = Optional.ofNullable(builder.tablePattern).orElse(""); String stringTableTypes = Optional.ofNullable(builder.tableTypes).orElse(""); From aa13d6365c57e682a5dfa2e39171e0c35741c488 Mon Sep 17 00:00:00 2001 From: Laszlo Bodor Date: Mon, 2 Jun 2025 13:56:23 +0200 Subject: [PATCH 6/8] PR comments 4 --- .../metastore/task/IcebergHouseKeeperService.java | 13 +++++++++---- .../hadoop/hive/metastore/conf/MetastoreConf.java | 4 ++-- 2 files changed, 11 insertions(+), 6 deletions(-) diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/metastore/task/IcebergHouseKeeperService.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/metastore/task/IcebergHouseKeeperService.java index aab274cded28..03f1f6d4d89a 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/metastore/task/IcebergHouseKeeperService.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/metastore/task/IcebergHouseKeeperService.java @@ -30,6 +30,7 @@ import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.MetastoreTaskThread; import org.apache.hadoop.hive.metastore.api.GetTableRequest; +import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.txn.NoMutex; @@ -89,11 +90,15 @@ private void expireTables(String catalogName, String dbPattern, String tablePatt LOG.debug("{} candidate tables found", tables.size()); for (TableName table : tables) { - expireSnapshotsForTable(getIcebergTable(table, msc)); + try { + expireSnapshotsForTable(getIcebergTable(table, msc)); + } catch (Exception e) { + LOG.error("Exception while running iceberg expiry service on catalog/db/table: {}/{}/{}", + catalogName, dbPattern, tablePattern, e); + } } - } catch (Exception e) { - LOG.error("Exception while running iceberg expiry service on catalog/db/table: {}/{}/{}", - catalogName, dbPattern, tablePattern, e); + } catch (MetaException e) { + throw new RuntimeException("Error while opening metastore client", e); } } diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java index 5519b9ee20b1..e2dc96d8d3b8 100644 --- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java +++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java @@ -931,9 +931,9 @@ public enum ConfVars { "By default, the pattern is set to empty string, which results in no matches (this is intentional" + "to avoid expensive metastore calls unless explicitly configured by the user)."), ICEBERG_TABLE_EXPIRY_TABLE_PATTERN("metastore.iceberg.table.expiry.table.pattern", - "hive.metastore.iceberg.table.expiry.table.pattern", "none", + "hive.metastore.iceberg.table.expiry.table.pattern", "", "Iceberg table expiry service tables for tables using the specified table pattern. " + - "By default, the pattern is set to 'none', which results in no matches (this is intentional" + + "By default, the pattern is set to empty string, which results in no matches (this is intentional" + "to avoid expensive metastore calls unless explicitly configured by the user)."), IDENTIFIER_FACTORY("datanucleus.identifierFactory", "datanucleus.identifierFactory", "datanucleus1", From a5b25d9037863f12ed1bbef97b862d9c538b6a87 Mon Sep 17 00:00:00 2001 From: Laszlo Bodor Date: Tue, 3 Jun 2025 08:30:44 +0200 Subject: [PATCH 7/8] changed to generic Exception --- .../mr/hive/metastore/task/IcebergHouseKeeperService.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/metastore/task/IcebergHouseKeeperService.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/metastore/task/IcebergHouseKeeperService.java index 03f1f6d4d89a..d31befaf3582 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/metastore/task/IcebergHouseKeeperService.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/metastore/task/IcebergHouseKeeperService.java @@ -97,8 +97,8 @@ private void expireTables(String catalogName, String dbPattern, String tablePatt catalogName, dbPattern, tablePattern, e); } } - } catch (MetaException e) { - throw new RuntimeException("Error while opening metastore client", e); + } catch (Exception e) { + throw new RuntimeException("Error while getting tables from metastore", e); } } From ba9771bbc53938f2f4238e045fa8536a85141131 Mon Sep 17 00:00:00 2001 From: Laszlo Bodor Date: Tue, 3 Jun 2025 10:10:11 +0200 Subject: [PATCH 8/8] spotless --- .../mr/hive/metastore/task/IcebergHouseKeeperService.java | 1 - 1 file changed, 1 deletion(-) diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/metastore/task/IcebergHouseKeeperService.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/metastore/task/IcebergHouseKeeperService.java index d31befaf3582..8c2e010c2384 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/metastore/task/IcebergHouseKeeperService.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/metastore/task/IcebergHouseKeeperService.java @@ -30,7 +30,6 @@ import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.MetastoreTaskThread; import org.apache.hadoop.hive.metastore.api.GetTableRequest; -import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.txn.NoMutex;