diff --git a/src/main/java/com/alipay/oceanbase/hbase/OHTable.java b/src/main/java/com/alipay/oceanbase/hbase/OHTable.java index 3defa3c7..d9305ec8 100644 --- a/src/main/java/com/alipay/oceanbase/hbase/OHTable.java +++ b/src/main/java/com/alipay/oceanbase/hbase/OHTable.java @@ -24,6 +24,7 @@ import com.alipay.oceanbase.hbase.result.ClientStreamScanner; import com.alipay.oceanbase.hbase.util.*; import com.alipay.oceanbase.rpc.ObTableClient; +import com.alipay.oceanbase.rpc.exception.ObTableException; import com.alipay.oceanbase.rpc.mutation.BatchOperation; import com.alipay.oceanbase.rpc.mutation.result.BatchOperationResult; import com.alipay.oceanbase.rpc.protocol.payload.impl.ObObj; @@ -130,7 +131,7 @@ public class OHTable implements HTableInterface { /** * the buffer of put request */ - private final ArrayList writeBuffer = new ArrayList(); + private final ArrayList writeBuffer = new ArrayList<>(); /** * when the put request reach the write buffer size the do put will * flush commits automatically @@ -331,7 +332,7 @@ public static ThreadPoolExecutor createDefaultThreadPoolExecutor(int coreSize, i System.setProperty(SOFA_THREAD_POOL_LOGGING_CAPABILITY, "false"); } SofaThreadPoolExecutor executor = new SofaThreadPoolExecutor(coreSize, maxThreads, - keepAliveTime, SECONDS, new SynchronousQueue(), "OHTableDefaultExecutePool", + keepAliveTime, SECONDS, new SynchronousQueue<>(), "OHTableDefaultExecutePool", TABLE_HBASE_LOGGER_SPACE); executor.allowCoreThreadTimeOut(true); return executor; @@ -428,13 +429,63 @@ public Boolean[] exists(List gets) throws IOException { } @Override - public void batch(List actions, Object[] results) { - throw new FeatureNotSupportedException("not supported yet."); + public void batch(final List actions, final Object[] results) throws IOException { + BatchError batchError = new BatchError(); + try { + List resultMapSingleOp = new LinkedList<>(); + String realTableName = getTargetTableName(actions); + obTableClient.setRuntimeBatchExecutor(executePool); + BatchOperation batch = buildBatchOperation(realTableName, actions, tableNameString.equals(realTableName), resultMapSingleOp); + BatchOperationResult tmpResults = batch.execute(); + if (results != null) { + if (results.length != actions.size()) { + throw new AssertionError("results.length"); + } + int index = 0; + for (int i = 0; i != results.length; ++i) { + results[i] = tmpResults.getResults().get(index); + index += resultMapSingleOp.get(i); + if (results[i] instanceof ObTableException) { + batchError.add((ObTableException) results[i], actions.get(i), null); + } + } + if (batchError.hasErrors()) { + throw batchError.makeException(); + } + } + } catch (Exception e) { + logger.error(LCD.convert("01-000010"), tableNameString, actions, e); + throw new IOException("batch table " + tableNameString + " error", e); + } + } + + private String getTargetTableName(List actions) { + byte[] family = null; + for (Row action : actions) { + if (action instanceof RowMutations || action instanceof RegionCoprocessorServiceExec) { + throw new FeatureNotSupportedException("not supported yet'"); + } else { + Mutation mutation = (Mutation) action; + if (mutation.getFamilyCellMap().size() != 1) { + return getTargetTableName(tableNameString); + } else { + byte[] nextFamily = mutation.getFamilyCellMap().keySet().iterator().next(); + if (family != null && !Arrays.equals(family, nextFamily)) { + return getTargetTableName(tableNameString); + } else if (family == null) { + family = nextFamily; + } + } + } + } + return getTargetTableName(tableNameString, Bytes.toString(family), configuration); } @Override - public Object[] batch(List actions) { - throw new FeatureNotSupportedException("not supported yet."); + public Object[] batch(List actions) throws IOException { + Object[] results = new Object[actions.size()]; + batch(actions, results); + return results; } @Override @@ -538,7 +589,7 @@ public Result get(final Get get) throws IOException { ServerCallable serverCallable = new ServerCallable(configuration, obTableClient, tableNameString, get.getRow(), get.getRow(), operationTimeout) { public Result call() throws IOException { - List keyValueList = new ArrayList(); + List keyValueList = new ArrayList<>(); byte[] family = new byte[] {}; ObTableClientQueryAsyncStreamResult clientQueryStreamResult; ObTableQueryAsyncRequest request; @@ -797,57 +848,11 @@ public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier, private void innerDelete(Delete delete) throws IOException { checkArgument(delete.getRow() != null, "row is null"); - List errorCodeList = new ArrayList(); - BatchOperationResult results = null; - try { - checkFamilyViolation(delete.getFamilyMap().keySet(), false); - if (delete.getFamilyMap().isEmpty()) { - // For a Delete operation without any qualifiers, we construct a DeleteFamily request. - // The server then performs the operation on all column families. - KeyValue kv = new KeyValue(delete.getRow(), delete.getTimeStamp(), - KeyValue.Type.DeleteFamily); - - BatchOperation batch = buildBatchOperation(tableNameString, Arrays.asList(kv), false, null); - results = batch.execute(); - } else if (delete.getFamilyMap().size() > 1) { - // Currently, the Delete Family operation type cannot transmit qualifiers to the server. - // As a result, the server cannot identify which families need to be deleted. - // Therefore, this process is handled sequentially. - boolean has_delete_family = delete.getFamilyMap().entrySet().stream() - .flatMap(entry -> entry.getValue().stream()) - .anyMatch(kv -> KeyValue.Type.codeToType(kv.getType()) == KeyValue.Type.DeleteFamily); - if (!has_delete_family) { - BatchOperation batch = buildBatchOperation(tableNameString, - delete.getFamilyMap(), false, null); - results = batch.execute(); - } else { - for (Map.Entry> entry : delete.getFamilyMap().entrySet()) { - BatchOperation batch = buildBatchOperation( - getTargetTableName(tableNameString, Bytes.toString(entry.getKey()), configuration), - entry.getValue(), false, null); - results = batch.execute(); - } - } - } else { - Map.Entry> entry = delete.getFamilyMap().entrySet().iterator() - .next(); - - BatchOperation batch = buildBatchOperation( - getTargetTableName(tableNameString, Bytes.toString(entry.getKey()), configuration), - entry.getValue(), false, null); - results = batch.execute(); - } - - errorCodeList = results.getErrorCodeList(); - boolean hasError = results.hasError(); - if (hasError) { - throw results.getFirstException(); - } + batch(Collections.singletonList(delete)); } catch (Exception e) { - logger.error(LCD.convert("01-00004"), tableNameString, errorCodeList, e); - throw new IOException("delete table " + tableNameString + " error codes " - + errorCodeList, e); + logger.error(LCD.convert("01-00004"), tableNameString, e); + throw new IOException("delete table " + tableNameString + "error" , e); } } @@ -1124,77 +1129,34 @@ public boolean isAutoFlush() { public void flushCommits() throws IOException { try { + if (writeBuffer.isEmpty()){ + return; + } + Map exceptionRowMap = new LinkedHashMap(); boolean[] resultSuccess = new boolean[writeBuffer.size()]; try { - Map, List>> familyMap = new HashMap, List>>(); - for (int i = 0; i < writeBuffer.size(); i++) { - Put aPut = writeBuffer.get(i); - Map> innerFamilyMap = aPut.getFamilyMap(); - if (innerFamilyMap.size() > 1) { - // Bypass logic: directly construct BatchOperation for puts with family map size > 1 - try { - BatchOperation batch = buildBatchOperation(this.tableNameString, - innerFamilyMap, false, null); - BatchOperationResult results = batch.execute(); - - boolean hasError = results.hasError(); - resultSuccess[i] = !hasError; - if (hasError) { - throw results.getFirstException(); - } - } catch (Exception e) { - logger.error(LCD.convert("01-00008"), tableNameString, null, autoFlush, - writeBuffer.size(), e); - throw new IOException("put table " + tableNameString + " error codes " - + null + "auto flush " + autoFlush - + " current buffer size " + writeBuffer.size(), e); - } - } else { - // Existing logic for puts with family map size = 1 - for (Map.Entry> entry : innerFamilyMap.entrySet()) { - String family = Bytes.toString(entry.getKey()); - Pair, List> keyValueWithIndex = familyMap - .get(family); - if (keyValueWithIndex == null) { - keyValueWithIndex = new Pair, List>( - new ArrayList(), new ArrayList()); - familyMap.put(family, keyValueWithIndex); - } - keyValueWithIndex.getFirst().add(i); - keyValueWithIndex.getSecond().addAll(entry.getValue()); + String realTableName = getTargetTableName(writeBuffer); + List resultMapSingleOp = new LinkedList<>(); + BatchOperation batch = buildBatchOperation(realTableName, writeBuffer, tableNameString.equals(realTableName), resultMapSingleOp); + BatchOperationResult results = batch.execute(); + if (results != null) { + int index = 0; + for (int i = 0; i != resultSuccess.length; ++i) { + if (results.getResults().get(index) instanceof ObTableException) { + resultSuccess[i] = false; + exceptionRowMap.put((ObTableException)results.getResults().get(index), writeBuffer.get(i)); + } else { + resultSuccess[i] = true; } + index += resultMapSingleOp.get(i); } } - for (Map.Entry, List>> entry : familyMap - .entrySet()) { - List errorCodeList = new ArrayList(entry.getValue() - .getSecond().size()); - try { - String targetTableName = getTargetTableName(this.tableNameString, - entry.getKey(), configuration); - - BatchOperation batch = buildBatchOperation(targetTableName, entry - .getValue().getSecond(), false, null); - BatchOperationResult results = batch.execute(); - - errorCodeList = results.getErrorCodeList(); - boolean hasError = results.hasError(); - for (Integer index : entry.getValue().getFirst()) { - resultSuccess[index] = !hasError; - } - if (hasError) { - throw results.getFirstException(); - } - } catch (Exception e) { - logger.error(LCD.convert("01-00008"), tableNameString, errorCodeList, - autoFlush, writeBuffer.size(), e); - throw new IOException("put table " + tableNameString + " error codes " - + errorCodeList + "auto flush " + autoFlush - + " current buffer size " + writeBuffer.size(), e); - } - - } - + } catch (Exception e) { + logger.error(LCD.convert("01-00008"), tableNameString, null, autoFlush, + writeBuffer.size(), e); + throw new IOException("put table " + tableNameString + " error codes " + null + + "auto flush " + autoFlush + " current buffer size " + + writeBuffer.size(), e); } finally { // mutate list so that it is empty for complete success, or contains // only failed records results are returned in the same order as the @@ -1206,6 +1168,12 @@ public void flushCommits() throws IOException { writeBuffer.remove(i); } } + if (!exceptionRowMap.isEmpty()) { + exceptionRowMap.forEach((e, row)->{ + logger.error(LCD.convert("01-00008"), row, tableNameString, autoFlush, + writeBuffer.size(), e); + }); + } } } finally { if (clearBufferOnFail) { @@ -1611,62 +1579,48 @@ public static ObTableBatchOperation buildObTableBatchOperation(List ke return batch; } - private ObTableBatchOperation buildObTableBatchOperation(Map> familyMap, - boolean putToAppend, - List qualifiers) { - ObTableBatchOperation batch = new ObTableBatchOperation(); - for (Map.Entry> entry : familyMap.entrySet()) { - byte[] family = entry.getKey(); - List keyValueList = entry.getValue(); - for (KeyValue kv : keyValueList) { - if (qualifiers != null) { - qualifiers - .add((Bytes.toString(family) + "." + Bytes.toString(kv.getQualifier())) - .getBytes()); - } - KeyValue new_kv = modifyQualifier(kv, - (Bytes.toString(family) + "." + Bytes.toString(kv.getQualifier())).getBytes()); - batch.addTableOperation(buildObTableOperation(new_kv, putToAppend)); - } - } - batch.setSameType(true); - batch.setSamePropertiesNames(true); - return batch; - } - private com.alipay.oceanbase.rpc.mutation.Mutation buildMutation(KeyValue kv, - boolean putToAppend) { + ObTableOperationType operationType, boolean isTableGroup) { KeyValue.Type kvType = KeyValue.Type.codeToType(kv.getType()); - switch (kvType) { - case Put: - ObTableOperationType operationType; - if (putToAppend) { - operationType = APPEND; - } else { - operationType = INSERT_OR_UPDATE; - } + switch (operationType) { + case INSERT_OR_UPDATE: + case APPEND: return com.alipay.oceanbase.rpc.mutation.Mutation.getInstance(operationType, ROW_KEY_COLUMNS, new Object[] { kv.getRow(), kv.getQualifier(), kv.getTimestamp() }, V_COLUMNS, new Object[] { kv.getValue() }); - case Delete: - return com.alipay.oceanbase.rpc.mutation.Mutation.getInstance(DEL, ROW_KEY_COLUMNS, - new Object[] { kv.getRow(), kv.getQualifier(), kv.getTimestamp() }, null, null); - case DeleteColumn: - return com.alipay.oceanbase.rpc.mutation.Mutation - .getInstance(DEL, ROW_KEY_COLUMNS, - new Object[] { kv.getRow(), kv.getQualifier(), -kv.getTimestamp() }, null, - null); - case DeleteFamily: - return com.alipay.oceanbase.rpc.mutation.Mutation.getInstance(DEL, ROW_KEY_COLUMNS, - new Object[] { kv.getRow(), null, -kv.getTimestamp() }, null, null); + case DEL: + switch (kvType) { + case Delete: + return com.alipay.oceanbase.rpc.mutation.Mutation.getInstance(DEL, + ROW_KEY_COLUMNS, + new Object[] { kv.getRow(), kv.getQualifier(), kv.getTimestamp() }, + null, null); + case Maximum: + return com.alipay.oceanbase.rpc.mutation.Mutation.getInstance(DEL, + ROW_KEY_COLUMNS, + new Object[] { kv.getRow(), null, -kv.getTimestamp() }, + null, null); + case DeleteColumn: + return com.alipay.oceanbase.rpc.mutation.Mutation.getInstance(DEL, + ROW_KEY_COLUMNS, + new Object[] { kv.getRow(), kv.getQualifier(), -kv.getTimestamp() }, + null, null); + case DeleteFamily: + return com.alipay.oceanbase.rpc.mutation.Mutation.getInstance(DEL, + ROW_KEY_COLUMNS, + new Object[] { kv.getRow(), isTableGroup?kv.getQualifier():null, -kv.getTimestamp() }, + null, null); + default: + throw new IllegalArgumentException("illegal mutation type " + kvType); + } default: - throw new IllegalArgumentException("illegal mutation type " + kvType); + throw new IllegalArgumentException("illegal mutation type " + operationType); } } private KeyValue modifyQualifier(KeyValue original, byte[] newQualifier) { - // Extract existing components + // Extract existing components byte[] row = original.getRow(); byte[] family = original.getFamily(); byte[] value = original.getValue(); @@ -1677,37 +1631,66 @@ private KeyValue modifyQualifier(KeyValue original, byte[] newQualifier) { value); } - private BatchOperation buildBatchOperation(String tableName, - Map> familyMap, - boolean putToAppend, List qualifiers) { + private BatchOperation buildBatchOperation(String tableName, List actions, boolean isTableGroup, List resultMapSingleOp) { BatchOperation batch = obTableClient.batchOperation(tableName); - - for (Map.Entry> entry : familyMap.entrySet()) { - byte[] family = entry.getKey(); - List keyValueList = entry.getValue(); - for (KeyValue kv : keyValueList) { - if (qualifiers != null) { - qualifiers.add(kv.getQualifier()); + if (actions != null) { + int posInList = -1; + int singleOpResultNum; + for (Row row : actions) { + singleOpResultNum = 0; + posInList++; + if (!(row instanceof Put) && !(row instanceof Delete)) { + throw new FeatureNotSupportedException( + "not supported other type in batch yet,only support put and delete"); + } else if (row instanceof Put) { + Put put = (Put) row; + if (put.isEmpty()) { + throw new IllegalArgumentException("No columns to insert for #" + + (posInList + 1) + " item"); + } + for (Map.Entry> entry : put.getFamilyMap().entrySet()) { + byte[] family = entry.getKey(); + List keyValueList = entry.getValue(); + for (KeyValue kv : keyValueList) { + singleOpResultNum++; + if(isTableGroup){ + KeyValue new_kv = modifyQualifier(kv,(Bytes.toString(family) + "." + Bytes.toString(kv.getQualifier())) + .getBytes()); + batch.addOperation(buildMutation(new_kv, INSERT_OR_UPDATE, isTableGroup)); + } else { + batch.addOperation(buildMutation(kv, INSERT_OR_UPDATE, isTableGroup)); + } + } + } + } else { + Delete delete = (Delete) row; + if (delete.isEmpty()) { + singleOpResultNum++; + KeyValue kv = new KeyValue(delete.getRow(), delete.getTimeStamp(), + KeyValue.Type.Maximum); + batch.addOperation(buildMutation(kv, DEL, isTableGroup)); + } else { + for (Map.Entry> entry : delete.getFamilyMap() + .entrySet()) { + byte[] family = entry.getKey(); + List keyValueList = entry.getValue(); + List mutations = new LinkedList<>(); + for (KeyValue kv : keyValueList) { + singleOpResultNum++; + if(isTableGroup){ + KeyValue new_kv = modifyQualifier(kv,(Bytes.toString(family) + "." + Bytes.toString(kv.getQualifier())) + .getBytes()); + batch.addOperation(buildMutation(new_kv, DEL, true)); + } else { + batch.addOperation(buildMutation(kv, DEL, false)); + } + } + } + } } - KeyValue new_kv = modifyQualifier(kv, - (Bytes.toString(family) + "." + Bytes.toString(kv.getQualifier())).getBytes()); - batch.addOperation(buildMutation(new_kv, putToAppend)); + resultMapSingleOp.add(singleOpResultNum); } } - - batch.setEntityType(ObTableEntityType.HKV); - return batch; - } - - private BatchOperation buildBatchOperation(String tableName, List keyValueList, - boolean putToAppend, List qualifiers) { - BatchOperation batch = obTableClient.batchOperation(tableName); - for (KeyValue kv : keyValueList) { - if (qualifiers != null) { - qualifiers.add(kv.getQualifier()); - } - batch.addOperation(buildMutation(kv, putToAppend)); - } batch.setEntityType(ObTableEntityType.HKV); return batch; } @@ -1739,15 +1722,6 @@ public static ObTableOperation buildObTableOperation(KeyValue kv, boolean putToA } } - private ObTableQueryRequest buildObTableQueryRequest(ObTableQuery obTableQuery, - String targetTableName) { - ObTableQueryRequest request = new ObTableQueryRequest(); - request.setEntityType(ObTableEntityType.HKV); - request.setTableQuery(obTableQuery); - request.setTableName(targetTableName); - return request; - } - private ObTableQueryAsyncRequest buildObTableQueryAsyncRequest(ObTableQuery obTableQuery, String targetTableName) { ObTableQueryRequest request = new ObTableQueryRequest(); diff --git a/src/main/java/com/alipay/oceanbase/hbase/util/BatchError.java b/src/main/java/com/alipay/oceanbase/hbase/util/BatchError.java new file mode 100644 index 00000000..640e432d --- /dev/null +++ b/src/main/java/com/alipay/oceanbase/hbase/util/BatchError.java @@ -0,0 +1,40 @@ +package com.alipay.oceanbase.hbase.util; + +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException; +import org.apache.hadoop.hbase.client.Row; + +import java.util.ArrayList; +import java.util.List; + +public class BatchError { + private final List throwables = new ArrayList(); + private final List actions = new ArrayList(); + private final List addresses = new ArrayList(); + + public synchronized void add(Throwable ex, Row row, ServerName serverName) { + if (row == null){ + throw new IllegalArgumentException("row cannot be null. location=" + serverName); + } + + throwables.add(ex); + actions.add(row); + addresses.add(serverName != null ? serverName.toString() : "null"); + } + + public boolean hasErrors() { + return !throwables.isEmpty(); + } + + public synchronized RetriesExhaustedWithDetailsException makeException() { + return new RetriesExhaustedWithDetailsException( + new ArrayList(throwables), + new ArrayList(actions), new ArrayList(addresses)); + } + + public synchronized void clear() { + throwables.clear(); + actions.clear(); + addresses.clear(); + } +} diff --git a/src/test/java/com/alipay/oceanbase/hbase/HTableTestBase.java b/src/test/java/com/alipay/oceanbase/hbase/HTableTestBase.java index 4c93efde..324204c2 100644 --- a/src/test/java/com/alipay/oceanbase/hbase/HTableTestBase.java +++ b/src/test/java/com/alipay/oceanbase/hbase/HTableTestBase.java @@ -103,7 +103,7 @@ PRIMARY KEY (`K`, `Q`, `T`) } // test scan with empty family - Scan scan = new Scan(); + Scan scan = new Scan(toBytes(key)); ResultScanner scanner = hTable.getScanner(scan); for (Result result : scanner) { for (KeyValue keyValue : result.raw()) { @@ -2451,7 +2451,7 @@ public void testReversedScan() throws Exception { res_count += 1; } } - Assert.assertEquals(res_count, 6); + Assert.assertEquals(6, res_count); scanner.close(); // reverse scan with MaxVersion @@ -2469,7 +2469,7 @@ public void testReversedScan() throws Exception { res_count += 1; } } - Assert.assertEquals(res_count, 3); + Assert.assertEquals(3, res_count); scanner.close(); // reverse scan with pageFilter @@ -2489,7 +2489,7 @@ public void testReversedScan() throws Exception { res_count += 1; } } - Assert.assertEquals(res_count, 6); + Assert.assertEquals(6, res_count); scanner.close(); // reverse scan with not_exist_start_row @@ -2507,7 +2507,7 @@ public void testReversedScan() throws Exception { res_count += 1; } } - Assert.assertEquals(res_count, 6); + Assert.assertEquals(6, res_count); scanner.close(); // reverse scan with abnormal range @@ -2525,7 +2525,7 @@ public void testReversedScan() throws Exception { res_count += 1; } } - Assert.assertEquals(res_count, 0); + Assert.assertEquals(0, res_count); scanner.close(); // reverse scan with abnormal range @@ -2543,7 +2543,7 @@ public void testReversedScan() throws Exception { res_count += 1; } } - Assert.assertEquals(res_count, 13); + Assert.assertEquals(13, res_count); scanner.close(); hTable.delete(deleteKey1Family); @@ -2700,7 +2700,7 @@ public void testPartitionScan() throws Exception { res_count += 1; } } - Assert.assertEquals(res_count, 7); + Assert.assertEquals(7, res_count); scanner.close(); // scan with prefixFilter @@ -2720,7 +2720,7 @@ public void testPartitionScan() throws Exception { res_count += 1; } } - Assert.assertEquals(res_count, 2); + Assert.assertEquals(2, res_count); scanner.close(); // scan with singleColumnValueFilter diff --git a/src/test/java/com/alipay/oceanbase/hbase/OHTableMultiColumnFamilyTest.java b/src/test/java/com/alipay/oceanbase/hbase/OHTableMultiColumnFamilyTest.java index 1793bb7f..f8d4eed3 100644 --- a/src/test/java/com/alipay/oceanbase/hbase/OHTableMultiColumnFamilyTest.java +++ b/src/test/java/com/alipay/oceanbase/hbase/OHTableMultiColumnFamilyTest.java @@ -24,9 +24,7 @@ import org.junit.rules.ExpectedException; import java.io.IOException; -import java.util.Arrays; -import java.util.HashMap; -import java.util.Map; +import java.util.*; import static org.apache.hadoop.hbase.util.Bytes.toBytes; import static org.junit.Assert.*; @@ -48,6 +46,108 @@ public void finish() throws IOException { hTable.close(); } + @Test + public void testMultiColumnFamilyBatch() throws Exception { + byte[] family1 = "family_with_group1".getBytes(); + byte[] family2 = "family_with_group2".getBytes(); + byte[] family3 = "family_with_group3".getBytes(); + + byte[] family1_column1 = "family1_column1".getBytes(); + byte[] family1_column2 = "family1_column2".getBytes(); + byte[] family1_column3 = "family1_column3".getBytes(); + byte[] family2_column1 = "family2_column1".getBytes(); + byte[] family2_column2 = "family2_column2".getBytes(); + byte[] family3_column1 = "family3_column1".getBytes(); + byte[] family1_value = "VVV1".getBytes(); + byte[] family2_value = "VVV2".getBytes(); + byte[] family3_value = "VVV3".getBytes(); + + int rows = 10; + List batchLsit = new LinkedList<>(); + for (int i = 0; i < rows; ++i) { + Put put = new Put(toBytes("Key" + i)); + Delete delete = new Delete(toBytes("Key" + i)); + batchLsit.add(delete); + put.add(family1, family1_column1, family1_value); + put.add(family1, family1_column2, family1_value); + put.add(family1, family1_column3, family1_value); + put.add(family2, family2_column1, family2_value); + put.add(family2, family2_column2, family2_value); + put.add(family3, family3_column1, family3_value); + batchLsit.add(put); + } + + // f1c1 f1c2 f1c3 f2c1 f2c2 f3c1 + Delete delete = new Delete(toBytes("Key1")); + delete.deleteColumns(family1, family1_column1); + delete.deleteColumns(family2, family2_column1); + batchLsit.add(delete); + hTable.batch(batchLsit); + // f1c2 f1c3 f2c2 f3c1 + Get get = new Get(toBytes("Key1")); + Result result = hTable.get(get); + KeyValue[] keyValues = result.raw(); + assertEquals(4, keyValues.length); + assertFalse(result.containsColumn(family1, family1_column1)); + assertFalse(result.containsColumn(family2, family2_column1)); + + assertTrue(result.containsColumn(family1, family1_column2)); + assertArrayEquals(result.getValue(family1, family1_column2), family1_value); + assertTrue(result.containsColumn(family1, family1_column3)); + assertArrayEquals(result.getValue(family1, family1_column3), family1_value); + assertTrue(result.containsColumn(family2, family2_column2)); + assertArrayEquals(result.getValue(family2, family2_column2), family2_value); + assertTrue(result.containsColumn(family3, family3_column1)); + assertArrayEquals(result.getValue(family3, family3_column1), family3_value); + + // f1c1 f2c1 f2c2 + delete = new Delete(toBytes("Key2")); + delete.deleteColumns(family1, family1_column2); + delete.deleteColumns(family1, family1_column3); + delete.deleteColumns(family3, family3_column1); + batchLsit.add(delete); + // null + hTable.batch(batchLsit); + get = new Get(toBytes("Key2")); + result = hTable.get(get); + keyValues = result.raw(); + assertEquals(3, keyValues.length); + batchLsit.clear(); + for (int i = 0; i < rows; ++i) { + Put put = new Put(toBytes("Key" + i)); + put.add(family1, family1_column1, family1_value); + put.add(family1, family1_column2, family1_value); + put.add(family1, family1_column3, family1_value); + put.add(family2, family2_column1, family2_value); + put.add(family2, family2_column2, family2_value); + put.add(family3, family3_column1, family3_value); + batchLsit.add(put); + } + + delete = new Delete(toBytes("Key3")); + delete.deleteColumn(family1, family1_column2); + delete.deleteColumn(family2, family2_column1); + batchLsit.add(delete); + hTable.batch(batchLsit); + get = new Get(toBytes("Key3")); + result = hTable.get(get); + keyValues = result.raw(); + assertEquals(6, keyValues.length); + + batchLsit.clear(); + delete = new Delete(toBytes("Key4")); + delete.deleteColumns(family1, family1_column2); + delete.deleteColumns(family2, family2_column1); + delete.deleteFamily(family3); + batchLsit.add(delete); + hTable.batch(batchLsit); + get = new Get(toBytes("Key4")); + get.setMaxVersions(10); + result = hTable.get(get); + keyValues = result.raw(); + assertEquals(6, keyValues.length); + } + @Test public void testMultiColumnFamilyPut() throws Exception { byte[] family1 = "family_with_group1".getBytes(); @@ -487,7 +587,6 @@ public void testMultiColumnFamilyGet() throws Exception { assertEquals(expectedValue, keyValues[i].getValue()); } } - System.out.println(Arrays.toString(result2.raw())); assertEquals(3, keyValues.length); //f2c1 f2c2 @@ -529,6 +628,8 @@ public void testMultiColumnFamilyDelete() throws Exception { for (int i = 0; i < rows; ++i) { Put put = new Put(toBytes("Key" + i)); + Delete delete = new Delete(toBytes("Key" + i)); + hTable.delete(delete); put.add(family1, family1_column1, family1_value); put.add(family1, family1_column2, family1_value); put.add(family1, family1_column3, family1_value);