diff --git a/src/main/java/com/alipay/oceanbase/hbase/OHTable.java b/src/main/java/com/alipay/oceanbase/hbase/OHTable.java index 0a3bbbf7..706954d5 100644 --- a/src/main/java/com/alipay/oceanbase/hbase/OHTable.java +++ b/src/main/java/com/alipay/oceanbase/hbase/OHTable.java @@ -104,6 +104,16 @@ public class OHTable implements Table { */ private int rpcTimeout; + /** + * timeout for each read rpc request + */ + private int readRpcTimeout; + + /** + * timeout for each write rpc request + */ + private int writeRpcTimeout; + /** * if the Get executing pool is specified by user cleanupPoolOnClose will be false , * which means that user is responsible for the pool @@ -292,7 +302,7 @@ public OHTable(final byte[] tableName, final ObTableClient obTableClient, public OHTable(TableName tableName, Connection connection, OHConnectionConfiguration connectionConfig, ExecutorService executePool) throws IOException { - checkArgument(connection.getConfiguration() != null, "configuration is null."); + checkArgument(connection != null, "connection is null."); checkArgument(tableName != null, "tableName is null."); checkArgument(connection.getConfiguration() != null, "configuration is null."); checkArgument(tableName.getName() != null, "tableNameString is null."); @@ -311,6 +321,8 @@ public OHTable(TableName tableName, Connection connection, this.cleanupPoolOnClose = false; } this.rpcTimeout = connectionConfig.getRpcTimeout(); + this.readRpcTimeout = connectionConfig.getReadRpcTimeout(); + this.writeRpcTimeout = connectionConfig.getWriteRpcTimeout(); this.operationTimeout = connectionConfig.getOperationTimeout(); this.operationExecuteInPool = this.configuration.getBoolean( HBASE_CLIENT_OPERATION_EXECUTE_IN_POOL, @@ -331,6 +343,50 @@ public OHTable(TableName tableName, Connection connection, finishSetUp(); } + public OHTable(Connection connection, ObTableBuilderBase builder, + OHConnectionConfiguration connectionConfig, ExecutorService executePool) + throws IOException { + checkArgument(connection != null, "connection is null."); + checkArgument(connection.getConfiguration() != null, "configuration is null."); + checkArgument(builder != null, "builder is null"); + checkArgument(connectionConfig != null, "connectionConfig is null."); + TableName builderTableName = builder.getTableName(); + this.tableName = builderTableName.getName(); + this.tableNameString = builderTableName.getNameAsString(); + this.configuration = connection.getConfiguration(); + this.executePool = executePool; + if (executePool == null) { + int maxThreads = configuration.getInt(HBASE_HTABLE_PRIVATE_THREADS_MAX, + DEFAULT_HBASE_HTABLE_PRIVATE_THREADS_MAX); + long keepAliveTime = configuration.getLong(HBASE_HTABLE_THREAD_KEEP_ALIVE_TIME, + DEFAULT_HBASE_HTABLE_THREAD_KEEP_ALIVE_TIME); + this.executePool = createDefaultThreadPoolExecutor(1, maxThreads, keepAliveTime); + this.cleanupPoolOnClose = true; + } else { + this.cleanupPoolOnClose = false; + } + this.rpcTimeout = builder.getRpcTimeout(); + this.readRpcTimeout = builder.getReadRpcTimeout(); + this.writeRpcTimeout = builder.getWriteRpcTimeout(); + this.operationTimeout = builder.getOperationTimeout(); + this.operationExecuteInPool = this.configuration.getBoolean( + HBASE_CLIENT_OPERATION_EXECUTE_IN_POOL, + (this.operationTimeout != HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT)); + this.maxKeyValueSize = connectionConfig.getMaxKeyValueSize(); + this.putWriteBufferCheck = this.configuration.getInt(HBASE_HTABLE_PUT_WRITE_BUFFER_CHECK, + DEFAULT_HBASE_HTABLE_PUT_WRITE_BUFFER_CHECK); + this.writeBufferSize = connectionConfig.getWriteBufferSize(); + int numRetries = configuration.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, + HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); + this.obTableClient = ObTableClientManager.getOrCreateObTableClient(setUserDefinedNamespace( + this.tableNameString, connectionConfig)); + this.obTableClient.setRpcExecuteTimeout(rpcTimeout); + this.obTableClient.setRuntimeRetryTimes(numRetries); + setOperationTimeout(operationTimeout); + + finishSetUp(); + } + /** * 创建默认的线程池 * Using the "direct handoff" approach, new threads will only be created @@ -370,11 +426,15 @@ private void finishSetUp() { HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, HConstants.HBASE_REGIONSERVER_LEASE_PERIOD_KEY, HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD); - this.rpcTimeout = configuration.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, - HConstants.DEFAULT_HBASE_RPC_TIMEOUT); - this.operationTimeout = this.configuration.getInt( + this.rpcTimeout = this.rpcTimeout <= 0 ? configuration.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, + HConstants.DEFAULT_HBASE_RPC_TIMEOUT) : this.rpcTimeout; + this.readRpcTimeout = this.readRpcTimeout <= 0 ? configuration.getInt(HConstants.HBASE_RPC_READ_TIMEOUT_KEY, + HConstants.DEFAULT_HBASE_RPC_TIMEOUT) : this.readRpcTimeout; + this.writeRpcTimeout = this.writeRpcTimeout <= 0 ? configuration.getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY, + HConstants.DEFAULT_HBASE_RPC_TIMEOUT) : this.writeRpcTimeout; + this.operationTimeout = this.operationTimeout <= 0 ? this.configuration.getInt( HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, - HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT); + HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT) : this.operationTimeout; this.operationExecuteInPool = this.configuration.getBoolean( HBASE_CLIENT_OPERATION_EXECUTE_IN_POOL, (this.operationTimeout != HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT)); @@ -388,7 +448,7 @@ private void finishSetUp() { private OHConnectionConfiguration setUserDefinedNamespace(String tableNameString, OHConnectionConfiguration ohConnectionConf) - throws IOException { + throws IllegalArgumentException { if (tableNameString.indexOf(':') != -1) { String[] params = tableNameString.split(":"); if (params.length != 2) { @@ -1320,6 +1380,16 @@ public int getRpcTimeout() { return this.rpcTimeout; } + @Override + public int getReadRpcTimeout() { + return this.readRpcTimeout; + } + + @Override + public int getWriteRpcTimeout() { + return this.writeRpcTimeout; + } + public void setRuntimeBatchExecutor(ExecutorService runtimeBatchExecutor) { this.obTableClient.setRuntimeBatchExecutor(runtimeBatchExecutor); } diff --git a/src/main/java/com/alipay/oceanbase/hbase/util/OHConnectionConfiguration.java b/src/main/java/com/alipay/oceanbase/hbase/util/OHConnectionConfiguration.java index fd032267..9b98dd0b 100644 --- a/src/main/java/com/alipay/oceanbase/hbase/util/OHConnectionConfiguration.java +++ b/src/main/java/com/alipay/oceanbase/hbase/util/OHConnectionConfiguration.java @@ -47,10 +47,13 @@ public class OHConnectionConfiguration { private final boolean odpMode; private final long writeBufferSize; private final int operationTimeout; + private final int metaOperationTimeout; private final int scannerCaching; private final long scannerMaxResultSize; private final int maxKeyValueSize; private final int rpcTimeout; + private final int readRpcTimeout; + private final int writeRpcTimeout; private final int rpcConnectTimeout; private final long writeBufferPeriodicFlushTimeoutMs; private final long writeBufferPeriodicFlushTimerTickMs; @@ -70,10 +73,16 @@ public OHConnectionConfiguration(Configuration conf) { } this.database = database; this.writeBufferSize = conf.getLong(WRITE_BUFFER_SIZE_KEY, WRITE_BUFFER_SIZE_DEFAULT); + this.metaOperationTimeout = conf.getInt(HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT, + HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT); this.operationTimeout = conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT); this.rpcTimeout = conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT); + this.readRpcTimeout = conf.getInt(HConstants.HBASE_RPC_READ_TIMEOUT_KEY, + HConstants.DEFAULT_HBASE_RPC_TIMEOUT); + this.writeRpcTimeout = conf.getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY, + HConstants.DEFAULT_HBASE_RPC_TIMEOUT); this.writeBufferPeriodicFlushTimeoutMs = conf.getLong( WRITE_BUFFER_PERIODIC_FLUSH_TIMEOUT_MS, WRITE_BUFFER_PERIODIC_FLUSH_TIMEOUT_MS_DEFAULT); this.writeBufferPeriodicFlushTimerTickMs = conf.getLong( @@ -117,6 +126,10 @@ public long getWriteBufferSize() { return this.writeBufferSize; } + public int getMetaOperationTimeout() { + return this.metaOperationTimeout; + } + public int getOperationTimeout() { return this.operationTimeout; } @@ -133,6 +146,14 @@ public int getRpcTimeout() { return this.rpcTimeout; } + public int getReadRpcTimeout() { + return this.readRpcTimeout; + } + + public int getWriteRpcTimeout() { + return this.writeRpcTimeout; + } + public int getRpcConnectTimeout() { return this.rpcConnectTimeout; } diff --git a/src/main/java/com/alipay/oceanbase/hbase/util/OHConnectionImpl.java b/src/main/java/com/alipay/oceanbase/hbase/util/OHConnectionImpl.java index 3964f52f..41a0690a 100644 --- a/src/main/java/com/alipay/oceanbase/hbase/util/OHConnectionImpl.java +++ b/src/main/java/com/alipay/oceanbase/hbase/util/OHConnectionImpl.java @@ -176,8 +176,19 @@ public boolean isClosed() { } @Override - public TableBuilder getTableBuilder(TableName tableName, ExecutorService executorService) { - throw new FeatureNotSupportedException("not supported yet'"); + public TableBuilder getTableBuilder(TableName tableName, ExecutorService pool) { + return new ObTableBuilderBase(tableName, connectionConfig) { + @Override + public Table build() { + try { + return new OHTable(OHConnectionImpl.this, this, + OHConnectionImpl.this.connectionConfig, pool); + } catch (Exception e) { + LOGGER.error("Fail to build new OHTable", e); + throw new RuntimeException(e); + } + } + }; } @Override diff --git a/src/main/java/com/alipay/oceanbase/hbase/util/ObTableBuilderBase.java b/src/main/java/com/alipay/oceanbase/hbase/util/ObTableBuilderBase.java new file mode 100644 index 00000000..48fad23b --- /dev/null +++ b/src/main/java/com/alipay/oceanbase/hbase/util/ObTableBuilderBase.java @@ -0,0 +1,74 @@ +package com.alipay.oceanbase.hbase.util; + +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.TableBuilder; +import org.apache.yetus.audience.InterfaceAudience; + +@InterfaceAudience.Private +abstract public class ObTableBuilderBase implements TableBuilder { + protected TableName tableName; + + protected int operationTimeout; + + protected int rpcTimeout; + + protected int readRpcTimeout; + + protected int writeRpcTimeout; + + ObTableBuilderBase(TableName tableName, OHConnectionConfiguration ohConnConf) { + if (tableName == null) { + throw new IllegalArgumentException("The provided tableName is null"); + } + this.tableName = tableName; + this.operationTimeout = tableName.isSystemTable() ? ohConnConf.getMetaOperationTimeout() + : ohConnConf.getOperationTimeout(); + this.rpcTimeout = ohConnConf.getRpcTimeout(); + this.readRpcTimeout = ohConnConf.getReadRpcTimeout(); + this.writeRpcTimeout = ohConnConf.getWriteRpcTimeout(); + } + + @Override + public ObTableBuilderBase setOperationTimeout(int timeout) { + this.operationTimeout = timeout; + return this; + } + + @Override + public ObTableBuilderBase setRpcTimeout(int timeout) { + this.rpcTimeout = timeout; + return this; + } + + @Override + public ObTableBuilderBase setReadRpcTimeout(int timeout) { + this.readRpcTimeout = timeout; + return this; + } + + @Override + public ObTableBuilderBase setWriteRpcTimeout(int timeout) { + this.writeRpcTimeout = timeout; + return this; + } + + public TableName getTableName() { + return this.tableName; + } + + public int getOperationTimeout() { + return this.operationTimeout; + } + + public int getRpcTimeout() { + return this.rpcTimeout; + } + + public int getReadRpcTimeout() { + return this.readRpcTimeout; + } + + public int getWriteRpcTimeout() { + return this.writeRpcTimeout; + } +} diff --git a/src/test/java/com/alipay/oceanbase/hbase/HTableMultiCFTestBase.java b/src/test/java/com/alipay/oceanbase/hbase/HTableMultiCFTestBase.java index 055aa1fb..b4f63a4d 100644 --- a/src/test/java/com/alipay/oceanbase/hbase/HTableMultiCFTestBase.java +++ b/src/test/java/com/alipay/oceanbase/hbase/HTableMultiCFTestBase.java @@ -31,8 +31,10 @@ import org.junit.*; import org.junit.rules.ExpectedException; +import java.io.IOException; import java.util.*; +import static org.apache.hadoop.hbase.ipc.RpcClient.SOCKET_TIMEOUT_CONNECT; import static org.apache.hadoop.hbase.util.Bytes.toBytes; import static org.junit.Assert.*; @@ -363,6 +365,80 @@ public void testDeleteFamilyVerison() throws Exception { multiCfHTable.delete(deleteKey3Family); } + @Test + public void testMultiColumnFamilyTableBuilder() throws Exception { + byte[] family1 = "family_with_group1".getBytes(); + byte[] family2 = "family_with_group2".getBytes(); + byte[] family3 = "family_with_group3".getBytes(); + + byte[] family1_column1 = "family1_column1".getBytes(); + byte[] family1_column2 = "family1_column2".getBytes(); + byte[] family1_column3 = "family1_column3".getBytes(); + byte[] family2_column1 = "family2_column1".getBytes(); + byte[] family2_column2 = "family2_column2".getBytes(); + byte[] family3_column1 = "family3_column1".getBytes(); + byte[] family1_value = "VVV1".getBytes(); + byte[] family2_value = "VVV2".getBytes(); + byte[] family3_value = "VVV3".getBytes(); + + Configuration conf = ObHTableTestUtil.newConfiguration(); + conf.set("rs.list.acquire.read.timeout", "10000"); + conf.set(SOCKET_TIMEOUT_CONNECT, "15000"); + Connection connection = ConnectionFactory.createConnection(conf); + TableName tableName = TableName.valueOf("test_multi_cf"); + TableBuilder builder = connection.getTableBuilder(tableName, null); + // build a OHTable with default params + Table tmpMultiCfHTable = builder.build(); + + Delete delete = new Delete(toBytes("Key0")); + delete.addFamily(family1); + delete.addFamily(family2); + delete.addFamily(family3); + tmpMultiCfHTable.delete(delete); + + Put put = new Put(toBytes("Key0")); + put.addColumn(family1, family1_column1, family1_value); + put.addColumn(family1, family1_column2, family1_value); + put.addColumn(family1, family1_column3, family1_value); + put.addColumn(family2, family2_column1, family2_value); + put.addColumn(family2, family2_column2, family2_value); + put.addColumn(family3, family3_column1, family3_value); + tmpMultiCfHTable.put(put); + + int count = 0; + Get get = new Get(toBytes("Key0")); + get.setMaxVersions(); + Result r = tmpMultiCfHTable.get(get); + Assert.assertEquals(6, r.rawCells().length); + + delete = new Delete(toBytes("Key0")); + delete.addFamily(family1); + delete.addFamily(family2); + delete.addFamily(family3); + tmpMultiCfHTable.delete(delete); + r = tmpMultiCfHTable.get(get); + Assert.assertEquals(0, r.rawCells().length); + + // set params for TableBuilder + builder.setOperationTimeout(1500000); + builder.setRpcTimeout(40000); + tmpMultiCfHTable = builder.build(); + + put = new Put(toBytes("Key0")); + put.addColumn(family1, family1_column1, family1_value); + put.addColumn(family1, family1_column2, family1_value); + put.addColumn(family2, family2_column1, family2_value); + put.addColumn(family3, family3_column1, family3_value); + tmpMultiCfHTable.put(put); + + r = tmpMultiCfHTable.get(get); + Assert.assertEquals(4, r.rawCells().length); + + tmpMultiCfHTable.delete(delete); + r = tmpMultiCfHTable.get(get); + Assert.assertEquals(0, r.rawCells().length); + } + @Test public void testMultiColumnFamilyBufferedMutator() throws Exception { byte[] family1 = "family_with_group1".getBytes(); diff --git a/src/test/java/com/alipay/oceanbase/hbase/OHConnectionTest.java b/src/test/java/com/alipay/oceanbase/hbase/OHConnectionTest.java index e83981e8..805de10e 100644 --- a/src/test/java/com/alipay/oceanbase/hbase/OHConnectionTest.java +++ b/src/test/java/com/alipay/oceanbase/hbase/OHConnectionTest.java @@ -71,11 +71,6 @@ public void testConnectionByXml() throws Exception { hTable.close(); } - @AfterClass - public static void finish() throws IOException { - hTable.close(); - } - @Test public void testRefreshTableEntry() throws Exception { hTable = ObHTableTestUtil.newOHTableClient("n1:test"); @@ -84,6 +79,55 @@ public void testRefreshTableEntry() throws Exception { ((OHTableClient) hTable).refreshTableEntry("family1", true); } + @After + public void after() throws IOException { + hTable.close(); + } + + @Test + public void testGetTableByTableBuilder() throws Exception { + Configuration conf = ObHTableTestUtil.newConfiguration(); + conf.set("rs.list.acquire.read.timeout", "10000"); + conf.set(SOCKET_TIMEOUT_CONNECT, "15000"); + connection = ConnectionFactory.createConnection(conf); + TableName tableName = TableName.valueOf("test"); + TableBuilder builder = connection.getTableBuilder(tableName, null); + // build a OHTable with default params + hTable = builder.build(); + testBasic(); + + // set params for TableBuilder + long keepAliveTime = conf.getLong("hbase.hconnection.threads.keepalivetime", 60); + ThreadPoolExecutor pool = new ThreadPoolExecutor(10, 256, keepAliveTime, TimeUnit.SECONDS, + new SynchronousQueue(), Threads.newDaemonThreadFactory("htable")); + pool.allowCoreThreadTimeOut(true); + builder = connection.getTableBuilder(tableName, pool); + builder.setRpcTimeout(40000); + builder.setReadRpcTimeout(30000); + builder.setWriteRpcTimeout(50000); + builder.setOperationTimeout(1500000); + hTable = builder.build(); + Assert.assertEquals(40000, hTable.getRpcTimeout()); + Assert.assertEquals(30000, hTable.getReadRpcTimeout()); + Assert.assertEquals(50000, hTable.getWriteRpcTimeout()); + Assert.assertEquals(1500000, hTable.getOperationTimeout()); + testBasic(); + + hTable.close(); + Assert.assertFalse(pool.isShutdown()); + pool.shutdown(); + try { + if (!pool.awaitTermination(10, TimeUnit.SECONDS)) { + System.out + .println("close() failed to terminate pool after 10 seconds. Abandoning pool."); + } + } catch (InterruptedException e) { + System.out.println("waitForTermination interrupted"); + Thread.currentThread().interrupt(); + } + Assert.assertTrue(pool.isShutdown()); + } + private void testBasic() throws Exception { String key = "putKey"; String column1 = "putColumn1";