Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 32 additions & 23 deletions src/main/java/com/alipay/oceanbase/hbase/OHTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.query.*;
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.syncquery.ObTableQueryAsyncRequest;
import com.alipay.oceanbase.rpc.stream.ObTableClientQueryAsyncStreamResult;
import com.alipay.oceanbase.rpc.stream.ObTableClientQueryStreamResult;
import com.alipay.oceanbase.rpc.table.ObHBaseParams;
import com.alipay.oceanbase.rpc.table.ObKVParams;
import com.alipay.sofa.common.thread.SofaThreadPoolExecutor;
Expand Down Expand Up @@ -540,8 +541,6 @@ public Result get(final Get get) throws IOException {
public Result call() throws IOException {
List<KeyValue> keyValueList = new ArrayList<KeyValue>();
byte[] family = new byte[] {};
ObTableClientQueryAsyncStreamResult clientQueryStreamResult;
ObTableQueryAsyncRequest request;
ObTableQuery obTableQuery;
try {
if (get.getFamilyMap().keySet() == null
Expand All @@ -553,21 +552,21 @@ public Result call() throws IOException {
NavigableSet<byte[]> columnFilters = new TreeSet<>(Bytes.BYTES_COMPARATOR);
processColumnFilters(columnFilters, get.getFamilyMap());
obTableQuery = buildObTableQuery(get, columnFilters);
request = buildObTableQueryAsyncRequest(obTableQuery,
ObTableQueryAsyncRequest request = buildObTableQueryAsyncRequest(obTableQuery,
getTargetTableName(tableNameString));

clientQueryStreamResult = (ObTableClientQueryAsyncStreamResult) obTableClient
ObTableClientQueryAsyncStreamResult clientQueryStreamResult = (ObTableClientQueryAsyncStreamResult) obTableClient
.execute(request);
getMaxRowFromResult(clientQueryStreamResult, keyValueList, true, family);
} else {
for (Map.Entry<byte[], NavigableSet<byte[]>> entry : get.getFamilyMap()
.entrySet()) {
family = entry.getKey();
obTableQuery = buildObTableQuery(get, entry.getValue());
request = buildObTableQueryAsyncRequest(obTableQuery,
ObTableQueryRequest request = buildObTableQueryRequest(obTableQuery,
getTargetTableName(tableNameString, Bytes.toString(family),
configuration));
clientQueryStreamResult = (ObTableClientQueryAsyncStreamResult) obTableClient
ObTableClientQueryStreamResult clientQueryStreamResult = (ObTableClientQueryStreamResult) obTableClient
.execute(request);
getMaxRowFromResult(clientQueryStreamResult, keyValueList, false,
family);
Expand Down Expand Up @@ -799,43 +798,53 @@ private void innerDelete(Delete delete) throws IOException {
checkArgument(delete.getRow() != null, "row is null");
List<Integer> errorCodeList = new ArrayList<Integer>();
BatchOperationResult results = null;

try {
checkFamilyViolation(delete.getFamilyMap().keySet(), false);
if (delete.getFamilyMap().isEmpty()) {
// For a Delete operation without any qualifiers, we construct a DeleteFamily request.
// The server then performs the operation on all column families.
KeyValue kv = new KeyValue(delete.getRow(), delete.getTimeStamp(),
KeyValue.Type.DeleteFamily);

BatchOperation batch = buildBatchOperation(tableNameString, Arrays.asList(kv), false, null);
KeyValue.Type.DeleteFamily);

BatchOperation batch = buildBatchOperation(tableNameString, Arrays.asList(kv),
false, null);
results = batch.execute();
} else if (delete.getFamilyMap().size() > 1) {
// Currently, the Delete Family operation type cannot transmit qualifiers to the server.
// As a result, the server cannot identify which families need to be deleted.
// Therefore, this process is handled sequentially.
boolean has_delete_family = delete.getFamilyMap().entrySet().stream()
.flatMap(entry -> entry.getValue().stream())
.anyMatch(kv -> KeyValue.Type.codeToType(kv.getType()) == KeyValue.Type.DeleteFamily);
if (!has_delete_family) {
boolean hasDeleteFamily = false;
for (Map.Entry<byte[], List<KeyValue>> entry : delete.getFamilyMap().entrySet()) {
for (KeyValue kv : entry.getValue()) {
if (KeyValue.Type.codeToType(kv.getType()) == KeyValue.Type.DeleteFamily) {
hasDeleteFamily = true;
break;
}
}
if (hasDeleteFamily) {
break;
}
}
if (!hasDeleteFamily) {
BatchOperation batch = buildBatchOperation(tableNameString,
delete.getFamilyMap(), false, null);
delete.getFamilyMap(), false, null);
results = batch.execute();
} else {
for (Map.Entry<byte[], List<KeyValue>> entry : delete.getFamilyMap().entrySet()) {
BatchOperation batch = buildBatchOperation(
getTargetTableName(tableNameString, Bytes.toString(entry.getKey()), configuration),
entry.getValue(), false, null);
getTargetTableName(tableNameString, Bytes.toString(entry.getKey()),
configuration), entry.getValue(), false, null);
results = batch.execute();
}
}
} else {
Map.Entry<byte[], List<KeyValue>> entry = delete.getFamilyMap().entrySet().iterator()
.next();
Map.Entry<byte[], List<KeyValue>> entry = delete.getFamilyMap().entrySet()
.iterator().next();

BatchOperation batch = buildBatchOperation(
getTargetTableName(tableNameString, Bytes.toString(entry.getKey()), configuration),
entry.getValue(), false, null);
getTargetTableName(tableNameString, Bytes.toString(entry.getKey()),
configuration), entry.getValue(), false, null);
results = batch.execute();
}

Expand Down Expand Up @@ -1020,12 +1029,12 @@ public Result increment(Increment increment) throws IOException {
byte[] f = entry.getKey();

ObTableBatchOperation batch = new ObTableBatchOperation();
entry.getValue().forEach(cell -> {
for (Cell cell : entry.getValue()) {
byte[] qualifier = cell.getQualifier();
qualifiers.add(qualifier);
batch.addTableOperation(getInstance(INCREMENT, new Object[] { rowKey, qualifier,
Long.MAX_VALUE }, V_COLUMNS, new Object[] { cell.getValue() }));
});
}

ObHTableFilter filter = buildObHTableFilter(null, increment.getTimeRange(), 1,
qualifiers);
Expand Down