Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
03c8e71
add DepentdentFilter and SingleColumnValueExcludeFilter
JackShi148 Oct 11, 2024
2aaf7c4
add singleColumnValueExcludeFilter and DependentColumnFilter; add tes…
JackShi148 Oct 16, 2024
fe21f09
merge into branch-1.3
JackShi148 Oct 17, 2024
3830246
Merge remote-tracking branch 'obkv/branch-1.3' into JackShi148/branch…
JackShi148 Oct 22, 2024
be654df
hbase support batch (#84)
stuBirdFly Oct 22, 2024
a82b46d
Merge remote-tracking branch 'obkv/branch-1.3' into branch-1.3
JackShi148 Oct 23, 2024
697c5d0
Add DeleteFamilyVersion function and corresponding test cases (#85)
JackShi148 Oct 23, 2024
144a77b
Merge remote-tracking branch 'obkv/branch-1.3' into branch-1.3
JackShi148 Oct 23, 2024
7733e4a
remove DeleteFamilyVersion file and move all cases to MultiColumnFami…
JackShi148 Oct 23, 2024
3411491
hbase support batchCallBack (#86)
stuBirdFly Oct 23, 2024
9c6be3f
adjust bufferdMutatorImpl 1.x to new batch
JackShi148 Oct 23, 2024
5347033
bufferedMutator do not retry, batch retry in table client
JackShi148 Oct 23, 2024
df4f7a8
Merge branch 'branch-1.3' into buffermutator_batch
JackShi148 Oct 23, 2024
d1d5b52
hbase support batch (#84)
stuBirdFly Oct 22, 2024
6081400
add rpcTimeout and operationTimetout setting in bufferedMutator
JackShi148 Oct 24, 2024
b830a72
Merge remote-tracking branch 'myfork/buffermutator_batch' into buffer…
JackShi148 Oct 24, 2024
4968a46
initial merge into hbase_2.0
JackShi148 Oct 24, 2024
3884e08
fix test
miyuan-ljr Oct 24, 2024
676ec51
Add DeleteFamilyVersion function and corresponding test cases (#85)
JackShi148 Oct 23, 2024
5d43b53
hbase support batchCallBack (#86)
stuBirdFly Oct 23, 2024
f801cfe
fix test
miyuan-ljr Oct 24, 2024
4c96d00
fix test
miyuan-ljr Oct 25, 2024
f8568b6
init hbase_2.0 bufferedMutatorImpl
JackShi148 Oct 25, 2024
56f838f
pass single bufferedMutator test
JackShi148 Oct 25, 2024
24ae6d7
remove useless comments
JackShi148 Oct 25, 2024
17f045d
pass mulfi-cf bufferedMutator tests
JackShi148 Oct 25, 2024
20a689a
format code
JackShi148 Oct 25, 2024
6759d96
add inherited interface in bufferedMutator; fix concurrent bug in buf…
JackShi148 Oct 25, 2024
d83966a
develop tableBuilder
JackShi148 Oct 28, 2024
25cdbca
merge into hbase_2.0
JackShi148 Oct 28, 2024
3a44d73
fix typo
JackShi148 Oct 28, 2024
21c6a0f
fix exception erros message
JackShi148 Oct 28, 2024
6edc83e
merge in hbase_2.0
JackShi148 Oct 28, 2024
14fc146
update time to wait pool to shutdown
JackShi148 Oct 28, 2024
f31ae96
optimize rpcTimeout and operationTimeout setting in OHTable finishSetup
JackShi148 Oct 28, 2024
d932b90
develop ObCheckAndMutateBuilder
JackShi148 Oct 28, 2024
4667121
add test cases
JackShi148 Oct 29, 2024
cb52818
merge in hbase_2.0
JackShi148 Oct 29, 2024
3ffb260
merge into hbase_2.0
JackShi148 Oct 29, 2024
a34c287
merge into hbase_2.0
JackShi148 Oct 29, 2024
e51e6be
add import
JackShi148 Oct 29, 2024
9cde5ba
pass test cases after merge
JackShi148 Oct 29, 2024
107b251
Merge branch 'hbase_2.0' into table_builder
JackShi148 Oct 29, 2024
ce5b1e9
pass all test cases after merge
JackShi148 Oct 29, 2024
8dca13d
pass test cases after merge
JackShi148 Oct 29, 2024
9e0cb53
remove useless import
JackShi148 Oct 30, 2024
63b2fdd
resolve merge conflicts
JackShi148 Oct 30, 2024
a9e1f5b
Merge branch 'table_builder' into checkmutate_builder to resolve conf…
JackShi148 Oct 30, 2024
07ce904
Merge branch 'hbase_2.0' into table_builder
JackShi148 Oct 30, 2024
33d1b63
Merge branch 'hbase_2.0' into checkmutate_builder
JackShi148 Oct 30, 2024
f08cd04
use temporary multiCfHTable so that does not influence other cases
JackShi148 Oct 30, 2024
93f4243
Merge branch 'table_builder' into checkmutate_builder
JackShi148 Oct 30, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
207 changes: 193 additions & 14 deletions src/main/java/com/alipay/oceanbase/hbase/OHTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@

import static com.alipay.oceanbase.hbase.constants.OHConstants.*;
import static com.alipay.oceanbase.hbase.util.Preconditions.checkArgument;
import static com.alipay.oceanbase.hbase.util.Preconditions.checkNotNull;
import static com.alipay.oceanbase.hbase.util.TableHBaseLoggerFactory.LCD;
import static com.alipay.oceanbase.hbase.util.TableHBaseLoggerFactory.TABLE_HBASE_LOGGER_SPACE;
import static com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableOperation.getInstance;
Expand Down Expand Up @@ -104,6 +105,16 @@ public class OHTable implements Table {
*/
private int rpcTimeout;

/**
* timeout for each read rpc request
*/
private int readRpcTimeout;

/**
* timeout for each write rpc request
*/
private int writeRpcTimeout;

/**
* if the <code>Get</code> executing pool is specified by user cleanupPoolOnClose will be false ,
* which means that user is responsible for the pool
Expand Down Expand Up @@ -292,7 +303,7 @@ public OHTable(final byte[] tableName, final ObTableClient obTableClient,
public OHTable(TableName tableName, Connection connection,
OHConnectionConfiguration connectionConfig, ExecutorService executePool)
throws IOException {
checkArgument(connection.getConfiguration() != null, "configuration is null.");
checkArgument(connection != null, "connection is null.");
checkArgument(tableName != null, "tableName is null.");
checkArgument(connection.getConfiguration() != null, "configuration is null.");
checkArgument(tableName.getName() != null, "tableNameString is null.");
Expand All @@ -311,6 +322,8 @@ public OHTable(TableName tableName, Connection connection,
this.cleanupPoolOnClose = false;
}
this.rpcTimeout = connectionConfig.getRpcTimeout();
this.readRpcTimeout = connectionConfig.getReadRpcTimeout();
this.writeRpcTimeout = connectionConfig.getWriteRpcTimeout();
this.operationTimeout = connectionConfig.getOperationTimeout();
this.operationExecuteInPool = this.configuration.getBoolean(
HBASE_CLIENT_OPERATION_EXECUTE_IN_POOL,
Expand All @@ -331,6 +344,50 @@ public OHTable(TableName tableName, Connection connection,
finishSetUp();
}

public OHTable(Connection connection, ObTableBuilderBase builder,
OHConnectionConfiguration connectionConfig, ExecutorService executePool)
throws IOException {
checkArgument(connection != null, "connection is null.");
checkArgument(connection.getConfiguration() != null, "configuration is null.");
checkArgument(builder != null, "builder is null");
checkArgument(connectionConfig != null, "connectionConfig is null.");
TableName builderTableName = builder.getTableName();
this.tableName = builderTableName.getName();
this.tableNameString = builderTableName.getNameAsString();
this.configuration = connection.getConfiguration();
this.executePool = executePool;
if (executePool == null) {
int maxThreads = configuration.getInt(HBASE_HTABLE_PRIVATE_THREADS_MAX,
DEFAULT_HBASE_HTABLE_PRIVATE_THREADS_MAX);
long keepAliveTime = configuration.getLong(HBASE_HTABLE_THREAD_KEEP_ALIVE_TIME,
DEFAULT_HBASE_HTABLE_THREAD_KEEP_ALIVE_TIME);
this.executePool = createDefaultThreadPoolExecutor(1, maxThreads, keepAliveTime);
this.cleanupPoolOnClose = true;
} else {
this.cleanupPoolOnClose = false;
}
this.rpcTimeout = builder.getRpcTimeout();
this.readRpcTimeout = builder.getReadRpcTimeout();
this.writeRpcTimeout = builder.getWriteRpcTimeout();
this.operationTimeout = builder.getOperationTimeout();
this.operationExecuteInPool = this.configuration.getBoolean(
HBASE_CLIENT_OPERATION_EXECUTE_IN_POOL,
(this.operationTimeout != HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT));
this.maxKeyValueSize = connectionConfig.getMaxKeyValueSize();
this.putWriteBufferCheck = this.configuration.getInt(HBASE_HTABLE_PUT_WRITE_BUFFER_CHECK,
DEFAULT_HBASE_HTABLE_PUT_WRITE_BUFFER_CHECK);
this.writeBufferSize = connectionConfig.getWriteBufferSize();
int numRetries = configuration.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
this.obTableClient = ObTableClientManager.getOrCreateObTableClient(setUserDefinedNamespace(
this.tableNameString, connectionConfig));
this.obTableClient.setRpcExecuteTimeout(rpcTimeout);
this.obTableClient.setRuntimeRetryTimes(numRetries);
setOperationTimeout(operationTimeout);

finishSetUp();
}

/**
* 创建默认的线程池
* Using the "direct handoff" approach, new threads will only be created
Expand Down Expand Up @@ -370,11 +427,15 @@ private void finishSetUp() {
HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
HConstants.HBASE_REGIONSERVER_LEASE_PERIOD_KEY,
HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD);
this.rpcTimeout = configuration.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
this.operationTimeout = this.configuration.getInt(
this.rpcTimeout = this.rpcTimeout <= 0 ? configuration.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
HConstants.DEFAULT_HBASE_RPC_TIMEOUT) : this.rpcTimeout;
this.readRpcTimeout = this.readRpcTimeout <= 0 ? configuration.getInt(HConstants.HBASE_RPC_READ_TIMEOUT_KEY,
HConstants.DEFAULT_HBASE_RPC_TIMEOUT) : this.readRpcTimeout;
this.writeRpcTimeout = this.writeRpcTimeout <= 0 ? configuration.getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY,
HConstants.DEFAULT_HBASE_RPC_TIMEOUT) : this.writeRpcTimeout;
this.operationTimeout = this.operationTimeout <= 0 ? this.configuration.getInt(
HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT) : this.operationTimeout;
this.operationExecuteInPool = this.configuration.getBoolean(
HBASE_CLIENT_OPERATION_EXECUTE_IN_POOL,
(this.operationTimeout != HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT));
Expand All @@ -388,7 +449,7 @@ private void finishSetUp() {

private OHConnectionConfiguration setUserDefinedNamespace(String tableNameString,
OHConnectionConfiguration ohConnectionConf)
throws IOException {
throws IllegalArgumentException {
if (tableNameString.indexOf(':') != -1) {
String[] params = tableNameString.split(":");
if (params.length != 2) {
Expand Down Expand Up @@ -903,7 +964,7 @@ public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier,
RowMutations rowMutations = new RowMutations(row);
rowMutations.add(put);
try {
return checkAndMutation(row, family, qualifier, compareOp, value, rowMutations);
return checkAndMutation(row, family, qualifier, compareOp, value, null, rowMutations);
} catch (Exception e) {
logger.error(LCD.convert("01-00005"), put, tableNameString, e);
throw new IOException("checkAndPut type table:" + tableNameString + " e.msg:"
Expand Down Expand Up @@ -959,7 +1020,7 @@ public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier,
RowMutations rowMutations = new RowMutations(row);
rowMutations.add(delete);
try {
return checkAndMutation(row, family, qualifier, compareOp, value, rowMutations);
return checkAndMutation(row, family, qualifier, compareOp, value, null, rowMutations);
} catch (Exception e) {
logger.error(LCD.convert("01-00005"), delete, tableNameString, e);
throw new IOException("checkAndDelete type table:" + tableNameString + " e.msg:"
Expand All @@ -972,26 +1033,29 @@ public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier,
CompareFilter.CompareOp compareOp, byte[] value,
RowMutations rowMutations) throws IOException {
try {
return checkAndMutation(row, family, qualifier, compareOp, value, rowMutations);
return checkAndMutation(row, family, qualifier, compareOp, value, null, rowMutations);
} catch (Exception e) {
logger.error(LCD.convert("01-00005"), rowMutations, tableNameString, e);
throw new IOException("checkAndMutate type table:" + tableNameString + " e.msg:"
+ e.getMessage() + " error.", e);
}
}

@Override
public CheckAndMutateBuilder checkAndMutate(byte[]row , byte[] family) {
return new ObCheckAndMutateBuilderImpl(row, family);
}

private boolean checkAndMutation(byte[] row, byte[] family, byte[] qualifier, CompareFilter.CompareOp compareOp, byte[] value,
RowMutations rowMutations) throws Exception {
TimeRange timeRange, RowMutations rowMutations) throws Exception {
checkArgument(row != null, "row is null");
checkArgument(isNotBlank(Bytes.toString(family)), "family is blank");
checkArgument(Bytes.equals(row, rowMutations.getRow()),
"mutation row is not equal check row");

"mutation row is not equal check row");
checkArgument(!rowMutations.getMutations().isEmpty(), "mutation is empty");

byte[] filterString = buildCheckAndMutateFilterString(family, qualifier, compareOp, value);

ObHTableFilter filter = buildObHTableFilter(filterString, null, 1, qualifier);
ObHTableFilter filter = buildObHTableFilter(filterString, timeRange, 1, qualifier);
List<Mutation> mutations = rowMutations.getMutations();
List<Cell> keyValueList = new LinkedList<>();
// only one family operation is allowed
Expand Down Expand Up @@ -1320,6 +1384,16 @@ public int getRpcTimeout() {
return this.rpcTimeout;
}

@Override
public int getReadRpcTimeout() {
return this.readRpcTimeout;
}

@Override
public int getWriteRpcTimeout() {
return this.writeRpcTimeout;
}

public void setRuntimeBatchExecutor(ExecutorService runtimeBatchExecutor) {
this.obTableClient.setRuntimeBatchExecutor(runtimeBatchExecutor);
}
Expand Down Expand Up @@ -1841,4 +1915,109 @@ public static OHOpType getDeleteType(Cell.Type type) {
throw new IllegalArgumentException("illegal mutation type " + type);
}
}

private class ObCheckAndMutateBuilderImpl implements CheckAndMutateBuilder {
private final byte[] row;
private final byte[] family;
private byte[] qualifier;
private byte[] value;
private TimeRange timeRange;
private CompareOperator cmpOp;

ObCheckAndMutateBuilderImpl(byte[] row, byte[] family) {
this.row = checkNotNull(row, "The provided row is null.");
this.family = checkNotNull(family, "The provided family is null.");
}

@Override
public CheckAndMutateBuilder qualifier(byte[] qualifier) {
this.qualifier = checkNotNull(qualifier, "The provided qualifier is null. You could" +
" use an empty byte array, or do not call this method if you want a null qualifier.");
return this;
}

@Override
public CheckAndMutateBuilder timeRange(TimeRange timeRange) {
this.timeRange = timeRange;
return this;
}

@Override
public CheckAndMutateBuilder ifNotExists() {
this.cmpOp = CompareOperator.EQUAL;
this.value = null;
return this;
}

@Override
public CheckAndMutateBuilder ifMatches(CompareOperator cmpOp, byte[] value) {
this.cmpOp = checkNotNull(cmpOp, "The provided cmpOp is null.");
this.value = checkNotNull(value , "The provided value is null.");
return this;
}

@Override
public boolean thenPut(Put put) throws IOException {
checkCmpOp();
RowMutations rowMutations = new RowMutations(row);
rowMutations.add(put);
try {
return checkAndMutation(row, family, qualifier, getCompareOp(cmpOp), value, timeRange, rowMutations);
} catch(Exception e) {
logger.error(LCD.convert("01-00005"), rowMutations, tableNameString, e);
throw new IOException("checkAndMutate type table: " + tableNameString + " e.msg: "
+ e.getMessage() + " error.", e);
}
}

@Override
public boolean thenDelete(Delete delete) throws IOException {
checkCmpOp();
RowMutations rowMutations = new RowMutations(row);
rowMutations.add(delete);
try {
return checkAndMutation(row, family, qualifier, getCompareOp(cmpOp), value, timeRange, rowMutations);
} catch(Exception e) {
logger.error(LCD.convert("01-00005"), rowMutations, tableNameString, e);
throw new IOException("checkAndMutate type table: " + tableNameString + " e.msg: "
+ e.getMessage() + " error.", e);
}
}

@Override
public boolean thenMutate(RowMutations mutation) throws IOException {
checkCmpOp();
try {
return checkAndMutation(row, family, qualifier, getCompareOp(cmpOp), value, timeRange, mutation);
} catch(Exception e) {
logger.error(LCD.convert("01-00005"), mutation, tableNameString, e);
throw new IOException("checkAndMutate type table: " + tableNameString + " e.msg: "
+ e.getMessage() + " error.", e);
}
}

private void checkCmpOp() {
checkNotNull(this.cmpOp, "The compare condition is null. Please use"
+ " ifNotExists/ifEquals/ifMatches before executing the request");
}

private CompareFilter.CompareOp getCompareOp(CompareOperator cmpOp) {
switch (cmpOp) {
case LESS:
return CompareFilter.CompareOp.LESS;
case LESS_OR_EQUAL:
return CompareFilter.CompareOp.LESS_OR_EQUAL;
case EQUAL:
return CompareFilter.CompareOp.EQUAL;
case NOT_EQUAL:
return CompareFilter.CompareOp.NOT_EQUAL;
case GREATER_OR_EQUAL:
return CompareFilter.CompareOp.GREATER_OR_EQUAL;
case GREATER:
return CompareFilter.CompareOp.GREATER;
default:
return CompareFilter.CompareOp.NO_OP;
}
}
}
}
6 changes: 6 additions & 0 deletions src/main/java/com/alipay/oceanbase/hbase/OHTableClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,12 @@ public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier,
return ohTable.checkAndMutate(row, family, qualifier, compareOp, value, mutations);
}

@Override
public CheckAndMutateBuilder checkAndMutate(byte[] row, byte[] family) {
checkStatus();
return ohTable.checkAndMutate(row, family);
}

@Override
public void setOperationTimeout(int i) {
checkStatus();
Expand Down
5 changes: 5 additions & 0 deletions src/main/java/com/alipay/oceanbase/hbase/OHTablePool.java
Original file line number Diff line number Diff line change
Expand Up @@ -826,6 +826,11 @@ public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier,
return table.checkAndDelete(row, family, qualifier, compareOp, value, delete);
}

@Override
public CheckAndMutateBuilder checkAndMutate(byte[] row, byte[] family) {
return table.checkAndMutate(row, family);
}

@Override
public Result increment(Increment increment) throws IOException {
return table.increment(increment);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,13 @@ public class OHConnectionConfiguration {
private final boolean odpMode;
private final long writeBufferSize;
private final int operationTimeout;
private final int metaOperationTimeout;
private final int scannerCaching;
private final long scannerMaxResultSize;
private final int maxKeyValueSize;
private final int rpcTimeout;
private final int readRpcTimeout;
private final int writeRpcTimeout;
private final int rpcConnectTimeout;
private final long writeBufferPeriodicFlushTimeoutMs;
private final long writeBufferPeriodicFlushTimerTickMs;
Expand All @@ -70,10 +73,16 @@ public OHConnectionConfiguration(Configuration conf) {
}
this.database = database;
this.writeBufferSize = conf.getLong(WRITE_BUFFER_SIZE_KEY, WRITE_BUFFER_SIZE_DEFAULT);
this.metaOperationTimeout = conf.getInt(HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT,
HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
this.operationTimeout = conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
this.rpcTimeout = conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
this.readRpcTimeout = conf.getInt(HConstants.HBASE_RPC_READ_TIMEOUT_KEY,
HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
this.writeRpcTimeout = conf.getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY,
HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
this.writeBufferPeriodicFlushTimeoutMs = conf.getLong(
WRITE_BUFFER_PERIODIC_FLUSH_TIMEOUT_MS, WRITE_BUFFER_PERIODIC_FLUSH_TIMEOUT_MS_DEFAULT);
this.writeBufferPeriodicFlushTimerTickMs = conf.getLong(
Expand Down Expand Up @@ -117,6 +126,10 @@ public long getWriteBufferSize() {
return this.writeBufferSize;
}

public int getMetaOperationTimeout() {
return this.metaOperationTimeout;
}

public int getOperationTimeout() {
return this.operationTimeout;
}
Expand All @@ -133,6 +146,14 @@ public int getRpcTimeout() {
return this.rpcTimeout;
}

public int getReadRpcTimeout() {
return this.readRpcTimeout;
}

public int getWriteRpcTimeout() {
return this.writeRpcTimeout;
}

public int getRpcConnectTimeout() {
return this.rpcConnectTimeout;
}
Expand Down
Loading