From 9b4e87c5d881cbabafef6c20f5a91a24581a0ddb Mon Sep 17 00:00:00 2001 From: JackShi148 Date: Wed, 20 Nov 2024 21:58:24 +0800 Subject: [PATCH 01/16] init get query building in batch; fix mulfi-cf test case bug --- .../com/alipay/oceanbase/hbase/OHTable.java | 43 ++++++++++++++++++- 1 file changed, 41 insertions(+), 2 deletions(-) diff --git a/src/main/java/com/alipay/oceanbase/hbase/OHTable.java b/src/main/java/com/alipay/oceanbase/hbase/OHTable.java index 676831ba..4df091c6 100644 --- a/src/main/java/com/alipay/oceanbase/hbase/OHTable.java +++ b/src/main/java/com/alipay/oceanbase/hbase/OHTable.java @@ -39,6 +39,7 @@ import com.alipay.oceanbase.rpc.stream.ObTableClientQueryStreamResult; import com.alipay.oceanbase.rpc.table.ObHBaseParams; import com.alipay.oceanbase.rpc.table.ObKVParams; +import com.alipay.oceanbase.rpc.table.ObTableClientQueryImpl; import com.alipay.sofa.common.thread.SofaThreadPoolExecutor; import com.google.protobuf.Descriptors; @@ -1841,14 +1842,52 @@ private KeyValue modifyQualifier(Cell original, byte[] newQualifier) { private BatchOperation buildBatchOperation(String tableName, List actions, boolean isTableGroup, List resultMapSingleOp) throws FeatureNotSupportedException, - IllegalArgumentException { + IllegalArgumentException, + IOException { BatchOperation batch = obTableClient.batchOperation(tableName); int posInList = -1; int singleOpResultNum; for (Row row : actions) { singleOpResultNum = 0; posInList++; - if (row instanceof Put) { + if (row instanceof Get) { + Get get = (Get) row; + ObTableQuery obTableQuery; + if (get.getFamilyMap().keySet() == null + || get.getFamilyMap().keySet().isEmpty() + || get.getFamilyMap().size() > 1) { + // In a Get operation where the family map is greater than 1 or equal to 0, + // we handle this by appending the column family to the qualifier on the client side. + // The server can then use this information to filter the appropriate column families and qualifiers. + if (!get.getColumnFamilyTimeRange().isEmpty()) { + throw new FeatureNotSupportedException("setColumnFamilyTimeRange is only supported in single column family for now"); + } + NavigableSet columnFilters = new TreeSet<>(Bytes.BYTES_COMPARATOR); + processColumnFilters(columnFilters, get.getFamilyMap()); + obTableQuery = buildObTableQuery(get, columnFilters); + ObTableClientQueryImpl query = new ObTableClientQueryImpl(tableName, obTableQuery, obTableClient); + batch.addOperation(query); + } else { + for (Map.Entry> entry : get.getFamilyMap() + .entrySet()) { + byte[] family = entry.getKey(); + if (!get.getColumnFamilyTimeRange().isEmpty()) { + Map colFamTimeRangeMap = get.getColumnFamilyTimeRange(); + if (colFamTimeRangeMap.size() > 1) { + throw new FeatureNotSupportedException("setColumnFamilyTimeRange is only supported in single column family for now"); + } else if (colFamTimeRangeMap.get(family) == null) { + throw new IllegalArgumentException("Get family is not matched in ColumnFamilyTimeRange"); + } else { + TimeRange tr = colFamTimeRangeMap.get(family); + get.setTimeRange(tr.getMin(), tr.getMax()); + } + } + obTableQuery = buildObTableQuery(get, entry.getValue()); + ObTableClientQueryImpl query = new ObTableClientQueryImpl(tableName, obTableQuery, obTableClient); + batch.addOperation(query); + } + } + } else if (row instanceof Put) { Put put = (Put) row; if (put.isEmpty()) { throw new IllegalArgumentException("No columns to insert for #" From a3a5c4f650f7fcb7474c27a83a7bd0f4cd587597 Mon Sep 17 00:00:00 2001 From: JackShi148 Date: Thu, 21 Nov 2024 22:08:23 +0800 Subject: [PATCH 02/16] deal with batch get --- .../com/alipay/oceanbase/hbase/OHTable.java | 184 ++++++++++++------ 1 file changed, 120 insertions(+), 64 deletions(-) diff --git a/src/main/java/com/alipay/oceanbase/hbase/OHTable.java b/src/main/java/com/alipay/oceanbase/hbase/OHTable.java index 4df091c6..caa4ed7e 100644 --- a/src/main/java/com/alipay/oceanbase/hbase/OHTable.java +++ b/src/main/java/com/alipay/oceanbase/hbase/OHTable.java @@ -25,8 +25,11 @@ import com.alipay.oceanbase.hbase.util.*; import com.alipay.oceanbase.rpc.ObTableClient; import com.alipay.oceanbase.rpc.exception.ObTableException; +import com.alipay.oceanbase.rpc.exception.ObTableUnexpectedException; import com.alipay.oceanbase.rpc.mutation.BatchOperation; import com.alipay.oceanbase.rpc.mutation.result.BatchOperationResult; +import com.alipay.oceanbase.rpc.mutation.result.MutationResult; +import com.alipay.oceanbase.rpc.protocol.payload.ObPayload; import com.alipay.oceanbase.rpc.protocol.payload.impl.ObObj; import com.alipay.oceanbase.rpc.protocol.payload.impl.ObRowKey; import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.*; @@ -69,6 +72,8 @@ import static com.alipay.oceanbase.hbase.util.Preconditions.checkNotNull; import static com.alipay.oceanbase.hbase.util.TableHBaseLoggerFactory.LCD; import static com.alipay.oceanbase.hbase.util.TableHBaseLoggerFactory.TABLE_HBASE_LOGGER_SPACE; +import static com.alipay.oceanbase.rpc.mutation.MutationFactory.colVal; +import static com.alipay.oceanbase.rpc.mutation.MutationFactory.row; import static com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableOperation.getInstance; import static com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableOperationType.*; import static com.alipay.sofa.common.thread.SofaThreadPoolConstants.SOFA_THREAD_POOL_LOGGING_CAPABILITY; @@ -694,39 +699,83 @@ public void batch(final List actions, final Object[] results) thr BatchError batchError = new BatchError(); obTableClient.setRuntimeBatchExecutor(executePool); List resultMapSingleOp = new LinkedList<>(); - if (!CompatibilityUtil.isBatchSupport()) { - try { - compatOldServerBatch(actions, results, batchError); - } catch (Exception e) { - throw new IOException(e); - } - } else { - String realTableName = getTargetTableName(actions); - BatchOperation batch = buildBatchOperation(realTableName, actions, - tableNameString.equals(realTableName), resultMapSingleOp); - BatchOperationResult tmpResults; - try { - tmpResults = batch.execute(); - } catch (Exception e) { - throw new IOException(e); - } - int index = 0; - for (int i = 0; i != actions.size(); ++i) { - if (tmpResults.getResults().get(index) instanceof ObTableException) { - if (results != null) { - results[i] = tmpResults.getResults().get(index); + try { + if (!CompatibilityUtil.isBatchSupport()) { + compatOldServerBatch(actions, results); + } else { + String realTableName = getTargetTableName(actions); + BatchOperation batch = buildBatchOperation(realTableName, actions, + tableNameString.equals(realTableName), resultMapSingleOp); + BatchOperationResult tmpResults = batch.execute(); + if (results != null) { + int index = 0; + for (int i = 0; i != results.length; ++i) { + if (tmpResults.getResults().get(index) instanceof ObTableException) { + results[i] = tmpResults.getResults().get(index); + batchError.add((ObTableException) results[i], actions.get(i), null); + } else if (actions.get(i) instanceof Get) { + Get get = (Get) actions.get(i); + boolean isTableGroup = (get.getFamilyMap().keySet().size() != 1); + if (tmpResults.getResults().get(index) instanceof MutationResult) { + MutationResult mutationResult = (MutationResult) tmpResults.getResults().get(index); + ObPayload innerResult = mutationResult.getResult(); + if (innerResult instanceof ObTableSingleOpResult) { + ObTableSingleOpResult singleOpResult = (ObTableSingleOpResult) innerResult; + ObTableSingleOpEntity singleOpEntity = singleOpResult.getEntity(); + List rowkeyNames = singleOpEntity.getRowKeyNames(); + List rowKey = singleOpEntity.getRowkey(); + List propertiesNames = singleOpEntity.getPropertiesNames(); + List propertiesValues = singleOpEntity.getPropertiesValues(); + if (!get.isCheckExistenceOnly()) { + if (rowKey.size() != (propertiesValues.size() * 3)) { + throw new IllegalArgumentException("the length of rowKey and properties is not matched"); + } else if (rowkeyNames.size() != 3 || propertiesNames.size() != 1) { + throw new IllegalArgumentException("the length of rowKeyNames and propertyNames is not matched"); + } + } + List cells = new ArrayList<>(); + int rowKeyIdx = 0, valueIdx = 0; + while (rowKeyIdx < rowKey.size() && valueIdx < propertiesValues.size()) { + byte[][] familyAndQualifier = new byte[2][]; + if (isTableGroup) { + // split family and qualifier + familyAndQualifier = OHBaseFuncUtils + .extractFamilyFromQualifier((byte[]) rowKey.get(rowKeyIdx + 1).getValue()); + } else { + byte[] family = get.getFamilyMap().keySet().iterator().next(); + familyAndQualifier[0] = family; + familyAndQualifier[1] = (byte[]) rowKey.get(rowKeyIdx + 1).getValue(); + } + KeyValue kv = new KeyValue((byte[]) rowKey.get(rowKeyIdx).getValue(),//K + familyAndQualifier[0], // family + familyAndQualifier[1], // qualifiermat + (Long) rowKey.get(rowKeyIdx + 2).getValue(), // T + (byte[]) propertiesValues.get(valueIdx).getValue()// V + ); + cells.add(kv); + rowKeyIdx += 3; + valueIdx++; + } + results[i] = Result.create(cells, get.isCheckExistenceOnly()); + } else { + throw new ObTableUnexpectedException("Unexpected type of result in MutationResult"); + } + } else { + throw new ObTableUnexpectedException("Unexpected type of result in batch"); + } + } else { + results[i] = new Result(); + } + index += resultMapSingleOp.get(i); } - batchError.add((ObTableException) tmpResults.getResults().get(index), actions.get(i), null); - } else { - if (results != null) { - results[i] = new Result(); + if (batchError.hasErrors()) { + throw batchError.makeException(); } } - index += resultMapSingleOp.get(i); } - } - if (batchError.hasErrors()) { - throw batchError.makeException(); + } catch (Exception e) { + logger.error(LCD.convert("01-000010"), tableNameString, actions, e); + throw new IOException("batch table " + tableNameString + " error", e); } } @@ -736,11 +785,21 @@ private String getTargetTableName(List actions) { if (action instanceof RowMutations || action instanceof RegionCoprocessorServiceExec) { throw new FeatureNotSupportedException("not supported yet'"); } else { - Mutation mutation = (Mutation) action; - if (mutation.getFamilyCellMap().size() != 1) { + Set familySet = null; + if (action instanceof Get){ + Get get = (Get) action; + familySet = get.familySet(); + } else { + Mutation mutation = (Mutation) action; + familySet = mutation.getFamilyCellMap().keySet(); + } + if (familySet == null) { + throw new ObTableUnexpectedException("Fail to get family set in action"); + } + if (familySet.size() != 1) { return getTargetTableName(tableNameString); } else { - byte[] nextFamily = mutation.getFamilyCellMap().keySet().iterator().next(); + byte[] nextFamily = familySet.iterator().next(); if (family != null && !Arrays.equals(family, nextFamily)) { return getTargetTableName(tableNameString); } else if (family == null) { @@ -1851,42 +1910,39 @@ private BatchOperation buildBatchOperation(String tableName, List singleOpResultNum = 0; posInList++; if (row instanceof Get) { + ++singleOpResultNum; Get get = (Get) row; ObTableQuery obTableQuery; - if (get.getFamilyMap().keySet() == null - || get.getFamilyMap().keySet().isEmpty() - || get.getFamilyMap().size() > 1) { - // In a Get operation where the family map is greater than 1 or equal to 0, - // we handle this by appending the column family to the qualifier on the client side. - // The server can then use this information to filter the appropriate column families and qualifiers. - if (!get.getColumnFamilyTimeRange().isEmpty()) { + // In a Get operation in ls batch, we need to determine whether the get is a table-group operation or not, + // we handle this by appending the column family to the qualifier on the client side. + // The server can then use this information to filter the appropriate column families and qualifiers. + if ((get.getFamilyMap().keySet().isEmpty() + || get.getFamilyMap().size() > 1) && + !get.getColumnFamilyTimeRange().isEmpty()) { + throw new FeatureNotSupportedException("setColumnFamilyTimeRange is only supported in single column family for now"); + } else if (get.getFamilyMap().size() == 1 && !get.getColumnFamilyTimeRange().isEmpty()) { + byte[] family = get.getFamilyMap().keySet().iterator().next(); + Map colFamTimeRangeMap = get.getColumnFamilyTimeRange(); + if (colFamTimeRangeMap.size() > 1) { throw new FeatureNotSupportedException("setColumnFamilyTimeRange is only supported in single column family for now"); + } else if (colFamTimeRangeMap.get(family) == null) { + throw new IllegalArgumentException("Get family is not matched in ColumnFamilyTimeRange"); + } else { + TimeRange tr = colFamTimeRangeMap.get(family); + get.setTimeRange(tr.getMin(), tr.getMax()); } - NavigableSet columnFilters = new TreeSet<>(Bytes.BYTES_COMPARATOR); - processColumnFilters(columnFilters, get.getFamilyMap()); - obTableQuery = buildObTableQuery(get, columnFilters); - ObTableClientQueryImpl query = new ObTableClientQueryImpl(tableName, obTableQuery, obTableClient); - batch.addOperation(query); - } else { - for (Map.Entry> entry : get.getFamilyMap() - .entrySet()) { - byte[] family = entry.getKey(); - if (!get.getColumnFamilyTimeRange().isEmpty()) { - Map colFamTimeRangeMap = get.getColumnFamilyTimeRange(); - if (colFamTimeRangeMap.size() > 1) { - throw new FeatureNotSupportedException("setColumnFamilyTimeRange is only supported in single column family for now"); - } else if (colFamTimeRangeMap.get(family) == null) { - throw new IllegalArgumentException("Get family is not matched in ColumnFamilyTimeRange"); - } else { - TimeRange tr = colFamTimeRangeMap.get(family); - get.setTimeRange(tr.getMin(), tr.getMax()); - } - } - obTableQuery = buildObTableQuery(get, entry.getValue()); - ObTableClientQueryImpl query = new ObTableClientQueryImpl(tableName, obTableQuery, obTableClient); - batch.addOperation(query); - } } + NavigableSet columnFilters = new TreeSet<>(Bytes.BYTES_COMPARATOR); + processColumnFilters(columnFilters, get.getFamilyMap()); + obTableQuery = buildObTableQuery(get, columnFilters); + ObTableClientQueryImpl query = new ObTableClientQueryImpl(tableName, obTableQuery, obTableClient); + try { + query.setRowKey(row(colVal("K", Bytes.toString(get.getRow())), colVal("Q", null), colVal("T", null))); + } catch (Exception e) { + logger.error("unexpected error occurs when set row key", e); + throw new IOException(e); + } + batch.addOperation(query); } else if (row instanceof Put) { Put put = (Put) row; if (put.isEmpty()) { @@ -1921,7 +1977,7 @@ private BatchOperation buildBatchOperation(String tableName, List } } else { throw new FeatureNotSupportedException( - "not supported other type in batch yet,only support put and delete"); + "not supported other type in batch yet,only support get, put and delete"); } resultMapSingleOp.add(singleOpResultNum); } From b518fadefb98a1d42a51e6f3fff35cc32fd81268 Mon Sep 17 00:00:00 2001 From: JackShi148 Date: Mon, 23 Dec 2024 16:23:45 +0800 Subject: [PATCH 03/16] compatible hbase 2.0 --- src/main/java/com/alipay/oceanbase/hbase/OHTable.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/com/alipay/oceanbase/hbase/OHTable.java b/src/main/java/com/alipay/oceanbase/hbase/OHTable.java index caa4ed7e..d5a9d606 100644 --- a/src/main/java/com/alipay/oceanbase/hbase/OHTable.java +++ b/src/main/java/com/alipay/oceanbase/hbase/OHTable.java @@ -701,7 +701,7 @@ public void batch(final List actions, final Object[] results) thr List resultMapSingleOp = new LinkedList<>(); try { if (!CompatibilityUtil.isBatchSupport()) { - compatOldServerBatch(actions, results); + compatOldServerBatch(actions, results, batchError); } else { String realTableName = getTargetTableName(actions); BatchOperation batch = buildBatchOperation(realTableName, actions, From b9923f95d21a028d061df0f2f4bfd470dd60cf02 Mon Sep 17 00:00:00 2001 From: JackShi148 Date: Mon, 25 Nov 2024 19:40:56 +0800 Subject: [PATCH 04/16] add test case for batch get and processing --- .../com/alipay/oceanbase/hbase/OHTable.java | 19 +-- .../hbase/HTableMultiCFTestBase.java | 117 ++++++++++++++++++ 2 files changed, 121 insertions(+), 15 deletions(-) diff --git a/src/main/java/com/alipay/oceanbase/hbase/OHTable.java b/src/main/java/com/alipay/oceanbase/hbase/OHTable.java index d5a9d606..2f002213 100644 --- a/src/main/java/com/alipay/oceanbase/hbase/OHTable.java +++ b/src/main/java/com/alipay/oceanbase/hbase/OHTable.java @@ -715,37 +715,26 @@ public void batch(final List actions, final Object[] results) thr batchError.add((ObTableException) results[i], actions.get(i), null); } else if (actions.get(i) instanceof Get) { Get get = (Get) actions.get(i); - boolean isTableGroup = (get.getFamilyMap().keySet().size() != 1); if (tmpResults.getResults().get(index) instanceof MutationResult) { MutationResult mutationResult = (MutationResult) tmpResults.getResults().get(index); ObPayload innerResult = mutationResult.getResult(); if (innerResult instanceof ObTableSingleOpResult) { ObTableSingleOpResult singleOpResult = (ObTableSingleOpResult) innerResult; ObTableSingleOpEntity singleOpEntity = singleOpResult.getEntity(); - List rowkeyNames = singleOpEntity.getRowKeyNames(); List rowKey = singleOpEntity.getRowkey(); - List propertiesNames = singleOpEntity.getPropertiesNames(); List propertiesValues = singleOpEntity.getPropertiesValues(); if (!get.isCheckExistenceOnly()) { if (rowKey.size() != (propertiesValues.size() * 3)) { throw new IllegalArgumentException("the length of rowKey and properties is not matched"); - } else if (rowkeyNames.size() != 3 || propertiesNames.size() != 1) { - throw new IllegalArgumentException("the length of rowKeyNames and propertyNames is not matched"); } } List cells = new ArrayList<>(); int rowKeyIdx = 0, valueIdx = 0; while (rowKeyIdx < rowKey.size() && valueIdx < propertiesValues.size()) { byte[][] familyAndQualifier = new byte[2][]; - if (isTableGroup) { - // split family and qualifier - familyAndQualifier = OHBaseFuncUtils - .extractFamilyFromQualifier((byte[]) rowKey.get(rowKeyIdx + 1).getValue()); - } else { - byte[] family = get.getFamilyMap().keySet().iterator().next(); - familyAndQualifier[0] = family; - familyAndQualifier[1] = (byte[]) rowKey.get(rowKeyIdx + 1).getValue(); - } + // split family and qualifier + familyAndQualifier = OHBaseFuncUtils + .extractFamilyFromQualifier((byte[]) rowKey.get(rowKeyIdx + 1).getValue()); KeyValue kv = new KeyValue((byte[]) rowKey.get(rowKeyIdx).getValue(),//K familyAndQualifier[0], // family familyAndQualifier[1], // qualifiermat @@ -756,7 +745,7 @@ public void batch(final List actions, final Object[] results) thr rowKeyIdx += 3; valueIdx++; } - results[i] = Result.create(cells, get.isCheckExistenceOnly()); + results[i] = Result.create(cells); } else { throw new ObTableUnexpectedException("Unexpected type of result in MutationResult"); } diff --git a/src/test/java/com/alipay/oceanbase/hbase/HTableMultiCFTestBase.java b/src/test/java/com/alipay/oceanbase/hbase/HTableMultiCFTestBase.java index 83236172..754a0350 100644 --- a/src/test/java/com/alipay/oceanbase/hbase/HTableMultiCFTestBase.java +++ b/src/test/java/com/alipay/oceanbase/hbase/HTableMultiCFTestBase.java @@ -695,6 +695,123 @@ public void testMultiColumnFamilyBufferedMutator() throws Exception { Assert.assertEquals(0, count); } + @Test + public void testMultiColumnFamilyBatchGet() throws Exception { + byte[] family1 = "family_with_group1".getBytes(); + byte[] family2 = "family_with_group2".getBytes(); + byte[] family3 = "family_with_group3".getBytes(); + + byte[] family1_column1 = "family1_column1".getBytes(); + byte[] family1_column2 = "family1_column2".getBytes(); + byte[] family1_column3 = "family1_column3".getBytes(); + byte[] family2_column1 = "family2_column1".getBytes(); + byte[] family2_column2 = "family2_column2".getBytes(); + byte[] family3_column1 = "family3_column1".getBytes(); + byte[] family1_value = "VVV1".getBytes(); + byte[] family2_value = "VVV2".getBytes(); + byte[] family3_value = "VVV3".getBytes(); + + int rows = 10; + List batchLsit = new LinkedList<>(); + for (int i = 0; i < rows; ++i) { + Put put = new Put(toBytes("Key" + i)); + Delete delete = new Delete(toBytes("Key" + i)); + batchLsit.add(delete); + put.add(family1, family1_column1, family1_value); + put.add(family1, family1_column2, family1_value); + put.add(family1, family1_column3, family1_value); + put.add(family2, family2_column1, family2_value); + put.add(family2, family2_column2, family2_value); + put.add(family3, family3_column1, family3_value); + batchLsit.add(put); + } + multiCfHTable.batch(batchLsit); + batchLsit.clear(); + + Get get1 = new Get("Key1".getBytes()); + get1.addFamily(family1); + get1.addFamily(family2); + batchLsit.add(get1); + Object[] results = multiCfHTable.batch(batchLsit); + Result getResult = (Result) results[0]; + for (KeyValue keyValue : getResult.raw()) { + System.out.printf("1. Rowkey: %s, Column Family: %s, Column Qualifier: %s, Timestamp: %d, Value: %s%n", + Bytes.toString(getResult.getRow()), + Bytes.toString(keyValue.getFamily()), + Bytes.toString(keyValue.getQualifier()), + keyValue.getTimestamp(), + Bytes.toString(keyValue.getValue()) + ); + } + batchLsit.clear(); + Delete delete = new Delete(toBytes("Key1")); + batchLsit.add(delete); + batchLsit.add(get1); + results = multiCfHTable.batch(batchLsit); + Assert.assertEquals(2, results.length); + Assert.assertEquals(0, ((Result) results[1]).raw().length); + + batchLsit.clear(); + Put put = new Put(toBytes("Key1")); + put.add(family1, family1_column1, family1_value); + put.add(family2, family2_column1, family2_value); + put.add(family3, family3_column1, family3_value); + batchLsit.add(put); + get1.addFamily(family3); + batchLsit.add(get1); + results = multiCfHTable.batch(batchLsit); + Assert.assertEquals(2, results.length); + getResult = (Result) results[1]; + for (KeyValue keyValue : getResult.raw()) { + System.out.printf("2. Rowkey: %s, Column Family: %s, Column Qualifier: %s, Timestamp: %d, Value: %s%n", + Bytes.toString(getResult.getRow()), + Bytes.toString(keyValue.getFamily()), + Bytes.toString(keyValue.getQualifier()), + keyValue.getTimestamp(), + Bytes.toString(keyValue.getValue()) + ); + } + + Get get2 = new Get("Key2".getBytes()); + get1.addFamily(family1); + get1.addFamily(family2); + get1.addFamily(family3); + batchLsit.clear(); + batchLsit.add(delete); + batchLsit.add(put); + batchLsit.add(get1); + batchLsit.add(delete); + batchLsit.add(get1); + batchLsit.add(get2); + results = multiCfHTable.batch(batchLsit); + Assert.assertEquals(6, results.length); + Result key1Result = (Result) results[2]; + Assert.assertEquals(3, key1Result.raw().length); + for (KeyValue keyValue : key1Result.raw()) { + System.out.printf("3. Rowkey: %s, Column Family: %s, Column Qualifier: %s, Timestamp: %d, Value: %s%n", + Bytes.toString(key1Result.getRow()), + Bytes.toString(keyValue.getFamily()), + Bytes.toString(keyValue.getQualifier()), + keyValue.getTimestamp(), + Bytes.toString(keyValue.getValue()) + ); + } + Result empResult = (Result) results[4]; + Assert.assertEquals(0, empResult.raw().length); + + Result key2Result = (Result) results[5]; + Assert.assertEquals(6, key2Result.raw().length); + for (KeyValue keyValue : key2Result.raw()) { + System.out.printf("4. Rowkey: %s, Column Family: %s, Column Qualifier: %s, Timestamp: %d, Value: %s%n", + Bytes.toString(key2Result.getRow()), + Bytes.toString(keyValue.getFamily()), + Bytes.toString(keyValue.getQualifier()), + keyValue.getTimestamp(), + Bytes.toString(keyValue.getValue()) + ); + } + } + @Test public void testMultiColumnFamilyBatch() throws Exception { byte[] family1 = "family_with_group1".getBytes(); From f9f8cdac1177205c600fc675e7da2030c675dbde Mon Sep 17 00:00:00 2001 From: JackShi148 Date: Tue, 26 Nov 2024 12:01:41 +0800 Subject: [PATCH 05/16] add batch get test cases and compatible batch case to hbase 2.0 --- .../hbase/HTableMultiCFTestBase.java | 517 ++++++++++++++---- 1 file changed, 400 insertions(+), 117 deletions(-) diff --git a/src/test/java/com/alipay/oceanbase/hbase/HTableMultiCFTestBase.java b/src/test/java/com/alipay/oceanbase/hbase/HTableMultiCFTestBase.java index 754a0350..b3c133ab 100644 --- a/src/test/java/com/alipay/oceanbase/hbase/HTableMultiCFTestBase.java +++ b/src/test/java/com/alipay/oceanbase/hbase/HTableMultiCFTestBase.java @@ -695,123 +695,6 @@ public void testMultiColumnFamilyBufferedMutator() throws Exception { Assert.assertEquals(0, count); } - @Test - public void testMultiColumnFamilyBatchGet() throws Exception { - byte[] family1 = "family_with_group1".getBytes(); - byte[] family2 = "family_with_group2".getBytes(); - byte[] family3 = "family_with_group3".getBytes(); - - byte[] family1_column1 = "family1_column1".getBytes(); - byte[] family1_column2 = "family1_column2".getBytes(); - byte[] family1_column3 = "family1_column3".getBytes(); - byte[] family2_column1 = "family2_column1".getBytes(); - byte[] family2_column2 = "family2_column2".getBytes(); - byte[] family3_column1 = "family3_column1".getBytes(); - byte[] family1_value = "VVV1".getBytes(); - byte[] family2_value = "VVV2".getBytes(); - byte[] family3_value = "VVV3".getBytes(); - - int rows = 10; - List batchLsit = new LinkedList<>(); - for (int i = 0; i < rows; ++i) { - Put put = new Put(toBytes("Key" + i)); - Delete delete = new Delete(toBytes("Key" + i)); - batchLsit.add(delete); - put.add(family1, family1_column1, family1_value); - put.add(family1, family1_column2, family1_value); - put.add(family1, family1_column3, family1_value); - put.add(family2, family2_column1, family2_value); - put.add(family2, family2_column2, family2_value); - put.add(family3, family3_column1, family3_value); - batchLsit.add(put); - } - multiCfHTable.batch(batchLsit); - batchLsit.clear(); - - Get get1 = new Get("Key1".getBytes()); - get1.addFamily(family1); - get1.addFamily(family2); - batchLsit.add(get1); - Object[] results = multiCfHTable.batch(batchLsit); - Result getResult = (Result) results[0]; - for (KeyValue keyValue : getResult.raw()) { - System.out.printf("1. Rowkey: %s, Column Family: %s, Column Qualifier: %s, Timestamp: %d, Value: %s%n", - Bytes.toString(getResult.getRow()), - Bytes.toString(keyValue.getFamily()), - Bytes.toString(keyValue.getQualifier()), - keyValue.getTimestamp(), - Bytes.toString(keyValue.getValue()) - ); - } - batchLsit.clear(); - Delete delete = new Delete(toBytes("Key1")); - batchLsit.add(delete); - batchLsit.add(get1); - results = multiCfHTable.batch(batchLsit); - Assert.assertEquals(2, results.length); - Assert.assertEquals(0, ((Result) results[1]).raw().length); - - batchLsit.clear(); - Put put = new Put(toBytes("Key1")); - put.add(family1, family1_column1, family1_value); - put.add(family2, family2_column1, family2_value); - put.add(family3, family3_column1, family3_value); - batchLsit.add(put); - get1.addFamily(family3); - batchLsit.add(get1); - results = multiCfHTable.batch(batchLsit); - Assert.assertEquals(2, results.length); - getResult = (Result) results[1]; - for (KeyValue keyValue : getResult.raw()) { - System.out.printf("2. Rowkey: %s, Column Family: %s, Column Qualifier: %s, Timestamp: %d, Value: %s%n", - Bytes.toString(getResult.getRow()), - Bytes.toString(keyValue.getFamily()), - Bytes.toString(keyValue.getQualifier()), - keyValue.getTimestamp(), - Bytes.toString(keyValue.getValue()) - ); - } - - Get get2 = new Get("Key2".getBytes()); - get1.addFamily(family1); - get1.addFamily(family2); - get1.addFamily(family3); - batchLsit.clear(); - batchLsit.add(delete); - batchLsit.add(put); - batchLsit.add(get1); - batchLsit.add(delete); - batchLsit.add(get1); - batchLsit.add(get2); - results = multiCfHTable.batch(batchLsit); - Assert.assertEquals(6, results.length); - Result key1Result = (Result) results[2]; - Assert.assertEquals(3, key1Result.raw().length); - for (KeyValue keyValue : key1Result.raw()) { - System.out.printf("3. Rowkey: %s, Column Family: %s, Column Qualifier: %s, Timestamp: %d, Value: %s%n", - Bytes.toString(key1Result.getRow()), - Bytes.toString(keyValue.getFamily()), - Bytes.toString(keyValue.getQualifier()), - keyValue.getTimestamp(), - Bytes.toString(keyValue.getValue()) - ); - } - Result empResult = (Result) results[4]; - Assert.assertEquals(0, empResult.raw().length); - - Result key2Result = (Result) results[5]; - Assert.assertEquals(6, key2Result.raw().length); - for (KeyValue keyValue : key2Result.raw()) { - System.out.printf("4. Rowkey: %s, Column Family: %s, Column Qualifier: %s, Timestamp: %d, Value: %s%n", - Bytes.toString(key2Result.getRow()), - Bytes.toString(keyValue.getFamily()), - Bytes.toString(keyValue.getQualifier()), - keyValue.getTimestamp(), - Bytes.toString(keyValue.getValue()) - ); - } - } - @Test public void testMultiColumnFamilyBatch() throws Exception { byte[] family1 = "family_with_group1".getBytes(); @@ -943,6 +826,406 @@ public void update(byte[] region, byte[] row, Result result) { }); assertEquals(11, updateCounter[0]); + /*--------------------------------------test batch get------------------------------------------*/ + // single-cf test + batchLsit.clear(); + for (int i = 0; i < rows; ++i) { + Put put = new Put(toBytes("Key" + i)); + delete = new Delete(toBytes("Key" + i)); + batchLsit.add(delete); + put.addColumn(family1, family1_column1, family1_value); + put.addColumn(family1, family1_column2, family1_value); + put.addColumn(family1, family1_column3, family1_value); + put.addColumn(family2, family2_column1, family2_value); + put.addColumn(family2, family2_column2, family2_value); + put.addColumn(family3, family3_column1, family3_value); + batchLsit.add(put); + } + results = new Object[batchLsit.size()]; + multiCfHTable.batch(batchLsit, results); + batchLsit.clear(); + // get + Get get1 = new Get("Key1".getBytes()); + get1.addFamily(family1); + batchLsit.add(get1); + results = new Object[batchLsit.size()]; + multiCfHTable.batch(batchLsit, results); + Assert.assertEquals(1, results.length); + Result getResult = (Result) results[0]; + Assert.assertEquals(3, getResult.rawCells().length); + for (Cell keyValue : getResult.rawCells()) { + System.out.printf("1. Rowkey: %s, Column Family: %s, Column Qualifier: %s, Timestamp: %d, Value: %s%n", + Bytes.toString(CellUtil.cloneRow(keyValue)), + Bytes.toString(CellUtil.cloneFamily(keyValue)), + Bytes.toString(CellUtil.cloneQualifier(keyValue)), + keyValue.getTimestamp(), + Bytes.toString(CellUtil.cloneValue(keyValue)) + ); + Assert.assertTrue(Bytes.toString(CellUtil.cloneFamily(keyValue)).equals("family_with_group1")); + } + // delete + get + delete = new Delete(toBytes("Key1")); + batchLsit.clear(); + batchLsit.add(delete); + batchLsit.add(get1); + results = new Object[batchLsit.size()]; + multiCfHTable.batch(batchLsit, results); + Assert.assertEquals(2, results.length); + Assert.assertEquals(0, ((Result) results[1]).rawCells().length); + + // put + get + Put put = new Put(toBytes("Key1")); + put.addColumn(family1, family1_column1, family1_value); + batchLsit.clear(); + batchLsit.add(put); + batchLsit.add(get1); + results = new Object[batchLsit.size()]; + multiCfHTable.batch(batchLsit, results); + Assert.assertEquals(2, results.length); + getResult = (Result) results[1]; + Assert.assertEquals(3, getResult.rawCells().length); + for (Cell keyValue : getResult.rawCells()) { + System.out.printf("2. Rowkey: %s, Column Family: %s, Column Qualifier: %s, Timestamp: %d, Value: %s%n", + Bytes.toString(CellUtil.cloneRow(keyValue)), + Bytes.toString(CellUtil.cloneFamily(keyValue)), + Bytes.toString(CellUtil.cloneQualifier(keyValue)), + keyValue.getTimestamp(), + Bytes.toString(CellUtil.cloneValue(keyValue)) + ); + } + + // put + delete + get + Get get2 = new Get("Key2".getBytes()); + get1.setMaxVersions(10); + get2.addColumn(family1, family1_column1); + batchLsit.clear(); + batchLsit.add(delete); + batchLsit.add(put); + batchLsit.add(get1); + batchLsit.add(delete); + batchLsit.add(get1); + batchLsit.add(get2); + results = new Object[batchLsit.size()]; + multiCfHTable.batch(batchLsit, results); + Assert.assertEquals(6, results.length); + Result key1Result = (Result) results[2]; + Assert.assertEquals(5, key1Result.rawCells().length); + for (Cell keyValue : key1Result.rawCells()) { + System.out.printf("3. Rowkey: %s, Column Family: %s, Column Qualifier: %s, Timestamp: %d, Value: %s%n", + Bytes.toString(CellUtil.cloneRow(keyValue)), + Bytes.toString(CellUtil.cloneFamily(keyValue)), + Bytes.toString(CellUtil.cloneQualifier(keyValue)), + keyValue.getTimestamp(), + Bytes.toString(CellUtil.cloneValue(keyValue)) + ); + } + Result empResult = (Result) results[4]; + Assert.assertEquals(0, empResult.rawCells().length); + + Result key2Result = (Result) results[5]; + Assert.assertEquals(1, key2Result.rawCells().length); + for (Cell keyValue : key2Result.rawCells()) { + System.out.printf("4. Rowkey: %s, Column Family: %s, Column Qualifier: %s, Timestamp: %d, Value: %s%n", + Bytes.toString(CellUtil.cloneRow(keyValue)), + Bytes.toString(CellUtil.cloneFamily(keyValue)), + Bytes.toString(CellUtil.cloneQualifier(keyValue)), + keyValue.getTimestamp(), + Bytes.toString(CellUtil.cloneValue(keyValue)) + ); + } + + // set maxVersion + put = new Put(toBytes("Key2")); + put.addColumn(family1, family1_column1, family1_value); + get2.setMaxVersions(10); + batchLsit.clear(); + batchLsit.add(put); + batchLsit.add(get2); + results = new Object[batchLsit.size()]; + multiCfHTable.batch(batchLsit, results); + Assert.assertEquals(2, results.length); + key2Result = (Result) results[1]; + Assert.assertEquals(2, key2Result.rawCells().length); + for (Cell keyValue : key2Result.rawCells()) { + System.out.printf("5. Rowkey: %s, Column Family: %s, Column Qualifier: %s, Timestamp: %d, Value: %s%n", + Bytes.toString(CellUtil.cloneRow(keyValue)), + Bytes.toString(CellUtil.cloneFamily(keyValue)), + Bytes.toString(CellUtil.cloneQualifier(keyValue)), + keyValue.getTimestamp(), + Bytes.toString(CellUtil.cloneValue(keyValue)) + ); + } + get2.setMaxVersions(2); + Get get2All = new Get(toBytes("Key2")); + get2All.addColumn(family1, family1_column1); + get2All.setMaxVersions(); + batchLsit.clear(); + batchLsit.add(put); + batchLsit.add(get2); + batchLsit.add(get2All); + results = new Object[batchLsit.size()]; + multiCfHTable.batch(batchLsit, results); + Assert.assertEquals(3, results.length); + key2Result = (Result) results[1]; + Result key2AllResult = (Result) results[2]; + Assert.assertEquals(3, key2AllResult.getColumnCells(family1, family1_column1).size()); + Assert.assertEquals(2, key2Result.getColumnCells(family1, family1_column1).size()); + for (Cell keyValue : key2Result.rawCells()) { + System.out.printf("6. Rowkey: %s, Column Family: %s, Column Qualifier: %s, Timestamp: %d, Value: %s%n", + Bytes.toString(CellUtil.cloneRow(keyValue)), + Bytes.toString(CellUtil.cloneFamily(keyValue)), + Bytes.toString(CellUtil.cloneQualifier(keyValue)), + keyValue.getTimestamp(), + Bytes.toString(CellUtil.cloneValue(keyValue)) + ); + } + + // compare with get interface + Result key2GetResult = multiCfHTable.get(get2); + Assert.assertEquals(key2GetResult.rawCells().length, key2Result.rawCells().length); + for (int i = 0; i < key2GetResult.size(); ++i) { + Assert.assertEquals(Bytes.toString(key2Result.getRow()), + Bytes.toString(key2GetResult.getRow())); + Assert.assertEquals(Bytes.toString(CellUtil.cloneFamily(key2Result.rawCells()[i])), + Bytes.toString(CellUtil.cloneFamily(key2GetResult.rawCells()[i]))); + Assert.assertEquals(Bytes.toString(CellUtil.cloneQualifier(key2Result.rawCells()[i])), + Bytes.toString(CellUtil.cloneQualifier(key2GetResult.rawCells()[i]))); + Assert.assertEquals(key2Result.rawCells()[i].getTimestamp(), + key2GetResult.rawCells()[i].getTimestamp()); + Assert.assertEquals(Bytes.toString(CellUtil.cloneValue(key2Result.rawCells()[i])), + Bytes.toString(CellUtil.cloneValue(key2GetResult.rawCells()[i]))); + } + // test batch get + batchLsit.clear(); + for (int i = 0; i < rows; ++i) { + get = new Get(toBytes("Key" + i)); + get.addFamily(family1); + get.setMaxVersions(10); + batchLsit.add(get); + } + results = new Object[batchLsit.size()]; + multiCfHTable.batch(batchLsit, results); + Assert.assertEquals(rows, results.length); + for (int i = 0; i < rows; ++i) { + get = new Get(toBytes("Key" + i)); + get.addFamily(family1); + get.setMaxVersions(10); + getResult = multiCfHTable.get(get); + Result batchGetResult = (Result) results[i]; + if (Bytes.toString(get.getRow()).equals("Key1")) { + Assert.assertEquals(0, batchGetResult.rawCells().length); + } else if (Bytes.toString(get.getRow()).equals("Key2")) { + // 3 + 1 + 1 + Assert.assertEquals(5, batchGetResult.rawCells().length); + } else { + Assert.assertEquals(3, batchGetResult.rawCells().length); + } + Assert.assertEquals(getResult.rawCells().length, batchGetResult.rawCells().length); + for (int j = 0; j < getResult.size(); ++j) { + Assert.assertEquals(Bytes.toString(key2Result.getRow()), + Bytes.toString(key2GetResult.getRow())); + Assert.assertEquals(Bytes.toString(CellUtil.cloneFamily(key2Result.rawCells()[i])), + Bytes.toString(CellUtil.cloneFamily(key2GetResult.rawCells()[i]))); + Assert.assertEquals(Bytes.toString(CellUtil.cloneQualifier(key2Result.rawCells()[i])), + Bytes.toString(CellUtil.cloneQualifier(key2GetResult.rawCells()[i]))); + Assert.assertEquals(key2Result.rawCells()[i].getTimestamp(), + key2GetResult.rawCells()[i].getTimestamp()); + Assert.assertEquals(Bytes.toString(CellUtil.cloneValue(key2Result.rawCells()[i])), + Bytes.toString(CellUtil.cloneValue(key2GetResult.rawCells()[i]))); + } + } + +// // multi-cf test +// batchLsit.clear(); +// for (int i = 0; i < rows; ++i) { +// put = new Put(toBytes("Key" + i)); +// delete = new Delete(toBytes("Key" + i)); +// batchLsit.add(delete); +// put.add(family1, family1_column1, family1_value); +// put.add(family1, family1_column2, family1_value); +// put.add(family1, family1_column3, family1_value); +// put.add(family2, family2_column1, family2_value); +// put.add(family2, family2_column2, family2_value); +// put.add(family3, family3_column1, family3_value); +// batchLsit.add(put); +// } +// multiCfHTable.batch(batchLsit); +// batchLsit.clear(); +// +// // get +// get1 = new Get("Key1".getBytes()); +// get1.addFamily(family1); +// get1.addFamily(family2); +// batchLsit.add(get1); +// results = multiCfHTable.batch(batchLsit); +// getResult = (Result) results[0]; +// for (KeyValue keyValue : getResult.raw()) { +// System.out.printf("1. Rowkey: %s, Column Family: %s, Column Qualifier: %s, Timestamp: %d, Value: %s%n", +// Bytes.toString(getResult.getRow()), +// Bytes.toString(keyValue.getFamily()), +// Bytes.toString(keyValue.getQualifier()), +// keyValue.getTimestamp(), +// Bytes.toString(keyValue.getValue()) +// ); +// Assert.assertFalse(Bytes.toString(keyValue.getFamily()).equals("family_with_group3")); +// } +// // delete + get +// batchLsit.clear(); +// delete = new Delete(toBytes("Key1")); +// batchLsit.add(delete); +// batchLsit.add(get1); +// results = multiCfHTable.batch(batchLsit); +// Assert.assertEquals(2, results.length); +// Assert.assertEquals(0, ((Result) results[1]).raw().length); +// +// // put + get +// batchLsit.clear(); +// put = new Put(toBytes("Key1")); +// put.add(family1, family1_column1, family1_value); +// put.add(family2, family2_column1, family2_value); +// put.add(family3, family3_column1, family3_value); +// batchLsit.add(put); +// get1.addFamily(family3); +// batchLsit.add(get1); +// results = multiCfHTable.batch(batchLsit); +// Assert.assertEquals(2, results.length); +// getResult = (Result) results[1]; +// for (KeyValue keyValue : getResult.raw()) { +// System.out.printf("2. Rowkey: %s, Column Family: %s, Column Qualifier: %s, Timestamp: %d, Value: %s%n", +// Bytes.toString(getResult.getRow()), +// Bytes.toString(keyValue.getFamily()), +// Bytes.toString(keyValue.getQualifier()), +// keyValue.getTimestamp(), +// Bytes.toString(keyValue.getValue()) +// ); +// } +// +// // put + delete + get +// // get2 query all family +// get2 = new Get("Key2".getBytes()); +// batchLsit.clear(); +// batchLsit.add(delete); +// batchLsit.add(put); +// batchLsit.add(get1); +// batchLsit.add(delete); +// batchLsit.add(get1); +// batchLsit.add(get2); +// results = multiCfHTable.batch(batchLsit); +// Assert.assertEquals(6, results.length); +// key1Result = (Result) results[2]; +// Assert.assertEquals(3, key1Result.raw().length); +// for (KeyValue keyValue : key1Result.raw()) { +// System.out.printf("3. Rowkey: %s, Column Family: %s, Column Qualifier: %s, Timestamp: %d, Value: %s%n", +// Bytes.toString(key1Result.getRow()), +// Bytes.toString(keyValue.getFamily()), +// Bytes.toString(keyValue.getQualifier()), +// keyValue.getTimestamp(), +// Bytes.toString(keyValue.getValue()) +// ); +// } +// empResult = (Result) results[4]; +// Assert.assertEquals(0, empResult.raw().length); +// +// key2Result = (Result) results[5]; +// Assert.assertEquals(6, key2Result.raw().length); +// for (KeyValue keyValue : key2Result.raw()) { +// System.out.printf("4. Rowkey: %s, Column Family: %s, Column Qualifier: %s, Timestamp: %d, Value: %s%n", +// Bytes.toString(key2Result.getRow()), +// Bytes.toString(keyValue.getFamily()), +// Bytes.toString(keyValue.getQualifier()), +// keyValue.getTimestamp(), +// Bytes.toString(keyValue.getValue()) +// ); +// } +// +// // set maxVersion +// batchLsit.clear(); +// put = new Put(toBytes("Key2")); +// put.add(family1, family1_column1, family1_value); +// put.add(family2, family2_column1, family2_value); +// put.add(family3, family3_column1, family3_value); +// batchLsit.add(put); +// get2.setMaxVersions(10); +// batchLsit.add(get2); +// results = multiCfHTable.batch(batchLsit); +// Assert.assertEquals(2, results.length); +// key2Result = (Result) results[1]; +// Assert.assertEquals(9, key2Result.raw().length); +// for (KeyValue keyValue : key2Result.raw()) { +// System.out.printf("5. Rowkey: %s, Column Family: %s, Column Qualifier: %s, Timestamp: %d, Value: %s%n", +// Bytes.toString(key2Result.getRow()), +// Bytes.toString(keyValue.getFamily()), +// Bytes.toString(keyValue.getQualifier()), +// keyValue.getTimestamp(), +// Bytes.toString(keyValue.getValue()) +// ); +// } +// put = new Put(toBytes("Key2")); +// put.add(family1, family1_column1, family1_value); +// get2.setMaxVersions(2); +// get2All = new Get(toBytes("Key2")); +// get2All.setMaxVersions(); +// batchLsit.clear(); +// batchLsit.add(put); +// batchLsit.add(get2); +// batchLsit.add(get2All); +// results = multiCfHTable.batch(batchLsit); +// Assert.assertEquals(3, results.length); +// key2Result = (Result) results[1]; +// key2AllResult = (Result) results[2]; +// Assert.assertEquals(3, key2AllResult.getColumn(family1, family1_column1).size()); +// Assert.assertEquals(2, key2Result.getColumn(family1, family1_column1).size()); +// for (KeyValue keyValue : key2Result.raw()) { +// System.out.printf("6. Rowkey: %s, Column Family: %s, Column Qualifier: %s, Timestamp: %d, Value: %s%n", +// Bytes.toString(key2Result.getRow()), +// Bytes.toString(keyValue.getFamily()), +// Bytes.toString(keyValue.getQualifier()), +// keyValue.getTimestamp(), +// Bytes.toString(keyValue.getValue()) +// ); +// } +// +// // compare with get interface +// key2GetResult = multiCfHTable.get(get2); +// Assert.assertEquals(key2GetResult.raw().length, key2Result.raw().length); +// for (int i = 0; i < key2GetResult.size(); ++i) { +// Assert.assertEquals(Bytes.toString(key2Result.getRow()), Bytes.toString(key2GetResult.getRow())); +// Assert.assertEquals(Bytes.toString(key2Result.raw()[i].getFamily()), Bytes.toString(key2GetResult.raw()[i].getFamily())); +// Assert.assertEquals(Bytes.toString(key2Result.raw()[i].getQualifier()), Bytes.toString(key2GetResult.raw()[i].getQualifier())); +// Assert.assertEquals(key2Result.raw()[i].getTimestamp(), key2GetResult.raw()[i].getTimestamp()); +// Assert.assertEquals(Bytes.toString(key2Result.raw()[i].getValue()), Bytes.toString(key2GetResult.raw()[i].getValue())); +// } +// // test batch get +// batchLsit.clear(); +// for (int i = 0; i < rows; ++i) { +// get = new Get(toBytes("Key" + i)); +// get.setMaxVersions(10); +// batchLsit.add(get); +// } +// results = multiCfHTable.batch(batchLsit); +// Assert.assertEquals(rows, results.length); +// for (int i = 0; i < rows; ++i) { +// get = new Get(toBytes("Key" + i)); +// get.setMaxVersions(10); +// getResult = multiCfHTable.get(get); +// Result batchGetResult = (Result) results[i]; +// if (Bytes.toString(get.getRow()).equals("Key1")) { +// Assert.assertEquals(0, batchGetResult.raw().length); +// } else if (Bytes.toString(get.getRow()).equals("Key2")) { +// // 6 + 3 + 1 +// Assert.assertEquals(10, batchGetResult.raw().length); +// } else { +// Assert.assertEquals(6, batchGetResult.raw().length); +// } +// Assert.assertEquals(getResult.raw().length, batchGetResult.raw().length); +// for (int j = 0; j < getResult.size(); ++j) { +// Assert.assertEquals(Bytes.toString(getResult.getRow()), Bytes.toString(batchGetResult.getRow())); +// Assert.assertEquals(Bytes.toString(getResult.raw()[j].getFamily()), Bytes.toString(batchGetResult.raw()[j].getFamily())); +// Assert.assertEquals(Bytes.toString(getResult.raw()[j].getQualifier()), Bytes.toString(batchGetResult.raw()[j].getQualifier())); +// Assert.assertEquals(getResult.raw()[j].getTimestamp(), batchGetResult.raw()[j].getTimestamp()); +// Assert.assertEquals(Bytes.toString(getResult.raw()[j].getValue()), Bytes.toString(batchGetResult.raw()[j].getValue())); +// } +// } } @Test From adc0feaccb5ff7ac2558ba1bc13754e0f4b05335 Mon Sep 17 00:00:00 2001 From: JackShi148 Date: Tue, 3 Dec 2024 14:53:03 +0800 Subject: [PATCH 06/16] change to use propertities --- .../com/alipay/oceanbase/hbase/OHTable.java | 22 ++++++------------- 1 file changed, 7 insertions(+), 15 deletions(-) diff --git a/src/main/java/com/alipay/oceanbase/hbase/OHTable.java b/src/main/java/com/alipay/oceanbase/hbase/OHTable.java index 2f002213..c1f00bfe 100644 --- a/src/main/java/com/alipay/oceanbase/hbase/OHTable.java +++ b/src/main/java/com/alipay/oceanbase/hbase/OHTable.java @@ -714,36 +714,28 @@ public void batch(final List actions, final Object[] results) thr results[i] = tmpResults.getResults().get(index); batchError.add((ObTableException) results[i], actions.get(i), null); } else if (actions.get(i) instanceof Get) { - Get get = (Get) actions.get(i); if (tmpResults.getResults().get(index) instanceof MutationResult) { MutationResult mutationResult = (MutationResult) tmpResults.getResults().get(index); ObPayload innerResult = mutationResult.getResult(); if (innerResult instanceof ObTableSingleOpResult) { ObTableSingleOpResult singleOpResult = (ObTableSingleOpResult) innerResult; ObTableSingleOpEntity singleOpEntity = singleOpResult.getEntity(); - List rowKey = singleOpEntity.getRowkey(); List propertiesValues = singleOpEntity.getPropertiesValues(); - if (!get.isCheckExistenceOnly()) { - if (rowKey.size() != (propertiesValues.size() * 3)) { - throw new IllegalArgumentException("the length of rowKey and properties is not matched"); - } - } List cells = new ArrayList<>(); - int rowKeyIdx = 0, valueIdx = 0; - while (rowKeyIdx < rowKey.size() && valueIdx < propertiesValues.size()) { + int valueIdx = 0; + while (valueIdx < propertiesValues.size()) { byte[][] familyAndQualifier = new byte[2][]; // split family and qualifier familyAndQualifier = OHBaseFuncUtils - .extractFamilyFromQualifier((byte[]) rowKey.get(rowKeyIdx + 1).getValue()); - KeyValue kv = new KeyValue((byte[]) rowKey.get(rowKeyIdx).getValue(),//K + .extractFamilyFromQualifier((byte[]) propertiesValues.get(valueIdx + 1).getValue()); + KeyValue kv = new KeyValue((byte[]) propertiesValues.get(valueIdx).getValue(),//K familyAndQualifier[0], // family familyAndQualifier[1], // qualifiermat - (Long) rowKey.get(rowKeyIdx + 2).getValue(), // T - (byte[]) propertiesValues.get(valueIdx).getValue()// V + (Long) propertiesValues.get(valueIdx + 2).getValue(), // T + (byte[]) propertiesValues.get(valueIdx + 3).getValue()// V ); cells.add(kv); - rowKeyIdx += 3; - valueIdx++; + valueIdx += 4; } results[i] = Result.create(cells); } else { From abe0b7f578f59d8e60e44d7ec1a8c0c2fd87e82b Mon Sep 17 00:00:00 2001 From: JackShi148 Date: Tue, 3 Dec 2024 16:32:20 +0800 Subject: [PATCH 07/16] add version control for batch get --- .../com/alipay/oceanbase/hbase/util/CompatibilityUtil.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/main/java/com/alipay/oceanbase/hbase/util/CompatibilityUtil.java b/src/main/java/com/alipay/oceanbase/hbase/util/CompatibilityUtil.java index 39db637c..d407088c 100644 --- a/src/main/java/com/alipay/oceanbase/hbase/util/CompatibilityUtil.java +++ b/src/main/java/com/alipay/oceanbase/hbase/util/CompatibilityUtil.java @@ -4,6 +4,7 @@ public class CompatibilityUtil { public static boolean isBatchSupport() { - return ObGlobal.OB_VERSION > ObGlobal.OB_VERSION_4_3_4_0; + return (ObGlobal.OB_VERSION >= ObGlobal.OB_VERSION_4_2_5_1 && ObGlobal.OB_VERSION < ObGlobal.OB_VERSION_4_3_0_0) + || ObGlobal.OB_VERSION >= ObGlobal.OB_VERSION_4_3_5_0; } } From 6b79bfdac59b7ce3510e3a36a70d511f881a24ed Mon Sep 17 00:00:00 2001 From: JackShi148 Date: Mon, 23 Dec 2024 15:17:23 +0800 Subject: [PATCH 08/16] compatible new client to old server --- .../com/alipay/oceanbase/hbase/OHTable.java | 139 +++++++++++------- 1 file changed, 82 insertions(+), 57 deletions(-) diff --git a/src/main/java/com/alipay/oceanbase/hbase/OHTable.java b/src/main/java/com/alipay/oceanbase/hbase/OHTable.java index c1f00bfe..01ad2765 100644 --- a/src/main/java/com/alipay/oceanbase/hbase/OHTable.java +++ b/src/main/java/com/alipay/oceanbase/hbase/OHTable.java @@ -59,6 +59,7 @@ import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.util.VersionInfo; import org.slf4j.Logger; @@ -529,8 +530,16 @@ public boolean exists(Get get) throws IOException { @Override public boolean[] existsAll(List gets) throws IOException { boolean[] ret = new boolean[gets.size()]; - for (int i = 0; i < gets.size(); ++i) { - ret[i] = exists(gets.get(i)); + if (CompatibilityUtil.isBatchSupport()) { + Result[] results = new Result[gets.size()]; + batch(gets, results); + for (int i = 0; i < gets.size(); ++i) { + ret[i] = !results[i].isEmpty(); + } + } else { + for (int i = 0; i < gets.size(); ++i) { + ret[i] = exists(gets.get(i)); + } } return ret; } @@ -699,64 +708,76 @@ public void batch(final List actions, final Object[] results) thr BatchError batchError = new BatchError(); obTableClient.setRuntimeBatchExecutor(executePool); List resultMapSingleOp = new LinkedList<>(); - try { - if (!CompatibilityUtil.isBatchSupport()) { + if (!CompatibilityUtil.isBatchSupport()) { + try { compatOldServerBatch(actions, results, batchError); - } else { - String realTableName = getTargetTableName(actions); - BatchOperation batch = buildBatchOperation(realTableName, actions, - tableNameString.equals(realTableName), resultMapSingleOp); - BatchOperationResult tmpResults = batch.execute(); - if (results != null) { - int index = 0; - for (int i = 0; i != results.length; ++i) { - if (tmpResults.getResults().get(index) instanceof ObTableException) { - results[i] = tmpResults.getResults().get(index); - batchError.add((ObTableException) results[i], actions.get(i), null); - } else if (actions.get(i) instanceof Get) { - if (tmpResults.getResults().get(index) instanceof MutationResult) { - MutationResult mutationResult = (MutationResult) tmpResults.getResults().get(index); - ObPayload innerResult = mutationResult.getResult(); - if (innerResult instanceof ObTableSingleOpResult) { - ObTableSingleOpResult singleOpResult = (ObTableSingleOpResult) innerResult; - ObTableSingleOpEntity singleOpEntity = singleOpResult.getEntity(); - List propertiesValues = singleOpEntity.getPropertiesValues(); - List cells = new ArrayList<>(); - int valueIdx = 0; - while (valueIdx < propertiesValues.size()) { - byte[][] familyAndQualifier = new byte[2][]; + } catch (Exception e) { + throw new IOException(e); + } + } else { + String realTableName = getTargetTableName(actions); + BatchOperation batch = buildBatchOperation(realTableName, actions, + tableNameString.equals(realTableName), resultMapSingleOp); + BatchOperationResult tmpResults; + try { + tmpResults = batch.execute(); + } catch (Exception e) { + throw new IOException(e); + } + int index = 0; + for (int i = 0; i != actions.size(); ++i) { + if (tmpResults.getResults().get(index) instanceof ObTableException) { + if (results != null) { + results[i] = tmpResults.getResults().get(index); + } + batchError.add((ObTableException) tmpResults.getResults().get(index), actions.get(i), null); + } else if (actions.get(i) instanceof Get) { + if (results != null) { + if (tmpResults.getResults().get(index) instanceof MutationResult) { + MutationResult mutationResult = (MutationResult) tmpResults.getResults().get(index); + ObPayload innerResult = mutationResult.getResult(); + if (innerResult instanceof ObTableSingleOpResult) { + ObTableSingleOpResult singleOpResult = (ObTableSingleOpResult) innerResult; + ObTableSingleOpEntity singleOpEntity = singleOpResult.getEntity(); + List propertiesValues = singleOpEntity.getPropertiesValues(); + List cells = new ArrayList<>(); + int valueIdx = 0; + while (valueIdx < propertiesValues.size()) { + byte[][] familyAndQualifier = new byte[2][]; + try { // split family and qualifier familyAndQualifier = OHBaseFuncUtils .extractFamilyFromQualifier((byte[]) propertiesValues.get(valueIdx + 1).getValue()); - KeyValue kv = new KeyValue((byte[]) propertiesValues.get(valueIdx).getValue(),//K - familyAndQualifier[0], // family - familyAndQualifier[1], // qualifiermat - (Long) propertiesValues.get(valueIdx + 2).getValue(), // T - (byte[]) propertiesValues.get(valueIdx + 3).getValue()// V - ); - cells.add(kv); - valueIdx += 4; + } catch (Exception e) { + throw new IOException(e); } - results[i] = Result.create(cells); - } else { - throw new ObTableUnexpectedException("Unexpected type of result in MutationResult"); + KeyValue kv = new KeyValue((byte[]) propertiesValues.get(valueIdx).getValue(),//K + familyAndQualifier[0], // family + familyAndQualifier[1], // qualifiermat + (Long) propertiesValues.get(valueIdx + 2).getValue(), // T + (byte[]) propertiesValues.get(valueIdx + 3).getValue()// V + ); + cells.add(kv); + valueIdx += 4; } + results[i] = Result.create(cells); } else { - throw new ObTableUnexpectedException("Unexpected type of result in batch"); + throw new ObTableUnexpectedException("Unexpected type of result in MutationResult"); } } else { - results[i] = new Result(); + throw new ObTableUnexpectedException("Unexpected type of result in batch"); } - index += resultMapSingleOp.get(i); } - if (batchError.hasErrors()) { - throw batchError.makeException(); + } else { + if (results != null) { + results[i] = new Result(); } } + index += resultMapSingleOp.get(i); } - } catch (Exception e) { - logger.error(LCD.convert("01-000010"), tableNameString, actions, e); - throw new IOException("batch table " + tableNameString + " error", e); + } + if (batchError.hasErrors()) { + throw batchError.makeException(); } } @@ -866,8 +887,8 @@ private String getTargetTableName(String tableNameString) { return tableNameString; } - // To enable the server to identify the column family to which a qualifier belongs, - // the client writes the column family name into the qualifier. + // To enable the server to identify the column family to which a qualifier belongs, + // the client writes the column family name into the qualifier. // The server then parses this information to determine the table that needs to be operated on. private void processColumnFilters(NavigableSet columnFilters, Map> familyMap) { @@ -903,8 +924,8 @@ public Result call() throws IOException { if (get.getFamilyMap().keySet() == null || get.getFamilyMap().keySet().isEmpty() || get.getFamilyMap().size() > 1) { - // In a Get operation where the family map is greater than 1 or equal to 0, - // we handle this by appending the column family to the qualifier on the client side. + // In a Get operation where the family map is greater than 1 or equal to 0, + // we handle this by appending the column family to the qualifier on the client side. // The server can then use this information to filter the appropriate column families and qualifiers. if (!get.getColumnFamilyTimeRange().isEmpty()) { throw new FeatureNotSupportedException("setColumnFamilyTimeRange is only supported in single column family for now"); @@ -961,8 +982,12 @@ public Result call() throws IOException { @Override public Result[] get(List gets) throws IOException { Result[] results = new Result[gets.size()]; - for (int i = 0; i < gets.size(); i++) { - results[i] = get(gets.get(i)); + if (CompatibilityUtil.isBatchSupport()) { // get only supported in BatchSupport version + batch(gets, results); + } else { + for (int i = 0; i < gets.size(); i++) { + results[i] = get(gets.get(i)); + } } return results; } @@ -989,8 +1014,8 @@ public ResultScanner call() throws IOException { if (scan.getFamilyMap().keySet() == null || scan.getFamilyMap().keySet().isEmpty() || scan.getFamilyMap().size() > 1) { - // In a Scan operation where the family map is greater than 1 or equal to 0, - // we handle this by appending the column family to the qualifier on the client side. + // In a Scan operation where the family map is greater than 1 or equal to 0, + // we handle this by appending the column family to the qualifier on the client side. // The server can then use this information to filter the appropriate column families and qualifiers. if (!scan.getColumnFamilyTimeRange().isEmpty()) { throw new FeatureNotSupportedException("setColumnFamilyTimeRange is only supported in single column family for now"); @@ -1875,7 +1900,7 @@ private KeyValue modifyQualifier(Cell original, byte[] newQualifier) { byte[] value = CellUtil.cloneValue(original); long timestamp = original.getTimestamp(); KeyValue.Type type = KeyValue.Type.codeToType(original.getType().getCode()); - // Create a new KeyValue with the modified qualifier + // Create a new KeyValue with the modified qualifier return new KeyValue(row, family, newQualifier, timestamp, type, value); } @@ -2059,8 +2084,8 @@ public static void checkFamilyViolation(Collection families, boolean che } } - // This method is currently only used for append and increment operations. - // It restricts these two methods to use multi-column family operations. + // This method is currently only used for append and increment operations. + // It restricts these two methods to use multi-column family operations. // Note: After completing operations on multiple column families, they are deleted using the method described above. public static void checkFamilyViolationForOneFamily(Collection families) { if (families == null || families.size() == 0) { From d790f248a2a2e6f4151dede137c1f4324fc7c04e Mon Sep 17 00:00:00 2001 From: JackShi148 Date: Tue, 24 Dec 2024 12:06:09 +0800 Subject: [PATCH 09/16] batch get compatible to hbase 2.x --- .../com/alipay/oceanbase/hbase/OHTable.java | 7 +- .../hbase/util/CompatibilityUtil.java | 10 - .../hbase/HTableMultiCFTestBase.java | 425 +++++++++--------- 3 files changed, 227 insertions(+), 215 deletions(-) delete mode 100644 src/main/java/com/alipay/oceanbase/hbase/util/CompatibilityUtil.java diff --git a/src/main/java/com/alipay/oceanbase/hbase/OHTable.java b/src/main/java/com/alipay/oceanbase/hbase/OHTable.java index 01ad2765..9113952f 100644 --- a/src/main/java/com/alipay/oceanbase/hbase/OHTable.java +++ b/src/main/java/com/alipay/oceanbase/hbase/OHTable.java @@ -23,6 +23,7 @@ import com.alipay.oceanbase.hbase.filter.HBaseFilterUtils; import com.alipay.oceanbase.hbase.result.ClientStreamScanner; import com.alipay.oceanbase.hbase.util.*; +import com.alipay.oceanbase.rpc.ObGlobal; import com.alipay.oceanbase.rpc.ObTableClient; import com.alipay.oceanbase.rpc.exception.ObTableException; import com.alipay.oceanbase.rpc.exception.ObTableUnexpectedException; @@ -530,7 +531,7 @@ public boolean exists(Get get) throws IOException { @Override public boolean[] existsAll(List gets) throws IOException { boolean[] ret = new boolean[gets.size()]; - if (CompatibilityUtil.isBatchSupport()) { + if (ObGlobal.isHBaseBatchGetSupport()) { Result[] results = new Result[gets.size()]; batch(gets, results); for (int i = 0; i < gets.size(); ++i) { @@ -708,7 +709,7 @@ public void batch(final List actions, final Object[] results) thr BatchError batchError = new BatchError(); obTableClient.setRuntimeBatchExecutor(executePool); List resultMapSingleOp = new LinkedList<>(); - if (!CompatibilityUtil.isBatchSupport()) { + if (!ObGlobal.isHBaseBatchGetSupport()) { try { compatOldServerBatch(actions, results, batchError); } catch (Exception e) { @@ -982,7 +983,7 @@ public Result call() throws IOException { @Override public Result[] get(List gets) throws IOException { Result[] results = new Result[gets.size()]; - if (CompatibilityUtil.isBatchSupport()) { // get only supported in BatchSupport version + if (ObGlobal.isHBaseBatchGetSupport()) { // get only supported in BatchSupport version batch(gets, results); } else { for (int i = 0; i < gets.size(); i++) { diff --git a/src/main/java/com/alipay/oceanbase/hbase/util/CompatibilityUtil.java b/src/main/java/com/alipay/oceanbase/hbase/util/CompatibilityUtil.java deleted file mode 100644 index d407088c..00000000 --- a/src/main/java/com/alipay/oceanbase/hbase/util/CompatibilityUtil.java +++ /dev/null @@ -1,10 +0,0 @@ -package com.alipay.oceanbase.hbase.util; - -import com.alipay.oceanbase.rpc.ObGlobal; - -public class CompatibilityUtil { - public static boolean isBatchSupport() { - return (ObGlobal.OB_VERSION >= ObGlobal.OB_VERSION_4_2_5_1 && ObGlobal.OB_VERSION < ObGlobal.OB_VERSION_4_3_0_0) - || ObGlobal.OB_VERSION >= ObGlobal.OB_VERSION_4_3_5_0; - } -} diff --git a/src/test/java/com/alipay/oceanbase/hbase/HTableMultiCFTestBase.java b/src/test/java/com/alipay/oceanbase/hbase/HTableMultiCFTestBase.java index b3c133ab..fb692c20 100644 --- a/src/test/java/com/alipay/oceanbase/hbase/HTableMultiCFTestBase.java +++ b/src/test/java/com/alipay/oceanbase/hbase/HTableMultiCFTestBase.java @@ -27,6 +27,7 @@ import org.apache.hadoop.hbase.client.coprocessor.Batch; import org.apache.hadoop.hbase.filter.*; import org.apache.hadoop.hbase.util.Bytes; +import org.checkerframework.checker.units.qual.C; import org.junit.*; import org.junit.rules.ExpectedException; @@ -876,6 +877,8 @@ public void update(byte[] region, byte[] row, Result result) { // put + get Put put = new Put(toBytes("Key1")); put.addColumn(family1, family1_column1, family1_value); + put.addColumn(family1, family1_column2, family2_value); + put.addColumn(family1, family1_column3, family3_value); batchLsit.clear(); batchLsit.add(put); batchLsit.add(get1); @@ -909,7 +912,7 @@ public void update(byte[] region, byte[] row, Result result) { multiCfHTable.batch(batchLsit, results); Assert.assertEquals(6, results.length); Result key1Result = (Result) results[2]; - Assert.assertEquals(5, key1Result.rawCells().length); + Assert.assertEquals(3, key1Result.rawCells().length); for (Cell keyValue : key1Result.rawCells()) { System.out.printf("3. Rowkey: %s, Column Family: %s, Column Qualifier: %s, Timestamp: %d, Value: %s%n", Bytes.toString(CellUtil.cloneRow(keyValue)), @@ -1022,210 +1025,228 @@ public void update(byte[] region, byte[] row, Result result) { } Assert.assertEquals(getResult.rawCells().length, batchGetResult.rawCells().length); for (int j = 0; j < getResult.size(); ++j) { - Assert.assertEquals(Bytes.toString(key2Result.getRow()), - Bytes.toString(key2GetResult.getRow())); - Assert.assertEquals(Bytes.toString(CellUtil.cloneFamily(key2Result.rawCells()[i])), - Bytes.toString(CellUtil.cloneFamily(key2GetResult.rawCells()[i]))); - Assert.assertEquals(Bytes.toString(CellUtil.cloneQualifier(key2Result.rawCells()[i])), - Bytes.toString(CellUtil.cloneQualifier(key2GetResult.rawCells()[i]))); - Assert.assertEquals(key2Result.rawCells()[i].getTimestamp(), - key2GetResult.rawCells()[i].getTimestamp()); - Assert.assertEquals(Bytes.toString(CellUtil.cloneValue(key2Result.rawCells()[i])), - Bytes.toString(CellUtil.cloneValue(key2GetResult.rawCells()[i]))); + Assert.assertEquals(Bytes.toString(getResult.getRow()), + Bytes.toString(batchGetResult.getRow())); + Assert.assertEquals(Bytes.toString(CellUtil.cloneFamily(getResult.rawCells()[j])), + Bytes.toString(CellUtil.cloneFamily(batchGetResult.rawCells()[j]))); + Assert.assertEquals(Bytes.toString(CellUtil.cloneQualifier(getResult.rawCells()[j])), + Bytes.toString(CellUtil.cloneQualifier(batchGetResult.rawCells()[j]))); + Assert.assertEquals(getResult.rawCells()[j].getTimestamp(), + batchGetResult.rawCells()[j].getTimestamp()); + Assert.assertEquals(Bytes.toString(CellUtil.cloneValue(getResult.rawCells()[j])), + Bytes.toString(CellUtil.cloneValue(batchGetResult.rawCells()[j]))); } } -// // multi-cf test -// batchLsit.clear(); -// for (int i = 0; i < rows; ++i) { -// put = new Put(toBytes("Key" + i)); -// delete = new Delete(toBytes("Key" + i)); -// batchLsit.add(delete); -// put.add(family1, family1_column1, family1_value); -// put.add(family1, family1_column2, family1_value); -// put.add(family1, family1_column3, family1_value); -// put.add(family2, family2_column1, family2_value); -// put.add(family2, family2_column2, family2_value); -// put.add(family3, family3_column1, family3_value); -// batchLsit.add(put); -// } -// multiCfHTable.batch(batchLsit); -// batchLsit.clear(); -// -// // get -// get1 = new Get("Key1".getBytes()); -// get1.addFamily(family1); -// get1.addFamily(family2); -// batchLsit.add(get1); -// results = multiCfHTable.batch(batchLsit); -// getResult = (Result) results[0]; -// for (KeyValue keyValue : getResult.raw()) { -// System.out.printf("1. Rowkey: %s, Column Family: %s, Column Qualifier: %s, Timestamp: %d, Value: %s%n", -// Bytes.toString(getResult.getRow()), -// Bytes.toString(keyValue.getFamily()), -// Bytes.toString(keyValue.getQualifier()), -// keyValue.getTimestamp(), -// Bytes.toString(keyValue.getValue()) -// ); -// Assert.assertFalse(Bytes.toString(keyValue.getFamily()).equals("family_with_group3")); -// } -// // delete + get -// batchLsit.clear(); -// delete = new Delete(toBytes("Key1")); -// batchLsit.add(delete); -// batchLsit.add(get1); -// results = multiCfHTable.batch(batchLsit); -// Assert.assertEquals(2, results.length); -// Assert.assertEquals(0, ((Result) results[1]).raw().length); -// -// // put + get -// batchLsit.clear(); -// put = new Put(toBytes("Key1")); -// put.add(family1, family1_column1, family1_value); -// put.add(family2, family2_column1, family2_value); -// put.add(family3, family3_column1, family3_value); -// batchLsit.add(put); -// get1.addFamily(family3); -// batchLsit.add(get1); -// results = multiCfHTable.batch(batchLsit); -// Assert.assertEquals(2, results.length); -// getResult = (Result) results[1]; -// for (KeyValue keyValue : getResult.raw()) { -// System.out.printf("2. Rowkey: %s, Column Family: %s, Column Qualifier: %s, Timestamp: %d, Value: %s%n", -// Bytes.toString(getResult.getRow()), -// Bytes.toString(keyValue.getFamily()), -// Bytes.toString(keyValue.getQualifier()), -// keyValue.getTimestamp(), -// Bytes.toString(keyValue.getValue()) -// ); -// } -// -// // put + delete + get -// // get2 query all family -// get2 = new Get("Key2".getBytes()); -// batchLsit.clear(); -// batchLsit.add(delete); -// batchLsit.add(put); -// batchLsit.add(get1); -// batchLsit.add(delete); -// batchLsit.add(get1); -// batchLsit.add(get2); -// results = multiCfHTable.batch(batchLsit); -// Assert.assertEquals(6, results.length); -// key1Result = (Result) results[2]; -// Assert.assertEquals(3, key1Result.raw().length); -// for (KeyValue keyValue : key1Result.raw()) { -// System.out.printf("3. Rowkey: %s, Column Family: %s, Column Qualifier: %s, Timestamp: %d, Value: %s%n", -// Bytes.toString(key1Result.getRow()), -// Bytes.toString(keyValue.getFamily()), -// Bytes.toString(keyValue.getQualifier()), -// keyValue.getTimestamp(), -// Bytes.toString(keyValue.getValue()) -// ); -// } -// empResult = (Result) results[4]; -// Assert.assertEquals(0, empResult.raw().length); -// -// key2Result = (Result) results[5]; -// Assert.assertEquals(6, key2Result.raw().length); -// for (KeyValue keyValue : key2Result.raw()) { -// System.out.printf("4. Rowkey: %s, Column Family: %s, Column Qualifier: %s, Timestamp: %d, Value: %s%n", -// Bytes.toString(key2Result.getRow()), -// Bytes.toString(keyValue.getFamily()), -// Bytes.toString(keyValue.getQualifier()), -// keyValue.getTimestamp(), -// Bytes.toString(keyValue.getValue()) -// ); -// } -// -// // set maxVersion -// batchLsit.clear(); -// put = new Put(toBytes("Key2")); -// put.add(family1, family1_column1, family1_value); -// put.add(family2, family2_column1, family2_value); -// put.add(family3, family3_column1, family3_value); -// batchLsit.add(put); -// get2.setMaxVersions(10); -// batchLsit.add(get2); -// results = multiCfHTable.batch(batchLsit); -// Assert.assertEquals(2, results.length); -// key2Result = (Result) results[1]; -// Assert.assertEquals(9, key2Result.raw().length); -// for (KeyValue keyValue : key2Result.raw()) { -// System.out.printf("5. Rowkey: %s, Column Family: %s, Column Qualifier: %s, Timestamp: %d, Value: %s%n", -// Bytes.toString(key2Result.getRow()), -// Bytes.toString(keyValue.getFamily()), -// Bytes.toString(keyValue.getQualifier()), -// keyValue.getTimestamp(), -// Bytes.toString(keyValue.getValue()) -// ); -// } -// put = new Put(toBytes("Key2")); -// put.add(family1, family1_column1, family1_value); -// get2.setMaxVersions(2); -// get2All = new Get(toBytes("Key2")); -// get2All.setMaxVersions(); -// batchLsit.clear(); -// batchLsit.add(put); -// batchLsit.add(get2); -// batchLsit.add(get2All); -// results = multiCfHTable.batch(batchLsit); -// Assert.assertEquals(3, results.length); -// key2Result = (Result) results[1]; -// key2AllResult = (Result) results[2]; -// Assert.assertEquals(3, key2AllResult.getColumn(family1, family1_column1).size()); -// Assert.assertEquals(2, key2Result.getColumn(family1, family1_column1).size()); -// for (KeyValue keyValue : key2Result.raw()) { -// System.out.printf("6. Rowkey: %s, Column Family: %s, Column Qualifier: %s, Timestamp: %d, Value: %s%n", -// Bytes.toString(key2Result.getRow()), -// Bytes.toString(keyValue.getFamily()), -// Bytes.toString(keyValue.getQualifier()), -// keyValue.getTimestamp(), -// Bytes.toString(keyValue.getValue()) -// ); -// } -// -// // compare with get interface -// key2GetResult = multiCfHTable.get(get2); -// Assert.assertEquals(key2GetResult.raw().length, key2Result.raw().length); -// for (int i = 0; i < key2GetResult.size(); ++i) { -// Assert.assertEquals(Bytes.toString(key2Result.getRow()), Bytes.toString(key2GetResult.getRow())); -// Assert.assertEquals(Bytes.toString(key2Result.raw()[i].getFamily()), Bytes.toString(key2GetResult.raw()[i].getFamily())); -// Assert.assertEquals(Bytes.toString(key2Result.raw()[i].getQualifier()), Bytes.toString(key2GetResult.raw()[i].getQualifier())); -// Assert.assertEquals(key2Result.raw()[i].getTimestamp(), key2GetResult.raw()[i].getTimestamp()); -// Assert.assertEquals(Bytes.toString(key2Result.raw()[i].getValue()), Bytes.toString(key2GetResult.raw()[i].getValue())); -// } -// // test batch get -// batchLsit.clear(); -// for (int i = 0; i < rows; ++i) { -// get = new Get(toBytes("Key" + i)); -// get.setMaxVersions(10); -// batchLsit.add(get); -// } -// results = multiCfHTable.batch(batchLsit); -// Assert.assertEquals(rows, results.length); -// for (int i = 0; i < rows; ++i) { -// get = new Get(toBytes("Key" + i)); -// get.setMaxVersions(10); -// getResult = multiCfHTable.get(get); -// Result batchGetResult = (Result) results[i]; -// if (Bytes.toString(get.getRow()).equals("Key1")) { -// Assert.assertEquals(0, batchGetResult.raw().length); -// } else if (Bytes.toString(get.getRow()).equals("Key2")) { -// // 6 + 3 + 1 -// Assert.assertEquals(10, batchGetResult.raw().length); -// } else { -// Assert.assertEquals(6, batchGetResult.raw().length); -// } -// Assert.assertEquals(getResult.raw().length, batchGetResult.raw().length); -// for (int j = 0; j < getResult.size(); ++j) { -// Assert.assertEquals(Bytes.toString(getResult.getRow()), Bytes.toString(batchGetResult.getRow())); -// Assert.assertEquals(Bytes.toString(getResult.raw()[j].getFamily()), Bytes.toString(batchGetResult.raw()[j].getFamily())); -// Assert.assertEquals(Bytes.toString(getResult.raw()[j].getQualifier()), Bytes.toString(batchGetResult.raw()[j].getQualifier())); -// Assert.assertEquals(getResult.raw()[j].getTimestamp(), batchGetResult.raw()[j].getTimestamp()); -// Assert.assertEquals(Bytes.toString(getResult.raw()[j].getValue()), Bytes.toString(batchGetResult.raw()[j].getValue())); -// } -// } + // multi-cf test + batchLsit.clear(); + for (int i = 0; i < rows; ++i) { + put = new Put(toBytes("Key" + i)); + delete = new Delete(toBytes("Key" + i)); + batchLsit.add(delete); + put.addColumn(family1, family1_column1, family1_value); + put.addColumn(family1, family1_column2, family1_value); + put.addColumn(family1, family1_column3, family1_value); + put.addColumn(family2, family2_column1, family2_value); + put.addColumn(family2, family2_column2, family2_value); + put.addColumn(family3, family3_column1, family3_value); + batchLsit.add(put); + } + results = new Object[batchLsit.size()]; + multiCfHTable.batch(batchLsit, results); + batchLsit.clear(); + + // get + get1 = new Get("Key1".getBytes()); + get1.addFamily(family1); + get1.addFamily(family2); + batchLsit.add(get1); + results = new Object[batchLsit.size()]; + multiCfHTable.batch(batchLsit, results); + getResult = (Result) results[0]; + for (Cell keyValue : getResult.rawCells()) { + System.out.printf("1. Rowkey: %s, Column Family: %s, Column Qualifier: %s, Timestamp: %d, Value: %s%n", + Bytes.toString(CellUtil.cloneRow(keyValue)), + Bytes.toString(CellUtil.cloneFamily(keyValue)), + Bytes.toString(CellUtil.cloneQualifier(keyValue)), + keyValue.getTimestamp(), + Bytes.toString(CellUtil.cloneValue(keyValue)) + ); + Assert.assertFalse(Bytes.toString(CellUtil.cloneFamily(keyValue)).equals("family_with_group3")); + } + // delete + get + batchLsit.clear(); + delete = new Delete(toBytes("Key1")); + batchLsit.add(delete); + batchLsit.add(get1); + results = new Object[batchLsit.size()]; + multiCfHTable.batch(batchLsit, results); + Assert.assertEquals(2, results.length); + Assert.assertEquals(0, ((Result) results[1]).rawCells().length); + + // put + get + batchLsit.clear(); + put = new Put(toBytes("Key1")); + put.addColumn(family1, family1_column1, family1_value); + put.addColumn(family2, family2_column1, family2_value); + put.addColumn(family3, family3_column1, family3_value); + batchLsit.add(put); + get1.addFamily(family3); + batchLsit.add(get1); + results = new Object[batchLsit.size()]; + multiCfHTable.batch(batchLsit, results); + Assert.assertEquals(2, results.length); + getResult = (Result) results[1]; + for (Cell keyValue : getResult.rawCells()) { + System.out.printf("2. Rowkey: %s, Column Family: %s, Column Qualifier: %s, Timestamp: %d, Value: %s%n", + Bytes.toString(CellUtil.cloneRow(keyValue)), + Bytes.toString(CellUtil.cloneFamily(keyValue)), + Bytes.toString(CellUtil.cloneQualifier(keyValue)), + keyValue.getTimestamp(), + Bytes.toString(CellUtil.cloneValue(keyValue)) + ); + } + + // put + delete + get + // get2 query all family + get2 = new Get("Key2".getBytes()); + batchLsit.clear(); + batchLsit.add(delete); + batchLsit.add(put); + batchLsit.add(get1); + batchLsit.add(delete); + batchLsit.add(get1); + batchLsit.add(get2); + results = new Object[batchLsit.size()]; + multiCfHTable.batch(batchLsit, results); + Assert.assertEquals(6, results.length); + key1Result = (Result) results[2]; + Assert.assertEquals(3, key1Result.rawCells().length); + for (Cell keyValue : key1Result.rawCells()) { + System.out.printf("3. Rowkey: %s, Column Family: %s, Column Qualifier: %s, Timestamp: %d, Value: %s%n", + Bytes.toString(CellUtil.cloneRow(keyValue)), + Bytes.toString(CellUtil.cloneFamily(keyValue)), + Bytes.toString(CellUtil.cloneQualifier(keyValue)), + keyValue.getTimestamp(), + Bytes.toString(CellUtil.cloneValue(keyValue)) + ); + } + empResult = (Result) results[4]; + Assert.assertEquals(0, empResult.rawCells().length); + + key2Result = (Result) results[5]; + Assert.assertEquals(6, key2Result.rawCells().length); + for (Cell keyValue : key2Result.rawCells()) { + System.out.printf("4. Rowkey: %s, Column Family: %s, Column Qualifier: %s, Timestamp: %d, Value: %s%n", + Bytes.toString(CellUtil.cloneRow(keyValue)), + Bytes.toString(CellUtil.cloneFamily(keyValue)), + Bytes.toString(CellUtil.cloneQualifier(keyValue)), + keyValue.getTimestamp(), + Bytes.toString(CellUtil.cloneValue(keyValue)) + ); + } + + // set maxVersion + batchLsit.clear(); + put = new Put(toBytes("Key2")); + put.addColumn(family1, family1_column1, family1_value); + put.addColumn(family2, family2_column1, family2_value); + put.addColumn(family3, family3_column1, family3_value); + batchLsit.add(put); + get2.setMaxVersions(10); + batchLsit.add(get2); + results = new Object[batchLsit.size()]; + multiCfHTable.batch(batchLsit, results); + Assert.assertEquals(2, results.length); + key2Result = (Result) results[1]; + Assert.assertEquals(9, key2Result.rawCells().length); + for (Cell keyValue : key2Result.rawCells()) { + System.out.printf("5. Rowkey: %s, Column Family: %s, Column Qualifier: %s, Timestamp: %d, Value: %s%n", + Bytes.toString(CellUtil.cloneRow(keyValue)), + Bytes.toString(CellUtil.cloneFamily(keyValue)), + Bytes.toString(CellUtil.cloneQualifier(keyValue)), + keyValue.getTimestamp(), + Bytes.toString(CellUtil.cloneValue(keyValue)) + ); + } + put = new Put(toBytes("Key2")); + put.addColumn(family1, family1_column1, family1_value); + get2.setMaxVersions(2); + get2All = new Get(toBytes("Key2")); + get2All.setMaxVersions(); + batchLsit.clear(); + batchLsit.add(put); + batchLsit.add(get2); + batchLsit.add(get2All); + results = new Object[batchLsit.size()]; + multiCfHTable.batch(batchLsit, results); + Assert.assertEquals(3, results.length); + key2Result = (Result) results[1]; + key2AllResult = (Result) results[2]; + Assert.assertEquals(3, key2AllResult.getColumnCells(family1, family1_column1).size()); + Assert.assertEquals(2, key2Result.getColumnCells(family1, family1_column1).size()); + for (Cell keyValue : key2Result.rawCells()) { + System.out.printf("6. Rowkey: %s, Column Family: %s, Column Qualifier: %s, Timestamp: %d, Value: %s%n", + Bytes.toString(CellUtil.cloneRow(keyValue)), + Bytes.toString(CellUtil.cloneFamily(keyValue)), + Bytes.toString(CellUtil.cloneQualifier(keyValue)), + keyValue.getTimestamp(), + Bytes.toString(CellUtil.cloneValue(keyValue)) + ); + } + + // compare with get interface + key2GetResult = multiCfHTable.get(get2); + Assert.assertEquals(key2GetResult.rawCells().length, key2Result.rawCells().length); + for (int i = 0; i < key2GetResult.size(); ++i) { + Assert.assertEquals(Bytes.toString(key2Result.getRow()), + Bytes.toString(key2GetResult.getRow())); + Assert.assertEquals(Bytes.toString(CellUtil.cloneFamily(key2Result.rawCells()[i])), + Bytes.toString(CellUtil.cloneFamily(key2GetResult.rawCells()[i]))); + Assert.assertEquals(Bytes.toString(CellUtil.cloneQualifier(key2Result.rawCells()[i])), + Bytes.toString(CellUtil.cloneQualifier(key2GetResult.rawCells()[i]))); + Assert.assertEquals(key2Result.rawCells()[i].getTimestamp(), + key2GetResult.rawCells()[i].getTimestamp()); + Assert.assertEquals(Bytes.toString(CellUtil.cloneValue(key2Result.rawCells()[i])), + Bytes.toString(CellUtil.cloneValue(key2GetResult.rawCells()[i]))); + } + // test batch get + batchLsit.clear(); + for (int i = 0; i < rows; ++i) { + get = new Get(toBytes("Key" + i)); + get.setMaxVersions(10); + batchLsit.add(get); + } + results = new Object[batchLsit.size()]; + multiCfHTable.batch(batchLsit, results); + Assert.assertEquals(rows, results.length); + for (int i = 0; i < rows; ++i) { + get = new Get(toBytes("Key" + i)); + get.setMaxVersions(10); + getResult = multiCfHTable.get(get); + Result batchGetResult = (Result) results[i]; + if (Bytes.toString(get.getRow()).equals("Key1")) { + Assert.assertEquals(0, batchGetResult.rawCells().length); + } else if (Bytes.toString(get.getRow()).equals("Key2")) { + // 6 + 3 + 1 + Assert.assertEquals(10, batchGetResult.rawCells().length); + } else { + Assert.assertEquals(6, batchGetResult.rawCells().length); + } + Assert.assertEquals(getResult.rawCells().length, batchGetResult.rawCells().length); + for (int j = 0; j < getResult.size(); ++j) { + Assert.assertEquals(Bytes.toString(getResult.getRow()), + Bytes.toString(batchGetResult.getRow())); + Assert.assertEquals(Bytes.toString(CellUtil.cloneFamily(getResult.rawCells()[j])), + Bytes.toString(CellUtil.cloneFamily(batchGetResult.rawCells()[j]))); + Assert.assertEquals(Bytes.toString(CellUtil.cloneQualifier(getResult.rawCells()[j])), + Bytes.toString(CellUtil.cloneQualifier(batchGetResult.rawCells()[j]))); + Assert.assertEquals(getResult.rawCells()[j].getTimestamp(), + batchGetResult.rawCells()[j].getTimestamp()); + Assert.assertEquals(Bytes.toString(CellUtil.cloneValue(getResult.rawCells()[j])), + Bytes.toString(CellUtil.cloneValue(batchGetResult.rawCells()[j]))); + } + } } @Test From 304263d22d67b37df3636e011be56722d6aedf74 Mon Sep 17 00:00:00 2001 From: JackShi148 Date: Tue, 24 Dec 2024 12:08:58 +0800 Subject: [PATCH 10/16] revert clear tableClient cache when OHTable close --- .../java/com/alipay/oceanbase/hbase/OHTable.java | 2 -- .../oceanbase/hbase/util/ObTableClientManager.java | 12 ------------ .../oceanbase/hbase/HTableMultiCFTestBase.java | 1 - 3 files changed, 15 deletions(-) diff --git a/src/main/java/com/alipay/oceanbase/hbase/OHTable.java b/src/main/java/com/alipay/oceanbase/hbase/OHTable.java index 9113952f..19a8bc7a 100644 --- a/src/main/java/com/alipay/oceanbase/hbase/OHTable.java +++ b/src/main/java/com/alipay/oceanbase/hbase/OHTable.java @@ -60,7 +60,6 @@ import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; -import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.util.VersionInfo; import org.slf4j.Logger; @@ -1531,7 +1530,6 @@ public void close() throws IOException { if (cleanupPoolOnClose) { executePool.shutdown(); } - ObTableClientManager.clear(); } @Override diff --git a/src/main/java/com/alipay/oceanbase/hbase/util/ObTableClientManager.java b/src/main/java/com/alipay/oceanbase/hbase/util/ObTableClientManager.java index 34d3d3b3..5db940aa 100644 --- a/src/main/java/com/alipay/oceanbase/hbase/util/ObTableClientManager.java +++ b/src/main/java/com/alipay/oceanbase/hbase/util/ObTableClientManager.java @@ -126,18 +126,6 @@ public static ObTableClient getOrCreateObTableClient(ObTableClientKey obTableCli return OB_TABLE_CLIENT_INSTANCE.get(obTableClientKey); } - public static void clear() throws IOException { - try { - for (Map.Entry pair : OB_TABLE_CLIENT_INSTANCE.entrySet()) { - pair.getValue().close(); - } - } - catch (Exception e) { - throw new IOException("fail to close tableClient" , e); - } - OB_TABLE_CLIENT_INSTANCE.clear(); - } - public static class ObTableClientKey { private String paramUrl; private String fullUserName; diff --git a/src/test/java/com/alipay/oceanbase/hbase/HTableMultiCFTestBase.java b/src/test/java/com/alipay/oceanbase/hbase/HTableMultiCFTestBase.java index fb692c20..a22bc769 100644 --- a/src/test/java/com/alipay/oceanbase/hbase/HTableMultiCFTestBase.java +++ b/src/test/java/com/alipay/oceanbase/hbase/HTableMultiCFTestBase.java @@ -27,7 +27,6 @@ import org.apache.hadoop.hbase.client.coprocessor.Batch; import org.apache.hadoop.hbase.filter.*; import org.apache.hadoop.hbase.util.Bytes; -import org.checkerframework.checker.units.qual.C; import org.junit.*; import org.junit.rules.ExpectedException; From f471ce85dc60bd3ec8d1625049177423ca2fe566 Mon Sep 17 00:00:00 2001 From: JackShi148 Date: Tue, 24 Dec 2024 12:23:48 +0800 Subject: [PATCH 11/16] do compatibility for batch get and batch put + delete --- src/main/java/com/alipay/oceanbase/hbase/OHTable.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/main/java/com/alipay/oceanbase/hbase/OHTable.java b/src/main/java/com/alipay/oceanbase/hbase/OHTable.java index 19a8bc7a..d7bd0b43 100644 --- a/src/main/java/com/alipay/oceanbase/hbase/OHTable.java +++ b/src/main/java/com/alipay/oceanbase/hbase/OHTable.java @@ -708,7 +708,7 @@ public void batch(final List actions, final Object[] results) thr BatchError batchError = new BatchError(); obTableClient.setRuntimeBatchExecutor(executePool); List resultMapSingleOp = new LinkedList<>(); - if (!ObGlobal.isHBaseBatchGetSupport()) { + if (!ObGlobal.isHBaseBatchSupport()) { try { compatOldServerBatch(actions, results, batchError); } catch (Exception e) { @@ -1915,6 +1915,9 @@ private BatchOperation buildBatchOperation(String tableName, List singleOpResultNum = 0; posInList++; if (row instanceof Get) { + if (!ObGlobal.isHBaseBatchGetSupport()) { + throw new FeatureNotSupportedException("server does not support batch get"); + } ++singleOpResultNum; Get get = (Get) row; ObTableQuery obTableQuery; From 70617094dd05f7d62ad6c354d90512c7ff4fcb47 Mon Sep 17 00:00:00 2001 From: JackShi148 Date: Thu, 2 Jan 2025 10:41:59 +0800 Subject: [PATCH 12/16] exists and get use batch get --- .../com/alipay/oceanbase/hbase/OHTable.java | 83 +++++++++++-------- 1 file changed, 50 insertions(+), 33 deletions(-) diff --git a/src/main/java/com/alipay/oceanbase/hbase/OHTable.java b/src/main/java/com/alipay/oceanbase/hbase/OHTable.java index d7bd0b43..cd67634f 100644 --- a/src/main/java/com/alipay/oceanbase/hbase/OHTable.java +++ b/src/main/java/com/alipay/oceanbase/hbase/OHTable.java @@ -530,16 +530,15 @@ public boolean exists(Get get) throws IOException { @Override public boolean[] existsAll(List gets) throws IOException { boolean[] ret = new boolean[gets.size()]; - if (ObGlobal.isHBaseBatchGetSupport()) { - Result[] results = new Result[gets.size()]; - batch(gets, results); - for (int i = 0; i < gets.size(); ++i) { - ret[i] = !results[i].isEmpty(); - } - } else { - for (int i = 0; i < gets.size(); ++i) { - ret[i] = exists(gets.get(i)); - } + List newGets = new ArrayList<>(); + for (Get get : gets) { + Get newGet = new Get(get); + newGet.setCheckExistenceOnly(true); + newGets.add(newGet); + } + Result[] results = get(newGets); + for (int i = 0; i < results.length; ++i) { + ret[i] = results[i].getExists(); } return ret; } @@ -696,7 +695,7 @@ private void compatOldServerBatch(final List actions, final Objec } @Override - public void batch(final List actions, final Object[] results) throws IOException{ + public void batch(final List actions, final Object[] results) throws IOException { if (actions == null) { return; } @@ -733,34 +732,19 @@ public void batch(final List actions, final Object[] results) thr batchError.add((ObTableException) tmpResults.getResults().get(index), actions.get(i), null); } else if (actions.get(i) instanceof Get) { if (results != null) { + Get get = (Get) actions.get(i); + // get results have been wrapped in MutationResult, need to fetch it if (tmpResults.getResults().get(index) instanceof MutationResult) { MutationResult mutationResult = (MutationResult) tmpResults.getResults().get(index); ObPayload innerResult = mutationResult.getResult(); if (innerResult instanceof ObTableSingleOpResult) { ObTableSingleOpResult singleOpResult = (ObTableSingleOpResult) innerResult; - ObTableSingleOpEntity singleOpEntity = singleOpResult.getEntity(); - List propertiesValues = singleOpEntity.getPropertiesValues(); - List cells = new ArrayList<>(); - int valueIdx = 0; - while (valueIdx < propertiesValues.size()) { - byte[][] familyAndQualifier = new byte[2][]; - try { - // split family and qualifier - familyAndQualifier = OHBaseFuncUtils - .extractFamilyFromQualifier((byte[]) propertiesValues.get(valueIdx + 1).getValue()); - } catch (Exception e) { - throw new IOException(e); - } - KeyValue kv = new KeyValue((byte[]) propertiesValues.get(valueIdx).getValue(),//K - familyAndQualifier[0], // family - familyAndQualifier[1], // qualifiermat - (Long) propertiesValues.get(valueIdx + 2).getValue(), // T - (byte[]) propertiesValues.get(valueIdx + 3).getValue()// V - ); - cells.add(kv); - valueIdx += 4; + List cells = generateGetResult(singleOpResult); + if (get.isCheckExistenceOnly()) { + results[i] = Result.create(null, !cells.isEmpty()); + } else { + results[i] = Result.create(cells); } - results[i] = Result.create(cells); } else { throw new ObTableUnexpectedException("Unexpected type of result in MutationResult"); } @@ -781,6 +765,36 @@ public void batch(final List actions, final Object[] results) thr } } + private List generateGetResult(ObTableSingleOpResult getResult) throws IOException { + List cells = new ArrayList<>(); + ObTableSingleOpEntity singleOpEntity = getResult.getEntity(); + // all values queried by this get are contained in properties + // qualifier in batch get result is always appended after family + List propertiesValues = singleOpEntity.getPropertiesValues(); + int valueIdx = 0; + while (valueIdx < propertiesValues.size()) { + // values in propertiesValues like: [ K, Q, T, V, K, Q, T, V ... ] + // we need to retrieve K Q T V and construct them to cells: [ cell_0, cell_1, ... ] + byte[][] familyAndQualifier = new byte[2][]; + try { + // split family and qualifier + familyAndQualifier = OHBaseFuncUtils + .extractFamilyFromQualifier((byte[]) propertiesValues.get(valueIdx + 1).getValue()); + } catch (Exception e) { + throw new IOException(e); + } + KeyValue kv = new KeyValue((byte[]) propertiesValues.get(valueIdx).getValue(),//K + familyAndQualifier[0], // family + familyAndQualifier[1], // qualifiermat + (Long) propertiesValues.get(valueIdx + 2).getValue(), // T + (byte[]) propertiesValues.get(valueIdx + 3).getValue()// V + ); + cells.add(kv); + valueIdx += 4; + } + return cells; + } + private String getTargetTableName(List actions) { byte[] family = null; for (Row action : actions) { @@ -1941,6 +1955,9 @@ private BatchOperation buildBatchOperation(String tableName, List } } NavigableSet columnFilters = new TreeSet<>(Bytes.BYTES_COMPARATOR); + // in batch get, we need to carry family in qualifier to server even this get is a single-cf operation + // because the entire batch may be a multi-cf batch so do not carry family + // family in qualifier helps us to know which table to query processColumnFilters(columnFilters, get.getFamilyMap()); obTableQuery = buildObTableQuery(get, columnFilters); ObTableClientQueryImpl query = new ObTableClientQueryImpl(tableName, obTableQuery, obTableClient); From 92b9e204ab261d6de15b04e4e3c188a8cb3f23ba Mon Sep 17 00:00:00 2001 From: JackShi148 Date: Thu, 2 Jan 2025 11:41:39 +0800 Subject: [PATCH 13/16] fix existsAll using batch get --- .../com/alipay/oceanbase/hbase/OHTable.java | 18 +++++++----------- 1 file changed, 7 insertions(+), 11 deletions(-) diff --git a/src/main/java/com/alipay/oceanbase/hbase/OHTable.java b/src/main/java/com/alipay/oceanbase/hbase/OHTable.java index cd67634f..9cb9a3c8 100644 --- a/src/main/java/com/alipay/oceanbase/hbase/OHTable.java +++ b/src/main/java/com/alipay/oceanbase/hbase/OHTable.java @@ -530,15 +530,15 @@ public boolean exists(Get get) throws IOException { @Override public boolean[] existsAll(List gets) throws IOException { boolean[] ret = new boolean[gets.size()]; - List newGets = new ArrayList<>(); + // if just checkExistOnly, batch get will not return any result or row count + // therefore we have to set checkExistOnly as false and so the result can be returned + // TODO: adjust ExistOnly in server when using batch get for (Get get : gets) { - Get newGet = new Get(get); - newGet.setCheckExistenceOnly(true); - newGets.add(newGet); + get.setCheckExistenceOnly(false); } - Result[] results = get(newGets); + Result[] results = get(gets); for (int i = 0; i < results.length; ++i) { - ret[i] = results[i].getExists(); + ret[i] = !results[i].isEmpty(); } return ret; } @@ -740,11 +740,7 @@ public void batch(final List actions, final Object[] results) thr if (innerResult instanceof ObTableSingleOpResult) { ObTableSingleOpResult singleOpResult = (ObTableSingleOpResult) innerResult; List cells = generateGetResult(singleOpResult); - if (get.isCheckExistenceOnly()) { - results[i] = Result.create(null, !cells.isEmpty()); - } else { - results[i] = Result.create(cells); - } + results[i] = Result.create(cells); } else { throw new ObTableUnexpectedException("Unexpected type of result in MutationResult"); } From 6c52b1f09c5d3d256b00b55cd86bbeacd339824d Mon Sep 17 00:00:00 2001 From: JackShi148 Date: Thu, 2 Jan 2025 11:51:36 +0800 Subject: [PATCH 14/16] keep the original gets --- src/main/java/com/alipay/oceanbase/hbase/OHTable.java | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/main/java/com/alipay/oceanbase/hbase/OHTable.java b/src/main/java/com/alipay/oceanbase/hbase/OHTable.java index 9cb9a3c8..4d3652a8 100644 --- a/src/main/java/com/alipay/oceanbase/hbase/OHTable.java +++ b/src/main/java/com/alipay/oceanbase/hbase/OHTable.java @@ -530,13 +530,16 @@ public boolean exists(Get get) throws IOException { @Override public boolean[] existsAll(List gets) throws IOException { boolean[] ret = new boolean[gets.size()]; + List newGets = new ArrayList<>(); // if just checkExistOnly, batch get will not return any result or row count // therefore we have to set checkExistOnly as false and so the result can be returned // TODO: adjust ExistOnly in server when using batch get for (Get get : gets) { - get.setCheckExistenceOnly(false); + Get newGet = new Get(get); + newGet.setCheckExistenceOnly(false); + newGets.add(newGet); } - Result[] results = get(gets); + Result[] results = get(newGets); for (int i = 0; i < results.length; ++i) { ret[i] = !results[i].isEmpty(); } From ebe9f76294d327837fb8ac615382bd540f55a508 Mon Sep 17 00:00:00 2001 From: JackShi148 Date: Thu, 2 Jan 2025 14:23:30 +0800 Subject: [PATCH 15/16] remove useless variable --- src/main/java/com/alipay/oceanbase/hbase/OHTable.java | 1 - 1 file changed, 1 deletion(-) diff --git a/src/main/java/com/alipay/oceanbase/hbase/OHTable.java b/src/main/java/com/alipay/oceanbase/hbase/OHTable.java index 4d3652a8..baa8459e 100644 --- a/src/main/java/com/alipay/oceanbase/hbase/OHTable.java +++ b/src/main/java/com/alipay/oceanbase/hbase/OHTable.java @@ -735,7 +735,6 @@ public void batch(final List actions, final Object[] results) thr batchError.add((ObTableException) tmpResults.getResults().get(index), actions.get(i), null); } else if (actions.get(i) instanceof Get) { if (results != null) { - Get get = (Get) actions.get(i); // get results have been wrapped in MutationResult, need to fetch it if (tmpResults.getResults().get(index) instanceof MutationResult) { MutationResult mutationResult = (MutationResult) tmpResults.getResults().get(index); From 2326b41f7602f6ce192451331398a0be091f6884 Mon Sep 17 00:00:00 2001 From: JackShi148 Date: Thu, 2 Jan 2025 17:33:59 +0800 Subject: [PATCH 16/16] add setRuntimeMaxWait and setRuntimeBatchMaxWait in OHTable setOperationTimeout --- src/main/java/com/alipay/oceanbase/hbase/OHTable.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/main/java/com/alipay/oceanbase/hbase/OHTable.java b/src/main/java/com/alipay/oceanbase/hbase/OHTable.java index baa8459e..34221073 100644 --- a/src/main/java/com/alipay/oceanbase/hbase/OHTable.java +++ b/src/main/java/com/alipay/oceanbase/hbase/OHTable.java @@ -1591,6 +1591,8 @@ public void batchCoprocessorService(Descriptors.MethodDescri @Override public void setOperationTimeout(int operationTimeout) { this.operationTimeout = operationTimeout; + this.obTableClient.setRuntimeMaxWait(operationTimeout); + this.obTableClient.setRuntimeBatchMaxWait(operationTimeout); this.operationExecuteInPool = this.configuration.getBoolean( HBASE_CLIENT_OPERATION_EXECUTE_IN_POOL, (this.operationTimeout != HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT));