Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 22 additions & 6 deletions src/main/java/com/alipay/oceanbase/hbase/OHTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -2127,11 +2127,13 @@ private ObTableQuery buildObTableQuery(final Get get, Collection<byte[]> columnQ

/**
* Check if the Get or Scan operation is configured for weak read.
* If the query does not have HBASE_HTABLE_READ_CONSISTENCY attribute set,
* it will fall back to the global configuration.
*
* @param query the Get or Scan object to check
* @return true if weak read is enabled, false otherwise
*/
public static boolean isWeakRead(Object query) {
public boolean isWeakRead(Object query) {
if (query == null) {
return false;
}
Expand All @@ -2143,11 +2145,16 @@ public static boolean isWeakRead(Object query) {
} else {
return false;
}
String consistencyStr;
if (consistency == null) {
return false;
// fall back to global configuration
consistencyStr = configuration.get(HBASE_HTABLE_READ_CONSISTENCY);
if (consistencyStr == null) {
return false;
}
} else {
consistencyStr = Bytes.toString(consistency);
}
String consistencyStr = Bytes.toString(consistency);
System.out.println("consistencyStr: " + consistencyStr);
return "weak".equalsIgnoreCase(consistencyStr);
}

Expand Down Expand Up @@ -2360,10 +2367,14 @@ private BatchOperation buildBatchOperation(String tableName, List<? extends Row>
BatchOperation batch = obTableClient.batchOperation(tableName);
int posInList = -1;
int singleOpResultNum;
int getOperationNum = 0;
// allGetIsWeakRead is initialized to true, and set to false if any Get is not weak read
boolean allGetIsWeakRead = true;
for (Row row : actions) {
singleOpResultNum = 0;
posInList++;
if (row instanceof Get) {
getOperationNum++;
if (!ObGlobal.isHBaseBatchGetSupport()) {
throw new FeatureNotSupportedException("server does not support batch get");
}
Expand Down Expand Up @@ -2398,8 +2409,9 @@ private BatchOperation buildBatchOperation(String tableName, List<? extends Row>
ObTableClientQueryImpl query = new ObTableClientQueryImpl(tableName, obTableQuery, obTableClient);
try {
query.setRowKey(row(colVal("K", Bytes.toString(get.getRow())), colVal("Q", null), colVal("T", Integer.MAX_VALUE)));
if (isWeakRead(get)) {
batch.setReadConsistency(ObReadConsistency.WEAK);
// if any Get is not weak read, set allGetIsWeakRead to false
if (!isWeakRead(get)) {
allGetIsWeakRead = false;
}
} catch (Exception e) {
throw new IOException(e);
Expand Down Expand Up @@ -2495,6 +2507,10 @@ private BatchOperation buildBatchOperation(String tableName, List<? extends Row>
}
resultMapSingleOp.add(singleOpResultNum);
}
// only set weak read consistency when all operations are Get and all Get operations are weak read
if (getOperationNum == actions.size() && allGetIsWeakRead) {
batch.setReadConsistency(ObReadConsistency.WEAK);
}
batch.setEntityType(ObTableEntityType.HKV);
batch.setServerCanRetry(OHBaseFuncUtils.serverCanRetry(obTableClient));
batch.setNeedTabletId(OHBaseFuncUtils.needTabletId(obTableClient));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,9 +127,6 @@ public static ObTableClient getOrCreateObTableClient(ObTableClientKey obTableCli
if (connectionConfig.getRoutePolicy() != null) {
obTableClient.setRoutePolicy(ObRoutePolicy.getByName(connectionConfig.getRoutePolicy()));
}
if (connectionConfig.getGlobalWeakRead() != null) {
obTableClient.setReadConsistency(ObReadConsistency.getByName(connectionConfig.getGlobalWeakRead()));
}
obTableClient.init();
OB_TABLE_CLIENT_INSTANCE.put(obTableClientKey, obTableClient);
}
Expand Down
Loading