Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -216,8 +216,8 @@ public static void testMultiCFGetImpl(Map.Entry<String, List<String>> entry) thr
Result r = hTable.get(get);
Assert.assertEquals(tableNames.size() * columns.length, r.raw().length);
int cur = 0;
for (int i = 0; i < columns.length; i++) {
for (String tableName : tableNames) {
for (String tableName : tableNames) {
for (int i = 0; i < columns.length; i++) {
AssertKeyValue(key, columns[i], lastTs, latestValue, r.raw()[cur]);
cur++;
}
Expand Down Expand Up @@ -246,10 +246,11 @@ public static void testMultiCFGetImpl(Map.Entry<String, List<String>> entry) thr
Result r = hTable.get(get);
Assert.assertEquals(tableNames.size() * columns.length * ts.length, r.raw().length);
int cur = 0;
for (int i = 0; i < columns.length; i++) {
for (String tableName : tableNames) {
for (int k = ts.length-1; k >= 0; k--) {
AssertKeyValue(key, columns[i], ts[k], values[k], r.raw()[cur]);
for (String tableName : tableNames) {
String family = getColumnFamilyName(tableName);
for (int i = 0; i < columns.length; i++) {
for (int k = ts.length - 1; k >= 0; k--) {
AssertKeyValue(key, family, columns[i], ts[k], values[k], r.raw()[cur]);
cur++;
}
}
Expand All @@ -264,9 +265,10 @@ public static void testMultiCFGetImpl(Map.Entry<String, List<String>> entry) thr
Result r = hTable.get(get);
Assert.assertEquals(tableNames.size() * columns.length, r.raw().length);
int cur = 0;
for (int i = 0; i < columns.length; i++) {
for (String tableName : tableNames) {
AssertKeyValue(key, columns[i], ts[1], values[1], r.raw()[cur]);
for (String tableName : tableNames) {
String family = getColumnFamilyName(tableName);
for (int i = 0; i < columns.length; i++) {
AssertKeyValue(key, family, columns[i], ts[1], values[1], r.raw()[cur]);
cur++;
}
}
Expand All @@ -282,9 +284,10 @@ public static void testMultiCFGetImpl(Map.Entry<String, List<String>> entry) thr
Result r = hTable.get(get);
Assert.assertEquals(tableNames.size() * columns.length, r.raw().length);
int cur = 0;
for (int i = 0; i < columns.length; i++) {
for (String tableName : tableNames) {
AssertKeyValue(key, columns[i], ts[0], values[0], r.raw()[cur]);
for (String tableName : tableNames) {
String family = getColumnFamilyName(tableName);
for (int i = 0; i < columns.length; i++) {
AssertKeyValue(key, family, columns[i], ts[0], values[0], r.raw()[cur]);
cur++;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public void prepareCase() throws Exception {
}

public static void testTTLImpl(List<String> tableNames) throws Exception {
closeTTLExecute();
disableTTL();
// 0. prepare data
String keys[] = {"putKey1", "putKey2", "putKey3"};
String endKey = "putKey4";
Expand Down Expand Up @@ -94,7 +94,8 @@ public static void testTTLImpl(List<String> tableNames) throws Exception {
}

// 4. open ttl knob to delete expired hbase data
openTTLExecute();
enableTTL();
triggerTTL();

// 5. check util expired hbase data is deleted by ttl tasks
checkUtilTimeout(()-> {
Expand All @@ -110,11 +111,11 @@ public static void testTTLImpl(List<String> tableNames) throws Exception {
}

// 6. close ttl knob
closeTTLExecute();
disableTTL();
}

public static void testMultiCFTTLImpl(Map<String, List<String>> group2tableNames) throws Exception {
closeTTLExecute();
disableTTL();
List<String> allTableNames = group2tableNames.values().stream().flatMap(Collection::stream).collect(Collectors.toList());
// 0. prepare data
String keys[] = {"putKey1", "putKey2", "putKey3"};
Expand All @@ -127,7 +128,6 @@ public static void testMultiCFTTLImpl(Map<String, List<String>> group2tableNames
OHTableClient hTable = ObHTableTestUtil.newOHTableClient(groupName);
hTable.init();
List<String> tableNames = entry.getValue();

for (String tableName : tableNames) {
String family = getColumnFamilyName(tableName);
for (String key : keys) {
Expand Down Expand Up @@ -168,7 +168,8 @@ public static void testMultiCFTTLImpl(Map<String, List<String>> group2tableNames
}

// 4. open ttl knob to delete expired hbase data
openTTLExecute();
enableTTL();
triggerTTL();

// 5. check util expired hbase data is deleted by ttl tasks
checkUtilTimeout(()-> {
Expand All @@ -184,7 +185,159 @@ public static void testMultiCFTTLImpl(Map<String, List<String>> group2tableNames
}

// 6. close ttl knob
closeTTLExecute();
disableTTL();
}

void testRowkeyTTL(List<String> tableNames, Boolean useScan, Boolean isReversed) throws Exception {
disableTTL();
// 0. prepare data
String keys[] = {"putKey1", "putKey2", "putKey3"};
String endKey = "putKey4";
String columns[] = {"putColumn1", "putColumn2"};
String values[] = {"putValue1", "putValue2", "putValue3", "putValue4"};
for (String tableName : tableNames) {
OHTableClient hTable = ObHTableTestUtil.newOHTableClient(getTableName(tableName));
hTable.init();
String family = getColumnFamilyName(tableName);
for (String key : keys) {
for (String column : columns) {
for (int i = 0; i < values.length; i++) {
Put put = new Put(toBytes(key));
put.add(family.getBytes(), column.getBytes(), values[i].getBytes());
hTable.put(put);
}
}
}
}

// 1. sleep util data expired
sleep(12000);

// 2. enable kv ttl
enableTTL();

// 3. SQL can scan expired but not delete yet hbase data
for (String tableName : tableNames) {
Assert.assertEquals(keys.length * columns.length * values.length,
getSQLTableRowCnt(tableName));
}

// 4. use Hbase scan/get expired data to trigger ttl
for (String tableName : tableNames) {
OHTableClient hTable = ObHTableTestUtil.newOHTableClient(getTableName(tableName));
hTable.init();
String family = getColumnFamilyName(tableName);
if (useScan) {
Scan scan = new Scan(keys[0].getBytes(), endKey.getBytes());
scan.addFamily(family.getBytes());
scan.setReversed(isReversed);
ResultScanner scanner = hTable.getScanner(scan);
List<Cell> cells = getCellsFromScanner(scanner);
assertEquals(0, cells.size());
} else {
for (String key : keys) {
Get get = new Get(key.getBytes());
get.addFamily(family.getBytes());
Result result = hTable.get(get);
assertEquals(0, result.rawCells().length);
}
}
}

// 5. wait to disable
checkUtilTimeout(()-> {
try {
Boolean passed = true;
for (int i = 0; passed && i < tableNames.size(); i++) {
if (getSQLTableRowCnt(tableNames.get(i)) > 0) {
passed = false;
}
}
return passed;
} catch (Exception e) {
throw new RuntimeException(e);
}
}, 50000, 3000);

// 6. disable ttl
disableTTL();
}

void testMultiCFRowkeyTTL(Map<String, List<String>> group2tableNames, Boolean useScan, Boolean isReversed) throws Exception {
disableTTL();
List<String> allTableNames = group2tableNames.values().stream().flatMap(Collection::stream).collect(Collectors.toList());
// 0. prepare data
String keys[] = {"putKey1", "putKey2", "putKey3"};
String endKey = "putKey4";
String columns[] = {"putColumn1", "putColumn2"};
String values[] = {"putValue1", "putValue2", "putValue3", "putValue4"};
for (Map.Entry<String, List<String>> entry : group2tableNames.entrySet()) {
String groupName = getTableName(entry.getKey());
OHTableClient hTable = ObHTableTestUtil.newOHTableClient(groupName);
hTable.init();
for (String tableName : entry.getValue()) {
String family = getColumnFamilyName(tableName);
for (String key : keys) {
for (String column : columns) {
for (int i = 0; i < values.length; i++) {
Put put = new Put(toBytes(key));
put.add(family.getBytes(), column.getBytes(), values[i].getBytes());
hTable.put(put);
}
}
}
}
}

// 1. sleep util data expired
sleep(12000);

// 2. enable kv ttl
enableTTL();

// 3. SQL can scan expired but not delete yet hbase data
for (String tableName : allTableNames) {
Assert.assertEquals(keys.length * columns.length * values.length,
getSQLTableRowCnt(tableName));
}

// 4. use Hbase scan expired data to trigger ttl
for (Map.Entry<String, List<String>> entry : group2tableNames.entrySet()) {
String groupName = getTableName(entry.getKey());
OHTableClient hTable = ObHTableTestUtil.newOHTableClient(groupName);
hTable.init();
if (useScan) {
Scan scan = new Scan(keys[0].getBytes(), endKey.getBytes());
scan.setReversed(isReversed);
ResultScanner scanner = hTable.getScanner(scan);
List<Cell> cells = getCellsFromScanner(scanner);
assertEquals(0, cells.size());
} else {
for (String key : keys) {
Get get = new Get(key.getBytes());
Result result = hTable.get(get);
assertEquals(0, result.rawCells().length);
}
}
}

// 5. wait to disable
checkUtilTimeout(()-> {
try {
Boolean passed = true;
for (int i = 0; passed && i < allTableNames.size(); i++) {
if (getSQLTableRowCnt(allTableNames.get(i)) > 0) {
passed = false;
}
}
return passed;
} catch (Exception e) {
throw new RuntimeException(e);
}
}, 70000, 5000);

// 6. disable ttl
disableTTL();
}


Expand All @@ -197,4 +350,20 @@ public void testTTL() throws Throwable {
public void testMultiCFTTL() throws Throwable {
testMultiCFTTLImpl(group2tableNames);
}

@Test
public void testRowkeyTTL() throws Exception {
testRowkeyTTL(tableNames, true, false);
testRowkeyTTL(tableNames, false, false);
// TODO: open the test after reverse scan is ok
// testRowkeyTTL(tableNames, true);
}

@Test
public void testMultiCFRowkeyTTL() throws Exception {
testMultiCFRowkeyTTL(group2tableNames, true, false);
testMultiCFRowkeyTTL(group2tableNames, false, false);
// TODO: open the test after reverse scan is ok
// testMultiCFRowkeyTTL(group2tableNames, true, true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -273,15 +273,19 @@ public static int getRunningNormalTTLTaskCnt() throws Exception {
return rowCnt;
}

public static void openTTLExecute() throws Exception {
public static void enableTTL() throws Exception {
Connection conn = ObHTableTestUtil.getConnection();
String stmt1 = "ALTER SYSTEM set enable_kv_ttl = true;";
String stmt2 = "ALTER SYSTEM trigger TTL;";
conn.createStatement().execute(stmt1);
conn.createStatement().execute(stmt2);
String stmt = "ALTER SYSTEM set enable_kv_ttl = true;";
conn.createStatement().execute(stmt);
}

public static void triggerTTL() throws Exception {
Connection conn = ObHTableTestUtil.getConnection();
String stmt = "ALTER SYSTEM trigger TTL;";
conn.createStatement().execute(stmt);
}

public static void closeTTLExecute() throws Exception {
public static void disableTTL() throws Exception {
Connection conn = ObHTableTestUtil.getConnection();
String stmt = "ALTER SYSTEM set enable_kv_ttl = false;";
conn.createStatement().execute(stmt);
Expand Down