Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
231 changes: 187 additions & 44 deletions src/main/java/com/alipay/oceanbase/hbase/OHTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -489,9 +489,27 @@ private String getTargetTableName(String tableNameString) {
return tableNameString;
}

// To enable the server to identify the column family to which a qualifier belongs,
// the client writes the column family name into the qualifier.
// The server then parses this information to determine the table that needs to be operated on.
private void processColumnFilters(NavigableSet<byte[]> columnFilters, Map<byte[], NavigableSet<byte[]>> familyMap) {
for (Map.Entry<byte[], NavigableSet<byte[]>> entry : familyMap.entrySet()) {
if (entry.getValue() != null) {
for (byte[] columnName : entry.getValue()) {
String columnNameStr = Bytes.toString(columnName);
columnNameStr = Bytes.toString(entry.getKey()) + "." + columnNameStr;
columnFilters.add(columnNameStr.getBytes());
}
} else {
String columnNameStr = Bytes.toString(entry.getKey()) + ".";
columnFilters.add(columnNameStr.getBytes());
}
}
}

@Override
public Result get(final Get get) throws IOException {
if (get.getFamilyMap().keySet() == null || get.getFamilyMap().keySet().size() == 0) {
if (get.getFamilyMap().keySet() == null || get.getFamilyMap().keySet().isEmpty()) {
// check nothing, use table group;
} else {
checkFamilyViolation(get.getFamilyMap().keySet());
Expand All @@ -502,29 +520,33 @@ public Result get(final Get get) throws IOException {
public Result call() throws IOException {
List<KeyValue> keyValueList = new ArrayList<KeyValue>();
byte[] family = new byte[] {};
ObTableClientQueryStreamResult clientQueryStreamResult;
ObTableQueryRequest request;
ObTableClientQueryAsyncStreamResult clientQueryStreamResult;
ObTableQueryAsyncRequest request;
ObTableQuery obTableQuery;
try {
if (get.getFamilyMap().keySet() == null
|| get.getFamilyMap().keySet().size() == 0) {
obTableQuery = buildObTableQuery(get, null);
request = buildObTableQueryRequest(obTableQuery,
|| get.getFamilyMap().keySet().isEmpty()
|| get.getFamilyMap().size() > 1) {
Comment thread
shenyunlong marked this conversation as resolved.
// In a Get operation where the family map is greater than 1 or equal to 0,
// we handle this by appending the column family to the qualifier on the client side.
// The server can then use this information to filter the appropriate column families and qualifiers.
NavigableSet<byte[]> columnFilters = new TreeSet<>(Bytes.BYTES_COMPARATOR);
processColumnFilters(columnFilters, get.getFamilyMap());
obTableQuery = buildObTableQuery(get, columnFilters);
request = buildObTableQueryAsyncRequest(obTableQuery,
getTargetTableName(tableNameString));

clientQueryStreamResult = (ObTableClientQueryStreamResult) obTableClient
clientQueryStreamResult = (ObTableClientQueryAsyncStreamResult) obTableClient
.execute(request);
getKeyValueFromResult(clientQueryStreamResult, keyValueList, true, family);
} else {
for (Map.Entry<byte[], NavigableSet<byte[]>> entry : get.getFamilyMap()
.entrySet()) {
family = entry.getKey();
obTableQuery = buildObTableQuery(get, entry.getValue());
request = buildObTableQueryRequest(
obTableQuery,
getTargetTableName(tableNameString, Bytes.toString(family),
configuration));
clientQueryStreamResult = (ObTableClientQueryStreamResult) obTableClient
request = buildObTableQueryAsyncRequest(obTableQuery,
getTargetTableName(tableNameString, Bytes.toString(family)));
clientQueryStreamResult = (ObTableClientQueryAsyncStreamResult) obTableClient
.execute(request);
getKeyValueFromResult(clientQueryStreamResult, keyValueList, false,
family);
Expand Down Expand Up @@ -581,9 +603,16 @@ public ResultScanner call() throws IOException {
ObTableQuery obTableQuery;
ObHTableFilter filter;
try {
if (scan.getFamilyMap().keySet().isEmpty()) {
if (scan.getFamilyMap().keySet() == null
|| scan.getFamilyMap().keySet().isEmpty()
Comment thread
shenyunlong marked this conversation as resolved.
|| scan.getFamilyMap().size() > 1) {
// In a Scan operation where the family map is greater than 1 or equal to 0,
// we handle this by appending the column family to the qualifier on the client side.
// The server can then use this information to filter the appropriate column families and qualifiers.
NavigableSet<byte[]> columnFilters = new TreeSet<>(Bytes.BYTES_COMPARATOR);
processColumnFilters(columnFilters, scan.getFamilyMap());
filter = buildObHTableFilter(scan.getFilter(), scan.getTimeRange(),
scan.getMaxVersions(), null);
scan.getMaxVersions(), columnFilters);
obTableQuery = buildObTableQuery(filter, scan);

request = buildObTableQueryAsyncRequest(obTableQuery,
Expand Down Expand Up @@ -744,18 +773,47 @@ public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier,

private void innerDelete(Delete delete) throws IOException {
checkArgument(delete.getRow() != null, "row is null");
checkArgument(!delete.isEmpty(), "delete is empty");
List<Integer> errorCodeList = new ArrayList<Integer>();
BatchOperationResult results = null;

try {
checkFamilyViolation(delete.getFamilyMap().keySet());
if (delete.getFamilyMap().isEmpty()) {
// For a Delete operation without any qualifiers, we construct a DeleteFamily request.
// The server then performs the operation on all column families.
KeyValue kv = new KeyValue(delete.getRow(), delete.getTimeStamp(),
KeyValue.Type.DeleteFamily);

BatchOperation batch = buildBatchOperation(tableNameString, Arrays.asList(kv), false, null);
results = batch.execute();
} else if (delete.getFamilyMap().size() > 1) {
// Currently, the Delete Family operation type cannot transmit qualifiers to the server.
// As a result, the server cannot identify which families need to be deleted.
// Therefore, this process is handled sequentially.
boolean has_delete_family = delete.getFamilyMap().entrySet().stream()
.flatMap(entry -> entry.getValue().stream())
.anyMatch(kv -> KeyValue.Type.codeToType(kv.getType()) == KeyValue.Type.DeleteFamily);
if (!has_delete_family) {
BatchOperation batch = buildBatchOperation(tableNameString,
delete.getFamilyMap(), false, null);
results = batch.execute();
} else {
for (Map.Entry<byte[], List<KeyValue>> entry : delete.getFamilyMap().entrySet()) {
Comment thread
shenyunlong marked this conversation as resolved.
BatchOperation batch = buildBatchOperation(
getTargetTableName(tableNameString, Bytes.toString(entry.getKey())),
entry.getValue(), false, null);
results = batch.execute();
}
}
} else {
Map.Entry<byte[], List<KeyValue>> entry = delete.getFamilyMap().entrySet().iterator()
.next();

Map.Entry<byte[], List<KeyValue>> entry = delete.getFamilyMap().entrySet().iterator()
.next();

BatchOperation batch = buildBatchOperation(
getTargetTableName(tableNameString, Bytes.toString(entry.getKey()), configuration),
entry.getValue(), false, null);
BatchOperationResult results = batch.execute();
BatchOperation batch = buildBatchOperation(
getTargetTableName(tableNameString, Bytes.toString(entry.getKey())),
entry.getValue(), false, null);
results = batch.execute();
}

errorCodeList = results.getErrorCodeList();
boolean hasError = results.hasError();
Expand Down Expand Up @@ -842,7 +900,7 @@ private boolean checkAndMutation(byte[] row, byte[] family, byte[] qualifier, Co
List<KeyValue> keyValueList = new LinkedList<>();
// only one family operation is allowed
for (Mutation mutation : mutations) {
checkFamilyViolation(mutation.getFamilyMap().keySet());
checkFamilyViolationForOneFamily(mutation.getFamilyMap().keySet());
checkArgument(Arrays.equals(family, mutation.getFamilyMap().firstEntry().getKey()),
"mutation family is not equal check family");
// Support for multiple families in the future
Expand Down Expand Up @@ -876,7 +934,7 @@ public void mutateRow(RowMutations rm) {
@Override
public Result append(Append append) throws IOException {

checkFamilyViolation(append.getFamilyMap().keySet());
checkFamilyViolationForOneFamily(append.getFamilyMap().keySet());
checkArgument(!append.isEmpty(), "append is empty.");
try {
byte[] r = append.getRow();
Expand Down Expand Up @@ -926,7 +984,7 @@ public Result append(Append append) throws IOException {
@Override
public Result increment(Increment increment) throws IOException {

checkFamilyViolation(increment.getFamilyMap().keySet());
checkFamilyViolationForOneFamily(increment.getFamilyMap().keySet());

try {
List<byte[]> qualifiers = new ArrayList<byte[]>();
Expand Down Expand Up @@ -1048,18 +1106,39 @@ public void flushCommits() throws IOException {
for (int i = 0; i < writeBuffer.size(); i++) {
Put aPut = writeBuffer.get(i);
Map<byte[], List<KeyValue>> innerFamilyMap = aPut.getFamilyMap();
// multi family can not ensure automatic
for (Map.Entry<byte[], List<KeyValue>> entry : innerFamilyMap.entrySet()) {
String family = Bytes.toString(entry.getKey());
Pair<List<Integer>, List<KeyValue>> keyValueWithIndex = familyMap
.get(family);
if (keyValueWithIndex == null) {
keyValueWithIndex = new Pair<List<Integer>, List<KeyValue>>(
new ArrayList<Integer>(), new ArrayList<KeyValue>());
familyMap.put(family, keyValueWithIndex);
if (innerFamilyMap.size() > 1) {
// Bypass logic: directly construct BatchOperation for puts with family map size > 1
try {
BatchOperation batch = buildBatchOperation(this.tableNameString,
innerFamilyMap, false, null);
BatchOperationResult results = batch.execute();

boolean hasError = results.hasError();
resultSuccess[i] = !hasError;
if (hasError) {
throw results.getFirstException();
}
} catch (Exception e) {
logger.error(LCD.convert("01-00008"), tableNameString, null, autoFlush,
writeBuffer.size(), e);
throw new IOException("put table " + tableNameString + " error codes "
+ null + "auto flush " + autoFlush
+ " current buffer size " + writeBuffer.size(), e);
}
} else {
// Existing logic for puts with family map size = 1
for (Map.Entry<byte[], List<KeyValue>> entry : innerFamilyMap.entrySet()) {
String family = Bytes.toString(entry.getKey());
Pair<List<Integer>, List<KeyValue>> keyValueWithIndex = familyMap
.get(family);
if (keyValueWithIndex == null) {
keyValueWithIndex = new Pair<List<Integer>, List<KeyValue>>(
new ArrayList<Integer>(), new ArrayList<KeyValue>());
familyMap.put(family, keyValueWithIndex);
}
keyValueWithIndex.getFirst().add(i);
keyValueWithIndex.getSecond().addAll(entry.getValue());
}
keyValueWithIndex.getFirst().add(i);
keyValueWithIndex.getSecond().addAll(entry.getValue());
}
}
for (Map.Entry<String, Pair<List<Integer>, List<KeyValue>>> entry : familyMap
Expand Down Expand Up @@ -1339,7 +1418,7 @@ private ObHTableFilter buildObHTableFilter(Filter filter, TimeRange timeRange, i
ObHTableFilter obHTableFilter = new ObHTableFilter();

if (filter != null) {
obHTableFilter.setFilterString(HBaseFilterUtils.toParseableString(filter));
obHTableFilter.setFilterString(HBaseFilterUtils.toParseableString(filter).getBytes());
}

if (timeRange != null) {
Expand Down Expand Up @@ -1379,7 +1458,7 @@ private ObHTableFilter buildObHTableFilter(String filterString, TimeRange timeRa
ObHTableFilter obHTableFilter = new ObHTableFilter();

if (filterString != null) {
obHTableFilter.setFilterString(filterString);
obHTableFilter.setFilterString(filterString.getBytes());
}

if (timeRange != null) {
Expand Down Expand Up @@ -1499,6 +1578,29 @@ public static ObTableBatchOperation buildObTableBatchOperation(List<KeyValue> ke
return batch;
}

private ObTableBatchOperation buildObTableBatchOperation(Map<byte[], List<KeyValue>> familyMap,
boolean putToAppend,
List<byte[]> qualifiers) {
ObTableBatchOperation batch = new ObTableBatchOperation();
for (Map.Entry<byte[], List<KeyValue>> entry : familyMap.entrySet()) {
byte[] family = entry.getKey();
List<KeyValue> keyValueList = entry.getValue();
for (KeyValue kv : keyValueList) {
if (qualifiers != null) {
qualifiers
.add((Bytes.toString(family) + "." + Bytes.toString(kv.getQualifier()))
.getBytes());
}
KeyValue new_kv = modifyQualifier(kv,
(Bytes.toString(family) + "." + Bytes.toString(kv.getQualifier())).getBytes());
batch.addTableOperation(buildObTableOperation(new_kv, putToAppend));
}
}
batch.setSameType(true);
batch.setSamePropertiesNames(true);
return batch;
}

private com.alipay.oceanbase.rpc.mutation.Mutation buildMutation(KeyValue kv,
boolean putToAppend) {
KeyValue.Type kvType = KeyValue.Type.codeToType(kv.getType());
Expand Down Expand Up @@ -1529,6 +1631,40 @@ private com.alipay.oceanbase.rpc.mutation.Mutation buildMutation(KeyValue kv,
throw new IllegalArgumentException("illegal mutation type " + kvType);
}
}
private KeyValue modifyQualifier(KeyValue original, byte[] newQualifier) {
// Extract existing components
byte[] row = original.getRow();
byte[] family = original.getFamily();
byte[] value = original.getValue();
long timestamp = original.getTimestamp();
byte type = original.getTypeByte();
// Create a new KeyValue with the modified qualifier
return new KeyValue(row, family, newQualifier, timestamp, KeyValue.Type.codeToType(type),
value);
}

private BatchOperation buildBatchOperation(String tableName,
Map<byte[], List<KeyValue>> familyMap,
boolean putToAppend, List<byte[]> qualifiers) {
BatchOperation batch = obTableClient.batchOperation(tableName);

for (Map.Entry<byte[], List<KeyValue>> entry : familyMap.entrySet()) {
byte[] family = entry.getKey();
List<KeyValue> keyValueList = entry.getValue();
for (KeyValue kv : keyValueList) {
if (qualifiers != null) {
qualifiers.add(kv.getQualifier());
}
KeyValue new_kv = modifyQualifier(kv,
(Bytes.toString(family) + "." + Bytes.toString(kv.getQualifier())).getBytes());
batch.addOperation(buildMutation(new_kv, putToAppend));
}
}

batch.setEntityType(ObTableEntityType.HKV);
return batch;
}


private BatchOperation buildBatchOperation(String tableName, List<KeyValue> keyValueList,
boolean putToAppend, List<byte[]> qualifiers) {
Expand Down Expand Up @@ -1618,24 +1754,31 @@ private ObTableQueryAndMutateRequest buildObTableQueryAndMutateRequest(ObTableQu
return request;
}

public static void checkFamilyViolation(Collection<byte[]> families) {
private void checkFamilyViolation(Collection<byte[]> families) {
for (byte[] family : families) {
if (isBlank(Bytes.toString(family))) {
throw new IllegalArgumentException("family is blank");
}
}
}


// This method is currently only used for append and increment operations.
// It restricts these two methods to use multi-column family operations.
// Note: After completing operations on multiple column families, they are deleted using the method described above.
private void checkFamilyViolationForOneFamily(Collection<byte[]> families) {
if (families == null || families.size() == 0) {
throw new FeatureNotSupportedException("family is empty.");
}

if (families.size() > 1) {
throw new FeatureNotSupportedException("multi family is not supported yet.");
}

for (byte[] family : families) {
if (family == null || family.length == 0) {
throw new IllegalArgumentException("family is empty");
}
if (isBlank(Bytes.toString(family))) {
throw new IllegalArgumentException("family is blank");
}
}

}

public void refreshTableEntry(String familyString, boolean hasTestLoad) throws Exception {
Expand Down
Loading