Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
164 changes: 142 additions & 22 deletions src/main/java/com/alipay/oceanbase/hbase/OHTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,14 @@
import com.alipay.oceanbase.hbase.filter.HBaseFilterUtils;
import com.alipay.oceanbase.hbase.result.ClientStreamScanner;
import com.alipay.oceanbase.hbase.util.*;
import com.alipay.oceanbase.rpc.ObGlobal;
import com.alipay.oceanbase.rpc.ObTableClient;
import com.alipay.oceanbase.rpc.exception.ObTableException;
import com.alipay.oceanbase.rpc.exception.ObTableUnexpectedException;
import com.alipay.oceanbase.rpc.mutation.BatchOperation;
import com.alipay.oceanbase.rpc.mutation.result.BatchOperationResult;
import com.alipay.oceanbase.rpc.mutation.result.MutationResult;
import com.alipay.oceanbase.rpc.protocol.payload.ObPayload;
import com.alipay.oceanbase.rpc.protocol.payload.impl.ObObj;
import com.alipay.oceanbase.rpc.protocol.payload.impl.ObRowKey;
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.*;
Expand All @@ -39,6 +43,7 @@
import com.alipay.oceanbase.rpc.stream.ObTableClientQueryStreamResult;
import com.alipay.oceanbase.rpc.table.ObHBaseParams;
import com.alipay.oceanbase.rpc.table.ObKVParams;
import com.alipay.oceanbase.rpc.table.ObTableClientQueryImpl;
import com.alipay.sofa.common.thread.SofaThreadPoolExecutor;

import com.google.protobuf.Descriptors;
Expand Down Expand Up @@ -68,6 +73,8 @@
import static com.alipay.oceanbase.hbase.util.Preconditions.checkNotNull;
import static com.alipay.oceanbase.hbase.util.TableHBaseLoggerFactory.LCD;
import static com.alipay.oceanbase.hbase.util.TableHBaseLoggerFactory.TABLE_HBASE_LOGGER_SPACE;
import static com.alipay.oceanbase.rpc.mutation.MutationFactory.colVal;
import static com.alipay.oceanbase.rpc.mutation.MutationFactory.row;
import static com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableOperation.getInstance;
import static com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableOperationType.*;
import static com.alipay.sofa.common.thread.SofaThreadPoolConstants.SOFA_THREAD_POOL_LOGGING_CAPABILITY;
Expand Down Expand Up @@ -523,8 +530,18 @@ public boolean exists(Get get) throws IOException {
@Override
public boolean[] existsAll(List<Get> gets) throws IOException {
boolean[] ret = new boolean[gets.size()];
for (int i = 0; i < gets.size(); ++i) {
ret[i] = exists(gets.get(i));
List<Get> newGets = new ArrayList<>();
// if just checkExistOnly, batch get will not return any result or row count
// therefore we have to set checkExistOnly as false and so the result can be returned
// TODO: adjust ExistOnly in server when using batch get
for (Get get : gets) {
Get newGet = new Get(get);
newGet.setCheckExistenceOnly(false);
newGets.add(newGet);
}
Result[] results = get(newGets);
for (int i = 0; i < results.length; ++i) {
ret[i] = !results[i].isEmpty();
}
return ret;
}
Expand Down Expand Up @@ -681,7 +698,7 @@ private void compatOldServerBatch(final List<? extends Row> actions, final Objec
}

@Override
public void batch(final List<? extends Row> actions, final Object[] results) throws IOException{
public void batch(final List<? extends Row> actions, final Object[] results) throws IOException {
if (actions == null) {
return;
}
Expand All @@ -693,7 +710,7 @@ public void batch(final List<? extends Row> actions, final Object[] results) thr
BatchError batchError = new BatchError();
obTableClient.setRuntimeBatchExecutor(executePool);
List<Integer> resultMapSingleOp = new LinkedList<>();
if (!CompatibilityUtil.isBatchSupport()) {
if (!ObGlobal.isHBaseBatchSupport()) {
try {
compatOldServerBatch(actions, results, batchError);
} catch (Exception e) {
Expand All @@ -716,6 +733,23 @@ public void batch(final List<? extends Row> actions, final Object[] results) thr
results[i] = tmpResults.getResults().get(index);
}
batchError.add((ObTableException) tmpResults.getResults().get(index), actions.get(i), null);
} else if (actions.get(i) instanceof Get) {
if (results != null) {
// get results have been wrapped in MutationResult, need to fetch it
if (tmpResults.getResults().get(index) instanceof MutationResult) {
MutationResult mutationResult = (MutationResult) tmpResults.getResults().get(index);
Comment thread
shenyunlong marked this conversation as resolved.
ObPayload innerResult = mutationResult.getResult();
if (innerResult instanceof ObTableSingleOpResult) {
ObTableSingleOpResult singleOpResult = (ObTableSingleOpResult) innerResult;
List<Cell> cells = generateGetResult(singleOpResult);
results[i] = Result.create(cells);
} else {
throw new ObTableUnexpectedException("Unexpected type of result in MutationResult");
}
} else {
throw new ObTableUnexpectedException("Unexpected type of result in batch");
}
}
} else {
if (results != null) {
results[i] = new Result();
Expand All @@ -729,17 +763,57 @@ public void batch(final List<? extends Row> actions, final Object[] results) thr
}
}

private List<Cell> generateGetResult(ObTableSingleOpResult getResult) throws IOException {
List<Cell> cells = new ArrayList<>();
ObTableSingleOpEntity singleOpEntity = getResult.getEntity();
// all values queried by this get are contained in properties
// qualifier in batch get result is always appended after family
List<ObObj> propertiesValues = singleOpEntity.getPropertiesValues();
int valueIdx = 0;
while (valueIdx < propertiesValues.size()) {
// values in propertiesValues like: [ K, Q, T, V, K, Q, T, V ... ]
// we need to retrieve K Q T V and construct them to cells: [ cell_0, cell_1, ... ]
byte[][] familyAndQualifier = new byte[2][];
try {
// split family and qualifier
familyAndQualifier = OHBaseFuncUtils
.extractFamilyFromQualifier((byte[]) propertiesValues.get(valueIdx + 1).getValue());
} catch (Exception e) {
throw new IOException(e);
}
KeyValue kv = new KeyValue((byte[]) propertiesValues.get(valueIdx).getValue(),//K
familyAndQualifier[0], // family
familyAndQualifier[1], // qualifiermat
(Long) propertiesValues.get(valueIdx + 2).getValue(), // T
(byte[]) propertiesValues.get(valueIdx + 3).getValue()// V
);
cells.add(kv);
valueIdx += 4;
}
return cells;
}

private String getTargetTableName(List<? extends Row> actions) {
byte[] family = null;
for (Row action : actions) {
if (action instanceof RowMutations || action instanceof RegionCoprocessorServiceExec) {
throw new FeatureNotSupportedException("not supported yet'");
} else {
Mutation mutation = (Mutation) action;
if (mutation.getFamilyCellMap().size() != 1) {
Set<byte[]> familySet = null;
if (action instanceof Get){
Get get = (Get) action;
familySet = get.familySet();
} else {
Mutation mutation = (Mutation) action;
familySet = mutation.getFamilyCellMap().keySet();
}
if (familySet == null) {
throw new ObTableUnexpectedException("Fail to get family set in action");
}
if (familySet.size() != 1) {
return getTargetTableName(tableNameString);
} else {
byte[] nextFamily = mutation.getFamilyCellMap().keySet().iterator().next();
byte[] nextFamily = familySet.iterator().next();
if (family != null && !Arrays.equals(family, nextFamily)) {
return getTargetTableName(tableNameString);
} else if (family == null) {
Expand Down Expand Up @@ -825,8 +899,8 @@ private String getTargetTableName(String tableNameString) {
return tableNameString;
}

// To enable the server to identify the column family to which a qualifier belongs,
// the client writes the column family name into the qualifier.
// To enable the server to identify the column family to which a qualifier belongs,
// the client writes the column family name into the qualifier.
// The server then parses this information to determine the table that needs to be operated on.
private void processColumnFilters(NavigableSet<byte[]> columnFilters,
Map<byte[], NavigableSet<byte[]>> familyMap) {
Expand Down Expand Up @@ -862,8 +936,8 @@ public Result call() throws IOException {
if (get.getFamilyMap().keySet() == null
|| get.getFamilyMap().keySet().isEmpty()
|| get.getFamilyMap().size() > 1) {
// In a Get operation where the family map is greater than 1 or equal to 0,
// we handle this by appending the column family to the qualifier on the client side.
// In a Get operation where the family map is greater than 1 or equal to 0,
// we handle this by appending the column family to the qualifier on the client side.
// The server can then use this information to filter the appropriate column families and qualifiers.
if (!get.getColumnFamilyTimeRange().isEmpty()) {
throw new FeatureNotSupportedException("setColumnFamilyTimeRange is only supported in single column family for now");
Expand Down Expand Up @@ -920,8 +994,12 @@ public Result call() throws IOException {
@Override
public Result[] get(List<Get> gets) throws IOException {
Result[] results = new Result[gets.size()];
for (int i = 0; i < gets.size(); i++) {
results[i] = get(gets.get(i));
if (ObGlobal.isHBaseBatchGetSupport()) { // get only supported in BatchSupport version
batch(gets, results);
} else {
for (int i = 0; i < gets.size(); i++) {
results[i] = get(gets.get(i));
}
}
return results;
}
Expand All @@ -948,8 +1026,8 @@ public ResultScanner call() throws IOException {
if (scan.getFamilyMap().keySet() == null
|| scan.getFamilyMap().keySet().isEmpty()
|| scan.getFamilyMap().size() > 1) {
// In a Scan operation where the family map is greater than 1 or equal to 0,
// we handle this by appending the column family to the qualifier on the client side.
// In a Scan operation where the family map is greater than 1 or equal to 0,
// we handle this by appending the column family to the qualifier on the client side.
// The server can then use this information to filter the appropriate column families and qualifiers.
if (!scan.getColumnFamilyTimeRange().isEmpty()) {
throw new FeatureNotSupportedException("setColumnFamilyTimeRange is only supported in single column family for now");
Expand Down Expand Up @@ -1464,7 +1542,6 @@ public void close() throws IOException {
if (cleanupPoolOnClose) {
executePool.shutdown();
}
ObTableClientManager.clear();
}

@Override
Expand Down Expand Up @@ -1514,6 +1591,8 @@ public <R extends Message> void batchCoprocessorService(Descriptors.MethodDescri
@Override
public void setOperationTimeout(int operationTimeout) {
this.operationTimeout = operationTimeout;
this.obTableClient.setRuntimeMaxWait(operationTimeout);
this.obTableClient.setRuntimeBatchMaxWait(operationTimeout);
this.operationExecuteInPool = this.configuration.getBoolean(
HBASE_CLIENT_OPERATION_EXECUTE_IN_POOL,
(this.operationTimeout != HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT));
Expand Down Expand Up @@ -1834,21 +1913,62 @@ private KeyValue modifyQualifier(Cell original, byte[] newQualifier) {
byte[] value = CellUtil.cloneValue(original);
long timestamp = original.getTimestamp();
KeyValue.Type type = KeyValue.Type.codeToType(original.getType().getCode());
// Create a new KeyValue with the modified qualifier
// Create a new KeyValue with the modified qualifier
return new KeyValue(row, family, newQualifier, timestamp, type, value);
}

private BatchOperation buildBatchOperation(String tableName, List<? extends Row> actions,
boolean isTableGroup, List<Integer> resultMapSingleOp)
throws FeatureNotSupportedException,
IllegalArgumentException {
IllegalArgumentException,
IOException {
BatchOperation batch = obTableClient.batchOperation(tableName);
int posInList = -1;
int singleOpResultNum;
for (Row row : actions) {
singleOpResultNum = 0;
posInList++;
if (row instanceof Put) {
if (row instanceof Get) {
if (!ObGlobal.isHBaseBatchGetSupport()) {
throw new FeatureNotSupportedException("server does not support batch get");
}
++singleOpResultNum;
Get get = (Get) row;
ObTableQuery obTableQuery;
// In a Get operation in ls batch, we need to determine whether the get is a table-group operation or not,
// we handle this by appending the column family to the qualifier on the client side.
// The server can then use this information to filter the appropriate column families and qualifiers.
if ((get.getFamilyMap().keySet().isEmpty()
|| get.getFamilyMap().size() > 1) &&
!get.getColumnFamilyTimeRange().isEmpty()) {
throw new FeatureNotSupportedException("setColumnFamilyTimeRange is only supported in single column family for now");
} else if (get.getFamilyMap().size() == 1 && !get.getColumnFamilyTimeRange().isEmpty()) {
byte[] family = get.getFamilyMap().keySet().iterator().next();
Map<byte[], TimeRange> colFamTimeRangeMap = get.getColumnFamilyTimeRange();
if (colFamTimeRangeMap.size() > 1) {
throw new FeatureNotSupportedException("setColumnFamilyTimeRange is only supported in single column family for now");
} else if (colFamTimeRangeMap.get(family) == null) {
throw new IllegalArgumentException("Get family is not matched in ColumnFamilyTimeRange");
} else {
TimeRange tr = colFamTimeRangeMap.get(family);
get.setTimeRange(tr.getMin(), tr.getMax());
}
}
NavigableSet<byte[]> columnFilters = new TreeSet<>(Bytes.BYTES_COMPARATOR);
// in batch get, we need to carry family in qualifier to server even this get is a single-cf operation
// because the entire batch may be a multi-cf batch so do not carry family
// family in qualifier helps us to know which table to query
processColumnFilters(columnFilters, get.getFamilyMap());
obTableQuery = buildObTableQuery(get, columnFilters);
ObTableClientQueryImpl query = new ObTableClientQueryImpl(tableName, obTableQuery, obTableClient);
try {
query.setRowKey(row(colVal("K", Bytes.toString(get.getRow())), colVal("Q", null), colVal("T", null)));
} catch (Exception e) {
logger.error("unexpected error occurs when set row key", e);
throw new IOException(e);
}
batch.addOperation(query);
} else if (row instanceof Put) {
Put put = (Put) row;
if (put.isEmpty()) {
throw new IllegalArgumentException("No columns to insert for #"
Expand Down Expand Up @@ -1882,7 +2002,7 @@ private BatchOperation buildBatchOperation(String tableName, List<? extends Row>
}
} else {
throw new FeatureNotSupportedException(
"not supported other type in batch yet,only support put and delete");
"not supported other type in batch yet,only support get, put and delete");
}
resultMapSingleOp.add(singleOpResultNum);
}
Expand Down Expand Up @@ -1983,8 +2103,8 @@ public static void checkFamilyViolation(Collection<byte[]> families, boolean che
}
}

// This method is currently only used for append and increment operations.
// It restricts these two methods to use multi-column family operations.
// This method is currently only used for append and increment operations.
// It restricts these two methods to use multi-column family operations.
// Note: After completing operations on multiple column families, they are deleted using the method described above.
public static void checkFamilyViolationForOneFamily(Collection<byte[]> families) {
if (families == null || families.size() == 0) {
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -126,18 +126,6 @@ public static ObTableClient getOrCreateObTableClient(ObTableClientKey obTableCli
return OB_TABLE_CLIENT_INSTANCE.get(obTableClientKey);
}

public static void clear() throws IOException {
try {
for (Map.Entry<ObTableClientKey, ObTableClient> pair : OB_TABLE_CLIENT_INSTANCE.entrySet()) {
pair.getValue().close();
}
}
catch (Exception e) {
throw new IOException("fail to close tableClient" , e);
}
OB_TABLE_CLIENT_INSTANCE.clear();
}

public static class ObTableClientKey {
private String paramUrl;
private String fullUserName;
Expand Down
Loading