Skip to content
58 changes: 34 additions & 24 deletions src/main/java/com/alipay/oceanbase/hbase/OHTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -520,8 +520,10 @@ public Result call() throws IOException {
.entrySet()) {
family = entry.getKey();
obTableQuery = buildObTableQuery(get, entry.getValue());
request = buildObTableQueryRequest(obTableQuery,
getTargetTableName(tableNameString, Bytes.toString(family)));
request = buildObTableQueryRequest(
obTableQuery,
getTargetTableName(tableNameString, Bytes.toString(family),
configuration));
clientQueryStreamResult = (ObTableClientQueryStreamResult) obTableClient
.execute(request);
getKeyValueFromResult(clientQueryStreamResult, keyValueList, false,
Expand Down Expand Up @@ -598,8 +600,10 @@ public ResultScanner call() throws IOException {
scan.getMaxVersions(), entry.getValue());
obTableQuery = buildObTableQuery(filter, scan);

request = buildObTableQueryAsyncRequest(obTableQuery,
getTargetTableName(tableNameString, Bytes.toString(family)));
request = buildObTableQueryAsyncRequest(
obTableQuery,
getTargetTableName(tableNameString, Bytes.toString(family),
configuration));
clientQueryAsyncStreamResult = (ObTableClientQueryAsyncStreamResult) obTableClient
.execute(request);
return new ClientStreamScanner(clientQueryAsyncStreamResult,
Expand Down Expand Up @@ -749,7 +753,7 @@ private void innerDelete(Delete delete) throws IOException {
.next();

BatchOperation batch = buildBatchOperation(
getTargetTableName(tableNameString, Bytes.toString(entry.getKey())),
getTargetTableName(tableNameString, Bytes.toString(entry.getKey()), configuration),
entry.getValue(), false, null);
BatchOperationResult results = batch.execute();

Expand Down Expand Up @@ -851,7 +855,7 @@ private boolean checkAndMutation(byte[] row, byte[] family, byte[] qualifier, Co
ObTableBatchOperation batch = buildObTableBatchOperation(keyValueList, false, null);

ObTableQueryAndMutateRequest request = buildObTableQueryAndMutateRequest(obTableQuery,
batch, getTargetTableName(tableNameString, Bytes.toString(family)));
batch, getTargetTableName(tableNameString, Bytes.toString(family), configuration));
ObTableQueryAndMutateResult result = (ObTableQueryAndMutateResult) obTableClient
.execute(request);
return result.getAffectedRows() > 0;
Expand Down Expand Up @@ -889,7 +893,8 @@ public Result append(Append append) throws IOException {
queryAndMutate.setTableQuery(obTableQuery);
queryAndMutate.setMutations(batchOperation);
ObTableQueryAndMutateRequest request = buildObTableQueryAndMutateRequest(obTableQuery,
batchOperation, getTargetTableName(tableNameString, Bytes.toString(f)));
batchOperation,
getTargetTableName(tableNameString, Bytes.toString(f), configuration));
request.setReturningAffectedEntity(true);
ObTableQueryAndMutateResult result = (ObTableQueryAndMutateResult) obTableClient
.execute(request);
Expand Down Expand Up @@ -949,7 +954,7 @@ public Result increment(Increment increment) throws IOException {
queryAndMutate.setTableQuery(obTableQuery);

ObTableQueryAndMutateRequest request = buildObTableQueryAndMutateRequest(obTableQuery,
batch, getTargetTableName(tableNameString, Bytes.toString(f)));
batch, getTargetTableName(tableNameString, Bytes.toString(f), configuration));
request.setReturningAffectedEntity(true);
ObTableQueryAndMutateResult result = (ObTableQueryAndMutateResult) obTableClient
.execute(request);
Expand Down Expand Up @@ -999,7 +1004,7 @@ public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, lo
queryAndMutate.setTableQuery(obTableQuery);

ObTableQueryAndMutateRequest request = buildObTableQueryAndMutateRequest(obTableQuery,
batch, getTargetTableName(tableNameString, Bytes.toString(family)));
batch, getTargetTableName(tableNameString, Bytes.toString(family), configuration));
request.setReturningAffectedEntity(true);
ObTableQueryAndMutateResult result = (ObTableQueryAndMutateResult) obTableClient
.execute(request);
Expand Down Expand Up @@ -1063,7 +1068,7 @@ public void flushCommits() throws IOException {
.getSecond().size());
try {
String targetTableName = getTargetTableName(this.tableNameString,
entry.getKey());
entry.getKey(), configuration);

BatchOperation batch = buildBatchOperation(targetTableName, entry
.getValue().getSecond(), false, null);
Expand Down Expand Up @@ -1308,21 +1313,23 @@ <T> T executeServerCallable(final ServerCallable<T> serverCallable) throws IOExc
}
}

private String getTargetTableName(String tableNameString, String familyString) {
public static String getTargetTableName(String tableNameString, String familyString,
Configuration conf) {
checkArgument(tableNameString != null, "tableNameString is null");
checkArgument(familyString != null, "familyString is null");
if (configuration.getBoolean(HBASE_HTABLE_TEST_LOAD_ENABLE, false)) {
return getTestLoadTargetTableName(tableNameString, familyString);
if (conf.getBoolean(HBASE_HTABLE_TEST_LOAD_ENABLE, false)) {
return getTestLoadTargetTableName(tableNameString, familyString, conf);
}
return getNormalTargetTableName(tableNameString, familyString);
}

private String getNormalTargetTableName(String tableNameString, String familyString) {
private static String getNormalTargetTableName(String tableNameString, String familyString) {
return tableNameString + "$" + familyString;
}

private String getTestLoadTargetTableName(String tableNameString, String familyString) {
String suffix = configuration.get(HBASE_HTABLE_TEST_LOAD_SUFFIX,
private static String getTestLoadTargetTableName(String tableNameString, String familyString,
Configuration conf) {
String suffix = conf.get(HBASE_HTABLE_TEST_LOAD_SUFFIX,
DEFAULT_HBASE_HTABLE_TEST_LOAD_SUFFIX);
return tableNameString + suffix + "$" + familyString;
}
Expand Down Expand Up @@ -1477,9 +1484,9 @@ private ObTableQuery buildObTableQuery(final Get get, Collection<byte[]> columnQ
return obTableQuery;
}

private ObTableBatchOperation buildObTableBatchOperation(List<KeyValue> keyValueList,
boolean putToAppend,
List<byte[]> qualifiers) {
public static ObTableBatchOperation buildObTableBatchOperation(List<KeyValue> keyValueList,
boolean putToAppend,
List<byte[]> qualifiers) {
ObTableBatchOperation batch = new ObTableBatchOperation();
for (KeyValue kv : keyValueList) {
if (qualifiers != null) {
Expand Down Expand Up @@ -1536,7 +1543,7 @@ private BatchOperation buildBatchOperation(String tableName, List<KeyValue> keyV
return batch;
}

private ObTableOperation buildObTableOperation(KeyValue kv, boolean putToAppend) {
public static ObTableOperation buildObTableOperation(KeyValue kv, boolean putToAppend) {
KeyValue.Type kvType = KeyValue.Type.codeToType(kv.getType());
switch (kvType) {
case Put:
Expand Down Expand Up @@ -1585,13 +1592,15 @@ private ObTableQueryAsyncRequest buildObTableQueryAsyncRequest(ObTableQuery obTa
return asyncRequest;
}

private ObTableBatchOperationRequest buildObTableBatchOperationRequest(ObTableBatchOperation obTableBatchOperation,
String targetTableName) {
public static ObTableBatchOperationRequest buildObTableBatchOperationRequest(ObTableBatchOperation obTableBatchOperation,
String targetTableName,
ExecutorService pool) {
ObTableBatchOperationRequest request = new ObTableBatchOperationRequest();
request.setTableName(targetTableName);
request.setReturningAffectedRows(true);
request.setEntityType(ObTableEntityType.HKV);
request.setBatchOperation(obTableBatchOperation);
request.setPool(pool);
return request;
}

Expand All @@ -1609,7 +1618,7 @@ private ObTableQueryAndMutateRequest buildObTableQueryAndMutateRequest(ObTableQu
return request;
}

private void checkFamilyViolation(Collection<byte[]> families) {
public static void checkFamilyViolation(Collection<byte[]> families) {
if (families == null || families.size() == 0) {
throw new FeatureNotSupportedException("family is empty.");
}
Expand Down Expand Up @@ -1637,7 +1646,8 @@ public void refreshTableEntry(String familyString, boolean hasTestLoad) throws E
getNormalTargetTableName(tableNameString, familyString), true, true);
if (hasTestLoad) {
this.obTableClient.getOrRefreshTableEntry(
getTestLoadTargetTableName(tableNameString, familyString), true, true);
getTestLoadTargetTableName(tableNameString, familyString, configuration), true,
true);
}
}

Expand Down
Loading