From d8602eca467f5c50497b3dbf02e85edd7ec59ecf Mon Sep 17 00:00:00 2001 From: Marton Bod Date: Thu, 8 Apr 2021 17:20:56 +0200 Subject: [PATCH 01/23] HIVE-XXX: Move Iceberg job commit to HS2 side --- .../iceberg/mr/hive/HiveIcebergMetaHook.java | 50 ++++++++++++++++++- .../mr/hive/HiveIcebergOutputCommitter.java | 6 +++ .../mr/hive/HiveIcebergStorageHandler.java | 3 ++ ...stHiveIcebergStorageHandlerWithEngine.java | 32 ------------ .../apache/iceberg/mr/hive/TestHiveShell.java | 11 ++++ pom.xml | 2 +- .../hadoop/hive/ql/exec/tez/DagUtils.java | 5 +- .../hadoop/hive/ql/exec/tez/TezTask.java | 30 ++++++++++- 8 files changed, 102 insertions(+), 37 deletions(-) diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java index d2c6996abc71..e24158ef9825 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java @@ -19,6 +19,7 @@ package org.apache.iceberg.mr.hive; +import java.io.IOException; import java.util.Collections; import java.util.List; import java.util.Properties; @@ -33,6 +34,14 @@ import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy; import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; import org.apache.hadoop.hive.ql.io.AcidUtils; +import org.apache.hadoop.hive.metastore.DefaultHiveMetaHook; +import org.apache.hadoop.hive.ql.exec.tez.TezTask; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.JobContext; +import org.apache.hadoop.mapred.JobContextImpl; +import org.apache.hadoop.mapred.JobID; +import org.apache.hadoop.mapred.JobStatus; +import org.apache.hadoop.mapred.OutputCommitter; import org.apache.iceberg.BaseMetastoreTableOperations; import org.apache.iceberg.CatalogUtil; import org.apache.iceberg.PartitionSpec; @@ -56,7 +65,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class HiveIcebergMetaHook implements HiveMetaHook { +public class HiveIcebergMetaHook extends DefaultHiveMetaHook { private static final Logger LOG = LoggerFactory.getLogger(HiveIcebergMetaHook.class); private static final Set PARAMETERS_TO_REMOVE = ImmutableSet .of(InputFormatConfig.TABLE_SCHEMA, Catalogs.LOCATION, Catalogs.NAME); @@ -346,4 +355,43 @@ private class PreAlterTableProperties { private PartitionSpec spec; private List partitionKeys; } + + @Override + public void commitInsertTable(org.apache.hadoop.hive.metastore.api.Table table, boolean overwrite) + throws MetaException { + // construct the job context + JobConf jobConf = new JobConf(conf); + String tableName = TableIdentifier.of(table.getDbName(), table.getTableName()).toString(); + jobConf.set(InputFormatConfig.TABLE_IDENTIFIER, tableName); + jobConf.set(InputFormatConfig.TABLE_LOCATION, table.getSd().getLocation()); + JobID jobID = JobID.forName(jobConf.get(TezTask.HIVE_TEZ_COMMIT_JOB_ID + "." + tableName)); + JobContext jobContext = new JobContextImpl(jobConf, jobID, null); + + // commit (or abort) + OutputCommitter committer = new HiveIcebergOutputCommitter(); + try { + committer.commitJob(jobContext); + } catch (Exception commitExc) { + LOG.error("Error while trying to commit job. Will abort it now.", commitExc); + try { + committer.abortJob(jobContext, JobStatus.State.FAILED); + throw new MetaException("Unable to commit job: " + commitExc.getMessage()); + } catch (IOException abortExc) { + LOG.error("Error while trying to abort failed job. There might be uncleaned data files.", abortExc); + throw new MetaException("Unable to commit and abort job: " + commitExc.getMessage()); + } + } + } + + @Override + public void preInsertTable(org.apache.hadoop.hive.metastore.api.Table table, boolean overwrite) + throws MetaException { + // do nothing + } + + @Override + public void rollbackInsertTable(org.apache.hadoop.hive.metastore.api.Table table, boolean overwrite) + throws MetaException { + // do nothing + } } diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java index bf298ad35bed..71c390fa33a2 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java @@ -33,6 +33,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.tez.TezTask; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.JobContext; import org.apache.hadoop.mapred.OutputCommitter; @@ -359,6 +360,11 @@ private static Collection dataFiles(ExecutorService executor, String l // If there are reducers, then every reducer will generate a result file. // If this is a map only task, then every mapper will generate a result file. int expectedFiles = conf.getNumReduceTasks() > 0 ? conf.getNumReduceTasks() : conf.getNumMapTasks(); + // For Tez, we can only reliably get the number of tasks from the Tez AM, not the job conf + if ("tez".equals(conf.get(HiveConf.ConfVars.HIVE_EXECUTION_ENGINE.varname))) { + expectedFiles = conf.getInt(TezTask.HIVE_TEZ_COMMIT_TASK_COUNT + + "." + conf.get(InputFormatConfig.TABLE_IDENTIFIER), -1); + } Collection dataFiles = new ConcurrentLinkedQueue<>(); diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java index 2bb0e0c0cf62..06429b8baa30 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java @@ -124,6 +124,9 @@ public void configureJobConf(TableDesc tableDesc, JobConf jobConf) { String tables = jobConf.get(InputFormatConfig.OUTPUT_TABLES); tables = tables == null ? tableName : tables + TABLE_NAME_SEPARATOR + tableName; jobConf.set("mapred.output.committer.class", HiveIcebergOutputCommitter.class.getName()); + // this will turn off job committing on the Tez AM side, so that we can commit the write jobs + // on HS2 side instead using the HiveIcebergMetaHook + jobConf.set("hive.tez.mapreduce.output.committer.class", ""); jobConf.set(InputFormatConfig.OUTPUT_TABLES, tables); String catalogName = tableDesc.getProperties().getProperty(InputFormatConfig.CATALOG_NAME); diff --git a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java index 38fef7ea057e..8308f2c000cd 100644 --- a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java +++ b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java @@ -49,7 +49,6 @@ import org.junit.After; import org.junit.AfterClass; import org.junit.Assert; -import org.junit.Assume; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Rule; @@ -339,8 +338,6 @@ public void testSelectDistinctFromTable() throws IOException { @Test public void testInsert() throws IOException { - Assume.assumeTrue("Tez write is not implemented yet", executionEngine.equals("mr")); - Table table = testTables.createTable(shell, "customers", HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA, fileFormat, ImmutableList.of()); @@ -360,7 +357,6 @@ public void testInsert() throws IOException { @Test(timeout = 100000) public void testInsertSupportedTypes() throws IOException { - Assume.assumeTrue("Tez write is not implemented yet", executionEngine.equals("mr")); for (int i = 0; i < SUPPORTED_TYPES.size(); i++) { Type type = SUPPORTED_TYPES.get(i); // TODO: remove this filter when issue #1881 is resolved @@ -389,8 +385,6 @@ public void testInsertSupportedTypes() throws IOException { */ @Test public void testInsertFromSelect() throws IOException { - Assume.assumeTrue("Tez write is not implemented yet", executionEngine.equals("mr")); - Table table = testTables.createTable(shell, "customers", HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA, fileFormat, HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS); @@ -408,8 +402,6 @@ public void testInsertFromSelect() throws IOException { */ @Test public void testInsertFromSelectWithOrderBy() throws IOException { - Assume.assumeTrue("Tez write is not implemented yet", executionEngine.equals("mr")); - Table table = testTables.createTable(shell, "customers", HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA, fileFormat, HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS); @@ -424,8 +416,6 @@ public void testInsertFromSelectWithOrderBy() throws IOException { @Test public void testInsertFromSelectWithProjection() throws IOException { - Assume.assumeTrue("Tez write is not implemented yet", executionEngine.equals("mr")); - Table table = testTables.createTable(shell, "customers", HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA, fileFormat, ImmutableList.of()); testTables.createTable(shell, "orders", ORDER_SCHEMA, fileFormat, ORDER_RECORDS); @@ -443,8 +433,6 @@ public void testInsertFromSelectWithProjection() throws IOException { @Test public void testInsertUsingSourceTableWithSharedColumnsNames() throws IOException { - Assume.assumeTrue("Tez write is not implemented yet", executionEngine.equals("mr")); - List records = HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS; PartitionSpec spec = PartitionSpec.builderFor(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA) .identity("last_name").build(); @@ -468,8 +456,6 @@ public void testInsertUsingSourceTableWithSharedColumnsNames() throws IOExceptio @Test public void testInsertFromJoiningTwoIcebergTables() throws IOException { - Assume.assumeTrue("Tez write is not implemented yet", executionEngine.equals("mr")); - PartitionSpec spec = PartitionSpec.builderFor(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA) .identity("last_name").build(); testTables.createTable(shell, "source_customers_1", HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA, @@ -487,7 +473,6 @@ public void testInsertFromJoiningTwoIcebergTables() throws IOException { @Test public void testWriteArrayOfPrimitivesInTable() throws IOException { - Assume.assumeTrue("Tez write is not implemented yet", executionEngine.equals("mr")); Schema schema = new Schema(required(1, "id", Types.LongType.get()), required(2, "arrayofprimitives", Types.ListType.ofRequired(3, Types.StringType.get()))); @@ -497,7 +482,6 @@ public void testWriteArrayOfPrimitivesInTable() throws IOException { @Test public void testWriteArrayOfArraysInTable() throws IOException { - Assume.assumeTrue("Tez write is not implemented yet", executionEngine.equals("mr")); Schema schema = new Schema( required(1, "id", Types.LongType.get()), @@ -509,7 +493,6 @@ public void testWriteArrayOfArraysInTable() throws IOException { @Test public void testWriteArrayOfMapsInTable() throws IOException { - Assume.assumeTrue("Tez write is not implemented yet", executionEngine.equals("mr")); Schema schema = new Schema(required(1, "id", Types.LongType.get()), required(2, "arrayofmaps", Types.ListType @@ -521,7 +504,6 @@ public void testWriteArrayOfMapsInTable() throws IOException { @Test public void testWriteArrayOfStructsInTable() throws IOException { - Assume.assumeTrue("Tez write is not implemented yet", executionEngine.equals("mr")); Schema schema = new Schema(required(1, "id", Types.LongType.get()), required(2, "arrayofstructs", Types.ListType.ofRequired(3, Types.StructType @@ -533,7 +515,6 @@ public void testWriteArrayOfStructsInTable() throws IOException { @Test public void testWriteMapOfPrimitivesInTable() throws IOException { - Assume.assumeTrue("Tez write is not implemented yet", executionEngine.equals("mr")); Schema schema = new Schema(required(1, "id", Types.LongType.get()), required(2, "mapofprimitives", Types.MapType.ofRequired(3, 4, Types.StringType.get(), Types.StringType.get()))); @@ -543,7 +524,6 @@ public void testWriteMapOfPrimitivesInTable() throws IOException { @Test public void testWriteMapOfArraysInTable() throws IOException { - Assume.assumeTrue("Tez write is not implemented yet", executionEngine.equals("mr")); Schema schema = new Schema(required(1, "id", Types.LongType.get()), required(2, "mapofarrays", Types.MapType.ofRequired(3, 4, Types.StringType.get(), Types.ListType.ofRequired(5, @@ -554,7 +534,6 @@ public void testWriteMapOfArraysInTable() throws IOException { @Test public void testWriteMapOfMapsInTable() throws IOException { - Assume.assumeTrue("Tez write is not implemented yet", executionEngine.equals("mr")); Schema schema = new Schema(required(1, "id", Types.LongType.get()), required(2, "mapofmaps", Types.MapType.ofRequired(3, 4, Types.StringType.get(), Types.MapType.ofRequired(5, 6, Types.StringType.get(), Types.StringType.get())))); @@ -564,7 +543,6 @@ public void testWriteMapOfMapsInTable() throws IOException { @Test public void testWriteMapOfStructsInTable() throws IOException { - Assume.assumeTrue("Tez write is not implemented yet", executionEngine.equals("mr")); Schema schema = new Schema(required(1, "id", Types.LongType.get()), required(2, "mapofstructs", Types.MapType.ofRequired(3, 4, Types.StringType.get(), Types.StructType.of(required(5, "something", Types.StringType.get()), @@ -576,7 +554,6 @@ public void testWriteMapOfStructsInTable() throws IOException { @Test public void testWriteStructOfPrimitivesInTable() throws IOException { - Assume.assumeTrue("Tez write is not implemented yet", executionEngine.equals("mr")); Schema schema = new Schema(required(1, "id", Types.LongType.get()), required(2, "structofprimitives", Types.StructType.of(required(3, "key", Types.StringType.get()), required(4, "value", @@ -587,7 +564,6 @@ public void testWriteStructOfPrimitivesInTable() throws IOException { @Test public void testWriteStructOfArraysInTable() throws IOException { - Assume.assumeTrue("Tez write is not implemented yet", executionEngine.equals("mr")); Schema schema = new Schema(required(1, "id", Types.LongType.get()), required(2, "structofarrays", Types.StructType .of(required(3, "names", Types.ListType.ofRequired(4, Types.StringType.get())), @@ -599,7 +575,6 @@ public void testWriteStructOfArraysInTable() throws IOException { @Test public void testWriteStructOfMapsInTable() throws IOException { - Assume.assumeTrue("Tez write is not implemented yet", executionEngine.equals("mr")); Schema schema = new Schema(required(1, "id", Types.LongType.get()), required(2, "structofmaps", Types.StructType .of(required(3, "map1", Types.MapType.ofRequired(4, 5, @@ -612,7 +587,6 @@ public void testWriteStructOfMapsInTable() throws IOException { @Test public void testWriteStructOfStructsInTable() throws IOException { - Assume.assumeTrue("Tez write is not implemented yet", executionEngine.equals("mr")); Schema schema = new Schema(required(1, "id", Types.LongType.get()), required(2, "structofstructs", Types.StructType.of(required(3, "struct1", Types.StructType .of(required(4, "key", Types.StringType.get()), required(5, "value", @@ -623,8 +597,6 @@ public void testWriteStructOfStructsInTable() throws IOException { @Test public void testPartitionedWrite() throws IOException { - Assume.assumeTrue("Tez write is not implemented yet", executionEngine.equals("mr")); - PartitionSpec spec = PartitionSpec.builderFor(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA) .bucket("customer_id", 3) .build(); @@ -639,8 +611,6 @@ public void testPartitionedWrite() throws IOException { @Test public void testIdentityPartitionedWrite() throws IOException { - Assume.assumeTrue("Tez write is not implemented yet", executionEngine.equals("mr")); - PartitionSpec spec = PartitionSpec.builderFor(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA) .identity("customer_id") .build(); @@ -655,8 +625,6 @@ public void testIdentityPartitionedWrite() throws IOException { @Test public void testMultilevelIdentityPartitionedWrite() throws IOException { - Assume.assumeTrue("Tez write is not implemented yet", executionEngine.equals("mr")); - PartitionSpec spec = PartitionSpec.builderFor(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA) .identity("customer_id") .identity("last_name") diff --git a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveShell.java b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveShell.java index c4f1080e607f..7d3a396f667f 100644 --- a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveShell.java +++ b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveShell.java @@ -22,6 +22,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.stream.Collectors; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.lockmgr.DbTxnManager; @@ -154,6 +155,13 @@ public List executeStatement(String statement) { } } + public String executeExplain(String statement) { + List objects = executeStatement(statement); + return objects.stream() + .map(o -> (String) o[0]) + .collect(Collectors.joining("\n")); + } + public Configuration getHiveConf() { if (session != null) { return session.getHiveConf(); @@ -200,6 +208,9 @@ private HiveConf initializeConf() { // set to true so that the Tez session will create an empty jar for localization hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_IN_TEST_IDE, true); + // enables vectorization on Tez + hiveConf.set("tez.mrreader.config.update.properties", "hive.io.file.readcolumn.names,hive.io.file.readcolumn.ids"); + return hiveConf; } } diff --git a/pom.xml b/pom.xml index b0141e6d7613..44b40d9bc626 100644 --- a/pom.xml +++ b/pom.xml @@ -199,7 +199,7 @@ 1.7.30 4.0.4 2.7.3-SNAPSHOT - 0.10.0 + 0.10.1-SNAPSHOT 2.2.0 2.4.5 2.12 diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java index c1ec1686b68c..fa12c8f771e6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java @@ -1604,8 +1604,9 @@ public Vertex createVertex(JobConf conf, BaseWork workUnit, Path scratchDir, boolean endVertex = tezWork.getLeaves().contains(workUnit); if (endVertex) { OutputCommitterDescriptor ocd = null; - if (HiveConf.getVar(conf, ConfVars.TEZ_MAPREDUCE_OUTPUT_COMMITTER) != null) { - ocd = OutputCommitterDescriptor.create(HiveConf.getVar(conf, ConfVars.TEZ_MAPREDUCE_OUTPUT_COMMITTER)); + String committer = HiveConf.getVar(conf, ConfVars.TEZ_MAPREDUCE_OUTPUT_COMMITTER); + if (committer != null && !committer.isEmpty()) { + ocd = OutputCommitterDescriptor.create(committer); } vertex.addDataSink("out_"+workUnit.getName(), new DataSinkDescriptor( OutputDescriptor.create(outputKlass.getName()) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java index 0b6dffcee45d..bff0beac0e08 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java @@ -30,6 +30,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import javax.annotation.Nullable; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -80,6 +81,7 @@ import org.apache.tez.dag.api.VertexGroup; import org.apache.tez.dag.api.client.DAGClient; import org.apache.tez.dag.api.client.DAGStatus; +import org.apache.tez.dag.api.client.Progress; import org.apache.tez.dag.api.client.StatusGetOpts; import org.apache.tez.dag.api.client.VertexStatus; import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; @@ -98,6 +100,9 @@ @SuppressWarnings({"serial"}) public class TezTask extends Task { + public static final String HIVE_TEZ_COMMIT_JOB_ID = "hive.tez.commit.job.id"; + public static final String HIVE_TEZ_COMMIT_TASK_COUNT = "hive.tez.commit.task.count"; + private static final String CLASS_NAME = TezTask.class.getName(); private static transient Logger LOG = LoggerFactory.getLogger(CLASS_NAME); private final PerfLogger perfLogger = SessionState.getPerfLogger(); @@ -250,9 +255,32 @@ public int execute() { this.setException(new HiveException(monitor.getDiagnostics())); } - // fetch the counters try { Set statusGetOpts = EnumSet.of(StatusGetOpts.GET_COUNTERS); + // save useful commit information into session conf, e.g. for custom commit hooks + List allWork = work.getAllWork(); + boolean hasReducer = allWork.stream().map(workToVertex::get).anyMatch(v -> v.getName().startsWith("Reducer")); + for (BaseWork baseWork : allWork) { + Vertex vertex = workToVertex.get(baseWork); + if (!hasReducer || vertex.getName().startsWith("Reducer")) { + // construct the parsable job id + VertexStatus status = dagClient.getVertexStatus(vertex.getName(), statusGetOpts); + String[] jobIdParts = status.getId().split("_"); + // status.getId() returns something like: vertex_1617722404520_0001_1_00 + // this should be transformed to a parsable JobID: job_16177224045200_0001 + int vertexId = Integer.parseInt(jobIdParts[jobIdParts.length - 1]); + String jobId = String.format("job_%s%d_%s", jobIdParts[1], vertexId, jobIdParts[2]); + // prefix with table name (for multi-table inserts), if available + String tableName = Optional.ofNullable(workToConf.get(baseWork)).map(c -> c.get("name")).orElse(null); + String jobIdKey = HIVE_TEZ_COMMIT_JOB_ID + (tableName == null ? "" : "." + tableName);; + String taskCountKey = HIVE_TEZ_COMMIT_TASK_COUNT + (tableName == null ? "" : "." + tableName); + // save info into session conf + HiveConf sessionConf = SessionState.get().getConf(); + sessionConf.set(jobIdKey, jobId); + sessionConf.setInt(taskCountKey, status.getProgress().getSucceededTaskCount()); + } + } + // fetch the counters TezCounters dagCounters = dagClient.getDAGStatus(statusGetOpts).getDAGCounters(); // if initial counters exists, merge it with dag counters to get aggregated view TezCounters mergedCounters = counters == null ? dagCounters : Utils.mergeTezCounters(dagCounters, counters); From 2f029dc2ba12279b762aea6c69e1e3583503fac5 Mon Sep 17 00:00:00 2001 From: Marton Bod Date: Fri, 9 Apr 2021 16:01:43 +0200 Subject: [PATCH 02/23] Fix review comments --- .../org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java | 2 ++ .../apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java | 6 ------ .../apache/iceberg/mr/hive/HiveIcebergStorageHandler.java | 4 ++-- .../test/java/org/apache/iceberg/mr/hive/TestHiveShell.java | 5 +++++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java | 3 ++- 5 files changed, 11 insertions(+), 9 deletions(-) diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java index e24158ef9825..952b8642dfed 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java @@ -365,6 +365,8 @@ public void commitInsertTable(org.apache.hadoop.hive.metastore.api.Table table, jobConf.set(InputFormatConfig.TABLE_IDENTIFIER, tableName); jobConf.set(InputFormatConfig.TABLE_LOCATION, table.getSd().getLocation()); JobID jobID = JobID.forName(jobConf.get(TezTask.HIVE_TEZ_COMMIT_JOB_ID + "." + tableName)); + int numTasks = conf.getInt(TezTask.HIVE_TEZ_COMMIT_TASK_COUNT + "." + tableName, -1); + jobConf.setNumReduceTasks(numTasks); JobContext jobContext = new JobContextImpl(jobConf, jobID, null); // commit (or abort) diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java index 71c390fa33a2..bf298ad35bed 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java @@ -33,7 +33,6 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.ql.exec.tez.TezTask; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.JobContext; import org.apache.hadoop.mapred.OutputCommitter; @@ -360,11 +359,6 @@ private static Collection dataFiles(ExecutorService executor, String l // If there are reducers, then every reducer will generate a result file. // If this is a map only task, then every mapper will generate a result file. int expectedFiles = conf.getNumReduceTasks() > 0 ? conf.getNumReduceTasks() : conf.getNumMapTasks(); - // For Tez, we can only reliably get the number of tasks from the Tez AM, not the job conf - if ("tez".equals(conf.get(HiveConf.ConfVars.HIVE_EXECUTION_ENGINE.varname))) { - expectedFiles = conf.getInt(TezTask.HIVE_TEZ_COMMIT_TASK_COUNT + - "." + conf.get(InputFormatConfig.TABLE_IDENTIFIER), -1); - } Collection dataFiles = new ConcurrentLinkedQueue<>(); diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java index 06429b8baa30..2f61665c0abf 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java @@ -26,6 +26,7 @@ import java.util.Properties; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.common.StatsSetupConst; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.HiveMetaHook; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler; @@ -123,10 +124,9 @@ public void configureJobConf(TableDesc tableDesc, JobConf jobConf) { "Can not handle table " + tableName + ". Its name contains '" + TABLE_NAME_SEPARATOR + "'"); String tables = jobConf.get(InputFormatConfig.OUTPUT_TABLES); tables = tables == null ? tableName : tables + TABLE_NAME_SEPARATOR + tableName; - jobConf.set("mapred.output.committer.class", HiveIcebergOutputCommitter.class.getName()); // this will turn off job committing on the Tez AM side, so that we can commit the write jobs // on HS2 side instead using the HiveIcebergMetaHook - jobConf.set("hive.tez.mapreduce.output.committer.class", ""); + jobConf.set(HiveConf.ConfVars.TEZ_MAPREDUCE_OUTPUT_COMMITTER.varname, ""); jobConf.set(InputFormatConfig.OUTPUT_TABLES, tables); String catalogName = tableDesc.getProperties().getProperty(InputFormatConfig.CATALOG_NAME); diff --git a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveShell.java b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveShell.java index 7d3a396f667f..711831656c94 100644 --- a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveShell.java +++ b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveShell.java @@ -155,6 +155,11 @@ public List executeStatement(String statement) { } } + /** + * Used for debugging. Please do not remove even if unused in the codebase. + * @param statement EXPLAIN statement + * @return EXPLAIN statement output in a single String which is IDE friendly for viewing + */ public String executeExplain(String statement) { List objects = executeStatement(statement); return objects.stream() diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java index bff0beac0e08..d25696783619 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java @@ -104,6 +104,7 @@ public class TezTask extends Task { public static final String HIVE_TEZ_COMMIT_TASK_COUNT = "hive.tez.commit.task.count"; private static final String CLASS_NAME = TezTask.class.getName(); + private static final String JOB_ID_TEMPLATE = "job_%s%d_%s"; private static transient Logger LOG = LoggerFactory.getLogger(CLASS_NAME); private final PerfLogger perfLogger = SessionState.getPerfLogger(); private static final String TEZ_MEMORY_RESERVE_FRACTION = "tez.task.scale.memory.reserve-fraction"; @@ -269,7 +270,7 @@ public int execute() { // status.getId() returns something like: vertex_1617722404520_0001_1_00 // this should be transformed to a parsable JobID: job_16177224045200_0001 int vertexId = Integer.parseInt(jobIdParts[jobIdParts.length - 1]); - String jobId = String.format("job_%s%d_%s", jobIdParts[1], vertexId, jobIdParts[2]); + String jobId = String.format(JOB_ID_TEMPLATE, jobIdParts[1], vertexId, jobIdParts[2]); // prefix with table name (for multi-table inserts), if available String tableName = Optional.ofNullable(workToConf.get(baseWork)).map(c -> c.get("name")).orElse(null); String jobIdKey = HIVE_TEZ_COMMIT_JOB_ID + (tableName == null ? "" : "." + tableName);; From 7acef0ca7ec57e11d15d538e517bbf4975ffaeb5 Mon Sep 17 00:00:00 2001 From: Marton Bod Date: Fri, 9 Apr 2021 16:16:12 +0200 Subject: [PATCH 03/23] Fix review comments 2 --- .../java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java index d25696783619..f1fa58263c85 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java @@ -81,7 +81,6 @@ import org.apache.tez.dag.api.VertexGroup; import org.apache.tez.dag.api.client.DAGClient; import org.apache.tez.dag.api.client.DAGStatus; -import org.apache.tez.dag.api.client.Progress; import org.apache.tez.dag.api.client.StatusGetOpts; import org.apache.tez.dag.api.client.VertexStatus; import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; @@ -259,11 +258,9 @@ public int execute() { try { Set statusGetOpts = EnumSet.of(StatusGetOpts.GET_COUNTERS); // save useful commit information into session conf, e.g. for custom commit hooks - List allWork = work.getAllWork(); - boolean hasReducer = allWork.stream().map(workToVertex::get).anyMatch(v -> v.getName().startsWith("Reducer")); - for (BaseWork baseWork : allWork) { + for (BaseWork baseWork : work.getAllWork()) { Vertex vertex = workToVertex.get(baseWork); - if (!hasReducer || vertex.getName().startsWith("Reducer")) { + if (!vertex.getDataSinks().isEmpty()) { // construct the parsable job id VertexStatus status = dagClient.getVertexStatus(vertex.getName(), statusGetOpts); String[] jobIdParts = status.getId().split("_"); @@ -273,7 +270,7 @@ public int execute() { String jobId = String.format(JOB_ID_TEMPLATE, jobIdParts[1], vertexId, jobIdParts[2]); // prefix with table name (for multi-table inserts), if available String tableName = Optional.ofNullable(workToConf.get(baseWork)).map(c -> c.get("name")).orElse(null); - String jobIdKey = HIVE_TEZ_COMMIT_JOB_ID + (tableName == null ? "" : "." + tableName);; + String jobIdKey = HIVE_TEZ_COMMIT_JOB_ID + (tableName == null ? "" : "." + tableName); String taskCountKey = HIVE_TEZ_COMMIT_TASK_COUNT + (tableName == null ? "" : "." + tableName); // save info into session conf HiveConf sessionConf = SessionState.get().getConf(); From 8275934b4cc454c8752a754b23a5c9f28f990008 Mon Sep 17 00:00:00 2001 From: Marton Bod Date: Fri, 9 Apr 2021 17:27:17 +0200 Subject: [PATCH 04/23] Use listing temporarily until new Tez release --- ...stHiveIcebergStorageHandlerWithEngine.java | 4 ++ pom.xml | 2 +- .../hadoop/hive/ql/exec/tez/TezTask.java | 61 ++++++++++++------- 3 files changed, 44 insertions(+), 23 deletions(-) diff --git a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java index 8308f2c000cd..a0171f0b489d 100644 --- a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java +++ b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java @@ -51,6 +51,7 @@ import org.junit.Assert; import org.junit.Before; import org.junit.BeforeClass; +import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; @@ -414,6 +415,7 @@ public void testInsertFromSelectWithOrderBy() throws IOException { HiveIcebergTestUtils.validateData(table, records, 0); } + @Ignore("Ignored until new Tez release has come out") @Test public void testInsertFromSelectWithProjection() throws IOException { Table table = testTables.createTable(shell, "customers", HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA, @@ -431,6 +433,7 @@ public void testInsertFromSelectWithProjection() throws IOException { HiveIcebergTestUtils.validateData(table, expected, 0); } + @Ignore("Ignored until new Tez release has come out") @Test public void testInsertUsingSourceTableWithSharedColumnsNames() throws IOException { List records = HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS; @@ -454,6 +457,7 @@ public void testInsertUsingSourceTableWithSharedColumnsNames() throws IOExceptio HiveIcebergTestUtils.validateData(table, expected, 0); } + @Ignore("Ignored until new Tez release has come out") @Test public void testInsertFromJoiningTwoIcebergTables() throws IOException { PartitionSpec spec = PartitionSpec.builderFor(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA) diff --git a/pom.xml b/pom.xml index 44b40d9bc626..b0141e6d7613 100644 --- a/pom.xml +++ b/pom.xml @@ -199,7 +199,7 @@ 1.7.30 4.0.4 2.7.3-SNAPSHOT - 0.10.1-SNAPSHOT + 0.10.0 2.2.0 2.4.5 2.12 diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java index f1fa58263c85..9711a2091cc9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.exec.tez; +import org.apache.hadoop.fs.FileStatus; import org.apache.hive.common.util.Ref; import org.apache.hadoop.hive.ql.exec.tez.UserPoolMapping.MappingInput; import java.io.IOException; @@ -30,7 +31,6 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.Set; import javax.annotation.Nullable; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -256,28 +256,10 @@ public int execute() { } try { - Set statusGetOpts = EnumSet.of(StatusGetOpts.GET_COUNTERS); // save useful commit information into session conf, e.g. for custom commit hooks - for (BaseWork baseWork : work.getAllWork()) { - Vertex vertex = workToVertex.get(baseWork); - if (!vertex.getDataSinks().isEmpty()) { - // construct the parsable job id - VertexStatus status = dagClient.getVertexStatus(vertex.getName(), statusGetOpts); - String[] jobIdParts = status.getId().split("_"); - // status.getId() returns something like: vertex_1617722404520_0001_1_00 - // this should be transformed to a parsable JobID: job_16177224045200_0001 - int vertexId = Integer.parseInt(jobIdParts[jobIdParts.length - 1]); - String jobId = String.format(JOB_ID_TEMPLATE, jobIdParts[1], vertexId, jobIdParts[2]); - // prefix with table name (for multi-table inserts), if available - String tableName = Optional.ofNullable(workToConf.get(baseWork)).map(c -> c.get("name")).orElse(null); - String jobIdKey = HIVE_TEZ_COMMIT_JOB_ID + (tableName == null ? "" : "." + tableName); - String taskCountKey = HIVE_TEZ_COMMIT_TASK_COUNT + (tableName == null ? "" : "." + tableName); - // save info into session conf - HiveConf sessionConf = SessionState.get().getConf(); - sessionConf.set(jobIdKey, jobId); - sessionConf.setInt(taskCountKey, status.getProgress().getSucceededTaskCount()); - } - } + // TODO: this is temporary, Iceberg-specific logic. Refactor when new Tez version has been released + collectCommitInformation(work); + Set statusGetOpts = EnumSet.of(StatusGetOpts.GET_COUNTERS); // fetch the counters TezCounters dagCounters = dagClient.getDAGStatus(statusGetOpts).getDAGCounters(); // if initial counters exists, merge it with dag counters to get aggregated view @@ -364,6 +346,41 @@ public int execute() { return rc; } + private void collectCommitInformation(TezWork work) throws IOException, TezException { + String jobIdPrefix = dagClient.getDagIdentifierString().split("_")[1]; + for (BaseWork w : work.getAllWork()) { + JobConf jobConf = workToConf.get(w); + Vertex vertex = workToVertex.get(w); + // we should only consider jobs where an output committer is defined + if (!vertex.getDataSinks().isEmpty() && jobConf != null && "org.apache.iceberg.mr.hive.HiveIcebergOutputCommitter" + .equals(jobConf.getOutputCommitter().getClass().getName())) { + String tableName = jobConf.get("name"); + String tableLocationRoot = jobConf.get("location"); + if (tableName != null && tableLocationRoot != null) { + Path path = new Path(tableLocationRoot + "/temp"); + LOG.debug("Table temp directory path is: " + path); + // list the directories inside the temp directory + FileStatus[] children = path.getFileSystem(jobConf).listStatus(path); + LOG.debug("Listing the table temp directory yielded these files: " + Arrays.toString(children)); + for (FileStatus child : children) { + // pick only directories that contain the correct jobID prefix + if (child.isDirectory() && child.getPath().getName().contains(jobIdPrefix)) { + // folder name pattern is queryID-jobID, we're removing the queryID part to get the jobID + String jobIdStr = child.getPath().getName().substring(jobConf.get("hive.query.id").length() + 1); + HiveConf sessionConf = SessionState.get().getConf(); + sessionConf.set(HIVE_TEZ_COMMIT_JOB_ID + "." + tableName, jobIdStr); + VertexStatus status = dagClient.getVertexStatus(vertex.getName(), EnumSet.of(StatusGetOpts.GET_COUNTERS)); + sessionConf.setInt(HIVE_TEZ_COMMIT_TASK_COUNT + "." + tableName, + status.getProgress().getSucceededTaskCount()); + } + } + } else { + LOG.warn("Table location or table name not found in config for base work: " + w.getName()); + } + } + } + } + private void updateNumRows() { if (counters != null) { TezCounter counter = counters.findCounter( From d385145456ebd4f5505c0b9aba2658b0fe72b8af Mon Sep 17 00:00:00 2001 From: Marton Bod Date: Sat, 10 Apr 2021 00:12:33 +0200 Subject: [PATCH 05/23] Unset config values after commit --- .../java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java index 952b8642dfed..2fc4cf8a55d0 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java @@ -382,6 +382,9 @@ public void commitInsertTable(org.apache.hadoop.hive.metastore.api.Table table, LOG.error("Error while trying to abort failed job. There might be uncleaned data files.", abortExc); throw new MetaException("Unable to commit and abort job: " + commitExc.getMessage()); } + } finally { + conf.unset(TezTask.HIVE_TEZ_COMMIT_JOB_ID + "." + tableName); + conf.unset(TezTask.HIVE_TEZ_COMMIT_TASK_COUNT + "." + tableName); } } From 8c2a33be63d0ef6942f3c63fa8528e3d885e240a Mon Sep 17 00:00:00 2001 From: Marton Bod Date: Sun, 11 Apr 2021 14:17:14 +0200 Subject: [PATCH 06/23] Abort job in case query execution was unsuccessful --- .../iceberg/mr/hive/HiveIcebergMetaHook.java | 41 ++++++++++++++----- .../hadoop/hive/ql/exec/tez/TezTask.java | 9 ++-- 2 files changed, 35 insertions(+), 15 deletions(-) diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java index 2fc4cf8a55d0..bf0e6d9cdc30 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java @@ -359,8 +359,12 @@ private class PreAlterTableProperties { @Override public void commitInsertTable(org.apache.hadoop.hive.metastore.api.Table table, boolean overwrite) throws MetaException { - // construct the job context + // check status to determine whether we need to commit or to abort JobConf jobConf = new JobConf(conf); + String queryIdKey = jobConf.get("hive.query.id") + ".result"; + boolean success = jobConf.getBoolean(queryIdKey, false); + + // construct the job context String tableName = TableIdentifier.of(table.getDbName(), table.getTableName()).toString(); jobConf.set(InputFormatConfig.TABLE_IDENTIFIER, tableName); jobConf.set(InputFormatConfig.TABLE_LOCATION, table.getSd().getLocation()); @@ -369,25 +373,40 @@ public void commitInsertTable(org.apache.hadoop.hive.metastore.api.Table table, jobConf.setNumReduceTasks(numTasks); JobContext jobContext = new JobContextImpl(jobConf, jobID, null); - // commit (or abort) OutputCommitter committer = new HiveIcebergOutputCommitter(); - try { - committer.commitJob(jobContext); - } catch (Exception commitExc) { - LOG.error("Error while trying to commit job. Will abort it now.", commitExc); + if (success) { + try { + committer.commitJob(jobContext); + } catch (Exception commitExc) { + LOG.error("Error while trying to commit job. Will abort it now.", commitExc); + try { + committer.abortJob(jobContext, JobStatus.State.FAILED); + throw new MetaException("Unable to commit job: " + commitExc.getMessage()); + } catch (IOException abortExc) { + LOG.error("Error while trying to abort failed job. There might be uncleaned data files.", abortExc); + throw new MetaException("Unable to commit and abort job: " + commitExc.getMessage()); + } + } finally { + cleanCommitConfig(queryIdKey, tableName); + } + } else { try { committer.abortJob(jobContext, JobStatus.State.FAILED); - throw new MetaException("Unable to commit job: " + commitExc.getMessage()); } catch (IOException abortExc) { LOG.error("Error while trying to abort failed job. There might be uncleaned data files.", abortExc); - throw new MetaException("Unable to commit and abort job: " + commitExc.getMessage()); + throw new MetaException("Unable to abort job: " + abortExc.getMessage()); + } finally { + cleanCommitConfig(queryIdKey, tableName); } - } finally { - conf.unset(TezTask.HIVE_TEZ_COMMIT_JOB_ID + "." + tableName); - conf.unset(TezTask.HIVE_TEZ_COMMIT_TASK_COUNT + "." + tableName); } } + private void cleanCommitConfig(String queryIdKey, String tableName) { + conf.unset(TezTask.HIVE_TEZ_COMMIT_JOB_ID + "." + tableName); + conf.unset(TezTask.HIVE_TEZ_COMMIT_TASK_COUNT + "." + tableName); + conf.unset(queryIdKey); + } + @Override public void preInsertTable(org.apache.hadoop.hive.metastore.api.Table table, boolean overwrite) throws MetaException { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java index 9711a2091cc9..f2838337ba3b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java @@ -258,7 +258,7 @@ public int execute() { try { // save useful commit information into session conf, e.g. for custom commit hooks // TODO: this is temporary, Iceberg-specific logic. Refactor when new Tez version has been released - collectCommitInformation(work); + collectCommitInformation(work, rc == 0); Set statusGetOpts = EnumSet.of(StatusGetOpts.GET_COUNTERS); // fetch the counters TezCounters dagCounters = dagClient.getDAGStatus(statusGetOpts).getDAGCounters(); @@ -346,11 +346,13 @@ public int execute() { return rc; } - private void collectCommitInformation(TezWork work) throws IOException, TezException { - String jobIdPrefix = dagClient.getDagIdentifierString().split("_")[1]; + private void collectCommitInformation(TezWork work, boolean success) throws IOException, TezException { + HiveConf sessionConf = SessionState.get().getConf(); for (BaseWork w : work.getAllWork()) { JobConf jobConf = workToConf.get(w); Vertex vertex = workToVertex.get(w); + sessionConf.setBoolean(jobConf.get("hive.query.id") + ".result", success); + String jobIdPrefix = dagClient.getDagIdentifierString().split("_")[1]; // we should only consider jobs where an output committer is defined if (!vertex.getDataSinks().isEmpty() && jobConf != null && "org.apache.iceberg.mr.hive.HiveIcebergOutputCommitter" .equals(jobConf.getOutputCommitter().getClass().getName())) { @@ -367,7 +369,6 @@ private void collectCommitInformation(TezWork work) throws IOException, TezExcep if (child.isDirectory() && child.getPath().getName().contains(jobIdPrefix)) { // folder name pattern is queryID-jobID, we're removing the queryID part to get the jobID String jobIdStr = child.getPath().getName().substring(jobConf.get("hive.query.id").length() + 1); - HiveConf sessionConf = SessionState.get().getConf(); sessionConf.set(HIVE_TEZ_COMMIT_JOB_ID + "." + tableName, jobIdStr); VertexStatus status = dagClient.getVertexStatus(vertex.getName(), EnumSet.of(StatusGetOpts.GET_COUNTERS)); sessionConf.setInt(HIVE_TEZ_COMMIT_TASK_COUNT + "." + tableName, From 67e69af18b798b38c0a2218220bb9474380df3b3 Mon Sep 17 00:00:00 2001 From: Marton Bod Date: Sun, 11 Apr 2021 14:29:49 +0200 Subject: [PATCH 07/23] Set query success state only for iceberg --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java index f2838337ba3b..3ab0d2347680 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java @@ -351,7 +351,6 @@ private void collectCommitInformation(TezWork work, boolean success) throws IOEx for (BaseWork w : work.getAllWork()) { JobConf jobConf = workToConf.get(w); Vertex vertex = workToVertex.get(w); - sessionConf.setBoolean(jobConf.get("hive.query.id") + ".result", success); String jobIdPrefix = dagClient.getDagIdentifierString().split("_")[1]; // we should only consider jobs where an output committer is defined if (!vertex.getDataSinks().isEmpty() && jobConf != null && "org.apache.iceberg.mr.hive.HiveIcebergOutputCommitter" @@ -373,6 +372,7 @@ private void collectCommitInformation(TezWork work, boolean success) throws IOEx VertexStatus status = dagClient.getVertexStatus(vertex.getName(), EnumSet.of(StatusGetOpts.GET_COUNTERS)); sessionConf.setInt(HIVE_TEZ_COMMIT_TASK_COUNT + "." + tableName, status.getProgress().getSucceededTaskCount()); + sessionConf.setBoolean(jobConf.get("hive.query.id") + ".result", success); } } } else { From c66922c45c2103606469bc1398fa3846041b0d7f Mon Sep 17 00:00:00 2001 From: Marton Bod Date: Tue, 13 Apr 2021 11:14:05 +0200 Subject: [PATCH 08/23] Move commit info collection out of counter try-catch --- .../java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java index 3ab0d2347680..3c17d79a2699 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java @@ -256,11 +256,8 @@ public int execute() { } try { - // save useful commit information into session conf, e.g. for custom commit hooks - // TODO: this is temporary, Iceberg-specific logic. Refactor when new Tez version has been released - collectCommitInformation(work, rc == 0); - Set statusGetOpts = EnumSet.of(StatusGetOpts.GET_COUNTERS); // fetch the counters + Set statusGetOpts = EnumSet.of(StatusGetOpts.GET_COUNTERS); TezCounters dagCounters = dagClient.getDAGStatus(statusGetOpts).getDAGCounters(); // if initial counters exists, merge it with dag counters to get aggregated view TezCounters mergedCounters = counters == null ? dagCounters : Utils.mergeTezCounters(dagCounters, counters); @@ -270,6 +267,10 @@ public int execute() { LOG.warn("Failed to get counters. Ignoring, summary info will be incomplete.", err); counters = null; } + + // save useful commit information into session conf, e.g. for custom commit hooks + // TODO: this is temporary, Iceberg-specific logic. Refactor when new Tez version has been released + collectCommitInformation(work, rc == 0); } finally { // Note: due to TEZ-3846, the session may actually be invalid in case of some errors. // Currently, reopen on an attempted reuse will take care of that; we cannot tell From b21ce9fc3bdf671a2e726aa55efa283923d09cf8 Mon Sep 17 00:00:00 2001 From: Marton Bod Date: Fri, 16 Apr 2021 21:20:40 +0200 Subject: [PATCH 09/23] fix assumes --- .../mr/hive/TestHiveIcebergStorageHandlerWithEngine.java | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java index a0171f0b489d..a3b4d4923896 100644 --- a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java +++ b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java @@ -49,6 +49,7 @@ import org.junit.After; import org.junit.AfterClass; import org.junit.Assert; +import org.junit.Assume; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Ignore; @@ -644,8 +645,6 @@ public void testMultilevelIdentityPartitionedWrite() throws IOException { @Test public void testMultiTableInsert() throws IOException { - Assume.assumeTrue("Tez write is not implemented yet", executionEngine.equals("mr")); - testTables.createTable(shell, "customers", HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA, fileFormat, HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS); @@ -685,11 +684,8 @@ public void testMultiTableInsert() throws IOException { @Test public void testWriteWithDefaultWriteFormat() { - Assume.assumeTrue("Tez write is not implemented yet", executionEngine.equals("mr")); - Assume.assumeTrue("Testing the default file format is enough for a single scenario.", - executionEngine.equals("tez") && testTableType == TestTables.TestTableType.HIVE_CATALOG && - fileFormat == FileFormat.ORC); + testTableType == TestTables.TestTableType.HIVE_CATALOG && fileFormat == FileFormat.ORC); TableIdentifier identifier = TableIdentifier.of("default", "customers"); From 7654cf7f88d0f88e7183f5f67c98e29df1ee4d0e Mon Sep 17 00:00:00 2001 From: Marton Bod Date: Sat, 17 Apr 2021 22:27:51 +0200 Subject: [PATCH 10/23] Copy iceberg props to session conf --- .../apache/iceberg/mr/hive/HiveIcebergStorageHandler.java | 2 +- .../java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java | 7 ++++++- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java index 2f61665c0abf..40b4f4f41754 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java @@ -124,10 +124,10 @@ public void configureJobConf(TableDesc tableDesc, JobConf jobConf) { "Can not handle table " + tableName + ". Its name contains '" + TABLE_NAME_SEPARATOR + "'"); String tables = jobConf.get(InputFormatConfig.OUTPUT_TABLES); tables = tables == null ? tableName : tables + TABLE_NAME_SEPARATOR + tableName; + jobConf.set(InputFormatConfig.OUTPUT_TABLES, tables); // this will turn off job committing on the Tez AM side, so that we can commit the write jobs // on HS2 side instead using the HiveIcebergMetaHook jobConf.set(HiveConf.ConfVars.TEZ_MAPREDUCE_OUTPUT_COMMITTER.varname, ""); - jobConf.set(InputFormatConfig.OUTPUT_TABLES, tables); String catalogName = tableDesc.getProperties().getProperty(InputFormatConfig.CATALOG_NAME); if (catalogName != null) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java index 3c17d79a2699..e23665b633e8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java @@ -373,9 +373,14 @@ private void collectCommitInformation(TezWork work, boolean success) throws IOEx VertexStatus status = dagClient.getVertexStatus(vertex.getName(), EnumSet.of(StatusGetOpts.GET_COUNTERS)); sessionConf.setInt(HIVE_TEZ_COMMIT_TASK_COUNT + "." + tableName, status.getProgress().getSucceededTaskCount()); - sessionConf.setBoolean(jobConf.get("hive.query.id") + ".result", success); } } + sessionConf.setBoolean(jobConf.get("hive.query.id") + ".result", success); + jobConf.forEach(e -> { + if (e.getKey().startsWith("iceberg.mr.")) { + sessionConf.set(e.getKey(), e.getValue()); + } + }); } else { LOG.warn("Table location or table name not found in config for base work: " + w.getName()); } From a495f2124cf651f481997bb191225be26114d8e2 Mon Sep 17 00:00:00 2001 From: Marton Bod Date: Mon, 19 Apr 2021 15:04:33 +0200 Subject: [PATCH 11/23] Fix multi-table committing --- .../iceberg/mr/hive/HiveIcebergMetaHook.java | 14 +++++++++---- .../mr/hive/HiveIcebergOutputCommitter.java | 19 +++++++++++------- ...stHiveIcebergStorageHandlerWithEngine.java | 15 ++++++++++++++ .../hadoop/hive/ql/exec/tez/TezTask.java | 20 +++++++++++++++---- 4 files changed, 53 insertions(+), 15 deletions(-) diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java index bf0e6d9cdc30..1e5a7cfc4bcb 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java @@ -359,20 +359,23 @@ private class PreAlterTableProperties { @Override public void commitInsertTable(org.apache.hadoop.hive.metastore.api.Table table, boolean overwrite) throws MetaException { + String tableName = TableIdentifier.of(table.getDbName(), table.getTableName()).toString(); + // check status to determine whether we need to commit or to abort JobConf jobConf = new JobConf(conf); - String queryIdKey = jobConf.get("hive.query.id") + ".result"; + String queryIdKey = jobConf.get("hive.query.id") + "." + tableName + ".result"; boolean success = jobConf.getBoolean(queryIdKey, false); // construct the job context - String tableName = TableIdentifier.of(table.getDbName(), table.getTableName()).toString(); - jobConf.set(InputFormatConfig.TABLE_IDENTIFIER, tableName); - jobConf.set(InputFormatConfig.TABLE_LOCATION, table.getSd().getLocation()); JobID jobID = JobID.forName(jobConf.get(TezTask.HIVE_TEZ_COMMIT_JOB_ID + "." + tableName)); int numTasks = conf.getInt(TezTask.HIVE_TEZ_COMMIT_TASK_COUNT + "." + tableName, -1); jobConf.setNumReduceTasks(numTasks); JobContext jobContext = new JobContextImpl(jobConf, jobID, null); + // we should only commit this current table because + // for multi-table inserts, this hook method will be called sequentially for each target table + jobConf.set(InputFormatConfig.OUTPUT_TABLES, tableName); + OutputCommitter committer = new HiveIcebergOutputCommitter(); if (success) { try { @@ -387,6 +390,7 @@ public void commitInsertTable(org.apache.hadoop.hive.metastore.api.Table table, throw new MetaException("Unable to commit and abort job: " + commitExc.getMessage()); } } finally { + // avoid config pollution with prefixed/suffixed keys cleanCommitConfig(queryIdKey, tableName); } } else { @@ -396,6 +400,7 @@ public void commitInsertTable(org.apache.hadoop.hive.metastore.api.Table table, LOG.error("Error while trying to abort failed job. There might be uncleaned data files.", abortExc); throw new MetaException("Unable to abort job: " + abortExc.getMessage()); } finally { + // avoid config pollution with prefixed/suffixed keys cleanCommitConfig(queryIdKey, tableName); } } @@ -404,6 +409,7 @@ public void commitInsertTable(org.apache.hadoop.hive.metastore.api.Table table, private void cleanCommitConfig(String queryIdKey, String tableName) { conf.unset(TezTask.HIVE_TEZ_COMMIT_JOB_ID + "." + tableName); conf.unset(TezTask.HIVE_TEZ_COMMIT_TASK_COUNT + "." + tableName); + conf.unset(InputFormatConfig.SERIALIZED_TABLE_PREFIX + tableName); conf.unset(queryIdKey); } diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java index bf298ad35bed..3b0e9544373f 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java @@ -105,13 +105,18 @@ public void commitTask(TaskAttemptContext originalContext) throws IOException { .executeWith(tableExecutor) .run(output -> { Table table = HiveIcebergStorageHandler.table(context.getJobConf(), output); - HiveIcebergRecordWriter writer = writers.get(output); - DataFile[] closedFiles = writer != null ? writer.dataFiles() : new DataFile[0]; - String fileForCommitLocation = generateFileForCommitLocation(table.location(), jobConf, - attemptID.getJobID(), attemptID.getTaskID().getId()); - - // Creating the file containing the data files generated by this task for this table - createFileForCommit(closedFiles, fileForCommitLocation, table.io()); + if (table != null) { + HiveIcebergRecordWriter writer = writers.get(output); + DataFile[] closedFiles = writer != null ? writer.dataFiles() : new DataFile[0]; + String fileForCommitLocation = generateFileForCommitLocation(table.location(), jobConf, + attemptID.getJobID(), attemptID.getTaskID().getId()); + + // Creating the file containing the data files generated by this task for this table + createFileForCommit(closedFiles, fileForCommitLocation, table.io()); + } else { + // most likely multi-table insert scenario + LOG.info("CommitTask found no serialized table in config for table: {}.", output); + } }, IOException.class); } finally { if (tableExecutor != null) { diff --git a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java index a3b4d4923896..e335714c720b 100644 --- a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java +++ b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java @@ -38,6 +38,7 @@ import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.data.GenericRecord; import org.apache.iceberg.data.Record; +import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.hive.HiveSchemaUtil; import org.apache.iceberg.mr.TestHelper; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; @@ -673,6 +674,7 @@ public void testMultiTableInsert() throws IOException { Table target1 = testTables.createTable(shell, "target1", target1Schema, fileFormat, ImmutableList.of()); Table target2 = testTables.createTable(shell, "target2", target2Schema, fileFormat, ImmutableList.of()); + // simple insert: should create a single vertex writing to both target tables shell.executeStatement("FROM customers " + "INSERT INTO target1 SELECT customer_id, first_name " + "INSERT INTO target2 SELECT last_name, customer_id"); @@ -680,6 +682,19 @@ public void testMultiTableInsert() throws IOException { // Check that everything is as expected HiveIcebergTestUtils.validateData(target1, target1Records, 0); HiveIcebergTestUtils.validateData(target2, target2Records, 1); + + // truncate the target tables + target1.newDelete().deleteFromRowFilter(Expressions.alwaysTrue()).commit(); + target2.newDelete().deleteFromRowFilter(Expressions.alwaysTrue()).commit(); + + // complex insert: should use a different vertex for each target table + shell.executeStatement("FROM customers " + + "INSERT INTO target1 SELECT customer_id, first_name ORDER BY first_name " + + "INSERT INTO target2 SELECT last_name, customer_id ORDER BY last_name"); + + // Check that everything is as expected + HiveIcebergTestUtils.validateData(target1, target1Records, 0); + HiveIcebergTestUtils.validateData(target2, target2Records, 1); } @Test diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java index e23665b633e8..092e20a0ab8c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java @@ -22,6 +22,7 @@ import org.apache.hive.common.util.Ref; import org.apache.hadoop.hive.ql.exec.tez.UserPoolMapping.MappingInput; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; @@ -369,13 +370,24 @@ private void collectCommitInformation(TezWork work, boolean success) throws IOEx if (child.isDirectory() && child.getPath().getName().contains(jobIdPrefix)) { // folder name pattern is queryID-jobID, we're removing the queryID part to get the jobID String jobIdStr = child.getPath().getName().substring(jobConf.get("hive.query.id").length() + 1); - sessionConf.set(HIVE_TEZ_COMMIT_JOB_ID + "." + tableName, jobIdStr); VertexStatus status = dagClient.getVertexStatus(vertex.getName(), EnumSet.of(StatusGetOpts.GET_COUNTERS)); - sessionConf.setInt(HIVE_TEZ_COMMIT_TASK_COUNT + "." + tableName, - status.getProgress().getSucceededTaskCount()); + // get all target tables this vertex wrote to + List tables = new ArrayList<>(); + for (Map.Entry entry : jobConf) { + if (entry.getKey().startsWith("iceberg.mr.serialized.table.")) { + tables.add(entry.getKey().substring("iceberg.mr.serialized.table.".length())); + } + } + // save information for each target table (jobID, task num, query state) + for (String table : tables) { + sessionConf.set(HIVE_TEZ_COMMIT_JOB_ID + "." + table, jobIdStr); + sessionConf.setInt(HIVE_TEZ_COMMIT_TASK_COUNT + "." + table, + status.getProgress().getSucceededTaskCount()); + sessionConf.setBoolean(jobConf.get("hive.query.id") + "." + table + ".result", success); + } } } - sessionConf.setBoolean(jobConf.get("hive.query.id") + ".result", success); + // save iceberg mr props as they can be needed during job commit (e.g. serialized table) jobConf.forEach(e -> { if (e.getKey().startsWith("iceberg.mr.")) { sessionConf.set(e.getKey(), e.getValue()); From f463e267f3caa3a00b0d233c0a3d5529acf0a66f Mon Sep 17 00:00:00 2001 From: Marton Bod Date: Mon, 19 Apr 2021 15:31:42 +0200 Subject: [PATCH 12/23] Simplify commit logic in metahook --- .../iceberg/mr/hive/HiveIcebergMetaHook.java | 41 ++++++++++--------- 1 file changed, 21 insertions(+), 20 deletions(-) diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java index 1e5a7cfc4bcb..cb939034d6ce 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java @@ -377,31 +377,32 @@ public void commitInsertTable(org.apache.hadoop.hive.metastore.api.Table table, jobConf.set(InputFormatConfig.OUTPUT_TABLES, tableName); OutputCommitter committer = new HiveIcebergOutputCommitter(); - if (success) { - try { - committer.commitJob(jobContext); - } catch (Exception commitExc) { - LOG.error("Error while trying to commit job. Will abort it now.", commitExc); + try { + if (success) { try { - committer.abortJob(jobContext, JobStatus.State.FAILED); + committer.commitJob(jobContext); + } catch (Exception commitExc) { + LOG.error("Error while trying to commit job (table: {}, jobID: {}). Will abort it now.", + tableName, jobID, commitExc); + abortJob(jobContext, committer, true); throw new MetaException("Unable to commit job: " + commitExc.getMessage()); - } catch (IOException abortExc) { - LOG.error("Error while trying to abort failed job. There might be uncleaned data files.", abortExc); - throw new MetaException("Unable to commit and abort job: " + commitExc.getMessage()); } - } finally { - // avoid config pollution with prefixed/suffixed keys - cleanCommitConfig(queryIdKey, tableName); + } else { + abortJob(jobContext, committer, false); } - } else { - try { - committer.abortJob(jobContext, JobStatus.State.FAILED); - } catch (IOException abortExc) { - LOG.error("Error while trying to abort failed job. There might be uncleaned data files.", abortExc); + } finally { + // avoid config pollution with prefixed/suffixed keys + cleanCommitConfig(queryIdKey, tableName); + } + } + + private void abortJob(JobContext jobContext, OutputCommitter committer, boolean suppressExc) throws MetaException { + try { + committer.abortJob(jobContext, JobStatus.State.FAILED); + } catch (IOException abortExc) { + LOG.error("Error while trying to abort failed job. There might be uncleaned data files.", abortExc); + if (!suppressExc) { throw new MetaException("Unable to abort job: " + abortExc.getMessage()); - } finally { - // avoid config pollution with prefixed/suffixed keys - cleanCommitConfig(queryIdKey, tableName); } } } From 75c9133de2de207559a408a0dbfc6af3566e3401 Mon Sep 17 00:00:00 2001 From: Marton Bod Date: Mon, 19 Apr 2021 15:50:50 +0200 Subject: [PATCH 13/23] Query vertex status only once per vertex --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java index 092e20a0ab8c..b920e8583384 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java @@ -360,6 +360,7 @@ private void collectCommitInformation(TezWork work, boolean success) throws IOEx String tableName = jobConf.get("name"); String tableLocationRoot = jobConf.get("location"); if (tableName != null && tableLocationRoot != null) { + VertexStatus status = dagClient.getVertexStatus(vertex.getName(), EnumSet.of(StatusGetOpts.GET_COUNTERS)); Path path = new Path(tableLocationRoot + "/temp"); LOG.debug("Table temp directory path is: " + path); // list the directories inside the temp directory @@ -370,7 +371,6 @@ private void collectCommitInformation(TezWork work, boolean success) throws IOEx if (child.isDirectory() && child.getPath().getName().contains(jobIdPrefix)) { // folder name pattern is queryID-jobID, we're removing the queryID part to get the jobID String jobIdStr = child.getPath().getName().substring(jobConf.get("hive.query.id").length() + 1); - VertexStatus status = dagClient.getVertexStatus(vertex.getName(), EnumSet.of(StatusGetOpts.GET_COUNTERS)); // get all target tables this vertex wrote to List tables = new ArrayList<>(); for (Map.Entry entry : jobConf) { From 2848ebc1e379b0630093fd7ada725c83c5c5ceca Mon Sep 17 00:00:00 2001 From: Marton Bod Date: Tue, 20 Apr 2021 16:51:20 +0200 Subject: [PATCH 14/23] Clean up prefix configs --- .../iceberg/mr/hive/HiveIcebergMetaHook.java | 17 ++++++++--------- .../apache/hadoop/hive/ql/exec/tez/TezTask.java | 11 ++++++----- 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java index cb939034d6ce..78bb6426a3ac 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java @@ -363,12 +363,11 @@ public void commitInsertTable(org.apache.hadoop.hive.metastore.api.Table table, // check status to determine whether we need to commit or to abort JobConf jobConf = new JobConf(conf); - String queryIdKey = jobConf.get("hive.query.id") + "." + tableName + ".result"; - boolean success = jobConf.getBoolean(queryIdKey, false); + boolean success = jobConf.getBoolean(TezTask.HIVE_TEZ_COMMIT_JOB_RESULT_PREFIX + tableName, false); // construct the job context - JobID jobID = JobID.forName(jobConf.get(TezTask.HIVE_TEZ_COMMIT_JOB_ID + "." + tableName)); - int numTasks = conf.getInt(TezTask.HIVE_TEZ_COMMIT_TASK_COUNT + "." + tableName, -1); + JobID jobID = JobID.forName(jobConf.get(TezTask.HIVE_TEZ_COMMIT_JOB_ID_PREFIX + tableName)); + int numTasks = conf.getInt(TezTask.HIVE_TEZ_COMMIT_TASK_COUNT_PREFIX + tableName, -1); jobConf.setNumReduceTasks(numTasks); JobContext jobContext = new JobContextImpl(jobConf, jobID, null); @@ -392,7 +391,7 @@ public void commitInsertTable(org.apache.hadoop.hive.metastore.api.Table table, } } finally { // avoid config pollution with prefixed/suffixed keys - cleanCommitConfig(queryIdKey, tableName); + cleanCommitConfig(tableName); } } @@ -407,11 +406,11 @@ private void abortJob(JobContext jobContext, OutputCommitter committer, boolean } } - private void cleanCommitConfig(String queryIdKey, String tableName) { - conf.unset(TezTask.HIVE_TEZ_COMMIT_JOB_ID + "." + tableName); - conf.unset(TezTask.HIVE_TEZ_COMMIT_TASK_COUNT + "." + tableName); + private void cleanCommitConfig(String tableName) { + conf.unset(TezTask.HIVE_TEZ_COMMIT_JOB_ID_PREFIX + tableName); + conf.unset(TezTask.HIVE_TEZ_COMMIT_TASK_COUNT_PREFIX + tableName); + conf.unset(TezTask.HIVE_TEZ_COMMIT_JOB_RESULT_PREFIX + tableName); conf.unset(InputFormatConfig.SERIALIZED_TABLE_PREFIX + tableName); - conf.unset(queryIdKey); } @Override diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java index b920e8583384..fd523fae6148 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java @@ -100,8 +100,9 @@ @SuppressWarnings({"serial"}) public class TezTask extends Task { - public static final String HIVE_TEZ_COMMIT_JOB_ID = "hive.tez.commit.job.id"; - public static final String HIVE_TEZ_COMMIT_TASK_COUNT = "hive.tez.commit.task.count"; + public static final String HIVE_TEZ_COMMIT_JOB_ID_PREFIX = "hive.tez.commit.job.id."; + public static final String HIVE_TEZ_COMMIT_JOB_RESULT_PREFIX = "hive.tez.commit.job.result."; + public static final String HIVE_TEZ_COMMIT_TASK_COUNT_PREFIX = "hive.tez.commit.task.count."; private static final String CLASS_NAME = TezTask.class.getName(); private static final String JOB_ID_TEMPLATE = "job_%s%d_%s"; @@ -380,10 +381,10 @@ private void collectCommitInformation(TezWork work, boolean success) throws IOEx } // save information for each target table (jobID, task num, query state) for (String table : tables) { - sessionConf.set(HIVE_TEZ_COMMIT_JOB_ID + "." + table, jobIdStr); - sessionConf.setInt(HIVE_TEZ_COMMIT_TASK_COUNT + "." + table, + sessionConf.set(HIVE_TEZ_COMMIT_JOB_ID_PREFIX + table, jobIdStr); + sessionConf.setInt(HIVE_TEZ_COMMIT_TASK_COUNT_PREFIX + table, status.getProgress().getSucceededTaskCount()); - sessionConf.setBoolean(jobConf.get("hive.query.id") + "." + table + ".result", success); + sessionConf.setBoolean(HIVE_TEZ_COMMIT_JOB_RESULT_PREFIX + table, success); } } } From 52f5e29b87b08aed5ef2787983ec9802fca93245 Mon Sep 17 00:00:00 2001 From: Marton Bod Date: Wed, 21 Apr 2021 11:54:23 +0200 Subject: [PATCH 15/23] Move abortJob to rollbackInsert --- .../iceberg/mr/hive/HiveIcebergMetaHook.java | 90 +++++++++---------- .../mr/hive/HiveIcebergOutputCommitter.java | 3 +- ...stHiveIcebergStorageHandlerWithEngine.java | 8 +- .../apache/iceberg/mr/hive/TestTables.java | 9 ++ 4 files changed, 59 insertions(+), 51 deletions(-) diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java index 78bb6426a3ac..9509b5b4bda0 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java @@ -33,9 +33,10 @@ import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy; import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; -import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.metastore.DefaultHiveMetaHook; +import org.apache.hadoop.hive.metastore.utils.StringUtils; import org.apache.hadoop.hive.ql.exec.tez.TezTask; +import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.JobContext; import org.apache.hadoop.mapred.JobContextImpl; @@ -356,53 +357,49 @@ private class PreAlterTableProperties { private List partitionKeys; } + @Override + public void preInsertTable(org.apache.hadoop.hive.metastore.api.Table table, boolean overwrite) + throws MetaException { + // do nothing + } + @Override public void commitInsertTable(org.apache.hadoop.hive.metastore.api.Table table, boolean overwrite) throws MetaException { String tableName = TableIdentifier.of(table.getDbName(), table.getTableName()).toString(); - - // check status to determine whether we need to commit or to abort - JobConf jobConf = new JobConf(conf); - boolean success = jobConf.getBoolean(TezTask.HIVE_TEZ_COMMIT_JOB_RESULT_PREFIX + tableName, false); - - // construct the job context - JobID jobID = JobID.forName(jobConf.get(TezTask.HIVE_TEZ_COMMIT_JOB_ID_PREFIX + tableName)); - int numTasks = conf.getInt(TezTask.HIVE_TEZ_COMMIT_TASK_COUNT_PREFIX + tableName, -1); - jobConf.setNumReduceTasks(numTasks); - JobContext jobContext = new JobContextImpl(jobConf, jobID, null); - - // we should only commit this current table because - // for multi-table inserts, this hook method will be called sequentially for each target table - jobConf.set(InputFormatConfig.OUTPUT_TABLES, tableName); - - OutputCommitter committer = new HiveIcebergOutputCommitter(); + JobContext jobContext = getJobContextForCommitOrAbort(tableName); + boolean failure = false; try { - if (success) { - try { - committer.commitJob(jobContext); - } catch (Exception commitExc) { - LOG.error("Error while trying to commit job (table: {}, jobID: {}). Will abort it now.", - tableName, jobID, commitExc); - abortJob(jobContext, committer, true); - throw new MetaException("Unable to commit job: " + commitExc.getMessage()); - } - } else { - abortJob(jobContext, committer, false); - } + OutputCommitter committer = new HiveIcebergOutputCommitter(); + committer.commitJob(jobContext); + } catch (Exception e) { + failure = true; + LOG.error("Error while trying to commit job", e); + throw new MetaException(StringUtils.stringifyException(e)); } finally { - // avoid config pollution with prefixed/suffixed keys - cleanCommitConfig(tableName); + // if there's a failure, the configs will still be needed in rollbackInsertTable + if (!failure) { + // avoid config pollution with prefixed/suffixed keys + cleanCommitConfig(tableName); + } } } - private void abortJob(JobContext jobContext, OutputCommitter committer, boolean suppressExc) throws MetaException { + @Override + public void rollbackInsertTable(org.apache.hadoop.hive.metastore.api.Table table, boolean overwrite) + throws MetaException { + String tableName = TableIdentifier.of(table.getDbName(), table.getTableName()).toString(); + JobContext jobContext = getJobContextForCommitOrAbort(tableName); + OutputCommitter committer = new HiveIcebergOutputCommitter(); try { + LOG.info("rollbackInsertTable: Aborting job for jobID: {} and table: {}", jobContext.getJobID(), tableName); committer.abortJob(jobContext, JobStatus.State.FAILED); - } catch (IOException abortExc) { - LOG.error("Error while trying to abort failed job. There might be uncleaned data files.", abortExc); - if (!suppressExc) { - throw new MetaException("Unable to abort job: " + abortExc.getMessage()); - } + } catch (IOException e) { + LOG.error("Error while trying to abort failed job. There might be uncleaned data files.", e); + // no throwing here because the original commitInsertTable exception should be propagated + } finally { + // avoid config pollution with prefixed/suffixed keys + cleanCommitConfig(tableName); } } @@ -413,15 +410,16 @@ private void cleanCommitConfig(String tableName) { conf.unset(InputFormatConfig.SERIALIZED_TABLE_PREFIX + tableName); } - @Override - public void preInsertTable(org.apache.hadoop.hive.metastore.api.Table table, boolean overwrite) - throws MetaException { - // do nothing - } + private JobContext getJobContextForCommitOrAbort(String tableName) { + JobConf jobConf = new JobConf(conf); + JobID jobID = JobID.forName(jobConf.get(TezTask.HIVE_TEZ_COMMIT_JOB_ID_PREFIX + tableName)); + int numTasks = conf.getInt(TezTask.HIVE_TEZ_COMMIT_TASK_COUNT_PREFIX + tableName, -1); + jobConf.setNumReduceTasks(numTasks); - @Override - public void rollbackInsertTable(org.apache.hadoop.hive.metastore.api.Table table, boolean overwrite) - throws MetaException { - // do nothing + // we should only commit this current table because + // for multi-table inserts, this hook method will be called sequentially for each target table + jobConf.set(InputFormatConfig.OUTPUT_TABLES, tableName); + + return new JobContextImpl(jobConf, jobID, null); } } diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java index 3b0e9544373f..ad279ea73197 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java @@ -114,7 +114,8 @@ public void commitTask(TaskAttemptContext originalContext) throws IOException { // Creating the file containing the data files generated by this task for this table createFileForCommit(closedFiles, fileForCommitLocation, table.io()); } else { - // most likely multi-table insert scenario + // When using Tez multi-table inserts, we could have more output tables in config than + // the actual tables this task has written to and has serialized in its config LOG.info("CommitTask found no serialized table in config for table: {}.", output); } }, IOException.class); diff --git a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java index e335714c720b..3d32fb8b6342 100644 --- a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java +++ b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java @@ -676,16 +676,16 @@ public void testMultiTableInsert() throws IOException { // simple insert: should create a single vertex writing to both target tables shell.executeStatement("FROM customers " + - "INSERT INTO target1 SELECT customer_id, first_name " + - "INSERT INTO target2 SELECT last_name, customer_id"); + "INSERT INTO target1 SELECT customer_id, first_name " + + "INSERT INTO target2 SELECT last_name, customer_id"); // Check that everything is as expected HiveIcebergTestUtils.validateData(target1, target1Records, 0); HiveIcebergTestUtils.validateData(target2, target2Records, 1); // truncate the target tables - target1.newDelete().deleteFromRowFilter(Expressions.alwaysTrue()).commit(); - target2.newDelete().deleteFromRowFilter(Expressions.alwaysTrue()).commit(); + testTables.truncateIcebergTable(target1); + testTables.truncateIcebergTable(target2); // complex insert: should use a different vertex for each target table shell.executeStatement("FROM customers " + diff --git a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestTables.java b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestTables.java index 4d3a60e862a0..c28236051f78 100644 --- a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestTables.java +++ b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestTables.java @@ -45,6 +45,7 @@ import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.data.Record; +import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.hadoop.HadoopCatalog; import org.apache.iceberg.hadoop.HadoopTables; import org.apache.iceberg.hive.HiveCatalog; @@ -275,6 +276,14 @@ public void appendIcebergTable(Configuration configuration, Table table, FileFor } } + /** + * Truncates an Iceberg table. + * @param table The iceberg table to truncate + */ + public void truncateIcebergTable(Table table) { + table.newDelete().deleteFromRowFilter(Expressions.alwaysTrue()).commit(); + } + private static class CatalogToTables implements Tables { private final Catalog catalog; From c0dc336e77c4d0214fca81fc710f7fd2ddc208b2 Mon Sep 17 00:00:00 2001 From: Marton Bod Date: Wed, 21 Apr 2021 13:35:52 +0200 Subject: [PATCH 16/23] Use nojob committer for Tez AM to abort jobs --- .../iceberg/mr/hive/HiveIcebergMetaHook.java | 1 - .../mr/hive/HiveIcebergStorageHandler.java | 20 ++++++++++---- ...stHiveIcebergStorageHandlerWithEngine.java | 1 - .../hadoop/hive/ql/exec/tez/TezTask.java | 26 ++++++++++--------- 4 files changed, 29 insertions(+), 19 deletions(-) diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java index 9509b5b4bda0..e8012443e712 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java @@ -406,7 +406,6 @@ public void rollbackInsertTable(org.apache.hadoop.hive.metastore.api.Table table private void cleanCommitConfig(String tableName) { conf.unset(TezTask.HIVE_TEZ_COMMIT_JOB_ID_PREFIX + tableName); conf.unset(TezTask.HIVE_TEZ_COMMIT_TASK_COUNT_PREFIX + tableName); - conf.unset(TezTask.HIVE_TEZ_COMMIT_JOB_RESULT_PREFIX + tableName); conf.unset(InputFormatConfig.SERIALIZED_TABLE_PREFIX + tableName); } diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java index 40b4f4f41754..616537b4de99 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java @@ -19,6 +19,7 @@ package org.apache.iceberg.mr.hive; +import java.io.IOException; import java.io.Serializable; import java.util.Collection; import java.util.HashMap; @@ -26,7 +27,6 @@ import java.util.Properties; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.common.StatsSetupConst; -import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.HiveMetaHook; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler; @@ -40,6 +40,7 @@ import org.apache.hadoop.hive.serde2.Deserializer; import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.JobContext; import org.apache.hadoop.mapred.OutputFormat; import org.apache.iceberg.Schema; import org.apache.iceberg.SchemaParser; @@ -95,7 +96,7 @@ public void configureInputJobProperties(TableDesc tableDesc, Map public void configureOutputJobProperties(TableDesc tableDesc, Map map) { overlayTableProperties(conf, tableDesc, map); // For Tez, setting the committer here is enough to make sure it'll be part of the jobConf - map.put("mapred.output.committer.class", HiveIcebergOutputCommitter.class.getName()); + map.put("mapred.output.committer.class", HiveIcebergNoJobCommitter.class.getName()); // For MR, the jobConf is set only in configureJobConf, so we're setting the write key here to detect it over there map.put(WRITE_KEY, "true"); // Putting the key into the table props as well, so that projection pushdown can be determined on a @@ -104,6 +105,18 @@ public void configureOutputJobProperties(TableDesc tableDesc, Map map) { @@ -125,9 +138,6 @@ public void configureJobConf(TableDesc tableDesc, JobConf jobConf) { String tables = jobConf.get(InputFormatConfig.OUTPUT_TABLES); tables = tables == null ? tableName : tables + TABLE_NAME_SEPARATOR + tableName; jobConf.set(InputFormatConfig.OUTPUT_TABLES, tables); - // this will turn off job committing on the Tez AM side, so that we can commit the write jobs - // on HS2 side instead using the HiveIcebergMetaHook - jobConf.set(HiveConf.ConfVars.TEZ_MAPREDUCE_OUTPUT_COMMITTER.varname, ""); String catalogName = tableDesc.getProperties().getProperty(InputFormatConfig.CATALOG_NAME); if (catalogName != null) { diff --git a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java index 3d32fb8b6342..9c7f8fb5af91 100644 --- a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java +++ b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java @@ -38,7 +38,6 @@ import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.data.GenericRecord; import org.apache.iceberg.data.Record; -import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.hive.HiveSchemaUtil; import org.apache.iceberg.mr.TestHelper; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java index fd523fae6148..6479f069b178 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java @@ -32,6 +32,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import javax.annotation.Nullable; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -101,7 +102,6 @@ public class TezTask extends Task { public static final String HIVE_TEZ_COMMIT_JOB_ID_PREFIX = "hive.tez.commit.job.id."; - public static final String HIVE_TEZ_COMMIT_JOB_RESULT_PREFIX = "hive.tez.commit.job.result."; public static final String HIVE_TEZ_COMMIT_TASK_COUNT_PREFIX = "hive.tez.commit.task.count."; private static final String CLASS_NAME = TezTask.class.getName(); @@ -270,9 +270,10 @@ public int execute() { counters = null; } - // save useful commit information into session conf, e.g. for custom commit hooks - // TODO: this is temporary, Iceberg-specific logic. Refactor when new Tez version has been released - collectCommitInformation(work, rc == 0); + // save useful commit information into session conf, e.g. for custom commit hooks, like Iceberg + if (rc == 0) { + collectCommitInformation(work); + } } finally { // Note: due to TEZ-3846, the session may actually be invalid in case of some errors. // Currently, reopen on an attempted reuse will take care of that; we cannot tell @@ -349,22 +350,24 @@ public int execute() { return rc; } - private void collectCommitInformation(TezWork work, boolean success) throws IOException, TezException { + private void collectCommitInformation(TezWork work) throws IOException, TezException { HiveConf sessionConf = SessionState.get().getConf(); for (BaseWork w : work.getAllWork()) { JobConf jobConf = workToConf.get(w); Vertex vertex = workToVertex.get(w); String jobIdPrefix = dagClient.getDagIdentifierString().split("_")[1]; - // we should only consider jobs where an output committer is defined - if (!vertex.getDataSinks().isEmpty() && jobConf != null && "org.apache.iceberg.mr.hive.HiveIcebergOutputCommitter" - .equals(jobConf.getOutputCommitter().getClass().getName())) { - String tableName = jobConf.get("name"); + boolean hasIcebergCommitter = Optional.ofNullable(jobConf).map(JobConf::getOutputCommitter) + .map(Object::getClass).map(Class::getName) + .filter(name -> name.endsWith("HiveIcebergNoJobCommitter")).isPresent(); + // we should only consider jobs with Iceberg output committer and a data sink + if (hasIcebergCommitter && !vertex.getDataSinks().isEmpty()) { String tableLocationRoot = jobConf.get("location"); - if (tableName != null && tableLocationRoot != null) { + if (tableLocationRoot != null) { VertexStatus status = dagClient.getVertexStatus(vertex.getName(), EnumSet.of(StatusGetOpts.GET_COUNTERS)); Path path = new Path(tableLocationRoot + "/temp"); LOG.debug("Table temp directory path is: " + path); // list the directories inside the temp directory + // TODO: this is temporary, refactor when new Tez version has been released FileStatus[] children = path.getFileSystem(jobConf).listStatus(path); LOG.debug("Listing the table temp directory yielded these files: " + Arrays.toString(children)); for (FileStatus child : children) { @@ -384,7 +387,6 @@ private void collectCommitInformation(TezWork work, boolean success) throws IOEx sessionConf.set(HIVE_TEZ_COMMIT_JOB_ID_PREFIX + table, jobIdStr); sessionConf.setInt(HIVE_TEZ_COMMIT_TASK_COUNT_PREFIX + table, status.getProgress().getSucceededTaskCount()); - sessionConf.setBoolean(HIVE_TEZ_COMMIT_JOB_RESULT_PREFIX + table, success); } } } @@ -395,7 +397,7 @@ private void collectCommitInformation(TezWork work, boolean success) throws IOEx } }); } else { - LOG.warn("Table location or table name not found in config for base work: " + w.getName()); + LOG.warn("Table location not found in config for base work: " + w.getName()); } } } From 15843111c5d8cdcf7baaf862f52dd33badf85332 Mon Sep 17 00:00:00 2001 From: Marton Bod Date: Wed, 21 Apr 2021 13:39:02 +0200 Subject: [PATCH 17/23] Fix indent --- .../mr/hive/TestHiveIcebergStorageHandlerWithEngine.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java index 9c7f8fb5af91..3b5f8247522c 100644 --- a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java +++ b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java @@ -675,8 +675,8 @@ public void testMultiTableInsert() throws IOException { // simple insert: should create a single vertex writing to both target tables shell.executeStatement("FROM customers " + - "INSERT INTO target1 SELECT customer_id, first_name " + - "INSERT INTO target2 SELECT last_name, customer_id"); + "INSERT INTO target1 SELECT customer_id, first_name " + + "INSERT INTO target2 SELECT last_name, customer_id"); // Check that everything is as expected HiveIcebergTestUtils.validateData(target1, target1Records, 0); From 29f0d44ba1780d91eadd4b2846934d70176db899 Mon Sep 17 00:00:00 2001 From: Marton Bod Date: Wed, 21 Apr 2021 14:16:34 +0200 Subject: [PATCH 18/23] Determine task num using listing during abortJob --- .../mr/hive/HiveIcebergOutputCommitter.java | 38 +++++++++++++------ 1 file changed, 26 insertions(+), 12 deletions(-) diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java index ad279ea73197..25930d101346 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java @@ -30,6 +30,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; @@ -217,11 +218,15 @@ public void abortJob(JobContext originalContext, int status) throws IOException .executeWith(tableExecutor) .onFailure((output, exc) -> LOG.warn("Failed cleanup table {} on abort job", output, exc)) .run(output -> { - LOG.info("Cleaning job for table {}", jobContext.getJobID(), output); + LOG.info("Cleaning job for jobID: {}, table: {}", jobContext.getJobID(), output); Table table = HiveIcebergStorageHandler.table(jobConf, output); - jobLocations.add(generateJobLocation(table.location(), jobConf, jobContext.getJobID())); - Collection dataFiles = dataFiles(fileExecutor, table.location(), jobContext, table.io(), false); + String jobLocation = generateJobLocation(table.location(), jobConf, jobContext.getJobID()); + jobLocations.add(jobLocation); + // list jobLocation to get number of forCommit files + int numTasks = listForCommits(jobConf, jobLocation).length; + Collection dataFiles = + dataFiles(numTasks, fileExecutor, table.location(), jobContext, table.io(), false); // Check if we have files already committed and remove data files if there are any if (dataFiles.size() > 0) { @@ -232,7 +237,7 @@ public void abortJob(JobContext originalContext, int status) throws IOException .onFailure((file, exc) -> LOG.warn("Failed to remove data file {} on abort job", file.path(), exc)) .run(file -> table.io().deleteFile(file.path().toString())); } - }); + }, IOException.class); } finally { fileExecutor.shutdown(); if (tableExecutor != null) { @@ -245,6 +250,14 @@ public void abortJob(JobContext originalContext, int status) throws IOException cleanup(jobContext, jobLocations); } + private FileStatus[] listForCommits(JobConf jobConf, String jobLocation) throws IOException { + Path path = new Path(jobLocation); + LOG.debug("Listing job location to get forCommits for abort: {}", jobLocation); + FileStatus[] children = path.getFileSystem(jobConf).listStatus(path); + LOG.debug("Listing the table temp directory yielded these files: {}", Arrays.toString(children)); + return children; + } + /** * Collects the additions to a single table and adds/commits the new files to the Iceberg table. * @param io The io to read the forCommit files @@ -269,7 +282,10 @@ private void commitTable(FileIO io, ExecutorService executor, JobContext jobCont LOG.info("Committing job has started for table: {}, using location: {}", table, generateJobLocation(location, conf, jobContext.getJobID())); - Collection dataFiles = dataFiles(executor, location, jobContext, io, true); + // If there are reducers, then every reducer will generate a result file. + // If this is a map only task, then every mapper will generate a result file. + int numTasks = conf.getNumReduceTasks() > 0 ? conf.getNumReduceTasks() : conf.getNumMapTasks(); + Collection dataFiles = dataFiles(numTasks, executor, location, jobContext, io, true); if (dataFiles.size() > 0) { // Appending data files to the table @@ -352,6 +368,8 @@ private static ExecutorService tableExecutor(Configuration conf, int maxThreadNu /** * Get the committed data files for this table and job. + * + * @param numTasks Number of writer tasks that produced a forCommit file * @param executor The executor used for reading the forCommit files parallel * @param location The location of the table * @param jobContext The job context @@ -359,18 +377,14 @@ private static ExecutorService tableExecutor(Configuration conf, int maxThreadNu * @param throwOnFailure If true then it throws an exception on failure * @return The list of the committed data files */ - private static Collection dataFiles(ExecutorService executor, String location, JobContext jobContext, - FileIO io, boolean throwOnFailure) { + private static Collection dataFiles(int numTasks, ExecutorService executor, String location, + JobContext jobContext, FileIO io, boolean throwOnFailure) { JobConf conf = jobContext.getJobConf(); - // If there are reducers, then every reducer will generate a result file. - // If this is a map only task, then every mapper will generate a result file. - int expectedFiles = conf.getNumReduceTasks() > 0 ? conf.getNumReduceTasks() : conf.getNumMapTasks(); - Collection dataFiles = new ConcurrentLinkedQueue<>(); // Reading the committed files. The assumption here is that the taskIds are generated in sequential order // starting from 0. - Tasks.range(expectedFiles) + Tasks.range(numTasks) .throwFailureWhenFinished(throwOnFailure) .executeWith(executor) .retry(3) From 49f6afed5a25025f9b23ac65675c5d9892bbd38d Mon Sep 17 00:00:00 2001 From: Marton Bod Date: Wed, 21 Apr 2021 14:22:10 +0200 Subject: [PATCH 19/23] When listing, filter only for forCommit files --- .../iceberg/mr/hive/HiveIcebergOutputCommitter.java | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java index 25930d101346..7c14a82c65dc 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java @@ -26,9 +26,11 @@ import java.util.Collection; import java.util.Map; import java.util.Properties; +import java.util.Set; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.stream.Collectors; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -224,7 +226,7 @@ public void abortJob(JobContext originalContext, int status) throws IOException String jobLocation = generateJobLocation(table.location(), jobConf, jobContext.getJobID()); jobLocations.add(jobLocation); // list jobLocation to get number of forCommit files - int numTasks = listForCommits(jobConf, jobLocation).length; + int numTasks = listForCommits(jobConf, jobLocation).size(); Collection dataFiles = dataFiles(numTasks, fileExecutor, table.location(), jobContext, table.io(), false); @@ -250,12 +252,14 @@ public void abortJob(JobContext originalContext, int status) throws IOException cleanup(jobContext, jobLocations); } - private FileStatus[] listForCommits(JobConf jobConf, String jobLocation) throws IOException { + private Set listForCommits(JobConf jobConf, String jobLocation) throws IOException { Path path = new Path(jobLocation); LOG.debug("Listing job location to get forCommits for abort: {}", jobLocation); FileStatus[] children = path.getFileSystem(jobConf).listStatus(path); - LOG.debug("Listing the table temp directory yielded these files: {}", Arrays.toString(children)); - return children; + LOG.debug("Listing the job location: {} yielded these files: {}", jobLocation, Arrays.toString(children)); + return Arrays.stream(children) + .filter(child -> !child.isDirectory() && child.getPath().getName().endsWith(FOR_COMMIT_EXTENSION)) + .collect(Collectors.toSet()); } /** From 58c0182f1f0cb4a69f6831dfbb1f6afe91e3b5a5 Mon Sep 17 00:00:00 2001 From: Marton Bod Date: Wed, 21 Apr 2021 14:52:23 +0200 Subject: [PATCH 20/23] Add javadoc and comment --- .../iceberg/mr/hive/HiveIcebergOutputCommitter.java | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java index 7c14a82c65dc..82987dc1a4b0 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java @@ -226,6 +226,7 @@ public void abortJob(JobContext originalContext, int status) throws IOException String jobLocation = generateJobLocation(table.location(), jobConf, jobContext.getJobID()); jobLocations.add(jobLocation); // list jobLocation to get number of forCommit files + // we do this because map/reduce num in jobConf is unreliable and we have no access to vertex status info int numTasks = listForCommits(jobConf, jobLocation).size(); Collection dataFiles = dataFiles(numTasks, fileExecutor, table.location(), jobContext, table.io(), false); @@ -252,6 +253,15 @@ public void abortJob(JobContext originalContext, int status) throws IOException cleanup(jobContext, jobLocations); } + /** + * Lists the forCommit files under a job location. This should only be used by {@link #abortJob(JobContext, int)}, + * since on the Tez AM-side it will have no access to the correct number of writer tasks otherwise. The commitJob + * should not need to use this listing as it should have access to the vertex status info on the HS2-side. + * @param jobConf jobConf used for getting the FS + * @param jobLocation The job location that we should list + * @return The set of forCommit files under the job location + * @throws IOException if the listing fails + */ private Set listForCommits(JobConf jobConf, String jobLocation) throws IOException { Path path = new Path(jobLocation); LOG.debug("Listing job location to get forCommits for abort: {}", jobLocation); From 266f27a58b68f439baaf19fa01c2c2abce8442dc Mon Sep 17 00:00:00 2001 From: Marton Bod Date: Thu, 22 Apr 2021 19:50:17 +0200 Subject: [PATCH 21/23] Unset output tables from session conf --- .../java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java | 1 + 1 file changed, 1 insertion(+) diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java index e8012443e712..f94a4384ecd0 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java @@ -407,6 +407,7 @@ private void cleanCommitConfig(String tableName) { conf.unset(TezTask.HIVE_TEZ_COMMIT_JOB_ID_PREFIX + tableName); conf.unset(TezTask.HIVE_TEZ_COMMIT_TASK_COUNT_PREFIX + tableName); conf.unset(InputFormatConfig.SERIALIZED_TABLE_PREFIX + tableName); + conf.unset(InputFormatConfig.OUTPUT_TABLES); } private JobContext getJobContextForCommitOrAbort(String tableName) { From aa7859caa539f0d8f94a65f714693bd691eaf156 Mon Sep 17 00:00:00 2001 From: Marton Bod Date: Fri, 23 Apr 2021 09:30:07 +0200 Subject: [PATCH 22/23] fix import order after rebase --- .../java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java index f94a4384ecd0..2cf6601c491d 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java @@ -25,6 +25,7 @@ import java.util.Properties; import java.util.Set; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.metastore.DefaultHiveMetaHook; import org.apache.hadoop.hive.metastore.HiveMetaHook; import org.apache.hadoop.hive.metastore.api.EnvironmentContext; import org.apache.hadoop.hive.metastore.api.FieldSchema; @@ -33,7 +34,6 @@ import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy; import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; -import org.apache.hadoop.hive.metastore.DefaultHiveMetaHook; import org.apache.hadoop.hive.metastore.utils.StringUtils; import org.apache.hadoop.hive.ql.exec.tez.TezTask; import org.apache.hadoop.hive.ql.io.AcidUtils; From 3493405f907244d602154a7c0cd0229a2890707d Mon Sep 17 00:00:00 2001 From: Marton Bod Date: Fri, 23 Apr 2021 09:41:00 +0200 Subject: [PATCH 23/23] change super calls after rebase --- .../java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java index 2cf6601c491d..2fb3d93403bb 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java @@ -201,7 +201,7 @@ public void commitDropTable(org.apache.hadoop.hive.metastore.api.Table hmsTable, @Override public void preAlterTable(org.apache.hadoop.hive.metastore.api.Table hmsTable, EnvironmentContext context) throws MetaException { - HiveMetaHook.super.preAlterTable(hmsTable, context); + super.preAlterTable(hmsTable, context); catalogProperties = getCatalogProperties(hmsTable); try { icebergTable = Catalogs.loadTable(conf, catalogProperties); @@ -240,7 +240,7 @@ public void preAlterTable(org.apache.hadoop.hive.metastore.api.Table hmsTable, E @Override public void commitAlterTable(org.apache.hadoop.hive.metastore.api.Table hmsTable, PartitionSpecProxy partitionSpecProxy) throws MetaException { - HiveMetaHook.super.commitAlterTable(hmsTable, partitionSpecProxy); + super.commitAlterTable(hmsTable, partitionSpecProxy); if (canMigrateHiveTable) { catalogProperties = getCatalogProperties(hmsTable); catalogProperties.put(InputFormatConfig.TABLE_SCHEMA, SchemaParser.toJson(preAlterTableProperties.schema));