Skip to content

Commit df45194

Browse files
tarak271zabetak
authored andcommitted
HIVE-27848: Refactor Initiator hierarchy into CompactorUtil and fix failure in TestCrudCompactorOnTez (Taraka Rama Rao Lethavadla reviewed by Stamatis Zampetakis)
The work started initially to fix the TestCrudCompactorOnTez.secondCompactionShouldBeRefusedBeforeEnqueueing. However, while changing the code to address the failure, the inheritance based design for the Initator that was chosen in HIVE-27598 revealed some weaknesses briefly outlined below. Due to inheritance the InitiatorBase class becomes a Thread something that doesn't really make sense and it comes with additional overhead every time we instantiate it. Moreover, the only class that currently extends InitiatorBase is the Initiator and it's difficult to imagine how we can make other extensions from InitiatorBase; the code becomes complex and any subtle change in InitiatorBase may have unpredictable effects on Initiator. Having a "Base" class that is not really meant to be extended and no instructions on how to do so is problematic. For the reasons above the focus of the work changed from just re-enabling the test to improving and addressing the shortcomings of the inheritance based design of Initiator. Close #4859
1 parent cf49f0e commit df45194

File tree

6 files changed

+357
-344
lines changed

6 files changed

+357
-344
lines changed

itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java

Lines changed: 46 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,6 @@
7777
import org.apache.orc.TypeDescription;
7878
import org.apache.orc.impl.RecordReaderImpl;
7979
import org.junit.Assert;
80-
import org.junit.Ignore;
8180
import org.junit.Test;
8281
import org.mockito.ArgumentCaptor;
8382
import org.mockito.Mockito;
@@ -602,7 +601,6 @@ public void testCompactionShouldNotFailOnPartitionsWithBooleanField() throws Exc
602601
}
603602

604603
@Test
605-
@Ignore("HIVE-27848")
606604
public void secondCompactionShouldBeRefusedBeforeEnqueueing() throws Exception {
607605
conf.setBoolVar(HiveConf.ConfVars.COMPACTOR_CRUD_QUERY_BASED, true);
608606

@@ -648,6 +646,52 @@ public void secondCompactionShouldBeRefusedBeforeEnqueueing() throws Exception {
648646
Assert.assertEquals("ready for cleaning", compacts.get(0).getState());
649647
}
650648

649+
@Test
650+
public void secondCompactionShouldBeRefusedBeforeEnqueueingForPartition() throws Exception {
651+
conf.setBoolVar(HiveConf.ConfVars.COMPACTOR_CRUD_QUERY_BASED, true);
652+
653+
final String dbName = "default";
654+
final String tableName = "compaction_test";
655+
executeStatementOnDriver("drop table if exists " + tableName, driver);
656+
executeStatementOnDriver("CREATE TABLE " + tableName + "(id string, value string) partitioned by(pt string) CLUSTERED BY(id) "
657+
+ "INTO 10 BUCKETS STORED AS ORC TBLPROPERTIES('transactional'='true')", driver);
658+
executeStatementOnDriver("alter table " + tableName + " add partition(pt='test')",driver);
659+
executeStatementOnDriver("INSERT INTO TABLE " + tableName + " partition(pt='test') values ('1','one'),('2','two'),('3','three'),"
660+
+ "('4','four'),('5','five'),('6','six'),('7','seven'),('8','eight'),('9','nine'),('10','ten'),"
661+
+ "('11','eleven'),('12','twelve'),('13','thirteen'),('14','fourteen'),('15','fifteen'),('16','sixteen'),"
662+
+ "('17','seventeen'),('18','eighteen'),('19','nineteen'),('20','twenty')", driver);
663+
664+
executeStatementOnDriver("insert into " + tableName + " partition(pt='test') values ('21', 'value21'),('84', 'value84'),"
665+
+ "('66', 'value66'),('54', 'value54')", driver);
666+
executeStatementOnDriver(
667+
"insert into " + tableName + " partition(pt='test') values ('22', 'value22'),('34', 'value34')," + "('35', 'value35')", driver);
668+
executeStatementOnDriver("insert into " + tableName + " partition(pt='test') values ('75', 'value75'),('99', 'value99')", driver);
669+
670+
TxnStore txnHandler = TxnUtils.getTxnStore(conf);
671+
672+
//Do a compaction directly and wait for it to finish
673+
CompactionRequest rqst = new CompactionRequest(dbName, tableName, CompactionType.MAJOR);
674+
rqst.setPartitionname("pt=test");
675+
CompactionResponse resp = txnHandler.compact(rqst);
676+
runWorker(conf);
677+
678+
//Try to do a second compaction on the same table before the cleaner runs.
679+
try {
680+
driver.run("ALTER TABLE " + tableName + " partition(pt='test') COMPACT 'major'");
681+
} catch (CommandProcessorException e) {
682+
String errorMessage = ErrorMsg.COMPACTION_REFUSED.format(dbName, tableName, " partition(pt=test)",
683+
"Compaction is already scheduled with state='ready for cleaning' and id=" + resp.getId());
684+
Assert.assertEquals(errorMessage, e.getCauseMessage());
685+
Assert.assertEquals(ErrorMsg.COMPACTION_REFUSED.getErrorCode(), e.getErrorCode());
686+
}
687+
688+
//Check if the first compaction is in 'ready for cleaning'
689+
ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
690+
List<ShowCompactResponseElement> compacts = rsp.getCompacts();
691+
Assert.assertEquals(1, compacts.size());
692+
Assert.assertEquals("ready for cleaning", compacts.get(0).getState());
693+
}
694+
651695
@Test
652696
public void testMinorCompactionShouldBeRefusedOnTablesWithOriginalFiles() throws Exception {
653697
conf.setBoolVar(HiveConf.ConfVars.COMPACTOR_CRUD_QUERY_BASED, true);

ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/compact/AlterTableCompactOperation.java

Lines changed: 51 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
package org.apache.hadoop.hive.ql.ddl.table.storage.compact;
2020

21+
import org.apache.hadoop.hive.common.ServerUtils;
2122
import org.apache.hadoop.hive.conf.HiveConf;
2223
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
2324

@@ -27,6 +28,7 @@
2728
import org.apache.hadoop.hive.metastore.api.ShowCompactResponse;
2829
import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement;
2930
import org.apache.hadoop.hive.metastore.txn.TxnStore;
31+
import org.apache.hadoop.hive.metastore.txn.TxnUtils;
3032
import org.apache.hadoop.hive.metastore.utils.JavaUtils;
3133
import org.apache.hadoop.hive.ql.ErrorMsg;
3234
import org.apache.hadoop.hive.ql.ddl.DDLOperation;
@@ -35,17 +37,21 @@
3537
import org.apache.hadoop.hive.ql.metadata.HiveException;
3638
import org.apache.hadoop.hive.ql.metadata.Partition;
3739
import org.apache.hadoop.hive.ql.metadata.Table;
38-
import org.apache.hadoop.hive.ql.txn.compactor.InitiatorBase;
40+
import org.apache.hadoop.hive.ql.txn.compactor.CompactorUtil;
3941

40-
import java.util.*;
41-
import java.util.concurrent.atomic.AtomicBoolean;
42+
import java.util.List;
43+
import java.util.ArrayList;
44+
import java.util.Map;
45+
import java.util.LinkedHashMap;
46+
import java.util.Optional;
4247

4348
import static org.apache.hadoop.hive.ql.io.AcidUtils.compactionTypeStr2ThriftType;
4449

4550
/**
4651
* Operation process of compacting a table.
4752
*/
4853
public class AlterTableCompactOperation extends DDLOperation<AlterTableCompactDesc> {
54+
4955
public AlterTableCompactOperation(DDLOperationContext context, AlterTableCompactDesc desc) {
5056
super(context, desc);
5157
}
@@ -56,6 +62,11 @@ public AlterTableCompactOperation(DDLOperationContext context, AlterTableCompact
5662
throw new HiveException(ErrorMsg.NONACID_COMPACTION_NOT_SUPPORTED, table.getDbName(), table.getTableName());
5763
}
5864

65+
Map<String, org.apache.hadoop.hive.metastore.api.Partition> partitionMap =
66+
convertPartitionsFromThriftToDB(getPartitions(table, desc, context));
67+
68+
TxnStore txnHandler = TxnUtils.getTxnStore(context.getConf());
69+
5970
CompactionRequest compactionRequest = new CompactionRequest(table.getDbName(), table.getTableName(),
6071
compactionTypeStr2ThriftType(desc.getCompactionType()));
6172

@@ -69,40 +80,48 @@ public AlterTableCompactOperation(DDLOperationContext context, AlterTableCompact
6980
compactionRequest.setNumberOfBuckets(desc.getNumberOfBuckets());
7081
}
7182

72-
InitiatorBase initiatorBase = new InitiatorBase();
73-
initiatorBase.setConf(context.getConf());
74-
initiatorBase.init(new AtomicBoolean());
75-
76-
Map<String, org.apache.hadoop.hive.metastore.api.Partition> partitionMap =
77-
convertPartitionsFromThriftToDB(getPartitions(table, desc, context));
78-
79-
if(desc.getPartitionSpec() != null){
80-
Optional<String> partitionName = partitionMap.keySet().stream().findFirst();
81-
partitionName.ifPresent(compactionRequest::setPartitionname);
82-
}
83-
List<CompactionResponse> compactionResponses =
84-
initiatorBase.initiateCompactionForTable(compactionRequest, table.getTTable(), partitionMap);
85-
for (CompactionResponse compactionResponse : compactionResponses) {
86-
if (!compactionResponse.isAccepted()) {
87-
String message;
88-
if (compactionResponse.isSetErrormessage()) {
89-
message = compactionResponse.getErrormessage();
90-
throw new HiveException(ErrorMsg.COMPACTION_REFUSED, table.getDbName(), table.getTableName(),
91-
"CompactionId: " + compactionResponse.getId(), message);
92-
}
93-
context.getConsole().printInfo(
94-
"Compaction already enqueued with id " + compactionResponse.getId() + "; State is "
95-
+ compactionResponse.getState());
96-
continue;
83+
//Will directly initiate compaction if an un-partitioned table/a partition is specified in the request
84+
if (desc.getPartitionSpec() != null || !table.isPartitioned()) {
85+
if (desc.getPartitionSpec() != null) {
86+
Optional<String> partitionName = partitionMap.keySet().stream().findFirst();
87+
partitionName.ifPresent(compactionRequest::setPartitionname);
9788
}
98-
context.getConsole().printInfo("Compaction enqueued with id " + compactionResponse.getId());
99-
if (desc.isBlocking() && compactionResponse.isAccepted()) {
100-
waitForCompactionToFinish(compactionResponse, context);
89+
CompactionResponse compactionResponse = txnHandler.compact(compactionRequest);
90+
parseCompactionResponse(compactionResponse, table, compactionRequest.getPartitionname());
91+
} else { // Check for eligible partitions and initiate compaction
92+
for (Map.Entry<String, org.apache.hadoop.hive.metastore.api.Partition> partitionMapEntry : partitionMap.entrySet()) {
93+
compactionRequest.setPartitionname(partitionMapEntry.getKey());
94+
CompactionResponse compactionResponse =
95+
CompactorUtil.initiateCompactionForPartition(table.getTTable(), partitionMapEntry.getValue(),
96+
compactionRequest, ServerUtils.hostname(), txnHandler, context.getConf());
97+
parseCompactionResponse(compactionResponse, table, partitionMapEntry.getKey());
10198
}
10299
}
103100
return 0;
104101
}
105102

103+
private void parseCompactionResponse(CompactionResponse compactionResponse, Table table, String partitionName)
104+
throws HiveException {
105+
if (compactionResponse == null) {
106+
context.getConsole().printInfo(
107+
"Not enough deltas to initiate compaction for table=" + table.getTableName() + "partition=" + partitionName);
108+
return;
109+
}
110+
if (!compactionResponse.isAccepted()) {
111+
if (compactionResponse.isSetErrormessage()) {
112+
throw new HiveException(ErrorMsg.COMPACTION_REFUSED, table.getDbName(), table.getTableName(),
113+
partitionName == null ? "" : " partition(" + partitionName + ")", compactionResponse.getErrormessage());
114+
}
115+
context.getConsole().printInfo(
116+
"Compaction already enqueued with id " + compactionResponse.getId() + "; State is " + compactionResponse.getState());
117+
return;
118+
}
119+
context.getConsole().printInfo("Compaction enqueued with id " + compactionResponse.getId());
120+
if (desc.isBlocking() && compactionResponse.isAccepted()) {
121+
waitForCompactionToFinish(compactionResponse, context);
122+
}
123+
}
124+
106125
private List<Partition> getPartitions(Table table, AlterTableCompactDesc desc, DDLOperationContext context)
107126
throws HiveException {
108127
List<Partition> partitions = new ArrayList<>();
@@ -117,7 +136,7 @@ private List<Partition> getPartitions(Table table, AlterTableCompactDesc desc, D
117136
partitions = context.getDb().getPartitions(table, partitionSpec);
118137
if (partitions.size() > 1) {
119138
throw new HiveException(ErrorMsg.TOO_MANY_COMPACTION_PARTITIONS);
120-
} else if (partitions.size() == 0) {
139+
} else if (partitions.isEmpty()) {
121140
throw new HiveException(ErrorMsg.INVALID_PARTITION_SPEC);
122141
}
123142
}

ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1417,6 +1417,13 @@ public static AcidDirectory getAcidState(FileSystem fileSystem, Path candidateDi
14171417
return directory;
14181418
}
14191419

1420+
public static AcidDirectory getAcidState(StorageDescriptor sd, ValidWriteIdList writeIds, HiveConf conf)
1421+
throws IOException {
1422+
Path location = new Path(sd.getLocation());
1423+
FileSystem fs = location.getFileSystem(conf);
1424+
return getAcidState(fs, location, conf, writeIds, Ref.from(false), false);
1425+
}
1426+
14201427
private static void findBestWorkingDeltas(ValidWriteIdList writeIdList, AcidDirectory directory) {
14211428
Collections.sort(directory.getCurrentDirectories());
14221429
//so now, 'current directories' should be sorted like delta_5_20 delta_5_10 delta_11_20 delta_51_60 for example

0 commit comments

Comments
 (0)