diff --git a/src/main/java/com/alipay/oceanbase/hbase/OHTable.java b/src/main/java/com/alipay/oceanbase/hbase/OHTable.java index 3defa3c7..48bb3ccb 100644 --- a/src/main/java/com/alipay/oceanbase/hbase/OHTable.java +++ b/src/main/java/com/alipay/oceanbase/hbase/OHTable.java @@ -35,6 +35,7 @@ import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.query.*; import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.syncquery.ObTableQueryAsyncRequest; import com.alipay.oceanbase.rpc.stream.ObTableClientQueryAsyncStreamResult; +import com.alipay.oceanbase.rpc.stream.ObTableClientQueryStreamResult; import com.alipay.oceanbase.rpc.table.ObHBaseParams; import com.alipay.oceanbase.rpc.table.ObKVParams; import com.alipay.sofa.common.thread.SofaThreadPoolExecutor; @@ -540,8 +541,6 @@ public Result get(final Get get) throws IOException { public Result call() throws IOException { List keyValueList = new ArrayList(); byte[] family = new byte[] {}; - ObTableClientQueryAsyncStreamResult clientQueryStreamResult; - ObTableQueryAsyncRequest request; ObTableQuery obTableQuery; try { if (get.getFamilyMap().keySet() == null @@ -553,10 +552,10 @@ public Result call() throws IOException { NavigableSet columnFilters = new TreeSet<>(Bytes.BYTES_COMPARATOR); processColumnFilters(columnFilters, get.getFamilyMap()); obTableQuery = buildObTableQuery(get, columnFilters); - request = buildObTableQueryAsyncRequest(obTableQuery, + ObTableQueryAsyncRequest request = buildObTableQueryAsyncRequest(obTableQuery, getTargetTableName(tableNameString)); - clientQueryStreamResult = (ObTableClientQueryAsyncStreamResult) obTableClient + ObTableClientQueryAsyncStreamResult clientQueryStreamResult = (ObTableClientQueryAsyncStreamResult) obTableClient .execute(request); getMaxRowFromResult(clientQueryStreamResult, keyValueList, true, family); } else { @@ -564,10 +563,10 @@ public Result call() throws IOException { .entrySet()) { family = entry.getKey(); obTableQuery = buildObTableQuery(get, entry.getValue()); - request = buildObTableQueryAsyncRequest(obTableQuery, + ObTableQueryRequest request = buildObTableQueryRequest(obTableQuery, getTargetTableName(tableNameString, Bytes.toString(family), configuration)); - clientQueryStreamResult = (ObTableClientQueryAsyncStreamResult) obTableClient + ObTableClientQueryStreamResult clientQueryStreamResult = (ObTableClientQueryStreamResult) obTableClient .execute(request); getMaxRowFromResult(clientQueryStreamResult, keyValueList, false, family); @@ -799,43 +798,53 @@ private void innerDelete(Delete delete) throws IOException { checkArgument(delete.getRow() != null, "row is null"); List errorCodeList = new ArrayList(); BatchOperationResult results = null; - + try { checkFamilyViolation(delete.getFamilyMap().keySet(), false); if (delete.getFamilyMap().isEmpty()) { // For a Delete operation without any qualifiers, we construct a DeleteFamily request. // The server then performs the operation on all column families. KeyValue kv = new KeyValue(delete.getRow(), delete.getTimeStamp(), - KeyValue.Type.DeleteFamily); - - BatchOperation batch = buildBatchOperation(tableNameString, Arrays.asList(kv), false, null); + KeyValue.Type.DeleteFamily); + + BatchOperation batch = buildBatchOperation(tableNameString, Arrays.asList(kv), + false, null); results = batch.execute(); } else if (delete.getFamilyMap().size() > 1) { // Currently, the Delete Family operation type cannot transmit qualifiers to the server. // As a result, the server cannot identify which families need to be deleted. // Therefore, this process is handled sequentially. - boolean has_delete_family = delete.getFamilyMap().entrySet().stream() - .flatMap(entry -> entry.getValue().stream()) - .anyMatch(kv -> KeyValue.Type.codeToType(kv.getType()) == KeyValue.Type.DeleteFamily); - if (!has_delete_family) { + boolean hasDeleteFamily = false; + for (Map.Entry> entry : delete.getFamilyMap().entrySet()) { + for (KeyValue kv : entry.getValue()) { + if (KeyValue.Type.codeToType(kv.getType()) == KeyValue.Type.DeleteFamily) { + hasDeleteFamily = true; + break; + } + } + if (hasDeleteFamily) { + break; + } + } + if (!hasDeleteFamily) { BatchOperation batch = buildBatchOperation(tableNameString, - delete.getFamilyMap(), false, null); + delete.getFamilyMap(), false, null); results = batch.execute(); } else { for (Map.Entry> entry : delete.getFamilyMap().entrySet()) { BatchOperation batch = buildBatchOperation( - getTargetTableName(tableNameString, Bytes.toString(entry.getKey()), configuration), - entry.getValue(), false, null); + getTargetTableName(tableNameString, Bytes.toString(entry.getKey()), + configuration), entry.getValue(), false, null); results = batch.execute(); } } } else { - Map.Entry> entry = delete.getFamilyMap().entrySet().iterator() - .next(); + Map.Entry> entry = delete.getFamilyMap().entrySet() + .iterator().next(); BatchOperation batch = buildBatchOperation( - getTargetTableName(tableNameString, Bytes.toString(entry.getKey()), configuration), - entry.getValue(), false, null); + getTargetTableName(tableNameString, Bytes.toString(entry.getKey()), + configuration), entry.getValue(), false, null); results = batch.execute(); } @@ -1020,12 +1029,12 @@ public Result increment(Increment increment) throws IOException { byte[] f = entry.getKey(); ObTableBatchOperation batch = new ObTableBatchOperation(); - entry.getValue().forEach(cell -> { + for (Cell cell : entry.getValue()) { byte[] qualifier = cell.getQualifier(); qualifiers.add(qualifier); batch.addTableOperation(getInstance(INCREMENT, new Object[] { rowKey, qualifier, Long.MAX_VALUE }, V_COLUMNS, new Object[] { cell.getValue() })); - }); + } ObHTableFilter filter = buildObHTableFilter(null, increment.getTimeRange(), 1, qualifiers);