diff --git a/src/main/java/com/alipay/oceanbase/hbase/OHTable.java b/src/main/java/com/alipay/oceanbase/hbase/OHTable.java index e5a1f42d..48ee7470 100644 --- a/src/main/java/com/alipay/oceanbase/hbase/OHTable.java +++ b/src/main/java/com/alipay/oceanbase/hbase/OHTable.java @@ -210,7 +210,7 @@ public OHTable(Configuration configuration, String tableName) throws IOException this.tableNameString, ohConnectionConf)); this.obTableClient.setRpcExecuteTimeout(ohConnectionConf.getRpcTimeout()); this.obTableClient.setRuntimeRetryTimes(numRetries); - setOperationTimeout(ohConnectionConf.getOperationTimeout()); + setOperationTimeout(ohConnectionConf.getClientOperationTimeout()); finishSetUp(); } @@ -262,7 +262,7 @@ public OHTable(Configuration configuration, final byte[] tableName, this.tableNameString, ohConnectionConf)); this.obTableClient.setRpcExecuteTimeout(ohConnectionConf.getRpcTimeout()); this.obTableClient.setRuntimeRetryTimes(numRetries); - setOperationTimeout(ohConnectionConf.getOperationTimeout()); + setOperationTimeout(ohConnectionConf.getClientOperationTimeout()); finishSetUp(); } @@ -318,7 +318,7 @@ public OHTable(TableName tableName, Connection connection, } this.rpcTimeout = connectionConfig.getRpcTimeout(); - this.operationTimeout = connectionConfig.getOperationTimeout(); + this.operationTimeout = connectionConfig.getClientOperationTimeout(); this.operationExecuteInPool = this.configuration.getBoolean( HBASE_CLIENT_OPERATION_EXECUTE_IN_POOL, (this.operationTimeout != HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT)); @@ -2286,6 +2286,7 @@ private BatchOperation buildBatchOperation(String tableName, List resultMapSingleOp.add(singleOpResultNum); } batch.setEntityType(ObTableEntityType.HKV); + batch.setServerCanRetry(OHBaseFuncUtils.serverCanRetry(obTableClient)); return batch; } @@ -2346,6 +2347,7 @@ private ObHbaseRequest buildHbaseRequest(List actions) request.setKeys(keys); request.setOpType(opType); request.setCfRows(cfRowsArray); + request.setServerCanRetry(OHBaseFuncUtils.serverCanRetry(obTableClient)); return request; } @@ -2389,6 +2391,7 @@ private ObTableQueryRequest buildObTableQueryRequest(ObTableQuery obTableQuery, request.setEntityType(ObTableEntityType.HKV); request.setTableQuery(obTableQuery); request.setTableName(targetTableName); + request.setServerCanRetry(OHBaseFuncUtils.serverCanRetry(obTableClient)); return request; } @@ -2402,6 +2405,7 @@ private ObTableQueryAsyncRequest buildObTableQueryAsyncRequest(ObTableQuery obTa asyncRequest.setEntityType(ObTableEntityType.HKV); asyncRequest.setTableName(targetTableName); asyncRequest.setObTableQueryRequest(request); + asyncRequest.setServerCanRetry(OHBaseFuncUtils.serverCanRetry(obTableClient)); return asyncRequest; } @@ -2416,6 +2420,7 @@ private ObTableQueryAndMutateRequest buildObTableQueryAndMutateRequest(ObTableQu request.setTableQueryAndMutate(queryAndMutate); request.setEntityType(ObTableEntityType.HKV); request.setReturningAffectedEntity(true); + request.setServerCanRetry(OHBaseFuncUtils.serverCanRetry(obTableClient)); return request; } diff --git a/src/main/java/com/alipay/oceanbase/hbase/util/OHAdmin.java b/src/main/java/com/alipay/oceanbase/hbase/util/OHAdmin.java index 6b4909a6..4a13e3a2 100644 --- a/src/main/java/com/alipay/oceanbase/hbase/util/OHAdmin.java +++ b/src/main/java/com/alipay/oceanbase/hbase/util/OHAdmin.java @@ -29,12 +29,10 @@ import java.io.IOException; import java.util.ArrayList; -import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.Future; import java.util.regex.Pattern; -import java.util.stream.Collectors; import static com.alipay.oceanbase.rpc.protocol.payload.ResultCodes.*; @@ -92,7 +90,7 @@ private void handleObTableException(Exception e, TableName tableName, ExceptionH @Override public int getOperationTimeout() { - return connection.getOHConnectionConfiguration().getOperationTimeout(); + return connection.getOHConnectionConfiguration().getClientOperationTimeout(); } @Override diff --git a/src/main/java/com/alipay/oceanbase/hbase/util/OHBaseFuncUtils.java b/src/main/java/com/alipay/oceanbase/hbase/util/OHBaseFuncUtils.java index c57b2cc2..1fac2b2c 100644 --- a/src/main/java/com/alipay/oceanbase/hbase/util/OHBaseFuncUtils.java +++ b/src/main/java/com/alipay/oceanbase/hbase/util/OHBaseFuncUtils.java @@ -97,4 +97,14 @@ public int compare(KeyValue kv1, KeyValue kv2) { } }); } + + public static boolean serverCanRetry(ObTableClient tableClient) { + if (tableClient.isOdpMode()) { + // ODP mode needs to check proxy version + return ObGlobal.OB_PROXY_VERSION >= ObGlobal.OB_PROXY_VERSION_4_3_6_0; + } else { + // OCP mode directly return true, server will do the check + return true; + } + } } diff --git a/src/main/java/com/alipay/oceanbase/hbase/util/OHBufferedMutatorImpl.java b/src/main/java/com/alipay/oceanbase/hbase/util/OHBufferedMutatorImpl.java index 69818e84..fb4cbc9c 100644 --- a/src/main/java/com/alipay/oceanbase/hbase/util/OHBufferedMutatorImpl.java +++ b/src/main/java/com/alipay/oceanbase/hbase/util/OHBufferedMutatorImpl.java @@ -70,7 +70,7 @@ public OHBufferedMutatorImpl(OHConnectionImpl ohConnection, BufferedMutatorParam OHConnectionConfiguration connectionConfig = ohConnection.getOHConnectionConfiguration(); this.pool = params.getPool(); this.rpcTimeout = connectionConfig.getRpcTimeout(); - this.operationTimeout = connectionConfig.getOperationTimeout(); + this.operationTimeout = connectionConfig.getClientOperationTimeout(); this.writeBufferSize = params.getWriteBufferSize() != OHConnectionImpl.BUFFERED_PARAM_UNSET ? params .getWriteBufferSize() : connectionConfig.getWriteBufferSize(); @@ -103,7 +103,7 @@ public OHBufferedMutatorImpl(Configuration conf, BufferedMutatorParams params, O OHConnectionConfiguration connectionConfig = new OHConnectionConfiguration(conf); this.pool = params.getPool(); this.rpcTimeout = connectionConfig.getRpcTimeout(); - this.operationTimeout = connectionConfig.getOperationTimeout(); + this.operationTimeout = connectionConfig.getClientOperationTimeout(); this.writeBufferSize = params.getWriteBufferSize() != OHConnectionImpl.BUFFERED_PARAM_UNSET ? params .getWriteBufferSize() : connectionConfig.getWriteBufferSize(); diff --git a/src/main/java/com/alipay/oceanbase/hbase/util/OHConnectionConfiguration.java b/src/main/java/com/alipay/oceanbase/hbase/util/OHConnectionConfiguration.java index 0685cccf..361bacce 100644 --- a/src/main/java/com/alipay/oceanbase/hbase/util/OHConnectionConfiguration.java +++ b/src/main/java/com/alipay/oceanbase/hbase/util/OHConnectionConfiguration.java @@ -42,7 +42,8 @@ public class OHConnectionConfiguration { private final int odpPort; private final boolean odpMode; private final long writeBufferSize; - private final int operationTimeout; + private final int clientOperationTimeout; + private final int serverOperationTimeout; private final int scannerCaching; private final long scannerMaxResultSize; private final int maxKeyValueSize; @@ -65,7 +66,9 @@ public OHConnectionConfiguration(Configuration conf) { } this.database = database; this.writeBufferSize = conf.getLong(WRITE_BUFFER_SIZE_KEY, WRITE_BUFFER_SIZE_DEFAULT); - this.operationTimeout = conf.getInt("hbase.client.operation.timeout", 1200000); + this.clientOperationTimeout = conf.getInt("hbase.client.operation.timeout", 1200000); + this.serverOperationTimeout = conf.getInt(HConstants.HBASE_RPC_SHORTOPERATION_TIMEOUT_KEY, + HConstants.DEFAULT_HBASE_RPC_SHORTOPERATION_TIMEOUT); this.rpcTimeout = conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT); int rpcConnectTimeout = -1; @@ -107,8 +110,12 @@ public long getWriteBufferSize() { return this.writeBufferSize; } - public int getOperationTimeout() { - return this.operationTimeout; + public int getClientOperationTimeout() { + return this.clientOperationTimeout; + } + + public int getServerOperationTimeout() { + return this.serverOperationTimeout; } public int getScannerCaching() { diff --git a/src/main/java/com/alipay/oceanbase/hbase/util/ObTableClientManager.java b/src/main/java/com/alipay/oceanbase/hbase/util/ObTableClientManager.java index f02a07a4..14a8f332 100644 --- a/src/main/java/com/alipay/oceanbase/hbase/util/ObTableClientManager.java +++ b/src/main/java/com/alipay/oceanbase/hbase/util/ObTableClientManager.java @@ -22,8 +22,6 @@ import com.alipay.oceanbase.rpc.constant.Constants; import com.google.common.base.Objects; import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; import java.io.IOException; @@ -34,6 +32,7 @@ import static com.alipay.oceanbase.hbase.constants.OHConstants.*; import static com.alipay.oceanbase.hbase.util.Preconditions.checkArgument; +import static com.alipay.oceanbase.rpc.property.Property.RPC_OPERATION_TIMEOUT; import static org.apache.commons.lang.StringUtils.isNotBlank; @InterfaceAudience.Private @@ -88,11 +87,11 @@ public static ObTableClient getOrCreateObTableClient(OHConnectionConfiguration c obTableClientKey.getProperties().put(property.getKey(), property.getValue()); } - return getOrCreateObTableClient(obTableClientKey, connectionConfig.getRpcConnectTimeout()); + return getOrCreateObTableClient(obTableClientKey, connectionConfig); } public static ObTableClient getOrCreateObTableClient(ObTableClientKey obTableClientKey, - int rpcConnectTimeout) throws IOException { + OHConnectionConfiguration connectionConfig) throws IOException { if (OB_TABLE_CLIENT_INSTANCE.get(obTableClientKey) == null) { ReentrantLock tmp = new ReentrantLock(); ReentrantLock lock = OB_TABLE_CLIENT_LOCK.putIfAbsent(obTableClientKey, tmp); @@ -117,7 +116,8 @@ public static ObTableClient getOrCreateObTableClient(ObTableClientKey obTableCli } obTableClient.setFullUserName(obTableClientKey.getFullUserName()); obTableClient.setPassword(obTableClientKey.getPassword()); - obTableClient.setRpcConnectTimeout(rpcConnectTimeout); + obTableClient.setRpcConnectTimeout(connectionConfig.getRpcConnectTimeout()); + obTableClient.addProperty(RPC_OPERATION_TIMEOUT.getKey(), Integer.toString(connectionConfig.getServerOperationTimeout())); obTableClient.init(); OB_TABLE_CLIENT_INSTANCE.put(obTableClientKey, obTableClient); } @@ -142,8 +142,8 @@ public static ObTableClient getOrCreateObTableClientByTableName(TableName tableN private static void initTimeoutAndRetryTimes(ObTableClient obTableClient, OHConnectionConfiguration ohConnectionConf) { obTableClient.setRpcExecuteTimeout(ohConnectionConf.getRpcTimeout()); obTableClient.setRuntimeRetryTimes(ohConnectionConf.getNumRetries()); - obTableClient.setRuntimeMaxWait(ohConnectionConf.getOperationTimeout()); - obTableClient.setRuntimeBatchMaxWait(ohConnectionConf.getOperationTimeout()); + obTableClient.setRuntimeMaxWait(ohConnectionConf.getClientOperationTimeout()); + obTableClient.setRuntimeBatchMaxWait(ohConnectionConf.getClientOperationTimeout()); } public static class ObTableClientKey {