From d1d5b52d79d85c2df27c0e73ae478e7ddce5f77b Mon Sep 17 00:00:00 2001 From: stuBirdFly <84010733+stuBirdFly@users.noreply.github.com> Date: Tue, 22 Oct 2024 20:35:40 +0800 Subject: [PATCH 1/6] hbase support batch (#84) --- .../com/alipay/oceanbase/hbase/OHTable.java | 384 ++++++++---------- .../oceanbase/hbase/util/BatchError.java | 40 ++ .../oceanbase/hbase/HTableTestBase.java | 16 +- .../hbase/OHTableMultiColumnFamilyTest.java | 113 +++++- 4 files changed, 336 insertions(+), 217 deletions(-) create mode 100644 src/main/java/com/alipay/oceanbase/hbase/util/BatchError.java diff --git a/src/main/java/com/alipay/oceanbase/hbase/OHTable.java b/src/main/java/com/alipay/oceanbase/hbase/OHTable.java index 8cb29cb6..a2daecfa 100644 --- a/src/main/java/com/alipay/oceanbase/hbase/OHTable.java +++ b/src/main/java/com/alipay/oceanbase/hbase/OHTable.java @@ -24,6 +24,7 @@ import com.alipay.oceanbase.hbase.result.ClientStreamScanner; import com.alipay.oceanbase.hbase.util.*; import com.alipay.oceanbase.rpc.ObTableClient; +import com.alipay.oceanbase.rpc.exception.ObTableException; import com.alipay.oceanbase.rpc.mutation.BatchOperation; import com.alipay.oceanbase.rpc.mutation.result.BatchOperationResult; import com.alipay.oceanbase.rpc.protocol.payload.impl.ObObj; @@ -131,7 +132,7 @@ public class OHTable implements Table { /** * the buffer of put request */ - private final ArrayList writeBuffer = new ArrayList(); + private final ArrayList writeBuffer = new ArrayList<>(); /** * when the put request reach the write buffer size the do put will * flush commits automatically @@ -336,7 +337,7 @@ public static ThreadPoolExecutor createDefaultThreadPoolExecutor(int coreSize, i System.setProperty(SOFA_THREAD_POOL_LOGGING_CAPABILITY, "false"); } SofaThreadPoolExecutor executor = new SofaThreadPoolExecutor(coreSize, maxThreads, - keepAliveTime, SECONDS, new SynchronousQueue(), "OHTableDefaultExecutePool", + keepAliveTime, SECONDS, new SynchronousQueue<>(), "OHTableDefaultExecutePool", TABLE_HBASE_LOGGER_SPACE); executor.allowCoreThreadTimeOut(true); return executor; @@ -463,8 +464,56 @@ public boolean[] exists(List gets) throws IOException { } @Override - public void batch(List actions, Object[] results) { - throw new FeatureNotSupportedException("not supported yet."); + public void batch(final List actions, final Object[] results) throws IOException { + BatchError batchError = new BatchError(); + try { + List resultMapSingleOp = new LinkedList<>(); + String realTableName = getTargetTableName(actions); + obTableClient.setRuntimeBatchExecutor(executePool); + BatchOperation batch = buildBatchOperation(realTableName, actions, tableNameString.equals(realTableName), resultMapSingleOp); + BatchOperationResult tmpResults = batch.execute(); + if (results != null) { + if (results.length != actions.size()) { + throw new AssertionError("results.length"); + } + int index = 0; + for (int i = 0; i != results.length; ++i) { + results[i] = tmpResults.getResults().get(index); + index += resultMapSingleOp.get(i); + if (results[i] instanceof ObTableException) { + batchError.add((ObTableException) results[i], actions.get(i), null); + } + } + if (batchError.hasErrors()) { + throw batchError.makeException(); + } + } + } catch (Exception e) { + logger.error(LCD.convert("01-000010"), tableNameString, actions, e); + throw new IOException("batch table " + tableNameString + " error", e); + } + } + + private String getTargetTableName(List actions) { + byte[] family = null; + for (Row action : actions) { + if (action instanceof RowMutations || action instanceof RegionCoprocessorServiceExec) { + throw new FeatureNotSupportedException("not supported yet'"); + } else { + Mutation mutation = (Mutation) action; + if (mutation.getFamilyCellMap().size() != 1) { + return getTargetTableName(tableNameString); + } else { + byte[] nextFamily = mutation.getFamilyCellMap().keySet().iterator().next(); + if (family != null && !Arrays.equals(family, nextFamily)) { + return getTargetTableName(tableNameString); + } else if (family == null) { + family = nextFamily; + } + } + } + } + return getTargetTableName(tableNameString, Bytes.toString(family), configuration); } @Override @@ -561,7 +610,7 @@ public Result get(final Get get) throws IOException { ServerCallable serverCallable = new ServerCallable(configuration, obTableClient, tableNameString, get.getRow(), get.getRow(), operationTimeout) { public Result call() throws IOException { - List keyValueList = new ArrayList(); + List keyValueList = new ArrayList<>(); byte[] family = new byte[] {}; ObTableClientQueryAsyncStreamResult clientQueryStreamResult; ObTableQueryAsyncRequest request; @@ -837,57 +886,13 @@ public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier, private void innerDelete(Delete delete) throws IOException { checkArgument(delete.getRow() != null, "row is null"); - List errorCodeList = new ArrayList(); - BatchOperationResult results = null; - try { - checkFamilyViolation(delete.getFamilyCellMap().keySet(), false); - if (delete.getFamilyCellMap().isEmpty()) { - // For a Delete operation without any qualifiers, we construct a DeleteFamily request. - // The server then performs the operation on all column families. - KeyValue kv = new KeyValue(delete.getRow(), delete.getTimeStamp(), - KeyValue.Type.DeleteFamily); - - BatchOperation batch = buildBatchOperation(tableNameString, Arrays.asList(kv), false, null); - results = batch.execute(); - } else if (delete.getFamilyCellMap().size() > 1) { - // Currently, the Delete Family operation type cannot transmit qualifiers to the server. - // As a result, the server cannot identify which families need to be deleted. - // Therefore, this process is handled sequentially. - boolean has_delete_family = delete.getFamilyCellMap().entrySet().stream() - .flatMap(entry -> entry.getValue().stream()) - .anyMatch(kv -> kv.getType() == Cell.Type.DeleteFamily); - if (!has_delete_family) { - BatchOperation batch = buildBatchOperation(tableNameString, - delete.getFamilyCellMap(), false, null); - results = batch.execute(); - } else { - for (Map.Entry> entry : delete.getFamilyCellMap().entrySet()) { - BatchOperation batch = buildBatchOperation( - getTargetTableName(tableNameString, Bytes.toString(entry.getKey()), configuration), - entry.getValue(), false, null); - results = batch.execute(); - } - } - } else { - Map.Entry> entry = delete.getFamilyCellMap().entrySet().iterator() - .next(); - - BatchOperation batch = buildBatchOperation( - getTargetTableName(tableNameString, Bytes.toString(entry.getKey()), configuration), - entry.getValue(), false, null); - results = batch.execute(); - } - - errorCodeList = results.getErrorCodeList(); - boolean hasError = results.hasError(); - if (hasError) { - throw results.getFirstException(); - } + List actions = Collections.singletonList(delete); + Object[] results = new Object[actions.size()]; + batch(actions, results); } catch (Exception e) { - logger.error(LCD.convert("01-00004"), tableNameString, errorCodeList, e); - throw new IOException("delete table " + tableNameString + " error codes " - + errorCodeList, e); + logger.error(LCD.convert("01-00004"), tableNameString, e); + throw new IOException("delete table " + tableNameString + "error" , e); } } @@ -1151,77 +1156,34 @@ public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, lo public void flushCommits() throws IOException { try { + if (writeBuffer.isEmpty()){ + return; + } + Map exceptionRowMap = new LinkedHashMap(); boolean[] resultSuccess = new boolean[writeBuffer.size()]; try { - Map, List>> familyMap = new HashMap, List>>(); - for (int i = 0; i < writeBuffer.size(); i++) { - Put aPut = writeBuffer.get(i); - Map> innerFamilyMap = aPut.getFamilyCellMap(); - if (innerFamilyMap.size() > 1) { - // Bypass logic: directly construct BatchOperation for puts with family map size > 1 - try { - BatchOperation batch = buildBatchOperation(this.tableNameString, - innerFamilyMap, false, null); - BatchOperationResult results = batch.execute(); - - boolean hasError = results.hasError(); - resultSuccess[i] = !hasError; - if (hasError) { - throw results.getFirstException(); - } - } catch (Exception e) { - logger.error(LCD.convert("01-00008"), tableNameString, null, autoFlush, - writeBuffer.size(), e); - throw new IOException("put table " + tableNameString + " error codes " - + null + "auto flush " + autoFlush - + " current buffer size " + writeBuffer.size(), e); - } - } else { - // Existing logic for puts with family map size = 1 - for (Map.Entry> entry : innerFamilyMap.entrySet()) { - String family = Bytes.toString(entry.getKey()); - Pair, List> keyValueWithIndex = familyMap - .get(family); - if (keyValueWithIndex == null) { - keyValueWithIndex = new Pair, List>( - new ArrayList(), new ArrayList()); - familyMap.put(family, keyValueWithIndex); - } - keyValueWithIndex.getFirst().add(i); - keyValueWithIndex.getSecond().addAll(entry.getValue()); - } - } - } - for (Map.Entry, List>> entry : familyMap - .entrySet()) { - List errorCodeList = new ArrayList(entry.getValue() - .getSecond().size()); - try { - String targetTableName = getTargetTableName(this.tableNameString, - entry.getKey(), configuration); - - BatchOperation batch = buildBatchOperation(targetTableName, entry - .getValue().getSecond(), false, null); - BatchOperationResult results = batch.execute(); - - errorCodeList = results.getErrorCodeList(); - boolean hasError = results.hasError(); - for (Integer index : entry.getValue().getFirst()) { - resultSuccess[index] = !hasError; - } - if (hasError) { - throw results.getFirstException(); + String realTableName = getTargetTableName(writeBuffer); + List resultMapSingleOp = new LinkedList<>(); + BatchOperation batch = buildBatchOperation(realTableName, writeBuffer, tableNameString.equals(realTableName), resultMapSingleOp); + BatchOperationResult results = batch.execute(); + if (results != null) { + int index = 0; + for (int i = 0; i != resultSuccess.length; ++i) { + if (results.getResults().get(index) instanceof ObTableException) { + resultSuccess[i] = false; + exceptionRowMap.put((ObTableException)results.getResults().get(index), writeBuffer.get(i)); + } else { + resultSuccess[i] = true; } - } catch (Exception e) { - logger.error(LCD.convert("01-00008"), tableNameString, errorCodeList, - autoFlush, writeBuffer.size(), e); - throw new IOException("put table " + tableNameString + " error codes " - + errorCodeList + "auto flush " + autoFlush - + " current buffer size " + writeBuffer.size(), e); + index += resultMapSingleOp.get(i); } - } - + } catch (Exception e) { + logger.error(LCD.convert("01-00008"), tableNameString, null, autoFlush, + writeBuffer.size(), e); + throw new IOException("put table " + tableNameString + " error codes " + null + + "auto flush " + autoFlush + " current buffer size " + + writeBuffer.size(), e); } finally { // mutate list so that it is empty for complete success, or contains // only failed records results are returned in the same order as the @@ -1233,6 +1195,12 @@ public void flushCommits() throws IOException { writeBuffer.remove(i); } } + if (!exceptionRowMap.isEmpty()) { + exceptionRowMap.forEach((e, row)->{ + logger.error(LCD.convert("01-00008"), row, tableNameString, autoFlush, + writeBuffer.size(), e); + }); + } } } finally { if (clearBufferOnFail) { @@ -1562,65 +1530,51 @@ public static ObTableBatchOperation buildObTableBatchOperation(List keyVal return batch; } - private ObTableBatchOperation buildObTableBatchOperation(Map> familyMap, - boolean putToAppend, - List qualifiers) { - ObTableBatchOperation batch = new ObTableBatchOperation(); - for (Map.Entry> entry : familyMap.entrySet()) { - byte[] family = entry.getKey(); - List keyValueList = entry.getValue(); - for (Cell kv : keyValueList) { - if (qualifiers != null) { - qualifiers - .add((Bytes.toString(family) + "." + Bytes.toString(CellUtil.cloneQualifier(kv))) - .getBytes()); - } - KeyValue new_kv = modifyQualifier(kv, - (Bytes.toString(family) + "." + Bytes.toString(CellUtil.cloneQualifier(kv))).getBytes()); - batch.addTableOperation(buildObTableOperation(new_kv, putToAppend)); - } - } - batch.setSameType(true); - batch.setSamePropertiesNames(true); - return batch; - } - private com.alipay.oceanbase.rpc.mutation.Mutation buildMutation(Cell kv, - boolean putToAppend) { - Cell.Type kvType = kv.getType(); - switch (kvType) { - case Put: - ObTableOperationType operationType; - if (putToAppend) { - operationType = APPEND; - } else { - operationType = INSERT_OR_UPDATE; - } + ObTableOperationType operationType, boolean isTableGroup) { + KeyValue.Type kvType = KeyValue.Type.codeToType(kv.getType().getCode()); + switch (operationType) { + case INSERT_OR_UPDATE: + case APPEND: return com.alipay.oceanbase.rpc.mutation.Mutation.getInstance(operationType, ROW_KEY_COLUMNS, new Object[] { CellUtil.cloneRow(kv), CellUtil.cloneQualifier(kv), kv.getTimestamp() }, V_COLUMNS, new Object[] { CellUtil.cloneValue(kv) }); - case Delete: - return com.alipay.oceanbase.rpc.mutation.Mutation.getInstance(DEL, ROW_KEY_COLUMNS, - new Object[] { CellUtil.cloneRow(kv), CellUtil.cloneQualifier(kv), kv.getTimestamp() }, null, null); - case DeleteColumn: - return com.alipay.oceanbase.rpc.mutation.Mutation - .getInstance(DEL, ROW_KEY_COLUMNS, - new Object[] { CellUtil.cloneRow(kv), CellUtil.cloneQualifier(kv), -kv.getTimestamp() }, null, - null); - case DeleteFamily: - return com.alipay.oceanbase.rpc.mutation.Mutation.getInstance(DEL, ROW_KEY_COLUMNS, - new Object[] { CellUtil.cloneRow(kv), null, -kv.getTimestamp() }, null, null); + case DEL: + switch (kvType) { + case Delete: + return com.alipay.oceanbase.rpc.mutation.Mutation.getInstance(DEL, + ROW_KEY_COLUMNS, + new Object[] { CellUtil.cloneRow(kv), CellUtil.cloneQualifier(kv), kv.getTimestamp() }, + null, null); + case Maximum: + return com.alipay.oceanbase.rpc.mutation.Mutation.getInstance(DEL, + ROW_KEY_COLUMNS, + new Object[] { CellUtil.cloneRow(kv), null, -kv.getTimestamp() }, + null, null); + case DeleteColumn: + return com.alipay.oceanbase.rpc.mutation.Mutation.getInstance(DEL, + ROW_KEY_COLUMNS, + new Object[] { CellUtil.cloneRow(kv), CellUtil.cloneQualifier(kv), -kv.getTimestamp() }, + null, null); + case DeleteFamily: + return com.alipay.oceanbase.rpc.mutation.Mutation.getInstance(DEL, + ROW_KEY_COLUMNS, + new Object[] { CellUtil.cloneRow(kv), isTableGroup?CellUtil.cloneQualifier(kv):null, -kv.getTimestamp() }, + null, null); + default: + throw new IllegalArgumentException("illegal mutation type " + kvType); + } default: - throw new IllegalArgumentException("illegal mutation type " + kvType); + throw new IllegalArgumentException("illegal mutation type " + operationType); } } private KeyValue modifyQualifier(Cell original, byte[] newQualifier) { - // Extract existing components - byte[] row = original.getRowArray(); - byte[] family = original.getFamilyArray(); - byte[] value = original.getValueArray(); + // Extract existing components + byte[] row = CellUtil.cloneRow(original); + byte[] family = CellUtil.cloneFamily(original); + byte[] value = CellUtil.cloneValue(original); long timestamp = original.getTimestamp(); KeyValue.Type type = KeyValue.Type.codeToType(original.getType().getCode()); // Create a new KeyValue with the modified qualifier @@ -1628,37 +1582,66 @@ private KeyValue modifyQualifier(Cell original, byte[] newQualifier) { value); } - private BatchOperation buildBatchOperation(String tableName, - Map> familyMap, - boolean putToAppend, List qualifiers) { + private BatchOperation buildBatchOperation(String tableName, List actions, boolean isTableGroup, List resultMapSingleOp) { BatchOperation batch = obTableClient.batchOperation(tableName); - - for (Map.Entry> entry : familyMap.entrySet()) { - byte[] family = entry.getKey(); - List keyValueList = entry.getValue(); - for (Cell kv : keyValueList) { - if (qualifiers != null) { - qualifiers.add(CellUtil.cloneQualifier(kv)); + if (actions != null) { + int posInList = -1; + int singleOpResultNum; + for (Row row : actions) { + singleOpResultNum = 0; + posInList++; + if (!(row instanceof Put) && !(row instanceof Delete)) { + throw new FeatureNotSupportedException( + "not supported other type in batch yet,only support put and delete"); + } else if (row instanceof Put) { + Put put = (Put) row; + if (put.isEmpty()) { + throw new IllegalArgumentException("No columns to insert for #" + + (posInList + 1) + " item"); + } + for (Map.Entry> entry : put.getFamilyCellMap().entrySet()) { + byte[] family = entry.getKey(); + List keyValueList = entry.getValue(); + for (Cell kv : keyValueList) { + singleOpResultNum++; + if(isTableGroup){ + KeyValue new_kv = modifyQualifier(kv,(Bytes.toString(family) + "." + Bytes.toString(CellUtil.cloneQualifier(kv))) + .getBytes()); + batch.addOperation(buildMutation(new_kv, INSERT_OR_UPDATE, isTableGroup)); + } else { + batch.addOperation(buildMutation(kv, INSERT_OR_UPDATE, isTableGroup)); + } + } + } + } else { + Delete delete = (Delete) row; + if (delete.isEmpty()) { + singleOpResultNum++; + KeyValue kv = new KeyValue(delete.getRow(), delete.getTimeStamp(), + KeyValue.Type.Maximum); + batch.addOperation(buildMutation(kv, DEL, isTableGroup)); + } else { + for (Map.Entry> entry : delete.getFamilyCellMap() + .entrySet()) { + byte[] family = entry.getKey(); + List keyValueList = entry.getValue(); + List mutations = new LinkedList<>(); + for (Cell kv : keyValueList) { + singleOpResultNum++; + if(isTableGroup){ + KeyValue new_kv = modifyQualifier(kv,(Bytes.toString(family) + "." + Bytes.toString(CellUtil.cloneQualifier(kv))) + .getBytes()); + batch.addOperation(buildMutation(new_kv, DEL, true)); + } else { + batch.addOperation(buildMutation(kv, DEL, false)); + } + } + } + } } - KeyValue new_kv = modifyQualifier(kv, - (Bytes.toString(family) + "." + Bytes.toString(CellUtil.cloneQualifier(kv))).getBytes()); - batch.addOperation(buildMutation(new_kv, putToAppend)); + resultMapSingleOp.add(singleOpResultNum); } } - - batch.setEntityType(ObTableEntityType.HKV); - return batch; - } - - private BatchOperation buildBatchOperation(String tableName, List keyValueList, - boolean putToAppend, List qualifiers) { - BatchOperation batch = obTableClient.batchOperation(tableName); - for (Cell kv : keyValueList) { - if (qualifiers != null) { - qualifiers.add(CellUtil.cloneQualifier(kv)); - } - batch.addOperation(buildMutation(kv, putToAppend)); - } batch.setEntityType(ObTableEntityType.HKV); return batch; } @@ -1690,15 +1673,6 @@ public static ObTableOperation buildObTableOperation(Cell kv, boolean putToAppen } } - private ObTableQueryRequest buildObTableQueryRequest(ObTableQuery obTableQuery, - String targetTableName) { - ObTableQueryRequest request = new ObTableQueryRequest(); - request.setEntityType(ObTableEntityType.HKV); - request.setTableQuery(obTableQuery); - request.setTableName(targetTableName); - return request; - } - private ObTableQueryAsyncRequest buildObTableQueryAsyncRequest(ObTableQuery obTableQuery, String targetTableName) { ObTableQueryRequest request = new ObTableQueryRequest(); diff --git a/src/main/java/com/alipay/oceanbase/hbase/util/BatchError.java b/src/main/java/com/alipay/oceanbase/hbase/util/BatchError.java new file mode 100644 index 00000000..640e432d --- /dev/null +++ b/src/main/java/com/alipay/oceanbase/hbase/util/BatchError.java @@ -0,0 +1,40 @@ +package com.alipay.oceanbase.hbase.util; + +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException; +import org.apache.hadoop.hbase.client.Row; + +import java.util.ArrayList; +import java.util.List; + +public class BatchError { + private final List throwables = new ArrayList(); + private final List actions = new ArrayList(); + private final List addresses = new ArrayList(); + + public synchronized void add(Throwable ex, Row row, ServerName serverName) { + if (row == null){ + throw new IllegalArgumentException("row cannot be null. location=" + serverName); + } + + throwables.add(ex); + actions.add(row); + addresses.add(serverName != null ? serverName.toString() : "null"); + } + + public boolean hasErrors() { + return !throwables.isEmpty(); + } + + public synchronized RetriesExhaustedWithDetailsException makeException() { + return new RetriesExhaustedWithDetailsException( + new ArrayList(throwables), + new ArrayList(actions), new ArrayList(addresses)); + } + + public synchronized void clear() { + throwables.clear(); + actions.clear(); + addresses.clear(); + } +} diff --git a/src/test/java/com/alipay/oceanbase/hbase/HTableTestBase.java b/src/test/java/com/alipay/oceanbase/hbase/HTableTestBase.java index 2148302d..5413c6d0 100644 --- a/src/test/java/com/alipay/oceanbase/hbase/HTableTestBase.java +++ b/src/test/java/com/alipay/oceanbase/hbase/HTableTestBase.java @@ -3986,7 +3986,7 @@ public void testReversedScan() throws Exception { res_count += 1; } } - Assert.assertEquals(res_count, 6); + Assert.assertEquals(6, res_count); scanner.close(); // reverse scan with MaxVersion @@ -4004,7 +4004,7 @@ public void testReversedScan() throws Exception { res_count += 1; } } - Assert.assertEquals(res_count, 3); + Assert.assertEquals(3, res_count); scanner.close(); // reverse scan with pageFilter @@ -4024,7 +4024,7 @@ public void testReversedScan() throws Exception { res_count += 1; } } - Assert.assertEquals(res_count, 6); + Assert.assertEquals(6, res_count); scanner.close(); // reverse scan with not_exist_start_row @@ -4042,7 +4042,7 @@ public void testReversedScan() throws Exception { res_count += 1; } } - Assert.assertEquals(res_count, 6); + Assert.assertEquals(6, res_count); scanner.close(); // reverse scan with abnormal range @@ -4060,7 +4060,7 @@ public void testReversedScan() throws Exception { res_count += 1; } } - Assert.assertEquals(res_count, 0); + Assert.assertEquals(0, res_count); scanner.close(); // reverse scan with abnormal range @@ -4078,7 +4078,7 @@ public void testReversedScan() throws Exception { res_count += 1; } } - Assert.assertEquals(res_count, 13); + Assert.assertEquals(13, res_count); scanner.close(); hTable.delete(deleteKey1Family); @@ -4427,7 +4427,7 @@ public void testPartitionScan() throws Exception { res_count += 1; } } - Assert.assertEquals(res_count, 7); + Assert.assertEquals(7, res_count); scanner.close(); // scan with prefixFilter @@ -4447,7 +4447,7 @@ public void testPartitionScan() throws Exception { res_count += 1; } } - Assert.assertEquals(res_count, 2); + Assert.assertEquals(2, res_count); scanner.close(); // scan with singleColumnValueFilter diff --git a/src/test/java/com/alipay/oceanbase/hbase/OHTableMultiColumnFamilyTest.java b/src/test/java/com/alipay/oceanbase/hbase/OHTableMultiColumnFamilyTest.java index 3af773bf..5a2e1c15 100644 --- a/src/test/java/com/alipay/oceanbase/hbase/OHTableMultiColumnFamilyTest.java +++ b/src/test/java/com/alipay/oceanbase/hbase/OHTableMultiColumnFamilyTest.java @@ -26,9 +26,7 @@ import org.junit.rules.ExpectedException; import java.io.IOException; -import java.util.Arrays; -import java.util.HashMap; -import java.util.Map; +import java.util.*; import static org.apache.hadoop.hbase.util.Bytes.toBytes; import static org.junit.Assert.*; @@ -50,6 +48,112 @@ public void finish() throws IOException { hTable.close(); } + @Test + public void testMultiColumnFamilyBatch() throws Exception { + byte[] family1 = "family_with_group1".getBytes(); + byte[] family2 = "family_with_group2".getBytes(); + byte[] family3 = "family_with_group3".getBytes(); + + byte[] family1_column1 = "family1_column1".getBytes(); + byte[] family1_column2 = "family1_column2".getBytes(); + byte[] family1_column3 = "family1_column3".getBytes(); + byte[] family2_column1 = "family2_column1".getBytes(); + byte[] family2_column2 = "family2_column2".getBytes(); + byte[] family3_column1 = "family3_column1".getBytes(); + byte[] family1_value = "VVV1".getBytes(); + byte[] family2_value = "VVV2".getBytes(); + byte[] family3_value = "VVV3".getBytes(); + + int rows = 10; + List batchLsit = new LinkedList<>(); + for (int i = 0; i < rows; ++i) { + Put put = new Put(toBytes("Key" + i)); + Delete delete = new Delete(toBytes("Key" + i)); + batchLsit.add(delete); + put.addColumn(family1, family1_column1, family1_value); + put.addColumn(family1, family1_column2, family1_value); + put.addColumn(family1, family1_column3, family1_value); + put.addColumn(family2, family2_column1, family2_value); + put.addColumn(family2, family2_column2, family2_value); + put.addColumn(family3, family3_column1, family3_value); + batchLsit.add(put); + } + + // f1c1 f1c2 f1c3 f2c1 f2c2 f3c1 + Delete delete = new Delete(toBytes("Key1")); + delete.addColumns(family1, family1_column1); + delete.addColumns(family2, family2_column1); + batchLsit.add(delete); + Object[] results = new Object[batchLsit.size()]; + hTable.batch(batchLsit, results); + // f1c2 f1c3 f2c2 f3c1 + Get get = new Get(toBytes("Key1")); + Result result = hTable.get(get); + Cell[] keyValues = result.rawCells(); + assertEquals(4, keyValues.length); + assertFalse(result.containsColumn(family1, family1_column1)); + assertFalse(result.containsColumn(family2, family2_column1)); + + assertTrue(result.containsColumn(family1, family1_column2)); + assertArrayEquals(result.getValue(family1, family1_column2), family1_value); + assertTrue(result.containsColumn(family1, family1_column3)); + assertArrayEquals(result.getValue(family1, family1_column3), family1_value); + assertTrue(result.containsColumn(family2, family2_column2)); + assertArrayEquals(result.getValue(family2, family2_column2), family2_value); + assertTrue(result.containsColumn(family3, family3_column1)); + assertArrayEquals(result.getValue(family3, family3_column1), family3_value); + + // f1c1 f2c1 f2c2 + delete = new Delete(toBytes("Key2")); + delete.addColumns(family1, family1_column2); + delete.addColumns(family1, family1_column3); + delete.addColumns(family3, family3_column1); + batchLsit.add(delete); + // null + results = new Object[batchLsit.size()]; + hTable.batch(batchLsit, results); + get = new Get(toBytes("Key2")); + result = hTable.get(get); + keyValues = result.rawCells(); + assertEquals(3, keyValues.length); + batchLsit.clear(); + for (int i = 0; i < rows; ++i) { + Put put = new Put(toBytes("Key" + i)); + put.addColumn(family1, family1_column1, family1_value); + put.addColumn(family1, family1_column2, family1_value); + put.addColumn(family1, family1_column3, family1_value); + put.addColumn(family2, family2_column1, family2_value); + put.addColumn(family2, family2_column2, family2_value); + put.addColumn(family3, family3_column1, family3_value); + batchLsit.add(put); + } + + delete = new Delete(toBytes("Key3")); + delete.addColumn(family1, family1_column2); + delete.addColumn(family2, family2_column1); + batchLsit.add(delete); + results = new Object[batchLsit.size()]; + hTable.batch(batchLsit, results); + get = new Get(toBytes("Key3")); + result = hTable.get(get); + keyValues = result.rawCells(); + assertEquals(6, keyValues.length); + + batchLsit.clear(); + delete = new Delete(toBytes("Key4")); + delete.addColumns(family1, family1_column2); + delete.addColumns(family2, family2_column1); + delete.addFamily(family3); + batchLsit.add(delete); + results = new Object[batchLsit.size()]; + hTable.batch(batchLsit, results); + get = new Get(toBytes("Key4")); + get.setMaxVersions(10); + result = hTable.get(get); + keyValues = result.rawCells(); + assertEquals(6, keyValues.length); + } + @Test public void testMultiColumnFamilyPut() throws Exception { byte[] family1 = "family_with_group1".getBytes(); @@ -486,7 +590,6 @@ public void testMultiColumnFamilyGet() throws Exception { assertEquals(expectedValue, CellUtil.cloneValue(keyValues[i])); } } - System.out.println(Arrays.toString(result2.rawCells())); assertEquals(3, keyValues.length); //f2c1 f2c2 @@ -528,6 +631,8 @@ public void testMultiColumnFamilyDelete() throws Exception { for (int i = 0; i < rows; ++i) { Put put = new Put(toBytes("Key" + i)); + Delete delete = new Delete(toBytes("Key" + i)); + hTable.delete(delete); put.addColumn(family1, family1_column1, family1_value); put.addColumn(family1, family1_column2, family1_value); put.addColumn(family1, family1_column3, family1_value); From 3884e08e8769cc968a9fdd1576cc0022b79fd07d Mon Sep 17 00:00:00 2001 From: miyuan-ljr Date: Thu, 24 Oct 2024 15:05:06 +0800 Subject: [PATCH 2/6] fix test --- .../com/alipay/oceanbase/hbase/OHTable.java | 89 +++++++++++-------- 1 file changed, 54 insertions(+), 35 deletions(-) diff --git a/src/main/java/com/alipay/oceanbase/hbase/OHTable.java b/src/main/java/com/alipay/oceanbase/hbase/OHTable.java index a2daecfa..8c8c204a 100644 --- a/src/main/java/com/alipay/oceanbase/hbase/OHTable.java +++ b/src/main/java/com/alipay/oceanbase/hbase/OHTable.java @@ -1531,40 +1531,38 @@ public static ObTableBatchOperation buildObTableBatchOperation(List keyVal } private com.alipay.oceanbase.rpc.mutation.Mutation buildMutation(Cell kv, - ObTableOperationType operationType, boolean isTableGroup) { - KeyValue.Type kvType = KeyValue.Type.codeToType(kv.getType().getCode()); + OHOpType operationType, boolean isTableGroup) { switch (operationType) { case INSERT_OR_UPDATE: + return com.alipay.oceanbase.rpc.mutation.Mutation.getInstance(INSERT_OR_UPDATE, + ROW_KEY_COLUMNS, + new Object[] { CellUtil.cloneRow(kv), CellUtil.cloneQualifier(kv), kv.getTimestamp() }, V_COLUMNS, + new Object[] { CellUtil.cloneValue(kv) }); case APPEND: - return com.alipay.oceanbase.rpc.mutation.Mutation.getInstance(operationType, + return com.alipay.oceanbase.rpc.mutation.Mutation.getInstance(APPEND, ROW_KEY_COLUMNS, new Object[] { CellUtil.cloneRow(kv), CellUtil.cloneQualifier(kv), kv.getTimestamp() }, V_COLUMNS, new Object[] { CellUtil.cloneValue(kv) }); - case DEL: - switch (kvType) { - case Delete: - return com.alipay.oceanbase.rpc.mutation.Mutation.getInstance(DEL, - ROW_KEY_COLUMNS, - new Object[] { CellUtil.cloneRow(kv), CellUtil.cloneQualifier(kv), kv.getTimestamp() }, - null, null); - case Maximum: - return com.alipay.oceanbase.rpc.mutation.Mutation.getInstance(DEL, - ROW_KEY_COLUMNS, - new Object[] { CellUtil.cloneRow(kv), null, -kv.getTimestamp() }, - null, null); - case DeleteColumn: - return com.alipay.oceanbase.rpc.mutation.Mutation.getInstance(DEL, - ROW_KEY_COLUMNS, - new Object[] { CellUtil.cloneRow(kv), CellUtil.cloneQualifier(kv), -kv.getTimestamp() }, - null, null); - case DeleteFamily: - return com.alipay.oceanbase.rpc.mutation.Mutation.getInstance(DEL, - ROW_KEY_COLUMNS, - new Object[] { CellUtil.cloneRow(kv), isTableGroup?CellUtil.cloneQualifier(kv):null, -kv.getTimestamp() }, - null, null); - default: - throw new IllegalArgumentException("illegal mutation type " + kvType); - } + case Delete: + return com.alipay.oceanbase.rpc.mutation.Mutation.getInstance(DEL, + ROW_KEY_COLUMNS, + new Object[] { CellUtil.cloneRow(kv), CellUtil.cloneQualifier(kv), kv.getTimestamp() }, + null, null); + case DeleteAll: + return com.alipay.oceanbase.rpc.mutation.Mutation.getInstance(DEL, + ROW_KEY_COLUMNS, + new Object[] { CellUtil.cloneRow(kv), null, -kv.getTimestamp() }, + null, null); + case DeleteColumn: + return com.alipay.oceanbase.rpc.mutation.Mutation.getInstance(DEL, + ROW_KEY_COLUMNS, + new Object[] { CellUtil.cloneRow(kv), CellUtil.cloneQualifier(kv), -kv.getTimestamp() }, + null, null); + case DeleteFamily: + return com.alipay.oceanbase.rpc.mutation.Mutation.getInstance(DEL, + ROW_KEY_COLUMNS, + new Object[] { CellUtil.cloneRow(kv), isTableGroup?CellUtil.cloneQualifier(kv):null, -kv.getTimestamp() }, + null, null); default: throw new IllegalArgumentException("illegal mutation type " + operationType); } @@ -1607,9 +1605,9 @@ private BatchOperation buildBatchOperation(String tableName, List if(isTableGroup){ KeyValue new_kv = modifyQualifier(kv,(Bytes.toString(family) + "." + Bytes.toString(CellUtil.cloneQualifier(kv))) .getBytes()); - batch.addOperation(buildMutation(new_kv, INSERT_OR_UPDATE, isTableGroup)); + batch.addOperation(buildMutation(new_kv, OHOpType.INSERT_OR_UPDATE, isTableGroup)); } else { - batch.addOperation(buildMutation(kv, INSERT_OR_UPDATE, isTableGroup)); + batch.addOperation(buildMutation(kv, OHOpType.INSERT_OR_UPDATE, isTableGroup)); } } } @@ -1617,9 +1615,8 @@ private BatchOperation buildBatchOperation(String tableName, List Delete delete = (Delete) row; if (delete.isEmpty()) { singleOpResultNum++; - KeyValue kv = new KeyValue(delete.getRow(), delete.getTimeStamp(), - KeyValue.Type.Maximum); - batch.addOperation(buildMutation(kv, DEL, isTableGroup)); + KeyValue kv = new KeyValue(delete.getRow(), delete.getTimeStamp()); + batch.addOperation(buildMutation(kv, OHOpType.DeleteAll, isTableGroup)); } else { for (Map.Entry> entry : delete.getFamilyCellMap() .entrySet()) { @@ -1631,9 +1628,9 @@ private BatchOperation buildBatchOperation(String tableName, List if(isTableGroup){ KeyValue new_kv = modifyQualifier(kv,(Bytes.toString(family) + "." + Bytes.toString(CellUtil.cloneQualifier(kv))) .getBytes()); - batch.addOperation(buildMutation(new_kv, DEL, true)); + batch.addOperation(buildMutation(new_kv, getDeleteType(new_kv.getType()), true)); } else { - batch.addOperation(buildMutation(kv, DEL, false)); + batch.addOperation(buildMutation(kv, getDeleteType(kv.getType()), false)); } } } @@ -1780,4 +1777,26 @@ public byte[][] getEndKeys() throws IOException { public Pair getStartEndKeys() throws IOException { return new Pair<>(getStartKeys(), getEndKeys()); } + + public static enum OHOpType { + INSERT_OR_UPDATE, + APPEND, + Delete, + DeleteAll, + DeleteColumn, + DeleteFamily, + } + + public static OHOpType getDeleteType(Cell.Type type) { + switch (type) { + case Delete: + return OHOpType.Delete; + case DeleteColumn: + return OHOpType.DeleteColumn; + case DeleteFamily: + return OHOpType.DeleteFamily; + default: + throw new IllegalArgumentException("illegal mutation type " + type); + } + } } \ No newline at end of file From 676ec51dfee1570eb809d528f828da20f9fa3fd4 Mon Sep 17 00:00:00 2001 From: Ziyu Shi <57038180+JackShi148@users.noreply.github.com> Date: Wed, 23 Oct 2024 10:49:17 +0800 Subject: [PATCH 3/6] Add DeleteFamilyVersion function and corresponding test cases (#85) * add DepentdentFilter and SingleColumnValueExcludeFilter * add singleColumnValueExcludeFilter and DependentColumnFilter; add test cases and optimize bufferedMutator test cases * add single cf timerange setting in Get * single cf setColumnFamilyTimeRange in Get and Scan * optimize code * add DeleteFamilyVersion and test cases * add DeleteFamilyVersion; optimize test cases * add DeleteFamilyVersion test case and pass * format code * delete useless self-defined table --- .../com/alipay/oceanbase/hbase/OHTable.java | 6 + .../hbase/filter/HBaseFilterUtils.java | 19 +- .../oceanbase/hbase/util/BatchError.java | 11 +- .../oceanbase/hbase/HTableTestBase.java | 89 ++--- .../oceanbase/hbase/OHConnectionTest.java | 131 ++++--- .../hbase/OHTableDeleteFamilyVersionTest.java | 350 ++++++++++++++++++ .../hbase/filter/HBaseFilterUtilsTest.java | 51 ++- 7 files changed, 524 insertions(+), 133 deletions(-) create mode 100644 src/test/java/com/alipay/oceanbase/hbase/OHTableDeleteFamilyVersionTest.java diff --git a/src/main/java/com/alipay/oceanbase/hbase/OHTable.java b/src/main/java/com/alipay/oceanbase/hbase/OHTable.java index 8c8c204a..e0df60b8 100644 --- a/src/main/java/com/alipay/oceanbase/hbase/OHTable.java +++ b/src/main/java/com/alipay/oceanbase/hbase/OHTable.java @@ -1563,6 +1563,11 @@ private com.alipay.oceanbase.rpc.mutation.Mutation buildMutation(Cell kv, ROW_KEY_COLUMNS, new Object[] { CellUtil.cloneRow(kv), isTableGroup?CellUtil.cloneQualifier(kv):null, -kv.getTimestamp() }, null, null); + case DeleteFamilyVersion: + return com.alipay.oceanbase.rpc.mutation.Mutation.getInstance(DEL, + ROW_KEY_COLUMNS, + new Object[] { CellUtil.cloneRow(kv), isTableGroup ? CellUtil.cloneQualifier(kv) : null, kv.getTimestamp() }, + null, null); default: throw new IllegalArgumentException("illegal mutation type " + operationType); } @@ -1785,6 +1790,7 @@ public static enum OHOpType { DeleteAll, DeleteColumn, DeleteFamily, + DeleteFamilyVersion, } public static OHOpType getDeleteType(Cell.Type type) { diff --git a/src/main/java/com/alipay/oceanbase/hbase/filter/HBaseFilterUtils.java b/src/main/java/com/alipay/oceanbase/hbase/filter/HBaseFilterUtils.java index 85c4f593..ed98d1b3 100644 --- a/src/main/java/com/alipay/oceanbase/hbase/filter/HBaseFilterUtils.java +++ b/src/main/java/com/alipay/oceanbase/hbase/filter/HBaseFilterUtils.java @@ -169,7 +169,8 @@ private static void toParseableByteArray(ByteArrayOutputStream byteStream, // SingleColumnValueExcludeFilter('cf1','col1',=,'binary:123',true,true) private static void toParseableByteArray(ByteArrayOutputStream byteStream, - SingleColumnValueExcludeFilter filter) throws IOException { + SingleColumnValueExcludeFilter filter) + throws IOException { byteStream.write(filter.getClass().getSimpleName().getBytes()); byteStream.write("('".getBytes()); writeBytesWithEscape(byteStream, filter.getFamily()); @@ -329,12 +330,13 @@ private static void toParseableByteArray(ByteArrayOutputStream byteStream, Times } // MultiRowRangeFilter('a',true,'b',false,'c',true,'d',false); - private static void toParseableByteArray(ByteArrayOutputStream byteStream, MultiRowRangeFilter filter) throws IOException { + private static void toParseableByteArray(ByteArrayOutputStream byteStream, + MultiRowRangeFilter filter) throws IOException { byteStream.write(filter.getClass().getSimpleName().getBytes()); byteStream.write('('); List ranges = filter.getRowRanges(); - for (int i = 0; i < ranges.size(); i ++) { + for (int i = 0; i < ranges.size(); i++) { MultiRowRangeFilter.RowRange range = ranges.get(i); byteStream.write("'".getBytes()); byteStream.write(range.getStartRow()); @@ -354,7 +356,8 @@ private static void toParseableByteArray(ByteArrayOutputStream byteStream, Multi } // InclusiveStopFilter('aaa'); - private static void toParseableByteArray(ByteArrayOutputStream byteStream, InclusiveStopFilter filter) throws IOException { + private static void toParseableByteArray(ByteArrayOutputStream byteStream, + InclusiveStopFilter filter) throws IOException { byteStream.write(filter.getClass().getSimpleName().getBytes()); byteStream.write('('); byteStream.write('\''); @@ -364,7 +367,8 @@ private static void toParseableByteArray(ByteArrayOutputStream byteStream, Inclu } // ColumnRangeFilter('a',true,'b',false); - private static void toParseableByteArray(ByteArrayOutputStream byteStream, ColumnRangeFilter filter) throws IOException { + private static void toParseableByteArray(ByteArrayOutputStream byteStream, + ColumnRangeFilter filter) throws IOException { byteStream.write(filter.getClass().getSimpleName().getBytes()); byteStream.write('('); @@ -382,12 +386,13 @@ private static void toParseableByteArray(ByteArrayOutputStream byteStream, Colum } // MultipleColumnPrefixFilter('a','b','d'); - private static void toParseableByteArray(ByteArrayOutputStream byteStream, MultipleColumnPrefixFilter filter) throws IOException { + private static void toParseableByteArray(ByteArrayOutputStream byteStream, + MultipleColumnPrefixFilter filter) throws IOException { byteStream.write(filter.getClass().getSimpleName().getBytes()); byteStream.write('('); byte[][] ranges = filter.getPrefix(); - for (int i = 0; i < ranges.length; i ++) { + for (int i = 0; i < ranges.length; i++) { byte[] range = ranges[i]; byteStream.write("'".getBytes()); byteStream.write(range); diff --git a/src/main/java/com/alipay/oceanbase/hbase/util/BatchError.java b/src/main/java/com/alipay/oceanbase/hbase/util/BatchError.java index 640e432d..49fe17e9 100644 --- a/src/main/java/com/alipay/oceanbase/hbase/util/BatchError.java +++ b/src/main/java/com/alipay/oceanbase/hbase/util/BatchError.java @@ -9,11 +9,11 @@ public class BatchError { private final List throwables = new ArrayList(); - private final List actions = new ArrayList(); - private final List addresses = new ArrayList(); + private final List actions = new ArrayList(); + private final List addresses = new ArrayList(); public synchronized void add(Throwable ex, Row row, ServerName serverName) { - if (row == null){ + if (row == null) { throw new IllegalArgumentException("row cannot be null. location=" + serverName); } @@ -27,9 +27,8 @@ public boolean hasErrors() { } public synchronized RetriesExhaustedWithDetailsException makeException() { - return new RetriesExhaustedWithDetailsException( - new ArrayList(throwables), - new ArrayList(actions), new ArrayList(addresses)); + return new RetriesExhaustedWithDetailsException(new ArrayList(throwables), + new ArrayList(actions), new ArrayList(addresses)); } public synchronized void clear() { diff --git a/src/test/java/com/alipay/oceanbase/hbase/HTableTestBase.java b/src/test/java/com/alipay/oceanbase/hbase/HTableTestBase.java index 5413c6d0..3a350d04 100644 --- a/src/test/java/com/alipay/oceanbase/hbase/HTableTestBase.java +++ b/src/test/java/com/alipay/oceanbase/hbase/HTableTestBase.java @@ -523,24 +523,24 @@ public void testFilter() throws Exception { tryPut(hTable, putKey2Column2Value1); tryPut(hTable, putKey2Column2Value2); -// time may be different -// +---------+-----+----------------+--------+ -// | K | Q | T | V | -// +---------+-----+----------------+--------+ -// | getKey1 | abc | -1728834971469 | value1 | -// | getKey1 | abc | -1728834971399 | value2 | -// | getKey1 | abc | -1728834971330 | value1 | -// | getKey1 | def | -1728834971748 | value2 | -// | getKey1 | def | -1728834971679 | value1 | -// | getKey1 | def | -1728834971609 | value2 | -// | getKey1 | def | -1728834971540 | value1 | -// | getKey2 | def | -1728834971887 | value2 | -// | getKey2 | def | -1728834971818 | value1 | -// +---------+-----+----------------+--------+ + // time may be different + // +---------+-----+----------------+--------+ + // | K | Q | T | V | + // +---------+-----+----------------+--------+ + // | getKey1 | abc | -1728834971469 | value1 | + // | getKey1 | abc | -1728834971399 | value2 | + // | getKey1 | abc | -1728834971330 | value1 | + // | getKey1 | def | -1728834971748 | value2 | + // | getKey1 | def | -1728834971679 | value1 | + // | getKey1 | def | -1728834971609 | value2 | + // | getKey1 | def | -1728834971540 | value1 | + // | getKey2 | def | -1728834971887 | value2 | + // | getKey2 | def | -1728834971818 | value1 | + // +---------+-----+----------------+--------+ SingleColumnValueFilter singleColumnValueFilter; singleColumnValueFilter = new SingleColumnValueFilter(Bytes.toBytes(family), - Bytes.toBytes(column1), CompareFilter.CompareOp.EQUAL, new BinaryComparator( + Bytes.toBytes(column1), CompareFilter.CompareOp.EQUAL, new BinaryComparator( toBytes(value1))); get = new Get(toBytes(key1)); get.setMaxVersions(10); @@ -551,7 +551,7 @@ public void testFilter() throws Exception { SingleColumnValueExcludeFilter singleColumnValueExcludeFilter; singleColumnValueExcludeFilter = new SingleColumnValueExcludeFilter(Bytes.toBytes(family), - Bytes.toBytes(column1), CompareFilter.CompareOp.EQUAL, new BinaryComparator( + Bytes.toBytes(column1), CompareFilter.CompareOp.EQUAL, new BinaryComparator( toBytes(value1))); get = new Get(toBytes(key1)); get.setMaxVersions(10); @@ -560,8 +560,8 @@ public void testFilter() throws Exception { r = hTable.get(get); Assert.assertEquals(4, r.rawCells().length); - DependentColumnFilter dependentColumnFilter = new DependentColumnFilter(Bytes.toBytes(family), - Bytes.toBytes(column1), false); + DependentColumnFilter dependentColumnFilter = new DependentColumnFilter( + Bytes.toBytes(family), Bytes.toBytes(column1), false); get = new Get(toBytes(key1)); get.setMaxVersions(10); get.addFamily(toBytes(family)); @@ -570,7 +570,7 @@ public void testFilter() throws Exception { Assert.assertEquals(3, r.rawCells().length); dependentColumnFilter = new DependentColumnFilter(Bytes.toBytes(family), - Bytes.toBytes(column1), true); + Bytes.toBytes(column1), true); get = new Get(toBytes(key1)); get.setMaxVersions(10); get.addFamily(toBytes(family)); @@ -579,7 +579,7 @@ public void testFilter() throws Exception { Assert.assertEquals(0, r.rawCells().length); dependentColumnFilter = new DependentColumnFilter(Bytes.toBytes(family), - Bytes.toBytes(column1), false, CompareFilter.CompareOp.EQUAL, new BinaryComparator( + Bytes.toBytes(column1), false, CompareFilter.CompareOp.EQUAL, new BinaryComparator( toBytes(value2))); get = new Get(toBytes(key2)); get.setMaxVersions(10); @@ -589,7 +589,7 @@ public void testFilter() throws Exception { Assert.assertEquals(0, r.rawCells().length); dependentColumnFilter = new DependentColumnFilter(Bytes.toBytes(family), - Bytes.toBytes(column2), false, CompareFilter.CompareOp.EQUAL, new BinaryComparator( + Bytes.toBytes(column2), false, CompareFilter.CompareOp.EQUAL, new BinaryComparator( toBytes(value2))); get = new Get(toBytes(key2)); get.setMaxVersions(10); @@ -598,8 +598,6 @@ public void testFilter() throws Exception { r = hTable.get(get); Assert.assertEquals(1, r.rawCells().length); - - filter = new ColumnPrefixFilter(Bytes.toBytes("e")); get = new Get(toBytes(key1)); get.setMaxVersions(10); @@ -947,7 +945,7 @@ public void testFilter() throws Exception { tryPut(hTable, putKey2Column2Value2); dependentColumnFilter = new DependentColumnFilter(Bytes.toBytes(family), - Bytes.toBytes(column1), false, CompareFilter.CompareOp.EQUAL, new BinaryComparator( + Bytes.toBytes(column1), false, CompareFilter.CompareOp.EQUAL, new BinaryComparator( toBytes(value1))); scan = new Scan(); scan.addFamily(family.getBytes()); @@ -957,7 +955,7 @@ public void testFilter() throws Exception { scan.setFilter(dependentColumnFilter); scanner = hTable.getScanner(scan); - long prevTimestamp = - 1; + long prevTimestamp = -1; for (Result result : scanner) { for (Cell keyValue : result.rawCells()) { if (prevTimestamp == -1) { @@ -1293,7 +1291,8 @@ public void testColumnRangeFilter() throws Exception { scan = new Scan(); scan.addFamily(family.getBytes()); scan.setMaxVersions(10); - ColumnRangeFilter filter = new ColumnRangeFilter(Bytes.toBytes("a"), true, Bytes.toBytes("b"), false); + ColumnRangeFilter filter = new ColumnRangeFilter(Bytes.toBytes("a"), true, + Bytes.toBytes("b"), false); scan.setFilter(filter); ResultScanner scanner = hTable.getScanner(scan); @@ -1363,11 +1362,7 @@ public void testColumnRangeFilter() throws Exception { scan = new Scan(); scan.addFamily(family.getBytes()); scan.setMaxVersions(10); - byte[][] range = { - Bytes.toBytes("g"), - Bytes.toBytes("3"), - Bytes.toBytes("d"), - }; + byte[][] range = { Bytes.toBytes("g"), Bytes.toBytes("3"), Bytes.toBytes("d"), }; MultipleColumnPrefixFilter iFilter = new MultipleColumnPrefixFilter(range); scan.setFilter(iFilter); scanner = hTable.getScanner(scan); @@ -1391,14 +1386,8 @@ public void testColumnRangeFilter() throws Exception { scan = new Scan(); scan.addFamily(family.getBytes()); scan.setMaxVersions(10); - range = new byte[][]{ - Bytes.toBytes("de"), - Bytes.toBytes("bg"), - Bytes.toBytes("nc"), - Bytes.toBytes("aa"), - Bytes.toBytes("abcd"), - Bytes.toBytes("dea"), - }; + range = new byte[][] { Bytes.toBytes("de"), Bytes.toBytes("bg"), Bytes.toBytes("nc"), + Bytes.toBytes("aa"), Bytes.toBytes("abcd"), Bytes.toBytes("dea"), }; iFilter = new MultipleColumnPrefixFilter(range); scan.setFilter(iFilter); scanner = hTable.getScanner(scan); @@ -2367,8 +2356,8 @@ public void testGetFilter() throws Exception { get = new Get(toBytes(key1)); get.setMaxVersions(10); get.addFamily(toBytes(family)); - DependentColumnFilter dependentColumnFilter = new DependentColumnFilter(Bytes.toBytes(family), - Bytes.toBytes(column1)); + DependentColumnFilter dependentColumnFilter = new DependentColumnFilter( + Bytes.toBytes(family), Bytes.toBytes(column1)); get.setFilter(dependentColumnFilter); r = hTable.get(get); Assert.assertEquals(3, r.rawCells().length); @@ -2555,7 +2544,7 @@ public void testGetFilter() throws Exception { filterList = new FilterList(); filterList.addFilter(new SingleColumnValueExcludeFilter(Bytes.toBytes(family), Bytes - .toBytes(column1), CompareFilter.CompareOp.EQUAL, Bytes.toBytes(value1))); + .toBytes(column1), CompareFilter.CompareOp.EQUAL, Bytes.toBytes(value1))); get = new Get(toBytes(key1)); get.setMaxVersions(10); get.addFamily(toBytes(family)); @@ -2565,7 +2554,7 @@ public void testGetFilter() throws Exception { filterList = new FilterList(); filterList.addFilter(new DependentColumnFilter(Bytes.toBytes(family), Bytes - .toBytes(column1), false)); + .toBytes(column1), false)); get = new Get(toBytes(key1)); get.setMaxVersions(10); get.addFamily(toBytes(family)); @@ -2575,7 +2564,7 @@ public void testGetFilter() throws Exception { filterList = new FilterList(); filterList.addFilter(new DependentColumnFilter(Bytes.toBytes(family), Bytes - .toBytes(column2), false)); + .toBytes(column2), false)); get = new Get(toBytes(key1)); get.setMaxVersions(10); get.addFamily(toBytes(family)); @@ -2585,7 +2574,7 @@ public void testGetFilter() throws Exception { filterList = new FilterList(); filterList.addFilter(new DependentColumnFilter(Bytes.toBytes(family), Bytes - .toBytes(column2))); + .toBytes(column2))); get = new Get(toBytes(key2)); get.setMaxVersions(10); get.addFamily(toBytes(family)); @@ -2595,7 +2584,7 @@ public void testGetFilter() throws Exception { filterList = new FilterList(); filterList.addFilter(new DependentColumnFilter(Bytes.toBytes(family), Bytes - .toBytes(column2), true)); + .toBytes(column2), true)); get = new Get(toBytes(key2)); get.setMaxVersions(10); get.addFamily(toBytes(family)); @@ -2605,8 +2594,8 @@ public void testGetFilter() throws Exception { filterList = new FilterList(); filterList.addFilter(new DependentColumnFilter(Bytes.toBytes(family), Bytes - .toBytes(column2), false, CompareFilter.CompareOp.EQUAL, - new BinaryComparator(toBytes(value2)))); + .toBytes(column2), false, CompareFilter.CompareOp.EQUAL, new BinaryComparator( + toBytes(value2)))); get = new Get(toBytes(key2)); get.setMaxVersions(10); get.addFamily(toBytes(family)); @@ -2739,10 +2728,9 @@ public void testGetFilter() throws Exception { r = hTable.get(get); Assert.assertEquals(7, r.rawCells().length); - SingleColumnValueExcludeFilter singleColumnValueExcludeFilter; singleColumnValueExcludeFilter = new SingleColumnValueExcludeFilter(Bytes.toBytes(family), - Bytes.toBytes(column1), CompareFilter.CompareOp.EQUAL, new BinaryComparator( + Bytes.toBytes(column1), CompareFilter.CompareOp.EQUAL, new BinaryComparator( toBytes(value1))); get = new Get(toBytes(key1)); get.setMaxVersions(10); @@ -2751,7 +2739,6 @@ public void testGetFilter() throws Exception { r = hTable.get(get); Assert.assertEquals(4, r.rawCells().length); - singleColumnValueFilter = new SingleColumnValueFilter(Bytes.toBytes(family), Bytes.toBytes(column1), CompareFilter.CompareOp.EQUAL, new BinaryComparator( toBytes(value2))); diff --git a/src/test/java/com/alipay/oceanbase/hbase/OHConnectionTest.java b/src/test/java/com/alipay/oceanbase/hbase/OHConnectionTest.java index 1a6875b1..2909bcb6 100644 --- a/src/test/java/com/alipay/oceanbase/hbase/OHConnectionTest.java +++ b/src/test/java/com/alipay/oceanbase/hbase/OHConnectionTest.java @@ -207,22 +207,25 @@ public void testBufferedMutatorWithFlush() throws Exception { conf.set(SOCKET_TIMEOUT_CONNECT, "15000"); BufferedMutator putBufferMutator = null; BufferedMutator delBufferedMutator = null; + String key = "putKey"; + String column1 = "putColumn1"; + String value = "value333444"; + String family = "family_group"; try { TableName tableName = TableName.valueOf("test"); connection = ConnectionFactory.createConnection(conf); hTable = connection.getTable(tableName); + + Delete delete= new Delete(toBytes(key)); + delete.addFamily(toBytes(family)); + hTable.delete(delete); + // use defualt params putBufferMutator = connection.getBufferedMutator(tableName); delBufferedMutator = connection.getBufferedMutator(tableName); - String key = "putKey"; - String column1 = "putColumn1"; - String value = "value333444"; - long timestamp = System.currentTimeMillis(); - Delete delete= new Delete(toBytes(key)); - delete.addFamily(toBytes("family_group")); - hTable.delete(delete); + long timestamp = System.currentTimeMillis(); // only support Put and Delete // for other type of operations, BufferedMutator will not set its type for them @@ -236,10 +239,10 @@ public void testBufferedMutatorWithFlush() throws Exception { List mutations = new ArrayList<>(); // test Put Put put1 = new Put(Bytes.toBytes(key)); - put1.addColumn(Bytes.toBytes("family_group"), Bytes.toBytes(column1), timestamp, Bytes.toBytes(value)); + put1.addColumn(Bytes.toBytes(family), Bytes.toBytes(column1), timestamp, Bytes.toBytes(value)); mutations.add(put1); Put put2 = new Put(Bytes.toBytes(key)); - put2.addColumn(Bytes.toBytes("family_group"), Bytes.toBytes(column1 + "1"), timestamp, Bytes.toBytes(value + "4")); + put2.addColumn(Bytes.toBytes(family), Bytes.toBytes(column1 + "1"), timestamp, Bytes.toBytes(value + "4")); mutations.add(put2); // test add Mutations with List putBufferMutator.mutate(mutations); @@ -255,7 +258,7 @@ public void testBufferedMutatorWithFlush() throws Exception { Assert.assertThrows(FeatureNotSupportedException.class, () -> { noCfMut.mutate(del); }); - del.addFamily(Bytes.toBytes("family_group")); + del.addFamily(Bytes.toBytes(family)); // test reuse different type bufferedMutator final BufferedMutator difTypeMut = putBufferMutator; Assert.assertThrows(IllegalArgumentException.class, () -> { @@ -283,8 +286,8 @@ public void testBufferedMutatorWithFlush() throws Exception { if (delBufferedMutator != null) { delBufferedMutator.close(); // test add mutations after closed - Delete delete = new Delete(Bytes.toBytes("putKey")); - delete.addFamily(Bytes.toBytes("family_group")); + Delete delete = new Delete(Bytes.toBytes(key)); + delete.addFamily(Bytes.toBytes(family)); final BufferedMutator closedMutator = delBufferedMutator; Assert.assertThrows(IllegalStateException.class, () -> { closedMutator.mutate(delete); @@ -311,24 +314,29 @@ public void testBufferedMutatorUseNameSpaceWithFlush() throws Exception { conf.set(SOCKET_TIMEOUT_CONNECT, "15000"); BufferedMutator putBufferMutator = null; BufferedMutator delBufferedMutator = null; + String key = "putKey"; + String column1 = "putColumn1"; + String value = "value333444"; + String family = "family_group"; + long timestamp = System.currentTimeMillis(); try { // use n1 database TableName tableName = TableName.valueOf("n1","test"); connection = ConnectionFactory.createConnection(conf); hTable = connection.getTable(tableName); + + Delete delete= new Delete(toBytes(key)); + delete.addFamily(toBytes(family)); + hTable.delete(delete); + // use defualt params putBufferMutator = connection.getBufferedMutator(tableName); delBufferedMutator = connection.getBufferedMutator(tableName); - String key = "putKey"; - String column1 = "putColumn1"; - String value = "value333444"; - long timestamp = System.currentTimeMillis(); - // only support Put and Delete // for other type of operations, BufferedMutator will not set its type for them Append append = new Append(Bytes.toBytes(key)); - append.add("family_group".getBytes(), column1.getBytes(), toBytes("_suffix")); + append.add(toBytes(family), toBytes(column1), toBytes("_suffix")); final BufferedMutator apMut = putBufferMutator; Assert.assertThrows(IllegalArgumentException.class, () -> { apMut.mutate(append); @@ -337,10 +345,10 @@ public void testBufferedMutatorUseNameSpaceWithFlush() throws Exception { List mutations = new ArrayList<>(); // test Put Put put1 = new Put(Bytes.toBytes(key)); - put1.addColumn(Bytes.toBytes("family_group"), Bytes.toBytes(column1), timestamp, Bytes.toBytes(value)); + put1.addColumn(Bytes.toBytes(family), Bytes.toBytes(column1), timestamp, Bytes.toBytes(value)); mutations.add(put1); Put put2 = new Put(Bytes.toBytes(key)); - put2.addColumn(Bytes.toBytes("family_group"), Bytes.toBytes(column1 + "1"), timestamp, Bytes.toBytes(value + "4")); + put2.addColumn(Bytes.toBytes(family), Bytes.toBytes(column1 + "1"), timestamp, Bytes.toBytes(value + "4")); mutations.add(put2); // test add Mutations with List putBufferMutator.mutate(mutations); @@ -356,7 +364,7 @@ public void testBufferedMutatorUseNameSpaceWithFlush() throws Exception { Assert.assertThrows(FeatureNotSupportedException.class, () -> { noCfMut.mutate(del); }); - del.addFamily(Bytes.toBytes("family_group")); + del.addFamily(Bytes.toBytes(family)); final BufferedMutator difTypeMut = putBufferMutator; // test reuse different type bufferedMutator Assert.assertThrows(IllegalArgumentException.class, () -> { @@ -412,30 +420,31 @@ public void testBufferedMutatorWithAutoFlush() throws Exception { BufferedMutatorParams params = null; long bufferSize = 45000L; int count = 0; + String key = "putKey"; + String column1 = "putColumn1"; + String value = "value333444"; + long timestamp = System.currentTimeMillis(); + String family = "family_group"; try { TableName tableName = TableName.valueOf("test"); connection = ConnectionFactory.createConnection(conf); hTable = connection.getTable(tableName); + + Delete delete= new Delete(toBytes(key)); + delete.addFamily(toBytes(family)); + hTable.delete(delete); + // set params params = new BufferedMutatorParams(tableName); params.writeBufferSize(bufferSize); putBufferMutator = connection.getBufferedMutator(params); - String key = "putKey"; - String column1 = "putColumn1"; - String value = "value333444"; - long timestamp = System.currentTimeMillis(); - - Delete delete= new Delete(toBytes(key)); - delete.addFamily(toBytes("family_group")); - hTable.delete(delete); - List mutations = new ArrayList<>(); for (int i = 0; i < 50; ++i) { mutations.clear(); for (int j = 0; j < 4; ++j) { Put put = new Put(Bytes.toBytes(key)); - put.addColumn(Bytes.toBytes("family_group"), Bytes.toBytes(column1 + "_" + i + "_" + j), + put.addColumn(Bytes.toBytes(family), Bytes.toBytes(column1 + "_" + i + "_" + j), timestamp, Bytes.toBytes(value + "_" + i + "_" + j)); mutations.add(put); } @@ -451,14 +460,14 @@ public void testBufferedMutatorWithAutoFlush() throws Exception { } finally { if (putBufferMutator != null) { putBufferMutator.close(); - Get get = new Get(toBytes("putKey")); + Get get = new Get(toBytes(key)); Result r = hTable.get(get); for (Cell keyValue : r.rawCells()) { ++count; } Assert.assertEquals(200, count); - Delete delete = new Delete(toBytes("putKey")); - delete.addFamily(toBytes("family_group")); + Delete delete = new Delete(toBytes(key)); + delete.addFamily(toBytes(family)); hTable.delete(delete); r = hTable.get(get); Assert.assertEquals(0, r.rawCells().length); @@ -498,10 +507,20 @@ public void testBufferedMutatorWithUserPool() throws Exception { BufferedMutatorParams params = null; long bufferSize = 45000L; int count = 0; + String key = "putKey"; + String column1 = "putColumn1"; + String value = "value333444"; + long timestamp = System.currentTimeMillis(); + String family = "family_group"; try { TableName tableName = TableName.valueOf("test"); connection = ConnectionFactory.createConnection(conf); hTable = connection.getTable(tableName); + + Delete delete= new Delete(toBytes(key)); + delete.addFamily(toBytes(family)); + hTable.delete(delete); + // set params params = new BufferedMutatorParams(tableName); params.writeBufferSize(bufferSize); @@ -514,21 +533,12 @@ public void testBufferedMutatorWithUserPool() throws Exception { ohBufferMutator = connection.getBufferedMutator(params); - String key = "putKey"; - String column1 = "putColumn1"; - String value = "value333444"; - long timestamp = System.currentTimeMillis(); - - Delete delete= new Delete(toBytes(key)); - delete.addFamily(toBytes("family_group")); - hTable.delete(delete); - List mutations = new ArrayList<>(); for (int i = 0; i < 50; ++i) { mutations.clear(); for (int j = 0; j < 4; ++j) { Put put = new Put(Bytes.toBytes(key)); - put.addColumn(Bytes.toBytes("family_group"), Bytes.toBytes(column1 + "_" + i + "_" + j), + put.addColumn(Bytes.toBytes(family), Bytes.toBytes(column1 + "_" + i + "_" + j), timestamp, Bytes.toBytes(value + "_" + i + "_" + j)); mutations.add(put); } @@ -544,14 +554,14 @@ public void testBufferedMutatorWithUserPool() throws Exception { } finally { if (ohBufferMutator != null) { ohBufferMutator.close(); - Get get = new Get(toBytes("putKey")); + Get get = new Get(toBytes(key)); Result r = hTable.get(get); for (Cell keyValue : r.rawCells()) { ++count; } Assert.assertEquals(200, count); - Delete delete = new Delete(toBytes("putKey")); - delete.addFamily(toBytes("family_group")); + Delete delete = new Delete(toBytes(key)); + delete.addFamily(toBytes(family)); hTable.delete(delete); r = hTable.get(get); @@ -591,10 +601,20 @@ public void testBufferedMutatorConcurrent() throws Exception { ExecutorService executorService = Executors.newFixedThreadPool(10); long bufferSize = 45000L; int count = 0; + String key = "putKey"; + String column1 = "putColumn1"; + String value = "value333444"; + long timestamp = System.currentTimeMillis(); + String family = "family_group"; try { TableName tableName = TableName.valueOf("test"); connection = ConnectionFactory.createConnection(conf); hTable = connection.getTable(tableName); + + Delete delete= new Delete(toBytes(key)); + delete.addFamily(toBytes(family)); + hTable.delete(delete); + // set params params = new BufferedMutatorParams(tableName); params.writeBufferSize(bufferSize); @@ -607,15 +627,6 @@ public void testBufferedMutatorConcurrent() throws Exception { ohBufferMutator = connection.getBufferedMutator(params); - String key = "putKey"; - String column1 = "putColumn1"; - String value = "value333444"; - long timestamp = System.currentTimeMillis(); - - Delete delete= new Delete(toBytes(key)); - delete.addFamily(toBytes("family_group")); - hTable.delete(delete); - for (int i = 0; i < 50; ++i) { final int taskId = i; final BufferedMutator thrBufferMutator = ohBufferMutator; @@ -628,7 +639,7 @@ public void testBufferedMutatorConcurrent() throws Exception { long thrTimestamp = timestamp; Put put = new Put(Bytes.toBytes(thrKey)); - put.addColumn(Bytes.toBytes("family_group"), Bytes.toBytes(thrColumn), + put.addColumn(Bytes.toBytes(family), Bytes.toBytes(thrColumn), thrTimestamp, Bytes.toBytes(thrValue)); mutations.add(put); } @@ -663,14 +674,14 @@ public void testBufferedMutatorConcurrent() throws Exception { } if (ohBufferMutator != null) { ohBufferMutator.close(); - Get get = new Get(toBytes("putKey")); + Get get = new Get(toBytes(key)); Result r = hTable.get(get); for (Cell keyValue : r.rawCells()) { ++count; } Assert.assertEquals(200, count); - Delete delete = new Delete(toBytes("putKey")); - delete.addFamily(toBytes("family_group")); + Delete delete = new Delete(toBytes(key)); + delete.addFamily(toBytes(family)); hTable.delete(delete); r = hTable.get(get); diff --git a/src/test/java/com/alipay/oceanbase/hbase/OHTableDeleteFamilyVersionTest.java b/src/test/java/com/alipay/oceanbase/hbase/OHTableDeleteFamilyVersionTest.java new file mode 100644 index 00000000..26ee365a --- /dev/null +++ b/src/test/java/com/alipay/oceanbase/hbase/OHTableDeleteFamilyVersionTest.java @@ -0,0 +1,350 @@ +package com.alipay.oceanbase.hbase; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.*; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.*; +import org.junit.rules.ExpectedException; + +import java.io.IOException; + +import static org.apache.hadoop.hbase.util.Bytes.toBytes; + +public class OHTableDeleteFamilyVersionTest { + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + protected Table hTable; + private static final String key1 = "scanKey1x"; + private static final String key2 = "scanKey2x"; + private static final String key3 = "scanKey3x"; + private static final String column1 = "column1"; + private static final String column2 = "column2"; + private static final String column3 = "column3"; + private static final String value1 = "value1"; + private static final String value2 = "value2"; + private static final String value3 = "value3"; + private static final String family1 = "family_with_group1"; + private static final String family2 = "family_with_group2"; + + @Before + public void before() throws Exception { + hTable = ObHTableTestUtil.newOHTableClient("test_multi_cf"); + ((OHTableClient) hTable).init(); + } + + @After + public void finish() throws IOException { + hTable.close(); + } + + public void tryPut(Table hTable, Put put) throws Exception { + hTable.put(put); + Thread.sleep(1); + } + + @Test + public void testDeleteFamilyVerison() throws Exception { + // delete previous data + Delete deleteKey1Family = new Delete(toBytes(key1)); + deleteKey1Family.addFamily(toBytes(family1)); + deleteKey1Family.addFamily(toBytes(family2)); + Delete deleteKey2Family = new Delete(toBytes(key2)); + deleteKey2Family.addFamily(toBytes(family1)); + deleteKey2Family.addFamily(toBytes(family2)); + Delete deleteKey3Family = new Delete(toBytes(key3)); + deleteKey3Family.addFamily(toBytes(family1)); + deleteKey3Family.addFamily(toBytes(family2)); + + hTable.delete(deleteKey1Family); + hTable.delete(deleteKey2Family); + hTable.delete(deleteKey3Family); + + long minTimeStamp = System.currentTimeMillis(); + Thread.sleep(5); + long timeStamp1 = System.currentTimeMillis(); + Thread.sleep(5); + long timeStamp2 = System.currentTimeMillis(); + Thread.sleep(5); + long timeStamp3 = System.currentTimeMillis(); + Thread.sleep(5); + long timeStamp4 = System.currentTimeMillis(); + Thread.sleep(5); + long timeStamp5 = System.currentTimeMillis(); + Thread.sleep(5); + long timeStamp6 = System.currentTimeMillis(); + Thread.sleep(5); + long timeStamp7 = System.currentTimeMillis(); + Thread.sleep(5); + long timeStamp8 = System.currentTimeMillis(); + Thread.sleep(5); + long timeStamp9 = System.currentTimeMillis(); + Thread.sleep(5); + long timeStamp10 = System.currentTimeMillis(); + Thread.sleep(5); + long timeStamp11 = System.currentTimeMillis(); + Thread.sleep(5); + long maxTimeStamp = System.currentTimeMillis(); + + Put putKey1Fam1Column1MinTs = new Put(toBytes(key1)); + putKey1Fam1Column1MinTs.addColumn(toBytes(family1), toBytes(column1), minTimeStamp, + toBytes(value1)); + + Put putKey3Fam1Column1Ts1 = new Put(toBytes(key3)); + putKey3Fam1Column1Ts1.addColumn(toBytes(family1), toBytes(column1), timeStamp1, toBytes(value2)); + + Put putKey1Fam1Column2MinTs = new Put(toBytes(key1)); + putKey1Fam1Column2MinTs.addColumn(toBytes(family1), toBytes(column2), minTimeStamp, + toBytes(value1)); + + Put putKey1Fam1Column2Ts3 = new Put(toBytes(key1)); + putKey1Fam1Column2Ts3.addColumn(toBytes(family1), toBytes(column2), timeStamp3, toBytes(value2)); + + Put putKey2Fam1Column2Ts3 = new Put(toBytes(key2)); + putKey2Fam1Column2Ts3.addColumn(toBytes(family1), toBytes(column2), timeStamp3, toBytes(value2)); + + Put putKey2Fam1Column3Ts1 = new Put(toBytes(key2)); + putKey2Fam1Column3Ts1.addColumn(toBytes(family1), toBytes(column3), timeStamp1, toBytes(value2)); + + Put putKey3Fam1Column3Ts1 = new Put(toBytes(key3)); + putKey3Fam1Column3Ts1.addColumn(toBytes(family1), toBytes(column3), timeStamp1, toBytes(value2)); + + Put putKey3Fam1Column2Ts6 = new Put(toBytes(key3)); + putKey3Fam1Column2Ts6.addColumn(toBytes(family1), toBytes(column2), timeStamp6, toBytes(value1)); + + Put putKey2Fam1Column3Ts6 = new Put(toBytes(key2)); + putKey2Fam1Column3Ts6.addColumn(toBytes(family1), toBytes(column3), timeStamp3, toBytes(value1)); + + tryPut(hTable, putKey1Fam1Column1MinTs); + tryPut(hTable, putKey3Fam1Column1Ts1); + tryPut(hTable, putKey1Fam1Column2MinTs); + tryPut(hTable, putKey1Fam1Column2Ts3); + tryPut(hTable, putKey2Fam1Column2Ts3); + tryPut(hTable, putKey2Fam1Column3Ts1); + tryPut(hTable, putKey3Fam1Column3Ts1); + tryPut(hTable, putKey3Fam1Column2Ts6); + tryPut(hTable, putKey2Fam1Column3Ts6); + + // test DeleteFamilyVersion single cf + Get get = new Get(toBytes(key1)); + get.addFamily(toBytes(family1)); + get.setTimeStamp(minTimeStamp); + get.setMaxVersions(10); + Result r = hTable.get(get); + Assert.assertEquals(2, r.rawCells().length); + + get = new Get(toBytes(key3)); + get.addFamily(toBytes(family1)); + get.setTimeStamp(timeStamp1); + get.setMaxVersions(10); + r = hTable.get(get); + Assert.assertEquals(2, r.rawCells().length); + + get = new Get(toBytes(key2)); + get.addFamily(toBytes(family1)); + get.setTimeStamp(timeStamp3); + get.setMaxVersions(10); + r = hTable.get(get); + Assert.assertEquals(2, r.rawCells().length); + + Delete delKey1MinTs = new Delete(toBytes(key1)); + delKey1MinTs.addFamilyVersion(toBytes(family1), minTimeStamp); + hTable.delete(delKey1MinTs); + + get = new Get(toBytes(key1)); + get.addFamily(toBytes(family1)); + get.setTimeStamp(minTimeStamp); + get.setMaxVersions(10); + r = hTable.get(get); + Assert.assertEquals(0, r.rawCells().length); + + Delete delKey3Ts1 = new Delete(toBytes(key3)); + delKey3Ts1.addFamilyVersion(toBytes(family1), timeStamp1); + hTable.delete(delKey3Ts1); + + get = new Get(toBytes(key3)); + get.addFamily(toBytes(family1)); + get.setTimeStamp(timeStamp1); + get.setMaxVersions(10); + r = hTable.get(get); + Assert.assertEquals(0, r.rawCells().length); + + Delete delKey2Ts3 = new Delete(toBytes(key2)); + delKey2Ts3.addFamilyVersion(family1.getBytes(), timeStamp3); + hTable.delete(delKey2Ts3); + + get = new Get(toBytes(key2)); + get.addFamily(toBytes(family1)); + get.setTimeStamp(timeStamp3); + get.setMaxVersions(10); + r = hTable.get(get); + Assert.assertEquals(0, r.rawCells().length); + + Scan scan = new Scan(); + scan.setStartRow(toBytes(key1)); + scan.setStopRow("scanKey4x".getBytes()); + scan.addFamily(toBytes(family1)); + scan.setMaxVersions(10); + ResultScanner scanner = hTable.getScanner(scan); + int key1Cnt = 0, key2Cnt = 0, key3Cnt = 0; + for (Result result : scanner) { + for (Cell kv : result.rawCells()) { + if (key1.equals(Bytes.toString(CellUtil.cloneRow(kv)))) { + ++key1Cnt; + } else if (key2.equals(Bytes.toString(CellUtil.cloneRow(kv)))) { + ++key2Cnt; + } else { + ++key3Cnt; + } + } + } + Assert.assertEquals(1, key1Cnt); + Assert.assertEquals(1, key2Cnt); + Assert.assertEquals(1, key3Cnt); + + hTable.delete(deleteKey1Family); + hTable.delete(deleteKey2Family); + hTable.delete(deleteKey3Family); + + // test DeleteFamilyVersion multiple cf + Put putKey1Fam1Column3Ts6 = new Put(toBytes(key1)); + putKey1Fam1Column3Ts6.addColumn(toBytes(family1), toBytes(column3), timeStamp6, toBytes(value3)); + + Put putKey1Fam2Column2Ts2 = new Put(toBytes(key1)); + putKey1Fam2Column2Ts2.addColumn(toBytes(family2), toBytes(column2), timeStamp2, toBytes(value1)); + + Put putKey1Fam2Column3Ts2 = new Put(toBytes(key1)); + putKey1Fam2Column3Ts2.addColumn(toBytes(family2), toBytes(column3), timeStamp2, toBytes(value1)); + + Put putKey1Fam1Column2Ts1 = new Put(toBytes(key1)); + putKey1Fam1Column2Ts1.addColumn(toBytes(family1), toBytes(column2), timeStamp1, toBytes(value2)); + + Put putKey2Fam1Column2Ts8 = new Put(toBytes(key2)); + putKey2Fam1Column2Ts8.addColumn(toBytes(family1), toBytes(column2), timeStamp8, toBytes(value2)); + + Put putKey2Fam2Column3Ts1 = new Put(toBytes(key2)); + putKey2Fam2Column3Ts1.addColumn(toBytes(family2), toBytes(column3), timeStamp3, toBytes(value3)); + + Put putKey2Fam1Column1Ts1 = new Put(toBytes(key2)); + putKey2Fam1Column1Ts1.addColumn(toBytes(family1), toBytes(column1), timeStamp8, toBytes(value1)); + + Put putKey2Fam2Column1Ts3 = new Put(toBytes(key2)); + putKey2Fam2Column1Ts3.addColumn(toBytes(family2), toBytes(column1), timeStamp3, toBytes(value2)); + + Put putKey3Fam1Column2Ts9 = new Put(toBytes(key3)); + putKey3Fam1Column2Ts9.addColumn(toBytes(family1), toBytes(column2), timeStamp9, toBytes(value2)); + + Put putKey3Fam2Column3Ts10 = new Put(toBytes(key3)); + putKey3Fam2Column3Ts10 + .addColumn(toBytes(family2), toBytes(column3), timeStamp10, toBytes(value1)); + + Put putKey3Fam2Column1Ts10 = new Put(toBytes(key3)); + putKey3Fam2Column1Ts10 + .addColumn(toBytes(family2), toBytes(column1), timeStamp10, toBytes(value2)); + + Put putKey3Fam1Column2Ts2 = new Put(toBytes(key3)); + putKey3Fam1Column2Ts2.addColumn(toBytes(family1), toBytes(column2), timeStamp2, toBytes(value1)); + + tryPut(hTable, putKey1Fam1Column3Ts6); + tryPut(hTable, putKey1Fam2Column2Ts2); + tryPut(hTable, putKey1Fam2Column3Ts2); + tryPut(hTable, putKey1Fam1Column2Ts1); + tryPut(hTable, putKey2Fam1Column2Ts8); + tryPut(hTable, putKey2Fam2Column3Ts1); + tryPut(hTable, putKey2Fam1Column1Ts1); + tryPut(hTable, putKey2Fam2Column1Ts3); + tryPut(hTable, putKey3Fam1Column2Ts9); + tryPut(hTable, putKey3Fam2Column3Ts10); + tryPut(hTable, putKey3Fam2Column1Ts10); + tryPut(hTable, putKey3Fam1Column2Ts2); + + Get getKey1 = new Get(toBytes(key1)); + getKey1.addFamily(toBytes(family1)); + getKey1.addFamily(toBytes(family2)); + getKey1.setMaxVersions(10); + r = hTable.get(getKey1); + Assert.assertEquals(4, r.rawCells().length); + + Get getKey2 = new Get(toBytes(key2)); + getKey2.addFamily(toBytes(family1)); + getKey2.addFamily(toBytes(family2)); + getKey2.setMaxVersions(10); + r = hTable.get(getKey2); + Assert.assertEquals(4, r.rawCells().length); + + Get getKey3 = new Get(toBytes(key3)); + getKey3.addFamily(toBytes(family1)); + getKey3.addFamily(toBytes(family2)); + getKey3.setMaxVersions(10); + r = hTable.get(getKey3); + Assert.assertEquals(4, r.rawCells().length); + + Delete delKey1Ts_6_2 = new Delete(toBytes(key1)); + delKey1Ts_6_2.addFamilyVersion(toBytes(family1), timeStamp6); + delKey1Ts_6_2.addFamilyVersion(toBytes(family2), timeStamp2); + hTable.delete(delKey1Ts_6_2); + + getKey1 = new Get(toBytes(key1)); + getKey1.addFamily(toBytes(family1)); + getKey1.addFamily(toBytes(family2)); + getKey1.setMaxVersions(10); + r = hTable.get(getKey1); + Assert.assertEquals(1, r.rawCells().length); + for (Cell kv : r.rawCells()) { + Assert.assertEquals(timeStamp1, kv.getTimestamp()); + } + + Delete delKey2Ts_8_3 = new Delete(toBytes(key2)); + delKey2Ts_8_3.addFamilyVersion(toBytes(family1), timeStamp8); + delKey2Ts_8_3.addFamilyVersion(toBytes(family2), timeStamp3); + hTable.delete(delKey2Ts_8_3); + + getKey2 = new Get(toBytes(key2)); + getKey2.addFamily(toBytes(family1)); + getKey2.addFamily(toBytes(family2)); + getKey2.setMaxVersions(10); + r = hTable.get(getKey2); + Assert.assertEquals(0, r.rawCells().length); + + Delete delKey3Ts_2_10 = new Delete(toBytes(key3)); + delKey3Ts_2_10.addFamilyVersion(toBytes(family1), timeStamp2); + delKey3Ts_2_10.addFamilyVersion(toBytes(family2), timeStamp10); + hTable.delete(delKey3Ts_2_10); + + getKey3 = new Get(toBytes(key3)); + getKey3.addFamily(toBytes(family1)); + getKey3.addFamily(toBytes(family2)); + getKey3.setMaxVersions(10); + r = hTable.get(getKey3); + Assert.assertEquals(1, r.rawCells().length); + for (Cell kv : r.rawCells()) { + Assert.assertEquals(timeStamp9, kv.getTimestamp()); + } + + scan = new Scan(); + scan.setStartRow(toBytes(key1)); + scan.setStopRow("scanKey4x".getBytes()); + scan.addFamily(toBytes(family1)); + scan.addFamily(toBytes(family2)); + scan.setMaxVersions(10); + scanner = hTable.getScanner(scan); + int ts1Cnt = 0, ts9Cnt = 0; + for (Result result : scanner) { + for (Cell kv : result.rawCells()) { + if (kv.getTimestamp() == timeStamp1) { + ++ts1Cnt; + } else if (kv.getTimestamp() == timeStamp9) { + ++ts9Cnt; + } + } + } + Assert.assertEquals(1, ts1Cnt); + Assert.assertEquals(1, ts9Cnt); + + hTable.delete(deleteKey1Family); + hTable.delete(deleteKey2Family); + hTable.delete(deleteKey3Family); + } +} diff --git a/src/test/java/com/alipay/oceanbase/hbase/filter/HBaseFilterUtilsTest.java b/src/test/java/com/alipay/oceanbase/hbase/filter/HBaseFilterUtilsTest.java index f0e66604..4fac8402 100644 --- a/src/test/java/com/alipay/oceanbase/hbase/filter/HBaseFilterUtilsTest.java +++ b/src/test/java/com/alipay/oceanbase/hbase/filter/HBaseFilterUtilsTest.java @@ -138,6 +138,39 @@ public void testSingleColumnValueFilter() throws IOException { } } + @Test + public void testSingleColumnValueExcludeFilter() throws IOException { + for (int i = 0; i < ops.length; i++) { + String expect = String + .format( + "SingleColumnValueExcludeFilter('family','qualifier',%s,'binary:value',false,true)", + opFlags[i]); + SingleColumnValueExcludeFilter filter = new SingleColumnValueExcludeFilter( + "family".getBytes(), "qualifier".getBytes(), ops[i], "value".getBytes()); + Assert.assertArrayEquals(expect.getBytes(), + HBaseFilterUtils.toParseableByteArray(filter)); + } + } + + @Test + public void testDependentColumnFilter() throws IOException { + DependentColumnFilter filter = new DependentColumnFilter("family".getBytes(), + "qualifier".getBytes()); + String expect = "DependentColumnFilter('family','qualifier',false)"; + Assert.assertArrayEquals(expect.getBytes(), HBaseFilterUtils.toParseableByteArray(filter)); + filter = new DependentColumnFilter("family".getBytes(), "qualifier".getBytes(), true); + expect = "DependentColumnFilter('family','qualifier',true)"; + Assert.assertArrayEquals(expect.getBytes(), HBaseFilterUtils.toParseableByteArray(filter)); + for (int i = 0; i < ops.length; ++i) { + filter = new DependentColumnFilter("family".getBytes(), "qualifier".getBytes(), false, + ops[i], new BinaryComparator("value".getBytes())); + expect = String.format( + "DependentColumnFilter('family','qualifier',false,%s,'binary:value')", opFlags[i]); + Assert.assertArrayEquals(expect.getBytes(), + HBaseFilterUtils.toParseableByteArray(filter)); + } + } + @Test public void testPageFilter() throws IOException { PageFilter filter = new PageFilter(128); @@ -188,24 +221,24 @@ public void testMultiRowRangeFilter() throws IOException { @Test public void testInclusiveStopFilter() throws IOException { InclusiveStopFilter filter = new InclusiveStopFilter(Bytes.toBytes("aaa")); - Assert.assertArrayEquals("InclusiveStopFilter('aaa')".getBytes(), HBaseFilterUtils.toParseableByteArray(filter)); + Assert.assertArrayEquals("InclusiveStopFilter('aaa')".getBytes(), + HBaseFilterUtils.toParseableByteArray(filter)); } @Test public void testColumnRangeFilter() throws IOException { - ColumnRangeFilter filter = new ColumnRangeFilter(Bytes.toBytes("a"), true, Bytes.toBytes("b"), false); - Assert.assertArrayEquals("ColumnRangeFilter('a',true,'b',false)".getBytes(), HBaseFilterUtils.toParseableByteArray(filter)); + ColumnRangeFilter filter = new ColumnRangeFilter(Bytes.toBytes("a"), true, + Bytes.toBytes("b"), false); + Assert.assertArrayEquals("ColumnRangeFilter('a',true,'b',false)".getBytes(), + HBaseFilterUtils.toParseableByteArray(filter)); } @Test public void testMultipleColumnPrefixFilter() throws IOException { - byte[][] prefix = { - Bytes.toBytes("a"), - Bytes.toBytes("b"), - Bytes.toBytes("d"), - }; + byte[][] prefix = { Bytes.toBytes("a"), Bytes.toBytes("b"), Bytes.toBytes("d"), }; MultipleColumnPrefixFilter filter = new MultipleColumnPrefixFilter(prefix); - Assert.assertArrayEquals("MultipleColumnPrefixFilter('a','b','d')".getBytes(), HBaseFilterUtils.toParseableByteArray(filter)); + Assert.assertArrayEquals("MultipleColumnPrefixFilter('a','b','d')".getBytes(), + HBaseFilterUtils.toParseableByteArray(filter)); } @Test From 5d43b53857c3e563b871ce790f20743309e34d62 Mon Sep 17 00:00:00 2001 From: stuBirdFly <84010733+stuBirdFly@users.noreply.github.com> Date: Wed, 23 Oct 2024 11:42:52 +0800 Subject: [PATCH 4/6] hbase support batchCallBack (#86) --- .../com/alipay/oceanbase/hbase/OHTable.java | 14 +++++++-- .../hbase/OHTableMultiColumnFamilyTest.java | 29 ++++++++++++++++++- 2 files changed, 40 insertions(+), 3 deletions(-) diff --git a/src/main/java/com/alipay/oceanbase/hbase/OHTable.java b/src/main/java/com/alipay/oceanbase/hbase/OHTable.java index e0df60b8..60117770 100644 --- a/src/main/java/com/alipay/oceanbase/hbase/OHTable.java +++ b/src/main/java/com/alipay/oceanbase/hbase/OHTable.java @@ -519,8 +519,18 @@ private String getTargetTableName(List actions) { @Override public void batchCallback(List actions, Object[] results, Batch.Callback callback) throws IOException, - InterruptedException { - throw new FeatureNotSupportedException("not supported yet'"); + InterruptedException { + try { + batch(actions, results); + } finally { + if (results != null) { + for (int i = 0; i < results.length; i++) { + if (!(results[i] instanceof ObTableException)) { + callback.update(null, actions.get(i).getRow(), (R) results[i]); + } + } + } + } } public static int compareByteArray(byte[] bt1, byte[] bt2) { diff --git a/src/test/java/com/alipay/oceanbase/hbase/OHTableMultiColumnFamilyTest.java b/src/test/java/com/alipay/oceanbase/hbase/OHTableMultiColumnFamilyTest.java index 5a2e1c15..7e1cfe08 100644 --- a/src/test/java/com/alipay/oceanbase/hbase/OHTableMultiColumnFamilyTest.java +++ b/src/test/java/com/alipay/oceanbase/hbase/OHTableMultiColumnFamilyTest.java @@ -17,10 +17,11 @@ package com.alipay.oceanbase.hbase; +import com.alipay.oceanbase.rpc.mutation.result.MutationResult; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; -import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.*; +import org.apache.hadoop.hbase.client.coprocessor.Batch; import org.apache.hadoop.hbase.filter.PrefixFilter; import org.junit.*; import org.junit.rules.ExpectedException; @@ -152,6 +153,32 @@ public void testMultiColumnFamilyBatch() throws Exception { result = hTable.get(get); keyValues = result.rawCells(); assertEquals(6, keyValues.length); + + batchLsit.clear(); + final long[] updateCounter = new long[] { 0L }; + delete = new Delete(toBytes("Key5")); + delete.deleteColumns(family1, family1_column2); + delete.deleteColumns(family2, family2_column1); + delete.deleteFamily(family3); + batchLsit.add(delete); + for (int i = 0; i < rows; ++i) { + Put put = new Put(toBytes("Key" + i)); + put.add(family1, family1_column1, family1_value); + put.add(family1, family1_column2, family1_value); + put.add(family1, family1_column3, family1_value); + put.add(family2, family2_column1, family2_value); + put.add(family2, family2_column2, family2_value); + put.add(family3, family3_column1, family3_value); + batchLsit.add(put); + } + hTable.batchCallback(batchLsit, new Batch.Callback() { + @Override + public void update(byte[] region, byte[] row, MutationResult result) { + updateCounter[0]++; + } + }); + assertEquals(11, updateCounter[0]); + } @Test From f801cfe0a899925791aa3b19dddd364bdab58700 Mon Sep 17 00:00:00 2001 From: miyuan-ljr Date: Thu, 24 Oct 2024 15:27:01 +0800 Subject: [PATCH 5/6] fix test --- .../hbase/OHTableMultiColumnFamilyTest.java | 21 ++++++++++--------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/src/test/java/com/alipay/oceanbase/hbase/OHTableMultiColumnFamilyTest.java b/src/test/java/com/alipay/oceanbase/hbase/OHTableMultiColumnFamilyTest.java index 7e1cfe08..8f252c5f 100644 --- a/src/test/java/com/alipay/oceanbase/hbase/OHTableMultiColumnFamilyTest.java +++ b/src/test/java/com/alipay/oceanbase/hbase/OHTableMultiColumnFamilyTest.java @@ -157,21 +157,22 @@ public void testMultiColumnFamilyBatch() throws Exception { batchLsit.clear(); final long[] updateCounter = new long[] { 0L }; delete = new Delete(toBytes("Key5")); - delete.deleteColumns(family1, family1_column2); - delete.deleteColumns(family2, family2_column1); - delete.deleteFamily(family3); + delete.addColumns(family1, family1_column2); + delete.addColumns(family2, family2_column1); + delete.addFamily(family3); batchLsit.add(delete); for (int i = 0; i < rows; ++i) { Put put = new Put(toBytes("Key" + i)); - put.add(family1, family1_column1, family1_value); - put.add(family1, family1_column2, family1_value); - put.add(family1, family1_column3, family1_value); - put.add(family2, family2_column1, family2_value); - put.add(family2, family2_column2, family2_value); - put.add(family3, family3_column1, family3_value); + put.addColumn(family1, family1_column1, family1_value); + put.addColumn(family1, family1_column2, family1_value); + put.addColumn(family1, family1_column3, family1_value); + put.addColumn(family2, family2_column1, family2_value); + put.addColumn(family2, family2_column2, family2_value); + put.addColumn(family3, family3_column1, family3_value); batchLsit.add(put); } - hTable.batchCallback(batchLsit, new Batch.Callback() { + results = new Object[batchLsit.size()]; + hTable.batchCallback(batchLsit, results, new Batch.Callback() { @Override public void update(byte[] region, byte[] row, MutationResult result) { updateCounter[0]++; From 4c96d00543fb9b211bb31209cdc685869ac1510e Mon Sep 17 00:00:00 2001 From: miyuan-ljr Date: Fri, 25 Oct 2024 11:27:10 +0800 Subject: [PATCH 6/6] fix test --- src/main/java/com/alipay/oceanbase/hbase/OHTable.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/main/java/com/alipay/oceanbase/hbase/OHTable.java b/src/main/java/com/alipay/oceanbase/hbase/OHTable.java index 60117770..44ea1df7 100644 --- a/src/main/java/com/alipay/oceanbase/hbase/OHTable.java +++ b/src/main/java/com/alipay/oceanbase/hbase/OHTable.java @@ -1811,6 +1811,8 @@ public static OHOpType getDeleteType(Cell.Type type) { return OHOpType.DeleteColumn; case DeleteFamily: return OHOpType.DeleteFamily; + case DeleteFamilyVersion: + return OHOpType.DeleteFamilyVersion; default: throw new IllegalArgumentException("illegal mutation type " + type); }