diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 647836a7eb65..90d9b608d6d8 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -2240,7 +2240,7 @@ public static enum ConfVars {
"Use stats from iceberg table snapshot for query planning. This has two values metastore and iceberg"),
HIVE_ICEBERG_EXPIRE_SNAPSHOT_NUMTHREADS("hive.iceberg.expire.snapshot.numthreads", 4,
"The number of threads to be used for deleting files during expire snapshot. If set to 0 or below it uses the" +
- " defult DirectExecutorService"),
+ " default DirectExecutorService"),
HIVE_ICEBERG_MASK_DEFAULT_LOCATION("hive.iceberg.mask.default.location", false,
"If this is set to true the URI for auth will have the default location masked with DEFAULT_TABLE_LOCATION"),
diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
index 3702174ca2d0..0a967cce6852 100644
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
@@ -39,8 +39,6 @@
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.collections.MapUtils;
@@ -1133,7 +1131,8 @@ private void deleteOrphanFiles(Table icebergTable, long timestampMillis, int num
try {
if (numThreads > 0) {
LOG.info("Executing delete orphan files on iceberg table {} with {} threads", icebergTable.name(), numThreads);
- deleteExecutorService = getDeleteExecutorService(icebergTable.name(), numThreads);
+ deleteExecutorService = IcebergTableUtil.newDeleteThreadPool(icebergTable.name(),
+ numThreads);
}
HiveIcebergDeleteOrphanFiles deleteOrphanFiles = new HiveIcebergDeleteOrphanFiles(conf, icebergTable);
@@ -1156,7 +1155,7 @@ private void expireSnapshot(Table icebergTable, AlterTableExecuteSpec.ExpireSnap
try {
if (numThreads > 0) {
LOG.info("Executing expire snapshots on iceberg table {} with {} threads", icebergTable.name(), numThreads);
- deleteExecutorService = getDeleteExecutorService(icebergTable.name(), numThreads);
+ deleteExecutorService = IcebergTableUtil.newDeleteThreadPool(icebergTable.name(), numThreads);
}
if (expireSnapshotsSpec == null) {
expireSnapshotWithDefaultParams(icebergTable, deleteExecutorService);
@@ -1235,15 +1234,6 @@ private void expireSnapshotByIds(Table icebergTable, String[] idsToExpire,
}
}
- private ExecutorService getDeleteExecutorService(String completeName, int numThreads) {
- AtomicInteger deleteThreadsIndex = new AtomicInteger(0);
- return Executors.newFixedThreadPool(numThreads, runnable -> {
- Thread thread = new Thread(runnable);
- thread.setName("remove-snapshot-" + completeName + "-" + deleteThreadsIndex.getAndIncrement());
- return thread;
- });
- }
-
@Override
public void alterTableSnapshotRefOperation(org.apache.hadoop.hive.ql.metadata.Table hmsTable,
AlterTableSnapshotRefSpec alterTableSnapshotRefSpec) {
diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java
index 79829c177f0a..54b283527a13 100644
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java
@@ -27,6 +27,9 @@
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BinaryOperator;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -551,4 +554,12 @@ public static TransformSpec getTransformSpec(Table table, String transformName,
return spec;
}
+ public static ExecutorService newDeleteThreadPool(String completeName, int numThreads) {
+ AtomicInteger deleteThreadsIndex = new AtomicInteger(0);
+ return Executors.newFixedThreadPool(numThreads, runnable -> {
+ Thread thread = new Thread(runnable);
+ thread.setName("remove-snapshot-" + completeName + "-" + deleteThreadsIndex.getAndIncrement());
+ return thread;
+ });
+ }
}
diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/metastore/task/IcebergHouseKeeperService.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/metastore/task/IcebergHouseKeeperService.java
new file mode 100644
index 000000000000..8c2e010c2384
--- /dev/null
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/metastore/task/IcebergHouseKeeperService.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.iceberg.mr.hive.metastore.task;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.common.TableName;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.MetastoreTaskThread;
+import org.apache.hadoop.hive.metastore.api.GetTableRequest;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.txn.NoMutex;
+import org.apache.hadoop.hive.metastore.txn.TxnStore;
+import org.apache.hadoop.hive.metastore.txn.TxnUtils;
+import org.apache.hadoop.hive.metastore.utils.TableFetcher;
+import org.apache.iceberg.ExpireSnapshots;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.mr.hive.IcebergTableUtil;
+import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class IcebergHouseKeeperService implements MetastoreTaskThread {
+ private static final Logger LOG = LoggerFactory.getLogger(IcebergHouseKeeperService.class);
+
+ private Configuration conf;
+ private TxnStore txnHandler;
+ private boolean shouldUseMutex;
+ private ExecutorService deleteExecutorService = null;
+
+ // table cache to avoid making repeated requests for the same Iceberg tables more than once per day
+ private final Cache tableCache = Caffeine.newBuilder()
+ .maximumSize(1000)
+ .expireAfterWrite(1, TimeUnit.DAYS)
+ .build();
+
+ @Override
+ public long runFrequency(TimeUnit unit) {
+ return MetastoreConf.getTimeVar(conf, MetastoreConf.ConfVars.ICEBERG_TABLE_EXPIRY_INTERVAL, unit);
+ }
+
+ @Override
+ public void run() {
+ LOG.debug("Running IcebergHouseKeeperService...");
+
+ String catalogName = MetastoreConf.getVar(conf, MetastoreConf.ConfVars.ICEBERG_TABLE_EXPIRY_CATALOG_NAME);
+ String dbPattern = MetastoreConf.getVar(conf, MetastoreConf.ConfVars.ICEBERG_TABLE_EXPIRY_DATABASE_PATTERN);
+ String tablePattern = MetastoreConf.getVar(conf, MetastoreConf.ConfVars.ICEBERG_TABLE_EXPIRY_TABLE_PATTERN);
+
+ TxnStore.MutexAPI mutex = shouldUseMutex ? txnHandler.getMutexAPI() : new NoMutex();
+
+ try (AutoCloseable closeable = mutex.acquireLock(TxnStore.MUTEX_KEY.IcebergHouseKeeper.name())) {
+ expireTables(catalogName, dbPattern, tablePattern);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private void expireTables(String catalogName, String dbPattern, String tablePattern) {
+ try (IMetaStoreClient msc = new HiveMetaStoreClient(conf)) {
+ // TODO: HIVE-28952 – modify TableFetcher to return HMS Table API objects directly,
+ // avoiding the need for subsequent msc.getTable calls to fetch each matched table individually
+ List tables = getTableFetcher(msc, catalogName, dbPattern, tablePattern).getTables();
+
+ LOG.debug("{} candidate tables found", tables.size());
+
+ for (TableName table : tables) {
+ try {
+ expireSnapshotsForTable(getIcebergTable(table, msc));
+ } catch (Exception e) {
+ LOG.error("Exception while running iceberg expiry service on catalog/db/table: {}/{}/{}",
+ catalogName, dbPattern, tablePattern, e);
+ }
+ }
+ } catch (Exception e) {
+ throw new RuntimeException("Error while getting tables from metastore", e);
+ }
+ }
+
+ @VisibleForTesting
+ TableFetcher getTableFetcher(IMetaStoreClient msc, String catalogName, String dbPattern, String tablePattern) {
+ return new TableFetcher.Builder(msc, catalogName, dbPattern, tablePattern).tableTypes(
+ "EXTERNAL_TABLE")
+ .tableCondition(
+ hive_metastoreConstants.HIVE_FILTER_FIELD_PARAMS + "table_type like \"ICEBERG\" ")
+ .build();
+ }
+
+ private Table getIcebergTable(TableName tableName, IMetaStoreClient msc) {
+ return tableCache.get(tableName, key -> {
+ LOG.debug("Getting iceberg table from metastore as it's not present in table cache: {}", tableName);
+ GetTableRequest request = new GetTableRequest(tableName.getDb(), tableName.getTable());
+ try {
+ return IcebergTableUtil.getTable(conf, msc.getTable(request));
+ } catch (TException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ }
+
+ /**
+ * Deletes snapshots of an Iceberg table, using the number of threads defined by the
+ * Hive config HIVE_ICEBERG_EXPIRE_SNAPSHOT_NUMTHREADS.
+ * This is largely equivalent to the HiveIcebergStorageHandler.expireSnapshotWithDefaultParams method.
+ *
+ * @param icebergTable the iceberg Table reference
+ */
+ private void expireSnapshotsForTable(Table icebergTable) {
+ ExpireSnapshots expireSnapshots = icebergTable.expireSnapshots();
+ if (deleteExecutorService != null) {
+ expireSnapshots.executeDeleteWith(deleteExecutorService);
+ }
+ expireSnapshots.commit();
+ }
+
+ @Override
+ public void enforceMutex(boolean enableMutex) {
+ this.shouldUseMutex = enableMutex;
+ }
+
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+ @Override
+ public void setConf(Configuration configuration) {
+ conf = configuration;
+ txnHandler = TxnUtils.getTxnStore(conf);
+
+ int numThreads = conf.getInt(HiveConf.ConfVars.HIVE_ICEBERG_EXPIRE_SNAPSHOT_NUMTHREADS.varname,
+ HiveConf.ConfVars.HIVE_ICEBERG_EXPIRE_SNAPSHOT_NUMTHREADS.defaultIntVal);
+ if (numThreads > 0) {
+ LOG.info("Will expire Iceberg snapshots using an executor service with {} threads", numThreads);
+ deleteExecutorService = IcebergTableUtil.newDeleteThreadPool("iceberg-housekeeper-service", numThreads);
+ }
+ }
+}
diff --git a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/metastore/task/TestIcebergHouseKeeperService.java b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/metastore/task/TestIcebergHouseKeeperService.java
new file mode 100644
index 000000000000..cd77beac66f2
--- /dev/null
+++ b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/metastore/task/TestIcebergHouseKeeperService.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.iceberg.mr.hive.metastore.task;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import org.apache.hadoop.hive.common.TableName;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.TableType;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.GetTableRequest;
+import org.apache.hadoop.hive.metastore.utils.TableFetcher;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.mr.hive.IcebergTableUtil;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TestIcebergHouseKeeperService {
+ private static final Logger LOG = LoggerFactory.getLogger(TestIcebergHouseKeeperService.class);
+
+ private static final HiveConf conf = new HiveConf(TestIcebergHouseKeeperService.class);
+ private static Hive db;
+
+ @BeforeClass
+ public static void beforeClass() throws Exception {
+ conf.set("hive.security.authorization.enabled", "false");
+ conf.set("hive.security.authorization.manager",
+ "org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdConfOnlyAuthorizerFactory");
+ conf.set("iceberg.engine.hive.lock-enabled", "false");
+
+ db = Hive.get(conf);
+ }
+
+ @AfterClass
+ public static void afterClass() {
+ db.close(true);
+ }
+
+ @Test
+ public void testIcebergTableFetched() throws Exception {
+ createIcebergTable("iceberg_table");
+
+ IcebergHouseKeeperService service = new IcebergHouseKeeperService();
+ TableFetcher tableFetcher = service.getTableFetcher(db.getMSC(), null, "default", "*");
+
+ List tables = tableFetcher.getTables();
+ Assert.assertEquals(new TableName("hive", "default", "iceberg_table"), tables.get(0));
+ }
+
+ @Test
+ public void testExpireSnapshotsByServiceRun() throws Exception {
+ String tableName = "iceberg_table_snapshot_expiry_e2e_test";
+ createIcebergTable(tableName);
+ IcebergHouseKeeperService service = getServiceForTable("default", tableName);
+
+ GetTableRequest request = new GetTableRequest("default", tableName);
+ org.apache.iceberg.Table icebergTable = IcebergTableUtil.getTable(conf, db.getMSC().getTable(request));
+
+ String metadataDirectory = icebergTable.location().replaceAll("^[a-zA-Z]+:", "") + "/metadata";
+
+ DataFile datafile = DataFiles.builder(icebergTable.spec())
+ .withRecordCount(3)
+ .withPath("/tmp/file.parquet")
+ .withFileSizeInBytes(10)
+ .build();
+
+ icebergTable.newAppend().appendFile(datafile).commit();
+ assertSnapshotFiles(metadataDirectory, 1);
+ icebergTable.newAppend().appendFile(datafile).commit();
+ assertSnapshotFiles(metadataDirectory, 2);
+
+ Thread.sleep(1000); // allow snapshots that are 1000ms old to become eligible for snapshot expiry
+ service.run();
+
+ assertSnapshotFiles(metadataDirectory, 1);
+ db.dropTable("default", "iceberg_table_snapshot_expiry_e2e_test");
+ }
+
+ private void createIcebergTable(String name) throws Exception {
+ Table table = new Table("default", name);
+ List columns = Lists.newArrayList();
+ columns.add(new FieldSchema("col", "string", "First column"));
+ table.setFields(columns); // Set columns
+
+ table.setProperty("EXTERNAL", "TRUE");
+ table.setTableType(TableType.EXTERNAL_TABLE);
+ table.setProperty("table_type", "ICEBERG");
+
+ table.setProperty("history.expire.max-snapshot-age-ms", "500");
+
+ db.createTable(table);
+ }
+
+ /**
+ * Creates IcebergHouseKeeperService that's configured to clean up a table by database and table name.
+ *
+ * @param tableName to be cleaned up
+ * @return IcebergHouseKeeperService
+ */
+ private IcebergHouseKeeperService getServiceForTable(String dbName, String tableName) {
+ IcebergHouseKeeperService service = new IcebergHouseKeeperService();
+ HiveConf serviceConf = new HiveConf(conf);
+ serviceConf.set("hive.metastore.iceberg.table.expiry.database.pattern", dbName);
+ serviceConf.set("hive.metastore.iceberg.table.expiry.table.pattern", tableName);
+ service.setConf(serviceConf);
+ return service;
+ }
+
+ private void assertSnapshotFiles(String metadataDirectory, int numberForSnapshotFiles) {
+ File[] matchingFiles = new File(metadataDirectory).listFiles((dir, name) -> name.startsWith("snap-"));
+ List files = Optional.ofNullable(matchingFiles).map(Arrays::asList).orElse(Collections.emptyList());
+ LOG.debug("Snapshot files found in directory({}): {}", metadataDirectory, files);
+ Assert.assertEquals(String.format("Unexpected no. of snapshot files in metadata directory: %s",
+ metadataDirectory), numberForSnapshotFiles, files.size());
+ }
+}
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/miniHS2/StartMiniHS2Cluster.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/miniHS2/StartMiniHS2Cluster.java
index 6edd85f6172e..553053e9e5a7 100644
--- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/miniHS2/StartMiniHS2Cluster.java
+++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/miniHS2/StartMiniHS2Cluster.java
@@ -50,6 +50,7 @@ public void testRunCluster() throws Exception {
String confFilesProperty = System.getProperty("miniHS2.conf", "../../data/conf/hive-site.xml");
boolean usePortsFromConf = Boolean.parseBoolean(System.getProperty("miniHS2.usePortsFromConf", "false"));
boolean isMetastoreRemote = Boolean.getBoolean("miniHS2.isMetastoreRemote");
+ boolean withHouseKeepingThreads = Boolean.getBoolean("miniHS2.withHouseKeepingThreads");
boolean queryHistory = Boolean.getBoolean("miniHS2.queryHistory");
// Load conf files
@@ -77,7 +78,8 @@ public void testRunCluster() throws Exception {
}
miniHS2 = new MiniHS2.Builder().withConf(conf).withClusterType(clusterType).withPortsFromConf(usePortsFromConf)
- .withRemoteMetastore(isMetastoreRemote).withQueryHistory(queryHistory).build();
+ .withRemoteMetastore(isMetastoreRemote).withHouseKeepingThreads(withHouseKeepingThreads)
+ .withQueryHistory(queryHistory).build();
Map confOverlay = new HashMap();
miniHS2.start(confOverlay);
diff --git a/itests/util/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java b/itests/util/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java
index 166ab90f642c..2c27daf29a03 100644
--- a/itests/util/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java
+++ b/itests/util/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java
@@ -86,6 +86,7 @@ public class MiniHS2 extends AbstractHiveService {
private MiniClusterType miniClusterType = MiniClusterType.LOCALFS_ONLY;
private boolean usePortsFromConf = false;
private PamAuthenticator pamAuthenticator;
+ private boolean withHouseKeepingThreads;
private boolean createTransactionalTables;
private int hmsPort = 0;
@@ -117,6 +118,7 @@ public static class Builder {
private String authType = "KERBEROS";
private boolean isHA = false;
private boolean cleanupLocalDirOnStartup = true;
+ private boolean withHouseKeepingThreads = false;
private boolean createTransactionalTables = true;
private boolean isMetastoreSecure;
private String metastoreServerPrincipal;
@@ -167,6 +169,11 @@ public Builder withRemoteMetastore(boolean isMetastoreRemote) {
return this;
}
+ public Builder withHouseKeepingThreads(boolean withHouseKeepingThreads) {
+ this.withHouseKeepingThreads = withHouseKeepingThreads;
+ return this;
+ }
+
public Builder withPortsFromConf(boolean usePortsFromConf) {
this.usePortsFromConf = usePortsFromConf;
return this;
@@ -240,8 +247,8 @@ public MiniHS2 build() throws Exception {
hiveConf.setBoolVar(ConfVars.HIVE_QUERY_HISTORY_ENABLED, useQueryHistory);
return new MiniHS2(hiveConf, miniClusterType, useMiniKdc, serverPrincipal, serverKeytab,
- isMetastoreRemote, createTransactionalTables, usePortsFromConf, authType, isHA, cleanupLocalDirOnStartup,
- isMetastoreSecure, metastoreServerPrincipal, metastoreServerKeyTab, dataNodes);
+ isMetastoreRemote, withHouseKeepingThreads, createTransactionalTables, usePortsFromConf, authType, isHA,
+ cleanupLocalDirOnStartup, isMetastoreSecure, metastoreServerPrincipal, metastoreServerKeyTab, dataNodes);
}
}
@@ -278,7 +285,8 @@ public boolean isUseMiniKdc() {
}
private MiniHS2(HiveConf hiveConf, MiniClusterType miniClusterType, boolean useMiniKdc,
- String serverPrincipal, String serverKeytab, boolean isMetastoreRemote, boolean createTransactionalTables,
+ String serverPrincipal, String serverKeytab, boolean isMetastoreRemote,
+ boolean withHouseKeepingThreads, boolean createTransactionalTables,
boolean usePortsFromConf, String authType, boolean isHA, boolean cleanupLocalDirOnStartup,
boolean isMetastoreSecure, String metastoreServerPrincipal, String metastoreKeyTab,
int dataNodes) throws Exception {
@@ -306,6 +314,7 @@ private MiniHS2(HiveConf hiveConf, MiniClusterType miniClusterType, boolean useM
this.isMetastoreSecure = isMetastoreSecure;
this.cleanupLocalDirOnStartup = cleanupLocalDirOnStartup;
this.usePortsFromConf = usePortsFromConf;
+ this.withHouseKeepingThreads = withHouseKeepingThreads;
this.createTransactionalTables = createTransactionalTables;
baseDir = getBaseDir();
localFS = FileSystem.getLocal(hiveConf);
@@ -405,14 +414,14 @@ public MiniHS2(HiveConf hiveConf, MiniClusterType clusterType) throws Exception
public MiniHS2(HiveConf hiveConf, MiniClusterType clusterType, boolean usePortsFromConf, boolean isMetastoreRemote)
throws Exception {
this(hiveConf, clusterType, false, null, null,
- isMetastoreRemote, true, usePortsFromConf, "KERBEROS", false, true,
+ isMetastoreRemote, false, true, usePortsFromConf, "KERBEROS", false, true,
false, null, null, DEFAULT_DATANODE_COUNT);
}
public void start(Map confOverlay) throws Exception {
if (isMetastoreRemote) {
hmsPort = MetaStoreTestUtils.startMetaStoreWithRetry(HadoopThriftAuthBridge.getBridge(), getHiveConf(),
- false, false, false, false, createTransactionalTables);
+ true, false, withHouseKeepingThreads, false, createTransactionalTables);
setWareHouseDir(MetastoreConf.getVar(getHiveConf(), MetastoreConf.ConfVars.WAREHOUSE));
}
diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
index 6f13260bd3a1..e2dc96d8d3b8 100644
--- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
+++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
@@ -105,7 +105,9 @@ public class MetastoreConf {
@VisibleForTesting
static final String ACID_OPEN_TXNS_COUNTER_SERVICE_CLASS =
"org.apache.hadoop.hive.metastore.txn.service.AcidOpenTxnsCounterService";
-
+ @VisibleForTesting
+ static final String ICEBERG_TABLE_SNAPSHOT_EXPIRY_SERVICE_CLASS =
+ "org.apache.iceberg.mr.hive.metastore.task.IcebergHouseKeeperService";
public static final String METASTORE_AUTHENTICATION_LDAP_USERMEMBERSHIPKEY_NAME =
"metastore.authentication.ldap.userMembershipKey";
public static final String METASTORE_RETRYING_HANDLER_CLASS =
@@ -916,6 +918,23 @@ public enum ConfVars {
HMS_HANDLER_PROXY_CLASS("metastore.hmshandler.proxy", "hive.metastore.hmshandler.proxy",
METASTORE_RETRYING_HANDLER_CLASS,
"The proxy class name of HMSHandler, default is RetryingHMSHandler."),
+ ICEBERG_TABLE_EXPIRY_INTERVAL("metastore.iceberg.table.expiry.interval",
+ "hive.metastore.iceberg.table.expiry.interval", 3600, TimeUnit.SECONDS,
+ "Time interval describing how often the iceberg table expiry service runs."),
+ // TODO: HIVE-28974: Implement pattern-based catalog retrieval in metastore client
+ ICEBERG_TABLE_EXPIRY_CATALOG_NAME("metastore.iceberg.table.expiry.catalog.name",
+ "hive.metastore.iceberg.table.expiry.catalog.name", "hive",
+ "Iceberg table expiry service looks for tables under the specified catalog name"),
+ ICEBERG_TABLE_EXPIRY_DATABASE_PATTERN("metastore.iceberg.table.expiry.database.pattern",
+ "hive.metastore.iceberg.table.expiry.database.pattern", "",
+ "Iceberg table expiry service searches for tables using the specified database pattern. " +
+ "By default, the pattern is set to empty string, which results in no matches (this is intentional" +
+ "to avoid expensive metastore calls unless explicitly configured by the user)."),
+ ICEBERG_TABLE_EXPIRY_TABLE_PATTERN("metastore.iceberg.table.expiry.table.pattern",
+ "hive.metastore.iceberg.table.expiry.table.pattern", "",
+ "Iceberg table expiry service tables for tables using the specified table pattern. " +
+ "By default, the pattern is set to empty string, which results in no matches (this is intentional" +
+ "to avoid expensive metastore calls unless explicitly configured by the user)."),
IDENTIFIER_FACTORY("datanucleus.identifierFactory",
"datanucleus.identifierFactory", "datanucleus1",
"Name of the identifier factory to use when generating table/column names etc. \n" +
@@ -1493,7 +1512,8 @@ public enum ConfVars {
ACID_TXN_CLEANER_SERVICE_CLASS + "," +
ACID_OPEN_TXNS_COUNTER_SERVICE_CLASS + "," +
MATERIALZIATIONS_REBUILD_LOCK_CLEANER_TASK_CLASS + "," +
- PARTITION_MANAGEMENT_TASK_CLASS,
+ PARTITION_MANAGEMENT_TASK_CLASS + "," +
+ ICEBERG_TABLE_SNAPSHOT_EXPIRY_SERVICE_CLASS,
"Comma-separated list of tasks that will be started in separate threads. These will be" +
" started only when the metastore is running as a separate service. They must " +
"implement " + METASTORE_TASK_THREAD_CLASS),
diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/TableFetcher.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/TableFetcher.java
new file mode 100644
index 000000000000..0e10635bc743
--- /dev/null
+++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/TableFetcher.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.metastore.utils;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.hive.common.TableName;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.TableType;
+import org.apache.hadoop.hive.metastore.Warehouse;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+
+public class TableFetcher {
+ private static final Logger LOG = LoggerFactory.getLogger(TableFetcher.class);
+
+ // mandatory client passed to this fetcher, has to be closed from caller
+ private final IMetaStoreClient client;
+ // mandatory catalogName
+ private final String catalogName;
+ // mandatory dbPattern: use "*" to fetch all, empty to fetch none
+ private final String dbPattern;
+ // optional tableTypes: comma separated table types to fetch, fetcher result is empty list if this is empty
+ // typical value: "MANAGED_TABLE,EXTERNAL_TABLE"
+ @VisibleForTesting
+ final Set tableTypes = new HashSet<>();
+ // tableFilter built from the input tablePattern and custom table conditions
+ @VisibleForTesting
+ String tableFilter;
+
+ private TableFetcher(Builder builder) {
+ this.client = builder.client;
+ if ("*".equalsIgnoreCase(builder.catalogName)) {
+ LOG.warn("Invalid wildcard '*' parameter for catalogName, exact catalog name is expected instead of regexp");
+ }
+ this.catalogName = Optional.ofNullable(builder.catalogName).orElse(Warehouse.DEFAULT_CATALOG_NAME);
+ this.dbPattern = Optional.ofNullable(builder.dbPattern).orElse("");
+ String tablePattern = Optional.ofNullable(builder.tablePattern).orElse("");
+ String stringTableTypes = Optional.ofNullable(builder.tableTypes).orElse("");
+
+ for (String type : stringTableTypes.split(",")) {
+ try {
+ tableTypes.add(TableType.valueOf(type.trim().toUpperCase()).name());
+ } catch (IllegalArgumentException e) {
+ LOG.warn("Unknown table type: {}", type);
+ }
+ }
+
+ buildTableFilter(tablePattern, builder.tableConditions);
+ }
+
+ private void buildTableFilter(String tablePattern, List conditions) {
+ boolean external = tableTypes.contains(TableType.EXTERNAL_TABLE.name());
+ boolean managed = tableTypes.contains(TableType.MANAGED_TABLE.name());
+ if (!managed && external) {
+ // only for external tables
+ conditions.add(
+ hive_metastoreConstants.HIVE_FILTER_FIELD_TABLE_TYPE + " = \"" + TableType.EXTERNAL_TABLE.name() + "\" ");
+ } else if (managed && !external) {
+ // only for managed tables
+ conditions.add(
+ hive_metastoreConstants.HIVE_FILTER_FIELD_TABLE_TYPE + " = \"" + TableType.MANAGED_TABLE.name() + "\" ");
+ }
+ if (!tablePattern.trim().isEmpty()) {
+ conditions.add(hive_metastoreConstants.HIVE_FILTER_FIELD_TABLE_NAME + " like \"" +
+ tablePattern.replaceAll("\\*", ".*") + "\"");
+ }
+ this.tableFilter = String.join(" and ", conditions);
+ }
+
+ public List getTables() throws Exception {
+ List candidates = new ArrayList<>();
+
+ // if tableTypes is empty, then a list with single empty string has to specified to scan no tables.
+ if (tableTypes.isEmpty()) {
+ LOG.info("Table fetcher returns empty list as no table types specified");
+ return candidates;
+ }
+
+ List databases = client.getDatabases(catalogName, dbPattern);
+
+ for (String db : databases) {
+ Database database = client.getDatabase(catalogName, db);
+ if (MetaStoreUtils.checkIfDbNeedsToBeSkipped(database)) {
+ LOG.debug("Skipping table under database: {}", db);
+ continue;
+ }
+ if (MetaStoreUtils.isDbBeingPlannedFailedOver(database)) {
+ LOG.info("Skipping table that belongs to database {} being failed over.", db);
+ continue;
+ }
+ List tablesNames = client.listTableNamesByFilter(catalogName, db, tableFilter, -1);
+ tablesNames.forEach(tablesName -> candidates.add(TableName.fromString(tablesName, catalogName, db)));
+ }
+ return candidates;
+ }
+
+ public static class Builder {
+ private final IMetaStoreClient client;
+ private final String catalogName;
+ private final String dbPattern;
+ private final String tablePattern;
+ private final List tableConditions = new ArrayList<>();
+ private String tableTypes;
+
+ public Builder(IMetaStoreClient client, String catalogName, String dbPattern, String tablePattern) {
+ this.client = client;
+ this.catalogName = catalogName;
+ this.dbPattern = dbPattern;
+ this.tablePattern = tablePattern;
+ }
+
+ public Builder tableTypes(String tableTypes) {
+ this.tableTypes = tableTypes;
+ return this;
+ }
+
+ public Builder tableCondition(String condition) {
+ this.tableConditions.add(condition);
+ return this;
+ }
+
+ public TableFetcher build() {
+ return new TableFetcher(this);
+ }
+ }
+}
\ No newline at end of file
diff --git a/standalone-metastore/metastore-common/src/test/java/org/apache/hadoop/hive/metastore/utils/TestTableFetcher.java b/standalone-metastore/metastore-common/src/test/java/org/apache/hadoop/hive/metastore/utils/TestTableFetcher.java
new file mode 100644
index 000000000000..eb07f34a2696
--- /dev/null
+++ b/standalone-metastore/metastore-common/src/test/java/org/apache/hadoop/hive/metastore/utils/TestTableFetcher.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.metastore.utils;
+
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.mockito.Mockito.mock;
+
+public class TestTableFetcher {
+
+ @Test
+ public void testEmpty() {
+ TableFetcher fetcher = new TableFetcher.Builder(mock(IMetaStoreClient.class), null, null, null).build();
+ // full empty parameter list leads to empty table filter
+ Assert.assertEquals("", fetcher.tableFilter);
+ }
+
+ @Test
+ public void testAsterisk() {
+ TableFetcher fetcher = new TableFetcher.Builder(mock(IMetaStoreClient.class), "hive", "*", "*").build();
+ // full empty parameter list leads to empty table filter
+ Assert.assertEquals("hive_filter_field_tableName__ like \".*\"", fetcher.tableFilter);
+ }
+
+ @Test
+ public void testCustomCondition() {
+ TableFetcher fetcher = new TableFetcher.Builder(mock(IMetaStoreClient.class), "hive", "*", "*")
+ .tableCondition(hive_metastoreConstants.HIVE_FILTER_FIELD_PARAMS + "table_param like \"some_value\" ")
+ .build();
+ // full empty parameter list leads to empty table filter
+ Assert.assertEquals(
+ "hive_filter_field_params__table_param like \"some_value\" and hive_filter_field_tableName__ like \".*\"",
+ fetcher.tableFilter);
+ }
+}
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/PartitionManagementTask.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/PartitionManagementTask.java
index ef0c4dcac47d..ca396fad70cb 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/PartitionManagementTask.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/PartitionManagementTask.java
@@ -18,11 +18,8 @@
package org.apache.hadoop.hive.metastore;
-import java.util.ArrayList;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -32,12 +29,11 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.common.TableName;
-import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.metastore.conf.TimeValidator;
-import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
+import org.apache.hadoop.hive.metastore.utils.TableFetcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -100,58 +96,12 @@ public void run() {
String dbPattern = MetastoreConf.getVar(conf, MetastoreConf.ConfVars.PARTITION_MANAGEMENT_DATABASE_PATTERN);
String tablePattern = MetastoreConf.getVar(conf, MetastoreConf.ConfVars.PARTITION_MANAGEMENT_TABLE_PATTERN);
String tableTypes = MetastoreConf.getVar(conf, MetastoreConf.ConfVars.PARTITION_MANAGEMENT_TABLE_TYPES);
- Set tableTypesSet = new HashSet<>();
- for (String type : tableTypes.split(",")) {
- try {
- tableTypesSet.add(TableType.valueOf(type.trim().toUpperCase()).name());
- } catch (IllegalArgumentException e) {
- // ignore
- LOG.warn("Unknown table type: {}", type);
- }
- }
- // if tableTypes is empty, then a list with single empty string has to specified to scan no tables.
- // specifying empty here is equivalent to disabling the partition discovery altogether as it scans no tables.
- if (tableTypesSet.isEmpty()) {
- LOG.info("Skipping partition management as no table types specified");
- return;
- }
-
- StringBuilder filterBuilder = new StringBuilder()
- .append(hive_metastoreConstants.HIVE_FILTER_FIELD_PARAMS)
- .append("discover__partitions").append(" like \"true\" ");
- boolean external = tableTypesSet.contains(TableType.EXTERNAL_TABLE.name());
- boolean managed = tableTypesSet.contains(TableType.MANAGED_TABLE.name());
- if (!managed && external) {
- // only for external tables
- filterBuilder.append(" and ").append(hive_metastoreConstants.HIVE_FILTER_FIELD_TABLE_TYPE)
- .append(" = \"").append(TableType.EXTERNAL_TABLE.name()).append("\" ");
- } else if (managed && !external) {
- // only for managed tables
- filterBuilder.append(" and ").append(hive_metastoreConstants.HIVE_FILTER_FIELD_TABLE_TYPE)
- .append(" = \"").append(TableType.MANAGED_TABLE.name()).append("\" ");
- }
- if (!tablePattern.trim().isEmpty()) {
- filterBuilder.append(" and ")
- .append(hive_metastoreConstants.HIVE_FILTER_FIELD_TABLE_NAME)
- .append(" like \"").append(tablePattern.replaceAll("\\*", ".*")).append("\"");
- }
-
- List databases = msc.getDatabases(catalogName, dbPattern);
- List candidates = new ArrayList<>();
- for (String db : databases) {
- Database database = msc.getDatabase(catalogName, db);
- if (MetaStoreUtils.checkIfDbNeedsToBeSkipped(database)) {
- LOG.debug("Skipping table under database: {}", db);
- continue;
- }
- if (MetaStoreUtils.isDbBeingPlannedFailedOver(database)) {
- LOG.info("Skipping table belongs to database {} being failed over.", db);
- continue;
- }
- List tablesNames = msc.listTableNamesByFilter(catalogName, db,
- filterBuilder.toString(), -1);
- tablesNames.forEach(tablesName -> candidates.add(TableName.fromString(tablesName, catalogName, db)));
- }
+ List candidates =
+ new TableFetcher.Builder(msc, catalogName, dbPattern, tablePattern).tableTypes(tableTypes)
+ .tableCondition(
+ hive_metastoreConstants.HIVE_FILTER_FIELD_PARAMS + "discover__partitions like \"true\" ")
+ .build()
+ .getTables();
if (candidates.isEmpty()) {
LOG.info("Got empty table list in catalog: {}, dbPattern: {}", catalogName, dbPattern);
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
index d5349e6c706a..a56fbcbbd818 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
@@ -104,7 +104,7 @@ public interface TxnStore extends Configurable {
enum MUTEX_KEY {
- Initiator, Cleaner, HouseKeeper, TxnCleaner,
+ Initiator, Cleaner, HouseKeeper, IcebergHouseKeeper, TxnCleaner,
CompactionScheduler, MaterializationRebuild
}
// Compactor states (Should really be enum)