diff --git a/src/main/java/com/alipay/oceanbase/hbase/OHTable.java b/src/main/java/com/alipay/oceanbase/hbase/OHTable.java index 0a3bbbf7..298d0828 100644 --- a/src/main/java/com/alipay/oceanbase/hbase/OHTable.java +++ b/src/main/java/com/alipay/oceanbase/hbase/OHTable.java @@ -63,6 +63,7 @@ import static com.alipay.oceanbase.hbase.constants.OHConstants.*; import static com.alipay.oceanbase.hbase.util.Preconditions.checkArgument; +import static com.alipay.oceanbase.hbase.util.Preconditions.checkNotNull; import static com.alipay.oceanbase.hbase.util.TableHBaseLoggerFactory.LCD; import static com.alipay.oceanbase.hbase.util.TableHBaseLoggerFactory.TABLE_HBASE_LOGGER_SPACE; import static com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableOperation.getInstance; @@ -104,6 +105,16 @@ public class OHTable implements Table { */ private int rpcTimeout; + /** + * timeout for each read rpc request + */ + private int readRpcTimeout; + + /** + * timeout for each write rpc request + */ + private int writeRpcTimeout; + /** * if the Get executing pool is specified by user cleanupPoolOnClose will be false , * which means that user is responsible for the pool @@ -292,7 +303,7 @@ public OHTable(final byte[] tableName, final ObTableClient obTableClient, public OHTable(TableName tableName, Connection connection, OHConnectionConfiguration connectionConfig, ExecutorService executePool) throws IOException { - checkArgument(connection.getConfiguration() != null, "configuration is null."); + checkArgument(connection != null, "connection is null."); checkArgument(tableName != null, "tableName is null."); checkArgument(connection.getConfiguration() != null, "configuration is null."); checkArgument(tableName.getName() != null, "tableNameString is null."); @@ -311,6 +322,8 @@ public OHTable(TableName tableName, Connection connection, this.cleanupPoolOnClose = false; } this.rpcTimeout = connectionConfig.getRpcTimeout(); + this.readRpcTimeout = connectionConfig.getReadRpcTimeout(); + this.writeRpcTimeout = connectionConfig.getWriteRpcTimeout(); this.operationTimeout = connectionConfig.getOperationTimeout(); this.operationExecuteInPool = this.configuration.getBoolean( HBASE_CLIENT_OPERATION_EXECUTE_IN_POOL, @@ -331,6 +344,50 @@ public OHTable(TableName tableName, Connection connection, finishSetUp(); } + public OHTable(Connection connection, ObTableBuilderBase builder, + OHConnectionConfiguration connectionConfig, ExecutorService executePool) + throws IOException { + checkArgument(connection != null, "connection is null."); + checkArgument(connection.getConfiguration() != null, "configuration is null."); + checkArgument(builder != null, "builder is null"); + checkArgument(connectionConfig != null, "connectionConfig is null."); + TableName builderTableName = builder.getTableName(); + this.tableName = builderTableName.getName(); + this.tableNameString = builderTableName.getNameAsString(); + this.configuration = connection.getConfiguration(); + this.executePool = executePool; + if (executePool == null) { + int maxThreads = configuration.getInt(HBASE_HTABLE_PRIVATE_THREADS_MAX, + DEFAULT_HBASE_HTABLE_PRIVATE_THREADS_MAX); + long keepAliveTime = configuration.getLong(HBASE_HTABLE_THREAD_KEEP_ALIVE_TIME, + DEFAULT_HBASE_HTABLE_THREAD_KEEP_ALIVE_TIME); + this.executePool = createDefaultThreadPoolExecutor(1, maxThreads, keepAliveTime); + this.cleanupPoolOnClose = true; + } else { + this.cleanupPoolOnClose = false; + } + this.rpcTimeout = builder.getRpcTimeout(); + this.readRpcTimeout = builder.getReadRpcTimeout(); + this.writeRpcTimeout = builder.getWriteRpcTimeout(); + this.operationTimeout = builder.getOperationTimeout(); + this.operationExecuteInPool = this.configuration.getBoolean( + HBASE_CLIENT_OPERATION_EXECUTE_IN_POOL, + (this.operationTimeout != HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT)); + this.maxKeyValueSize = connectionConfig.getMaxKeyValueSize(); + this.putWriteBufferCheck = this.configuration.getInt(HBASE_HTABLE_PUT_WRITE_BUFFER_CHECK, + DEFAULT_HBASE_HTABLE_PUT_WRITE_BUFFER_CHECK); + this.writeBufferSize = connectionConfig.getWriteBufferSize(); + int numRetries = configuration.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, + HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); + this.obTableClient = ObTableClientManager.getOrCreateObTableClient(setUserDefinedNamespace( + this.tableNameString, connectionConfig)); + this.obTableClient.setRpcExecuteTimeout(rpcTimeout); + this.obTableClient.setRuntimeRetryTimes(numRetries); + setOperationTimeout(operationTimeout); + + finishSetUp(); + } + /** * 创建默认的线程池 * Using the "direct handoff" approach, new threads will only be created @@ -370,11 +427,15 @@ private void finishSetUp() { HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, HConstants.HBASE_REGIONSERVER_LEASE_PERIOD_KEY, HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD); - this.rpcTimeout = configuration.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, - HConstants.DEFAULT_HBASE_RPC_TIMEOUT); - this.operationTimeout = this.configuration.getInt( + this.rpcTimeout = this.rpcTimeout <= 0 ? configuration.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, + HConstants.DEFAULT_HBASE_RPC_TIMEOUT) : this.rpcTimeout; + this.readRpcTimeout = this.readRpcTimeout <= 0 ? configuration.getInt(HConstants.HBASE_RPC_READ_TIMEOUT_KEY, + HConstants.DEFAULT_HBASE_RPC_TIMEOUT) : this.readRpcTimeout; + this.writeRpcTimeout = this.writeRpcTimeout <= 0 ? configuration.getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY, + HConstants.DEFAULT_HBASE_RPC_TIMEOUT) : this.writeRpcTimeout; + this.operationTimeout = this.operationTimeout <= 0 ? this.configuration.getInt( HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, - HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT); + HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT) : this.operationTimeout; this.operationExecuteInPool = this.configuration.getBoolean( HBASE_CLIENT_OPERATION_EXECUTE_IN_POOL, (this.operationTimeout != HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT)); @@ -388,7 +449,7 @@ private void finishSetUp() { private OHConnectionConfiguration setUserDefinedNamespace(String tableNameString, OHConnectionConfiguration ohConnectionConf) - throws IOException { + throws IllegalArgumentException { if (tableNameString.indexOf(':') != -1) { String[] params = tableNameString.split(":"); if (params.length != 2) { @@ -903,7 +964,7 @@ public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier, RowMutations rowMutations = new RowMutations(row); rowMutations.add(put); try { - return checkAndMutation(row, family, qualifier, compareOp, value, rowMutations); + return checkAndMutation(row, family, qualifier, compareOp, value, null, rowMutations); } catch (Exception e) { logger.error(LCD.convert("01-00005"), put, tableNameString, e); throw new IOException("checkAndPut type table:" + tableNameString + " e.msg:" @@ -959,7 +1020,7 @@ public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier, RowMutations rowMutations = new RowMutations(row); rowMutations.add(delete); try { - return checkAndMutation(row, family, qualifier, compareOp, value, rowMutations); + return checkAndMutation(row, family, qualifier, compareOp, value, null, rowMutations); } catch (Exception e) { logger.error(LCD.convert("01-00005"), delete, tableNameString, e); throw new IOException("checkAndDelete type table:" + tableNameString + " e.msg:" @@ -972,7 +1033,7 @@ public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier, CompareFilter.CompareOp compareOp, byte[] value, RowMutations rowMutations) throws IOException { try { - return checkAndMutation(row, family, qualifier, compareOp, value, rowMutations); + return checkAndMutation(row, family, qualifier, compareOp, value, null, rowMutations); } catch (Exception e) { logger.error(LCD.convert("01-00005"), rowMutations, tableNameString, e); throw new IOException("checkAndMutate type table:" + tableNameString + " e.msg:" @@ -980,18 +1041,21 @@ public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier, } } + @Override + public CheckAndMutateBuilder checkAndMutate(byte[]row , byte[] family) { + return new ObCheckAndMutateBuilderImpl(row, family); + } + private boolean checkAndMutation(byte[] row, byte[] family, byte[] qualifier, CompareFilter.CompareOp compareOp, byte[] value, - RowMutations rowMutations) throws Exception { + TimeRange timeRange, RowMutations rowMutations) throws Exception { checkArgument(row != null, "row is null"); checkArgument(isNotBlank(Bytes.toString(family)), "family is blank"); checkArgument(Bytes.equals(row, rowMutations.getRow()), - "mutation row is not equal check row"); - + "mutation row is not equal check row"); checkArgument(!rowMutations.getMutations().isEmpty(), "mutation is empty"); - byte[] filterString = buildCheckAndMutateFilterString(family, qualifier, compareOp, value); - ObHTableFilter filter = buildObHTableFilter(filterString, null, 1, qualifier); + ObHTableFilter filter = buildObHTableFilter(filterString, timeRange, 1, qualifier); List mutations = rowMutations.getMutations(); List keyValueList = new LinkedList<>(); // only one family operation is allowed @@ -1320,6 +1384,16 @@ public int getRpcTimeout() { return this.rpcTimeout; } + @Override + public int getReadRpcTimeout() { + return this.readRpcTimeout; + } + + @Override + public int getWriteRpcTimeout() { + return this.writeRpcTimeout; + } + public void setRuntimeBatchExecutor(ExecutorService runtimeBatchExecutor) { this.obTableClient.setRuntimeBatchExecutor(runtimeBatchExecutor); } @@ -1841,4 +1915,109 @@ public static OHOpType getDeleteType(Cell.Type type) { throw new IllegalArgumentException("illegal mutation type " + type); } } + + private class ObCheckAndMutateBuilderImpl implements CheckAndMutateBuilder { + private final byte[] row; + private final byte[] family; + private byte[] qualifier; + private byte[] value; + private TimeRange timeRange; + private CompareOperator cmpOp; + + ObCheckAndMutateBuilderImpl(byte[] row, byte[] family) { + this.row = checkNotNull(row, "The provided row is null."); + this.family = checkNotNull(family, "The provided family is null."); + } + + @Override + public CheckAndMutateBuilder qualifier(byte[] qualifier) { + this.qualifier = checkNotNull(qualifier, "The provided qualifier is null. You could" + + " use an empty byte array, or do not call this method if you want a null qualifier."); + return this; + } + + @Override + public CheckAndMutateBuilder timeRange(TimeRange timeRange) { + this.timeRange = timeRange; + return this; + } + + @Override + public CheckAndMutateBuilder ifNotExists() { + this.cmpOp = CompareOperator.EQUAL; + this.value = null; + return this; + } + + @Override + public CheckAndMutateBuilder ifMatches(CompareOperator cmpOp, byte[] value) { + this.cmpOp = checkNotNull(cmpOp, "The provided cmpOp is null."); + this.value = checkNotNull(value , "The provided value is null."); + return this; + } + + @Override + public boolean thenPut(Put put) throws IOException { + checkCmpOp(); + RowMutations rowMutations = new RowMutations(row); + rowMutations.add(put); + try { + return checkAndMutation(row, family, qualifier, getCompareOp(cmpOp), value, timeRange, rowMutations); + } catch(Exception e) { + logger.error(LCD.convert("01-00005"), rowMutations, tableNameString, e); + throw new IOException("checkAndMutate type table: " + tableNameString + " e.msg: " + + e.getMessage() + " error.", e); + } + } + + @Override + public boolean thenDelete(Delete delete) throws IOException { + checkCmpOp(); + RowMutations rowMutations = new RowMutations(row); + rowMutations.add(delete); + try { + return checkAndMutation(row, family, qualifier, getCompareOp(cmpOp), value, timeRange, rowMutations); + } catch(Exception e) { + logger.error(LCD.convert("01-00005"), rowMutations, tableNameString, e); + throw new IOException("checkAndMutate type table: " + tableNameString + " e.msg: " + + e.getMessage() + " error.", e); + } + } + + @Override + public boolean thenMutate(RowMutations mutation) throws IOException { + checkCmpOp(); + try { + return checkAndMutation(row, family, qualifier, getCompareOp(cmpOp), value, timeRange, mutation); + } catch(Exception e) { + logger.error(LCD.convert("01-00005"), mutation, tableNameString, e); + throw new IOException("checkAndMutate type table: " + tableNameString + " e.msg: " + + e.getMessage() + " error.", e); + } + } + + private void checkCmpOp() { + checkNotNull(this.cmpOp, "The compare condition is null. Please use" + + " ifNotExists/ifEquals/ifMatches before executing the request"); + } + + private CompareFilter.CompareOp getCompareOp(CompareOperator cmpOp) { + switch (cmpOp) { + case LESS: + return CompareFilter.CompareOp.LESS; + case LESS_OR_EQUAL: + return CompareFilter.CompareOp.LESS_OR_EQUAL; + case EQUAL: + return CompareFilter.CompareOp.EQUAL; + case NOT_EQUAL: + return CompareFilter.CompareOp.NOT_EQUAL; + case GREATER_OR_EQUAL: + return CompareFilter.CompareOp.GREATER_OR_EQUAL; + case GREATER: + return CompareFilter.CompareOp.GREATER; + default: + return CompareFilter.CompareOp.NO_OP; + } + } + } } \ No newline at end of file diff --git a/src/main/java/com/alipay/oceanbase/hbase/OHTableClient.java b/src/main/java/com/alipay/oceanbase/hbase/OHTableClient.java index 1894e8c5..eec11a60 100644 --- a/src/main/java/com/alipay/oceanbase/hbase/OHTableClient.java +++ b/src/main/java/com/alipay/oceanbase/hbase/OHTableClient.java @@ -173,6 +173,12 @@ public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier, return ohTable.checkAndMutate(row, family, qualifier, compareOp, value, mutations); } + @Override + public CheckAndMutateBuilder checkAndMutate(byte[] row, byte[] family) { + checkStatus(); + return ohTable.checkAndMutate(row, family); + } + @Override public void setOperationTimeout(int i) { checkStatus(); diff --git a/src/main/java/com/alipay/oceanbase/hbase/OHTablePool.java b/src/main/java/com/alipay/oceanbase/hbase/OHTablePool.java index 03034fd9..c274b6a2 100644 --- a/src/main/java/com/alipay/oceanbase/hbase/OHTablePool.java +++ b/src/main/java/com/alipay/oceanbase/hbase/OHTablePool.java @@ -826,6 +826,11 @@ public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier, return table.checkAndDelete(row, family, qualifier, compareOp, value, delete); } + @Override + public CheckAndMutateBuilder checkAndMutate(byte[] row, byte[] family) { + return table.checkAndMutate(row, family); + } + @Override public Result increment(Increment increment) throws IOException { return table.increment(increment); diff --git a/src/main/java/com/alipay/oceanbase/hbase/util/OHConnectionConfiguration.java b/src/main/java/com/alipay/oceanbase/hbase/util/OHConnectionConfiguration.java index fd032267..9b98dd0b 100644 --- a/src/main/java/com/alipay/oceanbase/hbase/util/OHConnectionConfiguration.java +++ b/src/main/java/com/alipay/oceanbase/hbase/util/OHConnectionConfiguration.java @@ -47,10 +47,13 @@ public class OHConnectionConfiguration { private final boolean odpMode; private final long writeBufferSize; private final int operationTimeout; + private final int metaOperationTimeout; private final int scannerCaching; private final long scannerMaxResultSize; private final int maxKeyValueSize; private final int rpcTimeout; + private final int readRpcTimeout; + private final int writeRpcTimeout; private final int rpcConnectTimeout; private final long writeBufferPeriodicFlushTimeoutMs; private final long writeBufferPeriodicFlushTimerTickMs; @@ -70,10 +73,16 @@ public OHConnectionConfiguration(Configuration conf) { } this.database = database; this.writeBufferSize = conf.getLong(WRITE_BUFFER_SIZE_KEY, WRITE_BUFFER_SIZE_DEFAULT); + this.metaOperationTimeout = conf.getInt(HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT, + HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT); this.operationTimeout = conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT); this.rpcTimeout = conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT); + this.readRpcTimeout = conf.getInt(HConstants.HBASE_RPC_READ_TIMEOUT_KEY, + HConstants.DEFAULT_HBASE_RPC_TIMEOUT); + this.writeRpcTimeout = conf.getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY, + HConstants.DEFAULT_HBASE_RPC_TIMEOUT); this.writeBufferPeriodicFlushTimeoutMs = conf.getLong( WRITE_BUFFER_PERIODIC_FLUSH_TIMEOUT_MS, WRITE_BUFFER_PERIODIC_FLUSH_TIMEOUT_MS_DEFAULT); this.writeBufferPeriodicFlushTimerTickMs = conf.getLong( @@ -117,6 +126,10 @@ public long getWriteBufferSize() { return this.writeBufferSize; } + public int getMetaOperationTimeout() { + return this.metaOperationTimeout; + } + public int getOperationTimeout() { return this.operationTimeout; } @@ -133,6 +146,14 @@ public int getRpcTimeout() { return this.rpcTimeout; } + public int getReadRpcTimeout() { + return this.readRpcTimeout; + } + + public int getWriteRpcTimeout() { + return this.writeRpcTimeout; + } + public int getRpcConnectTimeout() { return this.rpcConnectTimeout; } diff --git a/src/main/java/com/alipay/oceanbase/hbase/util/OHConnectionImpl.java b/src/main/java/com/alipay/oceanbase/hbase/util/OHConnectionImpl.java index 3964f52f..41a0690a 100644 --- a/src/main/java/com/alipay/oceanbase/hbase/util/OHConnectionImpl.java +++ b/src/main/java/com/alipay/oceanbase/hbase/util/OHConnectionImpl.java @@ -176,8 +176,19 @@ public boolean isClosed() { } @Override - public TableBuilder getTableBuilder(TableName tableName, ExecutorService executorService) { - throw new FeatureNotSupportedException("not supported yet'"); + public TableBuilder getTableBuilder(TableName tableName, ExecutorService pool) { + return new ObTableBuilderBase(tableName, connectionConfig) { + @Override + public Table build() { + try { + return new OHTable(OHConnectionImpl.this, this, + OHConnectionImpl.this.connectionConfig, pool); + } catch (Exception e) { + LOGGER.error("Fail to build new OHTable", e); + throw new RuntimeException(e); + } + } + }; } @Override diff --git a/src/main/java/com/alipay/oceanbase/hbase/util/ObTableBuilderBase.java b/src/main/java/com/alipay/oceanbase/hbase/util/ObTableBuilderBase.java new file mode 100644 index 00000000..48fad23b --- /dev/null +++ b/src/main/java/com/alipay/oceanbase/hbase/util/ObTableBuilderBase.java @@ -0,0 +1,74 @@ +package com.alipay.oceanbase.hbase.util; + +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.TableBuilder; +import org.apache.yetus.audience.InterfaceAudience; + +@InterfaceAudience.Private +abstract public class ObTableBuilderBase implements TableBuilder { + protected TableName tableName; + + protected int operationTimeout; + + protected int rpcTimeout; + + protected int readRpcTimeout; + + protected int writeRpcTimeout; + + ObTableBuilderBase(TableName tableName, OHConnectionConfiguration ohConnConf) { + if (tableName == null) { + throw new IllegalArgumentException("The provided tableName is null"); + } + this.tableName = tableName; + this.operationTimeout = tableName.isSystemTable() ? ohConnConf.getMetaOperationTimeout() + : ohConnConf.getOperationTimeout(); + this.rpcTimeout = ohConnConf.getRpcTimeout(); + this.readRpcTimeout = ohConnConf.getReadRpcTimeout(); + this.writeRpcTimeout = ohConnConf.getWriteRpcTimeout(); + } + + @Override + public ObTableBuilderBase setOperationTimeout(int timeout) { + this.operationTimeout = timeout; + return this; + } + + @Override + public ObTableBuilderBase setRpcTimeout(int timeout) { + this.rpcTimeout = timeout; + return this; + } + + @Override + public ObTableBuilderBase setReadRpcTimeout(int timeout) { + this.readRpcTimeout = timeout; + return this; + } + + @Override + public ObTableBuilderBase setWriteRpcTimeout(int timeout) { + this.writeRpcTimeout = timeout; + return this; + } + + public TableName getTableName() { + return this.tableName; + } + + public int getOperationTimeout() { + return this.operationTimeout; + } + + public int getRpcTimeout() { + return this.rpcTimeout; + } + + public int getReadRpcTimeout() { + return this.readRpcTimeout; + } + + public int getWriteRpcTimeout() { + return this.writeRpcTimeout; + } +} diff --git a/src/main/java/com/alipay/oceanbase/hbase/util/ObTableClientManager.java b/src/main/java/com/alipay/oceanbase/hbase/util/ObTableClientManager.java index 036ce265..5db940aa 100644 --- a/src/main/java/com/alipay/oceanbase/hbase/util/ObTableClientManager.java +++ b/src/main/java/com/alipay/oceanbase/hbase/util/ObTableClientManager.java @@ -21,8 +21,6 @@ import com.alipay.oceanbase.rpc.constant.Constants; import com.google.common.base.Objects; import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.client.ConnectionConfiguration; import java.io.IOException; import java.util.Map; diff --git a/src/test/java/com/alipay/oceanbase/hbase/HTableMultiCFTestBase.java b/src/test/java/com/alipay/oceanbase/hbase/HTableMultiCFTestBase.java index 055aa1fb..4511da54 100644 --- a/src/test/java/com/alipay/oceanbase/hbase/HTableMultiCFTestBase.java +++ b/src/test/java/com/alipay/oceanbase/hbase/HTableMultiCFTestBase.java @@ -18,9 +18,9 @@ package com.alipay.oceanbase.hbase; import com.alipay.oceanbase.hbase.util.OHBufferedMutatorImpl; -import com.alipay.oceanbase.hbase.util.ObHTableTestUtil; import org.apache.hadoop.conf.Configuration; import com.alipay.oceanbase.rpc.mutation.result.MutationResult; +import com.alipay.oceanbase.hbase.util.ObHTableTestUtil; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.TableName; @@ -31,8 +31,10 @@ import org.junit.*; import org.junit.rules.ExpectedException; +import java.io.IOException; import java.util.*; +import static org.apache.hadoop.hbase.ipc.RpcClient.SOCKET_TIMEOUT_CONNECT; import static org.apache.hadoop.hbase.util.Bytes.toBytes; import static org.junit.Assert.*; @@ -363,6 +365,80 @@ public void testDeleteFamilyVerison() throws Exception { multiCfHTable.delete(deleteKey3Family); } + @Test + public void testMultiColumnFamilyTableBuilder() throws Exception { + byte[] family1 = "family_with_group1".getBytes(); + byte[] family2 = "family_with_group2".getBytes(); + byte[] family3 = "family_with_group3".getBytes(); + + byte[] family1_column1 = "family1_column1".getBytes(); + byte[] family1_column2 = "family1_column2".getBytes(); + byte[] family1_column3 = "family1_column3".getBytes(); + byte[] family2_column1 = "family2_column1".getBytes(); + byte[] family2_column2 = "family2_column2".getBytes(); + byte[] family3_column1 = "family3_column1".getBytes(); + byte[] family1_value = "VVV1".getBytes(); + byte[] family2_value = "VVV2".getBytes(); + byte[] family3_value = "VVV3".getBytes(); + + Configuration conf = ObHTableTestUtil.newConfiguration(); + conf.set("rs.list.acquire.read.timeout", "10000"); + conf.set(SOCKET_TIMEOUT_CONNECT, "15000"); + Connection connection = ConnectionFactory.createConnection(conf); + TableName tableName = TableName.valueOf("test_multi_cf"); + TableBuilder builder = connection.getTableBuilder(tableName, null); + // build a OHTable with default params + Table tmpMultiCfHTable = builder.build(); + + Delete delete = new Delete(toBytes("Key0")); + delete.addFamily(family1); + delete.addFamily(family2); + delete.addFamily(family3); + tmpMultiCfHTable.delete(delete); + + Put put = new Put(toBytes("Key0")); + put.addColumn(family1, family1_column1, family1_value); + put.addColumn(family1, family1_column2, family1_value); + put.addColumn(family1, family1_column3, family1_value); + put.addColumn(family2, family2_column1, family2_value); + put.addColumn(family2, family2_column2, family2_value); + put.addColumn(family3, family3_column1, family3_value); + tmpMultiCfHTable.put(put); + + int count = 0; + Get get = new Get(toBytes("Key0")); + get.setMaxVersions(); + Result r = tmpMultiCfHTable.get(get); + Assert.assertEquals(6, r.rawCells().length); + + delete = new Delete(toBytes("Key0")); + delete.addFamily(family1); + delete.addFamily(family2); + delete.addFamily(family3); + tmpMultiCfHTable.delete(delete); + r = tmpMultiCfHTable.get(get); + Assert.assertEquals(0, r.rawCells().length); + + // set params for TableBuilder + builder.setOperationTimeout(1500000); + builder.setRpcTimeout(40000); + tmpMultiCfHTable = builder.build(); + + put = new Put(toBytes("Key0")); + put.addColumn(family1, family1_column1, family1_value); + put.addColumn(family1, family1_column2, family1_value); + put.addColumn(family2, family2_column1, family2_value); + put.addColumn(family3, family3_column1, family3_value); + tmpMultiCfHTable.put(put); + + r = tmpMultiCfHTable.get(get); + Assert.assertEquals(4, r.rawCells().length); + + tmpMultiCfHTable.delete(delete); + r = tmpMultiCfHTable.get(get); + Assert.assertEquals(0, r.rawCells().length); + } + @Test public void testMultiColumnFamilyBufferedMutator() throws Exception { byte[] family1 = "family_with_group1".getBytes(); diff --git a/src/test/java/com/alipay/oceanbase/hbase/HTableTestBase.java b/src/test/java/com/alipay/oceanbase/hbase/HTableTestBase.java index 43434d4e..0264f997 100644 --- a/src/test/java/com/alipay/oceanbase/hbase/HTableTestBase.java +++ b/src/test/java/com/alipay/oceanbase/hbase/HTableTestBase.java @@ -22,6 +22,7 @@ import org.apache.hadoop.hbase.client.*; import org.apache.hadoop.hbase.filter.*; import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException; +import org.apache.hadoop.hbase.io.TimeRange; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.junit.Assert; @@ -4705,6 +4706,71 @@ public void testCheckAndPut() throws IOException, InterruptedException { r = hTable.get(get); Assert.assertEquals(3, r.rawCells().length); Assert.assertEquals("value1", Bytes.toString(CellUtil.cloneValue(r.rawCells()[0]))); + + // test CheckAndMutateBuilder + Delete delete = new Delete(key.getBytes()); + delete.addFamily(family.getBytes()); + hTable.delete(delete); + r = hTable.get(get); + Assert.assertEquals(0, r.rawCells().length); + + put = new Put(key.getBytes()); + put.addColumn(family.getBytes(), column.getBytes(), value.getBytes()); + hTable.put(put); + get = new Get(key.getBytes()); + get.setMaxVersions(Integer.MAX_VALUE); + get.addColumn(family.getBytes(), column.getBytes()); + r = hTable.get(get); + Assert.assertEquals(1, r.rawCells().length); + + Table.CheckAndMutateBuilder builder = hTable.checkAndMutate(toBytes(key), toBytes(family)); + put = new Put(key.getBytes()); + put.addColumn(family.getBytes(), column.getBytes(), "value1".getBytes()); + ret = builder.qualifier(toBytes(column)).ifEquals(toBytes(value)).thenPut(put); + Assert.assertTrue(ret); + Put difFamPut = new Put(key.getBytes()); + difFamPut.addColumn("family_group".getBytes(), column.getBytes(), "value1".getBytes()); + Assert.assertThrows(IOException.class, () -> { + builder.qualifier(toBytes(column)).ifEquals(toBytes(value)).thenPut(difFamPut); + }); + + ret = builder.qualifier(toBytes(column)) + .ifMatches(CompareOperator.GREATER, toBytes("value1")).thenPut(put); + Assert.assertFalse(ret); + ret = builder.qualifier(toBytes(column)) + .ifMatches(CompareOperator.GREATER_OR_EQUAL, toBytes("value1")).thenPut(put); + Assert.assertTrue(ret); + ret = builder.qualifier(toBytes(column)).ifMatches(CompareOperator.LESS, toBytes("")) + .thenPut(put); + Assert.assertFalse(ret); + ret = builder.qualifier(toBytes(column)) + .ifMatches(CompareOperator.LESS_OR_EQUAL, toBytes("")).thenPut(put); + Assert.assertFalse(ret); + + get = new Get(key.getBytes()); + get.setMaxVersions(Integer.MAX_VALUE); + get.addColumn(family.getBytes(), column.getBytes()); + r = hTable.get(get); + Assert.assertEquals(3, r.rawCells().length); + Assert.assertEquals("value1", Bytes.toString(CellUtil.cloneValue(r.rawCells()[0]))); + + // test TimeRange + long t = System.currentTimeMillis(); + put = new Put(key.getBytes()); + put.addColumn(family.getBytes(), column.getBytes(), t, value.getBytes()); + put.addColumn(family.getBytes(), column.getBytes(), t + 3, "value1".getBytes()); + put.addColumn(family.getBytes(), column.getBytes(), t + 5, "value2".getBytes()); + hTable.put(put); + put = new Put(toBytes(key)); + put.addColumn(toBytes(family), toBytes(column), toBytes(value)); + TimeRange timeRange = new TimeRange(t + 1, t + 3); + ret = builder.qualifier(toBytes(column)).timeRange(timeRange).ifEquals(toBytes("value1")) + .thenPut(put); + Assert.assertFalse(ret); + timeRange = new TimeRange(t, t + 2); + ret = builder.qualifier(toBytes(column)).timeRange(timeRange).ifEquals(toBytes(value)) + .thenPut(put); + Assert.assertTrue(ret); } @Test @@ -4799,6 +4865,104 @@ public void testCheckAndDelete() throws IOException { r = hTable.get(get); Assert.assertEquals(0, r.rawCells().length); + // test CheckAndMutateBuilder + delete = new Delete(key.getBytes()); + delete.addFamily(family.getBytes()); + hTable.delete(delete); + put = new Put(key.getBytes()); + put.addColumn(family.getBytes(), column.getBytes(), value.getBytes()); + hTable.put(put); + + // check delete column + delete = new Delete(key.getBytes()); + delete.addColumn(family.getBytes(), column.getBytes()); + Table.CheckAndMutateBuilder builder = hTable.checkAndMutate(toBytes(key), toBytes(family)); + ret = builder.qualifier(toBytes(column)).ifEquals(toBytes(value)).thenDelete(delete); + Assert.assertTrue(ret); + put = new Put(key.getBytes()); + put.addColumn(family.getBytes(), column.getBytes(), "value6".getBytes()); + hTable.put(put); + ret = builder.qualifier(toBytes(column)) + .ifMatches(CompareOperator.GREATER, toBytes("value5")).thenDelete(delete); + Assert.assertTrue(ret); + put = new Put(key.getBytes()); + put.addColumn(family.getBytes(), column.getBytes(), "value5".getBytes()); + hTable.put(put); + ret = builder.qualifier(toBytes(column)) + .ifMatches(CompareOperator.GREATER_OR_EQUAL, toBytes("value5")).thenDelete(delete); + Assert.assertTrue(ret); + put = new Put(key.getBytes()); + put.addColumn(family.getBytes(), column.getBytes(), "value1".getBytes()); + hTable.put(put); + ret = builder.qualifier(toBytes(column)).ifMatches(CompareOperator.LESS, toBytes("value1")) + .thenDelete(delete); + Assert.assertFalse(ret); + ret = builder.qualifier(toBytes(column)) + .ifMatches(CompareOperator.LESS_OR_EQUAL, toBytes("value1")).thenDelete(delete); + Assert.assertTrue(ret); + + get = new Get(key.getBytes()); + get.setMaxVersions(Integer.MAX_VALUE); + get.addColumn(family.getBytes(), column.getBytes()); + r = hTable.get(get); + Assert.assertEquals(0, r.rawCells().length); + + // check delete columns + t = System.currentTimeMillis(); + put = new Put(key.getBytes()); + put.addColumn(family.getBytes(), column.getBytes(), t, value.getBytes()); + put.addColumn(family.getBytes(), column.getBytes(), t + 1, value.getBytes()); + hTable.put(put); + get = new Get(key.getBytes()); + get.setMaxVersions(Integer.MAX_VALUE); + get.addColumn(family.getBytes(), column.getBytes()); + r = hTable.get(get); + Assert.assertEquals(2, r.rawCells().length); + delete = new Delete(key.getBytes()); + delete.addColumns(family.getBytes(), column.getBytes()); + ret = builder.qualifier(toBytes(column)).ifEquals(toBytes(value)).thenDelete(delete); + Assert.assertTrue(ret); + get = new Get(key.getBytes()); + get.setMaxVersions(Integer.MAX_VALUE); + get.addColumn(family.getBytes(), column.getBytes()); + r = hTable.get(get); + Assert.assertEquals(0, r.rawCells().length); + + // check delete family + t = System.currentTimeMillis(); + put = new Put(key.getBytes()); + put.addColumn(family.getBytes(), column.getBytes(), t, value.getBytes()); + put.addColumn(family.getBytes(), column.getBytes(), t + 1, value.getBytes()); + put.addColumn(family.getBytes(), column2.getBytes(), t, value.getBytes()); + put.addColumn(family.getBytes(), column2.getBytes(), t + 1, value.getBytes()); + hTable.put(put); + get = new Get(key.getBytes()); + get.setMaxVersions(Integer.MAX_VALUE); + get.addFamily(family.getBytes()); + r = hTable.get(get); + Assert.assertEquals(4, r.rawCells().length); + + // test TimeRange + put = new Put(key.getBytes()); + put.addColumn(family.getBytes(), column.getBytes(), value.getBytes()); + TimeRange timeRange = new TimeRange(t + 2, t + 4); + ret = builder.qualifier(toBytes(column)).timeRange(timeRange).ifEquals(toBytes(value)) + .thenPut(put); + Assert.assertFalse(ret); + timeRange = new TimeRange(t, t + 2); + ret = builder.qualifier(toBytes(column)).timeRange(timeRange).ifEquals(toBytes(value)) + .thenPut(put); + Assert.assertTrue(ret); + + delete = new Delete(key.getBytes()); + delete.addFamily(family.getBytes()); + ret = builder.qualifier(toBytes(column)).ifEquals(toBytes(value)).thenDelete(delete); + Assert.assertTrue(ret); + get = new Get(key.getBytes()); + get.setMaxVersions(Integer.MAX_VALUE); + get.addFamily(family.getBytes()); + r = hTable.get(get); + Assert.assertEquals(0, r.rawCells().length); } @Test @@ -4943,6 +5107,163 @@ public void testCheckAndMutate() throws IOException { get.setMaxVersions(Integer.MAX_VALUE); r = hTable.get(get); Assert.assertEquals(10, r.rawCells().length); + + // test CheckAndMutateBuilder + Delete delete = new Delete(key.getBytes()); + delete.addFamily(family.getBytes()); + hTable.delete(delete); + + t = System.currentTimeMillis(); + // put + put1 = new Put(key.getBytes()); + put1.addColumn(family.getBytes(), column1.getBytes(), t, value1.getBytes()); + put1.addColumn(family.getBytes(), column2.getBytes(), t, value2.getBytes()); + + put2 = new Put(key.getBytes()); + put2.addColumn(family.getBytes(), column1.getBytes(), t + 3, value2.getBytes()); + put2.addColumn(family.getBytes(), column2.getBytes(), t + 3, value1.getBytes()); + + put3 = new Put(key.getBytes()); + put3.addColumn(family.getBytes(), column1.getBytes(), t + 5, value1.getBytes()); + put3.addColumn(family.getBytes(), column2.getBytes(), t + 5, value2.getBytes()); + + rowMutations = new RowMutations(key.getBytes()); + rowMutations.add(put1); + rowMutations.add(put2); + rowMutations.add(put3); + + //put data + Table.CheckAndMutateBuilder builder = hTable.checkAndMutate(toBytes(key), + family.getBytes(StandardCharsets.UTF_8)); + ret = builder.qualifier(toBytes(column1)).ifNotExists().thenMutate(rowMutations); + Assert.assertTrue(ret); + + get = new Get(key.getBytes()); + get.addFamily(family.getBytes()); + get.setMaxVersions(Integer.MAX_VALUE); + r = hTable.get(get); + Assert.assertEquals(6, r.rawCells().length); + + t = System.currentTimeMillis() + 7; + // put + put1 = new Put(key.getBytes()); + put1.addColumn(family.getBytes(), column1.getBytes(), t, value1.getBytes()); + put1.addColumn(family.getBytes(), column2.getBytes(), t, value2.getBytes()); + + put2 = new Put(key.getBytes()); + put2.addColumn(family.getBytes(), column1.getBytes(), t + 3, value2.getBytes()); + put2.addColumn(family.getBytes(), column2.getBytes(), t + 3, value1.getBytes()); + + put3 = new Put(key.getBytes()); + put3.addColumn(family.getBytes(), column1.getBytes(), t + 5, value1.getBytes()); + put3.addColumn(family.getBytes(), column2.getBytes(), t + 5, value2.getBytes()); + rowMutations = new RowMutations(key.getBytes()); + rowMutations.add(put1); + rowMutations.add(put2); + rowMutations.add(put3); + // test greater op + ret = builder.qualifier(toBytes(column1)).ifMatches(CompareOperator.LESS, toBytes(value1)) + .thenMutate(rowMutations); + Assert.assertFalse(ret); + get = new Get(key.getBytes()); + get.addFamily(family.getBytes()); + get.setMaxVersions(Integer.MAX_VALUE); + r = hTable.get(get); + Assert.assertEquals(6, r.rawCells().length); + + // test less op + ret = builder.qualifier(toBytes(column1)).ifMatches(CompareOperator.LESS, toBytes(value2)) + .thenMutate(rowMutations); + Assert.assertTrue(ret); + get = new Get(key.getBytes()); + get.addFamily(family.getBytes()); + get.setMaxVersions(Integer.MAX_VALUE); + r = hTable.get(get); + Assert.assertEquals(12, r.rawCells().length); + + t = System.currentTimeMillis() + 14; + // put + put1 = new Put(key.getBytes()); + put1.addColumn(family.getBytes(), column1.getBytes(), t, value1.getBytes()); + put1.addColumn(family.getBytes(), column2.getBytes(), t, value2.getBytes()); + + put2 = new Put(key.getBytes()); + put2.addColumn(family.getBytes(), column1.getBytes(), t + 3, value2.getBytes()); + put2.addColumn(family.getBytes(), column2.getBytes(), t + 3, value1.getBytes()); + + put3 = new Put(key.getBytes()); + put3.addColumn(family.getBytes(), column1.getBytes(), t + 5, value1.getBytes()); + put3.addColumn(family.getBytes(), column2.getBytes(), t + 5, value2.getBytes()); + rowMutations = new RowMutations(key.getBytes()); + rowMutations.add(put1); + rowMutations.add(put2); + rowMutations.add(put3); + // test NO_OP + try { + builder.qualifier(toBytes(column1)).ifMatches(CompareOperator.NO_OP, toBytes(value1)) + .thenMutate(rowMutations); + fail(); + } catch (Exception e) { + Assert.assertTrue(e.getMessage().contains("checkAndMutate")); + } + + // test equal op + ret = builder.qualifier(toBytes(column1)).ifEquals(toBytes(value1)) + .thenMutate(rowMutations); + Assert.assertTrue(ret); + get = new Get(key.getBytes()); + get.addFamily(family.getBytes()); + get.setMaxVersions(Integer.MAX_VALUE); + r = hTable.get(get); + Assert.assertEquals(18, r.rawCells().length); + + t = System.currentTimeMillis() + 21; + // put + put1 = new Put(key.getBytes()); + put1.addColumn(family.getBytes(), column1.getBytes(), t, value1.getBytes()); + put1.addColumn(family.getBytes(), column2.getBytes(), t, value2.getBytes()); + + // delete + delete1 = new Delete(key.getBytes()); + delete1.addColumns(family.getBytes(), column1.getBytes()); + + // check delete and put + rowMutations = new RowMutations(key.getBytes()); + rowMutations.add(delete1); + rowMutations.add(put1); + ret = builder.qualifier(toBytes(column1)).ifEquals(toBytes(value1)) + .thenMutate(rowMutations); + Assert.assertTrue(ret); + get = new Get(key.getBytes()); + get.addColumn(family.getBytes(), column1.getBytes()); + get.setMaxVersions(Integer.MAX_VALUE); + r = hTable.get(get); + Assert.assertEquals(1, r.rawCells().length); + + get = new Get(key.getBytes()); + get.addColumn(family.getBytes(), column2.getBytes()); + get.setMaxVersions(Integer.MAX_VALUE); + r = hTable.get(get); + Assert.assertEquals(10, r.rawCells().length); + + // check TimeRange + put1 = new Put(toBytes(key)); + put1.addColumn(toBytes(family), toBytes(column1), toBytes(value1)); + rowMutations = new RowMutations(key.getBytes()); + rowMutations.add(put1); + rowMutations.add(delete1); + TimeRange timeRange = new TimeRange(t + 1, t + 3); + ret = builder.qualifier(toBytes(column1)).timeRange(timeRange).ifEquals(toBytes(value1)) + .thenMutate(rowMutations); + Assert.assertFalse(ret); + timeRange = new TimeRange(t, t + 2); + ret = builder.qualifier(toBytes(column1)).timeRange(timeRange).ifEquals(toBytes(value1)) + .thenMutate(rowMutations); + Assert.assertTrue(ret); + + delete = new Delete(key.getBytes()); + delete.addFamily(family.getBytes()); + hTable.delete(delete); } @Test diff --git a/src/test/java/com/alipay/oceanbase/hbase/OHConnectionTest.java b/src/test/java/com/alipay/oceanbase/hbase/OHConnectionTest.java index e83981e8..805de10e 100644 --- a/src/test/java/com/alipay/oceanbase/hbase/OHConnectionTest.java +++ b/src/test/java/com/alipay/oceanbase/hbase/OHConnectionTest.java @@ -71,11 +71,6 @@ public void testConnectionByXml() throws Exception { hTable.close(); } - @AfterClass - public static void finish() throws IOException { - hTable.close(); - } - @Test public void testRefreshTableEntry() throws Exception { hTable = ObHTableTestUtil.newOHTableClient("n1:test"); @@ -84,6 +79,55 @@ public void testRefreshTableEntry() throws Exception { ((OHTableClient) hTable).refreshTableEntry("family1", true); } + @After + public void after() throws IOException { + hTable.close(); + } + + @Test + public void testGetTableByTableBuilder() throws Exception { + Configuration conf = ObHTableTestUtil.newConfiguration(); + conf.set("rs.list.acquire.read.timeout", "10000"); + conf.set(SOCKET_TIMEOUT_CONNECT, "15000"); + connection = ConnectionFactory.createConnection(conf); + TableName tableName = TableName.valueOf("test"); + TableBuilder builder = connection.getTableBuilder(tableName, null); + // build a OHTable with default params + hTable = builder.build(); + testBasic(); + + // set params for TableBuilder + long keepAliveTime = conf.getLong("hbase.hconnection.threads.keepalivetime", 60); + ThreadPoolExecutor pool = new ThreadPoolExecutor(10, 256, keepAliveTime, TimeUnit.SECONDS, + new SynchronousQueue(), Threads.newDaemonThreadFactory("htable")); + pool.allowCoreThreadTimeOut(true); + builder = connection.getTableBuilder(tableName, pool); + builder.setRpcTimeout(40000); + builder.setReadRpcTimeout(30000); + builder.setWriteRpcTimeout(50000); + builder.setOperationTimeout(1500000); + hTable = builder.build(); + Assert.assertEquals(40000, hTable.getRpcTimeout()); + Assert.assertEquals(30000, hTable.getReadRpcTimeout()); + Assert.assertEquals(50000, hTable.getWriteRpcTimeout()); + Assert.assertEquals(1500000, hTable.getOperationTimeout()); + testBasic(); + + hTable.close(); + Assert.assertFalse(pool.isShutdown()); + pool.shutdown(); + try { + if (!pool.awaitTermination(10, TimeUnit.SECONDS)) { + System.out + .println("close() failed to terminate pool after 10 seconds. Abandoning pool."); + } + } catch (InterruptedException e) { + System.out.println("waitForTermination interrupted"); + Thread.currentThread().interrupt(); + } + Assert.assertTrue(pool.isShutdown()); + } + private void testBasic() throws Exception { String key = "putKey"; String column1 = "putColumn1"; diff --git a/src/test/java/unit_test_db.sql b/src/test/java/unit_test_db.sql index 8fe8a9b3..9591c825 100644 --- a/src/test/java/unit_test_db.sql +++ b/src/test/java/unit_test_db.sql @@ -204,6 +204,8 @@ CREATE TABLE `test_multi_cf$family_with_group3` ( PRIMARY KEY (`K`, `Q`, `T`) ) TABLEGROUP = test_multi_cf PARTITION BY KEY(`K`) PARTITIONS 3; +---------------------------- n1 database ---------------------------- +CREATE DATABASE IF NOT EXISTS `n1`; USE `n1`; CREATE TABLE `n1:test$family1` ( `K` varbinary(1024) NOT NULL,