Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions src/main/java/com/alipay/oceanbase/hbase/OHTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ public OHTable(Configuration configuration, String tableName) throws IOException
this.tableNameString, ohConnectionConf));
this.obTableClient.setRpcExecuteTimeout(ohConnectionConf.getRpcTimeout());
this.obTableClient.setRuntimeRetryTimes(numRetries);
setOperationTimeout(ohConnectionConf.getOperationTimeout());
setOperationTimeout(ohConnectionConf.getClientOperationTimeout());

finishSetUp();
}
Expand Down Expand Up @@ -262,7 +262,7 @@ public OHTable(Configuration configuration, final byte[] tableName,
this.tableNameString, ohConnectionConf));
this.obTableClient.setRpcExecuteTimeout(ohConnectionConf.getRpcTimeout());
this.obTableClient.setRuntimeRetryTimes(numRetries);
setOperationTimeout(ohConnectionConf.getOperationTimeout());
setOperationTimeout(ohConnectionConf.getClientOperationTimeout());

finishSetUp();
}
Expand Down Expand Up @@ -318,7 +318,7 @@ public OHTable(TableName tableName, Connection connection,
}

this.rpcTimeout = connectionConfig.getRpcTimeout();
this.operationTimeout = connectionConfig.getOperationTimeout();
this.operationTimeout = connectionConfig.getClientOperationTimeout();
this.operationExecuteInPool = this.configuration.getBoolean(
HBASE_CLIENT_OPERATION_EXECUTE_IN_POOL,
(this.operationTimeout != HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT));
Expand Down Expand Up @@ -2286,6 +2286,7 @@ private BatchOperation buildBatchOperation(String tableName, List<? extends Row>
resultMapSingleOp.add(singleOpResultNum);
}
batch.setEntityType(ObTableEntityType.HKV);
batch.setServerCanRetry(OHBaseFuncUtils.serverCanRetry(obTableClient));
return batch;
}

Expand Down Expand Up @@ -2346,6 +2347,7 @@ private ObHbaseRequest buildHbaseRequest(List<? extends Row> actions)
request.setKeys(keys);
request.setOpType(opType);
request.setCfRows(cfRowsArray);
request.setServerCanRetry(OHBaseFuncUtils.serverCanRetry(obTableClient));
return request;
}

Expand Down Expand Up @@ -2389,6 +2391,7 @@ private ObTableQueryRequest buildObTableQueryRequest(ObTableQuery obTableQuery,
request.setEntityType(ObTableEntityType.HKV);
request.setTableQuery(obTableQuery);
request.setTableName(targetTableName);
request.setServerCanRetry(OHBaseFuncUtils.serverCanRetry(obTableClient));
return request;
}

Expand All @@ -2402,6 +2405,7 @@ private ObTableQueryAsyncRequest buildObTableQueryAsyncRequest(ObTableQuery obTa
asyncRequest.setEntityType(ObTableEntityType.HKV);
asyncRequest.setTableName(targetTableName);
asyncRequest.setObTableQueryRequest(request);
asyncRequest.setServerCanRetry(OHBaseFuncUtils.serverCanRetry(obTableClient));
return asyncRequest;
}

Expand All @@ -2416,6 +2420,7 @@ private ObTableQueryAndMutateRequest buildObTableQueryAndMutateRequest(ObTableQu
request.setTableQueryAndMutate(queryAndMutate);
request.setEntityType(ObTableEntityType.HKV);
request.setReturningAffectedEntity(true);
request.setServerCanRetry(OHBaseFuncUtils.serverCanRetry(obTableClient));
return request;
}

Expand Down
4 changes: 1 addition & 3 deletions src/main/java/com/alipay/oceanbase/hbase/util/OHAdmin.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,10 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Future;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

import static com.alipay.oceanbase.rpc.protocol.payload.ResultCodes.*;

Expand Down Expand Up @@ -92,7 +90,7 @@ private void handleObTableException(Exception e, TableName tableName, ExceptionH

@Override
public int getOperationTimeout() {
return connection.getOHConnectionConfiguration().getOperationTimeout();
return connection.getOHConnectionConfiguration().getClientOperationTimeout();
}

@Override
Expand Down
10 changes: 10 additions & 0 deletions src/main/java/com/alipay/oceanbase/hbase/util/OHBaseFuncUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -97,4 +97,14 @@ public int compare(KeyValue kv1, KeyValue kv2) {
}
});
}

public static boolean serverCanRetry(ObTableClient tableClient) {
if (tableClient.isOdpMode()) {
// ODP mode needs to check proxy version
return ObGlobal.OB_PROXY_VERSION >= ObGlobal.OB_PROXY_VERSION_4_3_6_0;
} else {
// OCP mode directly return true, server will do the check
return true;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public OHBufferedMutatorImpl(OHConnectionImpl ohConnection, BufferedMutatorParam
OHConnectionConfiguration connectionConfig = ohConnection.getOHConnectionConfiguration();
this.pool = params.getPool();
this.rpcTimeout = connectionConfig.getRpcTimeout();
this.operationTimeout = connectionConfig.getOperationTimeout();
this.operationTimeout = connectionConfig.getClientOperationTimeout();

this.writeBufferSize = params.getWriteBufferSize() != OHConnectionImpl.BUFFERED_PARAM_UNSET ? params
.getWriteBufferSize() : connectionConfig.getWriteBufferSize();
Expand Down Expand Up @@ -103,7 +103,7 @@ public OHBufferedMutatorImpl(Configuration conf, BufferedMutatorParams params, O
OHConnectionConfiguration connectionConfig = new OHConnectionConfiguration(conf);
this.pool = params.getPool();
this.rpcTimeout = connectionConfig.getRpcTimeout();
this.operationTimeout = connectionConfig.getOperationTimeout();
this.operationTimeout = connectionConfig.getClientOperationTimeout();

this.writeBufferSize = params.getWriteBufferSize() != OHConnectionImpl.BUFFERED_PARAM_UNSET ? params
.getWriteBufferSize() : connectionConfig.getWriteBufferSize();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ public class OHConnectionConfiguration {
private final int odpPort;
private final boolean odpMode;
private final long writeBufferSize;
private final int operationTimeout;
private final int clientOperationTimeout;
private final int serverOperationTimeout;
private final int scannerCaching;
private final long scannerMaxResultSize;
private final int maxKeyValueSize;
Expand All @@ -65,7 +66,9 @@ public OHConnectionConfiguration(Configuration conf) {
}
this.database = database;
this.writeBufferSize = conf.getLong(WRITE_BUFFER_SIZE_KEY, WRITE_BUFFER_SIZE_DEFAULT);
this.operationTimeout = conf.getInt("hbase.client.operation.timeout", 1200000);
this.clientOperationTimeout = conf.getInt("hbase.client.operation.timeout", 1200000);
this.serverOperationTimeout = conf.getInt(HConstants.HBASE_RPC_SHORTOPERATION_TIMEOUT_KEY,
HConstants.DEFAULT_HBASE_RPC_SHORTOPERATION_TIMEOUT);
this.rpcTimeout = conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
int rpcConnectTimeout = -1;
Expand Down Expand Up @@ -107,8 +110,12 @@ public long getWriteBufferSize() {
return this.writeBufferSize;
}

public int getOperationTimeout() {
return this.operationTimeout;
public int getClientOperationTimeout() {
return this.clientOperationTimeout;
}

public int getServerOperationTimeout() {
return this.serverOperationTimeout;
}

public int getScannerCaching() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@
import com.alipay.oceanbase.rpc.constant.Constants;
import com.google.common.base.Objects;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;

import java.io.IOException;
Expand All @@ -34,6 +32,7 @@

import static com.alipay.oceanbase.hbase.constants.OHConstants.*;
import static com.alipay.oceanbase.hbase.util.Preconditions.checkArgument;
import static com.alipay.oceanbase.rpc.property.Property.RPC_OPERATION_TIMEOUT;
import static org.apache.commons.lang.StringUtils.isNotBlank;

@InterfaceAudience.Private
Expand Down Expand Up @@ -88,11 +87,11 @@ public static ObTableClient getOrCreateObTableClient(OHConnectionConfiguration c
obTableClientKey.getProperties().put(property.getKey(), property.getValue());
}

return getOrCreateObTableClient(obTableClientKey, connectionConfig.getRpcConnectTimeout());
return getOrCreateObTableClient(obTableClientKey, connectionConfig);
}

public static ObTableClient getOrCreateObTableClient(ObTableClientKey obTableClientKey,
int rpcConnectTimeout) throws IOException {
OHConnectionConfiguration connectionConfig) throws IOException {
if (OB_TABLE_CLIENT_INSTANCE.get(obTableClientKey) == null) {
ReentrantLock tmp = new ReentrantLock();
ReentrantLock lock = OB_TABLE_CLIENT_LOCK.putIfAbsent(obTableClientKey, tmp);
Expand All @@ -117,7 +116,8 @@ public static ObTableClient getOrCreateObTableClient(ObTableClientKey obTableCli
}
obTableClient.setFullUserName(obTableClientKey.getFullUserName());
obTableClient.setPassword(obTableClientKey.getPassword());
obTableClient.setRpcConnectTimeout(rpcConnectTimeout);
obTableClient.setRpcConnectTimeout(connectionConfig.getRpcConnectTimeout());
obTableClient.addProperty(RPC_OPERATION_TIMEOUT.getKey(), Integer.toString(connectionConfig.getServerOperationTimeout()));
obTableClient.init();
OB_TABLE_CLIENT_INSTANCE.put(obTableClientKey, obTableClient);
}
Expand All @@ -142,8 +142,8 @@ public static ObTableClient getOrCreateObTableClientByTableName(TableName tableN
private static void initTimeoutAndRetryTimes(ObTableClient obTableClient, OHConnectionConfiguration ohConnectionConf) {
obTableClient.setRpcExecuteTimeout(ohConnectionConf.getRpcTimeout());
obTableClient.setRuntimeRetryTimes(ohConnectionConf.getNumRetries());
obTableClient.setRuntimeMaxWait(ohConnectionConf.getOperationTimeout());
obTableClient.setRuntimeBatchMaxWait(ohConnectionConf.getOperationTimeout());
obTableClient.setRuntimeMaxWait(ohConnectionConf.getClientOperationTimeout());
obTableClient.setRuntimeBatchMaxWait(ohConnectionConf.getClientOperationTimeout());
}

public static class ObTableClientKey {
Expand Down