org.slf4j
slf4j-api
@@ -403,5 +407,5 @@
-
+
diff --git a/src/main/java/com/alipay/oceanbase/hbase/OHTable.java b/src/main/java/com/alipay/oceanbase/hbase/OHTable.java
index 780957e8..6d94fd06 100644
--- a/src/main/java/com/alipay/oceanbase/hbase/OHTable.java
+++ b/src/main/java/com/alipay/oceanbase/hbase/OHTable.java
@@ -22,9 +22,7 @@
import com.alipay.oceanbase.hbase.execute.ServerCallable;
import com.alipay.oceanbase.hbase.filter.HBaseFilterUtils;
import com.alipay.oceanbase.hbase.result.ClientStreamScanner;
-import com.alipay.oceanbase.hbase.util.OHBaseFuncUtils;
-import com.alipay.oceanbase.hbase.util.ObTableClientManager;
-import com.alipay.oceanbase.hbase.util.TableHBaseLoggerFactory;
+import com.alipay.oceanbase.hbase.util.*;
import com.alipay.oceanbase.rpc.ObTableClient;
import com.alipay.oceanbase.rpc.mutation.BatchOperation;
import com.alipay.oceanbase.rpc.mutation.result.BatchOperationResult;
@@ -39,12 +37,12 @@
import com.alipay.oceanbase.rpc.stream.ObTableClientQueryAsyncStreamResult;
import com.alipay.oceanbase.rpc.stream.ObTableClientQueryStreamResult;
import com.alipay.sofa.common.thread.SofaThreadPoolExecutor;
-import com.alipay.oceanbase.hbase.exception.OperationTimeoutException;
import com.google.protobuf.Descriptors;
import com.google.protobuf.Message;
import com.google.protobuf.Service;
import com.google.protobuf.ServiceException;
+import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.HConstants;
@@ -77,109 +75,95 @@
public class OHTable implements HTableInterface {
- private static final Logger logger = TableHBaseLoggerFactory
- .getLogger(OHTable.class);
+ private static final Logger logger = TableHBaseLoggerFactory
+ .getLogger(OHTable.class);
/**
* the table client for oceanbase
*/
- private final ObTableClient obTableClient;
+ private final ObTableClient obTableClient;
/**
* the ohTable name in byte array
*/
- private final byte[] tableName;
+ private final byte[] tableName;
/**
* the ohTable name in string
*/
- private final String tableNameString;
+ private final String tableNameString;
/**
* operation timeout whose default value is Integer.MaxValue decide the timeout of executing in pool.
*
* if operation timeout is not equal to the default value mean the Get execute in the pool
*/
- private int operationTimeout;
+ private int operationTimeout;
/**
* if the Get executing pool is specified by user cleanupPoolOnClose will be false ,
* which means that user is responsible for the pool
*/
- private boolean cleanupPoolOnClose = true;
+ private boolean cleanupPoolOnClose = true;
/**
* if the obTableClient is specified by user closeClientOnClose will be false ,
* which means that user is responsible for obTableClient
*/
- private boolean closeClientOnClose = true;
+ private boolean closeClientOnClose = true;
/**
* when the operationExecuteInPool is true the Get
* will be executed in the pool.
*/
- private final ExecutorService executePool;
-
- /**
- * the maximum number of threads to allow in the
- * Get executing pool
- */
- private final int maxThreads;
-
- /**
- * the Get executing pool thread keep alive time.
- * when the number of threads is greater than the core, this
- * is the maximum time that excess idle threads will wait for
- * new tasks before terminating.d
- */
- private final long keepAliveTime;
+ private ExecutorService executePool;
/**
* decide whether the Get request will be executed
* in the pool.
*/
- private boolean operationExecuteInPool = false;
+ private boolean operationExecuteInPool = false;
/**
* the buffer of put request
*/
- private final ArrayList writeBuffer = new ArrayList();
+ private final ArrayList writeBuffer = new ArrayList();
/**
* when the put request reach the write buffer size the do put will
* flush commits automatically
*/
- private long writeBufferSize;
+ private long writeBufferSize;
/**
* the do put check write buffer every putWriteBufferCheck puts
*/
- private int putWriteBufferCheck;
+ private int putWriteBufferCheck;
/**
* decide whether clear the buffer when meet exception.the default
* value is true. Be careful about the correctness when set it false
*/
- private boolean clearBufferOnFail = true;
+ private boolean clearBufferOnFail = true;
/**
* whether flush the put automatically
*/
- private boolean autoFlush = true;
+ private boolean autoFlush = true;
/**
* current buffer size
*/
- private long currentWriteBufferSize;
+ private long currentWriteBufferSize;
/**
* the max size of put key value
*/
- private int maxKeyValueSize;
+ private int maxKeyValueSize;
// i.e., doPut checks the writebuffer every X Puts.
/**
* Configuration extends from hbase configuration
*/
- private final Configuration configuration;
+ private final Configuration configuration;
/**
* Creates an object to access a HBase table.
@@ -199,12 +183,14 @@ public OHTable(Configuration configuration, String tableName) throws IOException
this.configuration = configuration;
this.tableName = tableName.getBytes();
this.tableNameString = tableName;
- this.maxThreads = configuration.getInt(HBASE_HTABLE_PRIVATE_THREADS_MAX,
+
+ int maxThreads = configuration.getInt(HBASE_HTABLE_PRIVATE_THREADS_MAX,
DEFAULT_HBASE_HTABLE_PRIVATE_THREADS_MAX);
- this.keepAliveTime = configuration.getLong(HBASE_HTABLE_THREAD_KEEP_ALIVE_TIME,
+ long keepAliveTime = configuration.getLong(HBASE_HTABLE_THREAD_KEEP_ALIVE_TIME,
DEFAULT_HBASE_HTABLE_THREAD_KEEP_ALIVE_TIME);
this.executePool = createDefaultThreadPoolExecutor(1, maxThreads, keepAliveTime);
- this.obTableClient = ObTableClientManager.getOrCreateObTableClient(configuration);
+ this.obTableClient = ObTableClientManager
+ .getOrCreateObTableClient(new OHConnectionConfiguration(configuration));
finishSetUp();
}
@@ -246,15 +232,12 @@ public OHTable(Configuration configuration, final byte[] tableName,
checkArgument(executePool != null && !executePool.isShutdown(),
"executePool is null or executePool is shutdown");
this.configuration = configuration;
- this.executePool = executePool;
this.tableName = tableName;
this.tableNameString = Bytes.toString(tableName);
+ this.executePool = executePool;
this.cleanupPoolOnClose = false;
- this.maxThreads = configuration.getInt(HBASE_HTABLE_PRIVATE_THREADS_MAX,
- DEFAULT_HBASE_HTABLE_PRIVATE_THREADS_MAX);
- this.keepAliveTime = configuration.getLong(HBASE_HTABLE_THREAD_KEEP_ALIVE_TIME,
- DEFAULT_HBASE_HTABLE_THREAD_KEEP_ALIVE_TIME);
- this.obTableClient = ObTableClientManager.getOrCreateObTableClient(configuration);
+ this.obTableClient = ObTableClientManager
+ .getOrCreateObTableClient(new OHConnectionConfiguration(configuration));
finishSetUp();
}
@@ -280,14 +263,46 @@ public OHTable(final byte[] tableName, final ObTableClient obTableClient,
this.tableNameString = Bytes.toString(tableName);
this.cleanupPoolOnClose = false;
this.closeClientOnClose = false;
- this.maxThreads = DEFAULT_HBASE_HTABLE_PRIVATE_THREADS_MAX;
- this.keepAliveTime = DEFAULT_HBASE_HTABLE_THREAD_KEEP_ALIVE_TIME;
this.executePool = executePool;
this.obTableClient = obTableClient;
this.configuration = new Configuration();
finishSetUp();
}
+ public OHTable(TableName tableName, Connection connection,
+ OHConnectionConfiguration connectionConfig, ExecutorService executePool)
+ throws IOException {
+ checkArgument(connection.getConfiguration() != null, "configuration is null.");
+ checkArgument(tableName != null, "tableName is null.");
+ checkArgument(connection.getConfiguration() != null, "configuration is null.");
+ checkArgument(tableName.getName() != null, "tableNameString is null.");
+ checkArgument(connectionConfig != null, "connectionConfig is null.");
+ this.tableNameString = Bytes.toString(tableName.getName());
+ this.configuration = connection.getConfiguration();
+ this.executePool = executePool;
+ if (executePool == null) {
+ int maxThreads = configuration.getInt(HBASE_HTABLE_PRIVATE_THREADS_MAX,
+ DEFAULT_HBASE_HTABLE_PRIVATE_THREADS_MAX);
+ long keepAliveTime = configuration.getLong(HBASE_HTABLE_THREAD_KEEP_ALIVE_TIME,
+ DEFAULT_HBASE_HTABLE_THREAD_KEEP_ALIVE_TIME);
+ this.executePool = createDefaultThreadPoolExecutor(1, maxThreads, keepAliveTime);
+ this.cleanupPoolOnClose = true;
+ } else {
+ this.cleanupPoolOnClose = false;
+ }
+
+ this.operationTimeout = connectionConfig.getOperationTimeout();
+ this.operationExecuteInPool = this.configuration.getBoolean(
+ HBASE_CLIENT_OPERATION_EXECUTE_IN_POOL,
+ (this.operationTimeout != HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT));
+ this.maxKeyValueSize = connectionConfig.getMaxKeyValueSize();
+ this.putWriteBufferCheck = this.configuration.getInt(HBASE_HTABLE_PUT_WRITE_BUFFER_CHECK,
+ DEFAULT_HBASE_HTABLE_PUT_WRITE_BUFFER_CHECK);
+ this.writeBufferSize = connectionConfig.getWriteBufferSize();
+ this.tableName = tableName.getName();
+ this.obTableClient = ObTableClientManager.getOrCreateObTableClient(connectionConfig);
+ }
+
/**
* 创建默认的线程池
* Using the "direct handoff" approach, new threads will only be created
@@ -299,6 +314,7 @@ public OHTable(final byte[] tableName, final ObTableClient obTableClient,
* @param keepAliveTime keep alive time
* @return ThreadPoolExecutor
*/
+ @InterfaceAudience.Private
public static ThreadPoolExecutor createDefaultThreadPoolExecutor(int coreSize, int maxThreads,
long keepAliveTime) {
// NOTE: when SOFA_THREAD_POOL_LOGGING_CAPABILITY is set to true or not set,
@@ -328,26 +344,30 @@ private void finishSetUp() {
this.operationExecuteInPool = this.configuration.getBoolean(
HBASE_CLIENT_OPERATION_EXECUTE_IN_POOL,
(this.operationTimeout != HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT));
- this.maxKeyValueSize = this.configuration.getInt(HBASE_CLIENT_KEYVALUE_MAXSIZE,
- DEFAULT_HBASE_CLIENT_KEYVALUE_MAXSIZE);
+ this.maxKeyValueSize = this.configuration.getInt(MAX_KEYVALUE_SIZE_KEY,
+ MAX_KEYVALUE_SIZE_DEFAULT);
this.putWriteBufferCheck = this.configuration.getInt(HBASE_HTABLE_PUT_WRITE_BUFFER_CHECK,
DEFAULT_HBASE_HTABLE_PUT_WRITE_BUFFER_CHECK);
- this.writeBufferSize = this.configuration.getLong(HBASE_HTABLE_CLIENT_WRITE_BUFFER,
- DEFAULT_HBASE_HTABLE_CLIENT_WRITE_BUFFER);
+ this.writeBufferSize = this.configuration.getLong(WRITE_BUFFER_SIZE_KEY,
+ WRITE_BUFFER_SIZE_DEFAULT);
}
+ @Override
public byte[] getTableName() {
return tableName;
}
+ @Override
public TableName getName() {
- return null;
+ throw new FeatureNotSupportedException("not supported yet.");
}
+ @Override
public Configuration getConfiguration() {
return configuration;
}
+ @Override
public HTableDescriptor getTableDescriptor() {
throw new FeatureNotSupportedException("not supported yet.");
}
@@ -363,38 +383,49 @@ public HTableDescriptor getTableDescriptor() {
* @return true if the specified Get matches one or more keys, false if not
* @throws IOException e
*/
+ @Override
public boolean exists(Get get) throws IOException {
Result r = get(get);
return !r.isEmpty();
}
+ @Override
+ public boolean[] existsAll(List list) throws IOException {
+ throw new FeatureNotSupportedException("not supported yet.");
+ }
+
+ @Override
public Boolean[] exists(List gets) throws IOException {
throw new FeatureNotSupportedException("not supported yet'");
}
+ @Override
public void batch(List extends Row> actions, Object[] results) {
throw new FeatureNotSupportedException("not supported yet.");
}
+ @Override
public Object[] batch(List extends Row> actions) {
throw new FeatureNotSupportedException("not supported yet.");
}
+ @Override
public void batchCallback(List extends Row> actions, Object[] results,
Batch.Callback callback) throws IOException,
InterruptedException {
throw new FeatureNotSupportedException("not supported yet'");
}
+ @Override
public Object[] batchCallback(List extends Row> actions, Batch.Callback callback)
throws IOException,
InterruptedException {
throw new FeatureNotSupportedException("not supported yet'");
}
- public void getKeyValueFromResult(AbstractQueryStreamResult clientQueryStreamResult,
- List keyValueList, boolean isTableGroup,
- byte[] family) throws Exception {
+ private void getKeyValueFromResult(AbstractQueryStreamResult clientQueryStreamResult,
+ List keyValueList, boolean isTableGroup,
+ byte[] family) throws Exception {
byte[][] familyAndQualifier = new byte[2][];
while (clientQueryStreamResult.next()) {
List row = clientQueryStreamResult.getRow();
@@ -416,14 +447,16 @@ public void getKeyValueFromResult(AbstractQueryStreamResult clientQueryStreamRes
}
}
- public String getTargetTableName(String tableNameString) {
+ private String getTargetTableName(String tableNameString) {
if (configuration.getBoolean(HBASE_HTABLE_TEST_LOAD_ENABLE, false)) {
- return tableNameString + configuration.get(HBASE_HTABLE_TEST_LOAD_SUFFIX,
- DEFAULT_HBASE_HTABLE_TEST_LOAD_SUFFIX);
+ return tableNameString
+ + configuration.get(HBASE_HTABLE_TEST_LOAD_SUFFIX,
+ DEFAULT_HBASE_HTABLE_TEST_LOAD_SUFFIX);
}
return tableNameString;
}
+ @Override
public Result get(final Get get) throws IOException {
if (get.getFamilyMap().keySet() == null || get.getFamilyMap().keySet().size() == 0) {
// check nothing, use table group;
@@ -447,7 +480,8 @@ public Result call() throws IOException {
get.getMaxVersions(), null);
obTableQuery = buildObTableQuery(filter, get.getRow(), true, get.getRow(),
true, -1);
- request = buildObTableQueryRequest(obTableQuery, getTargetTableName(tableNameString));
+ request = buildObTableQueryRequest(obTableQuery,
+ getTargetTableName(tableNameString));
clientQueryStreamResult = (ObTableClientQueryStreamResult) obTableClient
.execute(request);
@@ -484,6 +518,7 @@ public Result call() throws IOException {
return executeServerCallable(serverCallable);
}
+ @Override
public Result[] get(List gets) throws IOException {
Result[] results = new Result[gets.size()];
for (int i = 0; i < gets.size(); i++) {
@@ -498,10 +533,12 @@ public Result[] get(List gets) throws IOException {
* @param family family
* @return Result
*/
+ @Override
public Result getRowOrBefore(byte[] row, byte[] family) {
throw new FeatureNotSupportedException("not supported yet.");
}
+ @Override
public ResultScanner getScanner(final Scan scan) throws IOException {
if (scan.getFamilyMap().keySet() == null || scan.getFamilyMap().keySet().size() == 0) {
@@ -535,7 +572,8 @@ public ResultScanner call() throws IOException {
if (scan.isReversed()) { // reverse scan 时设置为逆序
obTableQuery.setScanOrder(ObScanOrder.Reverse);
}
- request = buildObTableQueryAsyncRequest(obTableQuery, getTargetTableName(tableNameString));
+ request = buildObTableQueryAsyncRequest(obTableQuery,
+ getTargetTableName(tableNameString));
clientQueryAsyncStreamResult = (ObTableClientQueryAsyncStreamResult) obTableClient
.execute(request);
return new ClientStreamScanner(clientQueryAsyncStreamResult,
@@ -581,22 +619,26 @@ public ResultScanner call() throws IOException {
return executeServerCallable(serverCallable);
}
+ @Override
public ResultScanner getScanner(final byte[] family) throws IOException {
Scan scan = new Scan();
scan.addFamily(family);
return getScanner(scan);
}
+ @Override
public ResultScanner getScanner(byte[] family, byte[] qualifier) throws IOException {
Scan scan = new Scan();
scan.addColumn(family, qualifier);
return getScanner(scan);
}
+ @Override
public void put(Put put) throws IOException {
doPut(Collections.singletonList(put));
}
+ @Override
public void put(List puts) throws IOException {
doPut(puts);
}
@@ -653,9 +695,25 @@ private void validatePut(Put put) {
* @return boolean
* @throws IOException if failed
*/
+ @Override
public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier, byte[] value, Put put)
throws IOException {
- return checkAndMutation(row, family, qualifier, value, put);
+ return checkAndPut(row, family, qualifier, CompareFilter.CompareOp.EQUAL, value, put);
+ }
+
+ @Override
+ public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier,
+ CompareFilter.CompareOp compareOp, byte[] value, Put put)
+ throws IOException {
+ RowMutations rowMutations = new RowMutations(row);
+ rowMutations.add(put);
+ try {
+ return checkAndMutation(row, family, qualifier, compareOp, value, rowMutations);
+ } catch (Exception e) {
+ logger.error(LCD.convert("01-00005"), put, tableNameString, e);
+ throw new IOException("checkAndPut type table:" + tableNameString + " e.msg:"
+ + e.getMessage() + " error.", e);
+ }
}
private void innerDelete(Delete delete) throws IOException {
@@ -685,11 +743,13 @@ private void innerDelete(Delete delete) throws IOException {
}
}
+ @Override
public void delete(Delete delete) throws IOException {
checkFamilyViolation(delete.getFamilyMap().keySet());
innerDelete(delete);
}
+ @Override
public void delete(List deletes) throws IOException {
for (Delete delete : deletes) {
innerDelete(delete);
@@ -706,50 +766,76 @@ public void delete(List deletes) throws IOException {
* @return boolean
* @throws IOException if failed
*/
+ @Override
public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier, byte[] value,
Delete delete) throws IOException {
- return checkAndMutation(row, family, qualifier, value, delete);
+ return checkAndDelete(row, family, qualifier, CompareFilter.CompareOp.EQUAL, value, delete);
}
- private boolean checkAndMutation(byte[] row, byte[] family, byte[] qualifier, byte[] value,
- Mutation mutation) throws IOException {
+ @Override
+ public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier,
+ CompareFilter.CompareOp compareOp, byte[] value, Delete delete)
+ throws IOException {
+ RowMutations rowMutations = new RowMutations(row);
+ rowMutations.add(delete);
try {
+ return checkAndMutation(row, family, qualifier, compareOp, value, rowMutations);
+ } catch (Exception e) {
+ logger.error(LCD.convert("01-00005"), delete, tableNameString, e);
+ throw new IOException("checkAndDelete type table:" + tableNameString + " e.msg:"
+ + e.getMessage() + " error.", e);
+ }
+ }
+
+ @Override
+ public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier,
+ CompareFilter.CompareOp compareOp, byte[] value,
+ RowMutations rowMutations) throws IOException {
+ try {
+ return checkAndMutation(row, family, qualifier, compareOp, value, rowMutations);
+ } catch (Exception e) {
+ logger.error(LCD.convert("01-00005"), rowMutations, tableNameString, e);
+ throw new IOException("checkAndMutate type table:" + tableNameString + " e.msg:"
+ + e.getMessage() + " error.", e);
+ }
+ }
+
+ private boolean checkAndMutation(byte[] row, byte[] family, byte[] qualifier, CompareFilter.CompareOp compareOp, byte[] value,
+ RowMutations rowMutations) throws Exception {
checkArgument(row != null, "row is null");
checkArgument(isNotBlank(Bytes.toString(family)), "family is blank");
- checkArgument(Bytes.equals(row, mutation.getRow()),
+ checkArgument(Bytes.equals(row, rowMutations.getRow()),
"mutation row is not equal check row");
- checkArgument(!mutation.isEmpty(), "mutation is empty");
+ checkArgument(!rowMutations.getMutations().isEmpty(), "mutation is empty");
- String filterString = buildCheckAndMutateFilterString(family, qualifier, value);
+ String filterString = buildCheckAndMutateFilterString(family, qualifier, compareOp, value);
ObHTableFilter filter = buildObHTableFilter(filterString, null, 1, qualifier);
-
- checkFamilyViolation(mutation.getFamilyMap().keySet());
+ List mutations = rowMutations.getMutations();
+ List keyValueList = new LinkedList<>();
// only one family operation is allowed
- Map.Entry> entry = mutation.getFamilyMap().entrySet().iterator()
- .next();
-
- checkArgument(Arrays.equals(family, entry.getKey()),
- "mutation family is not equal check family");
-
+ for (Mutation mutation : mutations) {
+ checkFamilyViolation(mutation.getFamilyMap().keySet());
+ checkArgument(Arrays.equals(family, mutation.getFamilyMap().firstEntry().getKey()),
+ "mutation family is not equal check family");
+ // Support for multiple families in the future
+ for (Map.Entry> entry : mutation.getFamilyMap().entrySet()) {
+ keyValueList.addAll(entry.getValue());
+ }
+ }
ObTableQuery obTableQuery = buildObTableQuery(filter, row, true, row, true, -1);
- ObTableBatchOperation batch = buildObTableBatchOperation(entry.getValue(), false, null);
+ ObTableBatchOperation batch = buildObTableBatchOperation(keyValueList, false, null);
ObTableQueryAndMutateRequest request = buildObTableQueryAndMutateRequest(obTableQuery,
- batch, getTargetTableName(tableNameString, Bytes.toString(entry.getKey())));
+ batch, getTargetTableName(tableNameString, Bytes.toString(family)));
ObTableQueryAndMutateResult result = (ObTableQueryAndMutateResult) obTableClient
.execute(request);
return result.getAffectedRows() > 0;
- } catch (Exception e) {
- logger.error(LCD.convert("01-00005"), mutation.getClass().getSimpleName(),
- tableNameString, e);
- throw new IOException("checkAndMutation type " + mutation.getClass().getSimpleName()
- + " table " + tableNameString + " error.", e);
- }
}
+ @Override
public void mutateRow(RowMutations rm) {
throw new FeatureNotSupportedException("not supported yet.");
}
@@ -761,6 +847,7 @@ public void mutateRow(RowMutations rm) {
* @return Result
* @throws IOException if failed
*/
+ @Override
public Result append(Append append) throws IOException {
checkFamilyViolation(append.getFamilyMap().keySet());
@@ -809,6 +896,7 @@ public Result append(Append append) throws IOException {
* @return Result
* @throws IOException if failed
*/
+ @Override
public Result increment(Increment increment) throws IOException {
checkFamilyViolation(increment.getFamilyMap().keySet());
@@ -870,6 +958,7 @@ public Result increment(Increment increment) throws IOException {
* @return long
* @throws IOException if failed
*/
+ @Override
public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount)
throws IOException {
try {
@@ -904,21 +993,25 @@ public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, lo
}
}
+ @Override
public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount,
Durability durability) throws IOException {
- throw new FeatureNotSupportedException("not supported yet'");
+ return incrementColumnValue(row, family, qualifier, amount);
}
+ @Override
public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount,
boolean writeToWAL) throws IOException {
// WAL ignored
return incrementColumnValue(row, family, qualifier, amount);
}
+ @Override
public boolean isAutoFlush() {
return autoFlush;
}
+ @Override
public void flushCommits() throws IOException {
try {
@@ -998,16 +1091,19 @@ public void flushCommits() throws IOException {
}
}
+ @Override
public void close() throws IOException {
if (cleanupPoolOnClose) {
executePool.shutdown();
}
}
+ @Override
public CoprocessorRpcChannel coprocessorService(byte[] row) {
throw new FeatureNotSupportedException("not supported yet'");
}
+ @Override
public Map coprocessorService(Class service,
byte[] startKey, byte[] endKey,
Batch.Call callable)
@@ -1016,6 +1112,7 @@ public Map coprocessorService(Class service
throw new FeatureNotSupportedException("not supported yet'");
}
+ @Override
public void coprocessorService(Class service, byte[] startKey,
byte[] endKey, Batch.Call callable,
Batch.Callback callback)
@@ -1029,6 +1126,7 @@ public void coprocessorService(Class service, byte[] s
*
* @param autoFlush Whether or not to enable 'auto-flush'.
*/
+ @Override
public void setAutoFlush(boolean autoFlush) {
setAutoFlush(autoFlush, autoFlush);
}
@@ -1058,11 +1156,13 @@ public void setAutoFlush(boolean autoFlush) {
* @param clearBufferOnFail Whether to keep Put failures in the writeBuffer
* @see #flushCommits
*/
+ @Override
public void setAutoFlush(boolean autoFlush, boolean clearBufferOnFail) {
this.autoFlush = autoFlush;
this.clearBufferOnFail = autoFlush || clearBufferOnFail;
}
+ @Override
public void setAutoFlushTo(boolean autoFlush) {
throw new FeatureNotSupportedException("not supported yet'");
}
@@ -1075,6 +1175,7 @@ public void setAutoFlushTo(boolean autoFlush) {
*
* @return The size of the write buffer in bytes.
*/
+ @Override
public long getWriteBufferSize() {
return writeBufferSize;
}
@@ -1088,6 +1189,7 @@ public long getWriteBufferSize() {
* @param writeBufferSize The new write buffer size, in bytes.
* @throws IOException if a remote or network exception occurs.
*/
+ @Override
public void setWriteBufferSize(long writeBufferSize) throws IOException {
this.writeBufferSize = writeBufferSize;
if (currentWriteBufferSize > writeBufferSize) {
@@ -1095,6 +1197,7 @@ public void setWriteBufferSize(long writeBufferSize) throws IOException {
}
}
+ @Override
public Map batchCoprocessorService(Descriptors.MethodDescriptor methodDescriptor,
Message request,
byte[] startKey,
@@ -1105,6 +1208,7 @@ public Map batchCoprocessorService(Descriptors.Me
throw new FeatureNotSupportedException("not supported yet'");
}
+ @Override
public void batchCoprocessorService(Descriptors.MethodDescriptor methodDescriptor,
Message request, byte[] startKey,
byte[] endKey, R responsePrototype,
@@ -1114,12 +1218,7 @@ public void batchCoprocessorService(Descriptors.MethodDescri
throw new FeatureNotSupportedException("not supported yet'");
}
- public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier,
- CompareFilter.CompareOp compareOp, byte[] value,
- RowMutations mutation) throws IOException {
- throw new FeatureNotSupportedException("not supported yet'");
- }
-
+ @Override
public void setOperationTimeout(int operationTimeout) {
this.operationTimeout = operationTimeout;
this.operationExecuteInPool = this.configuration.getBoolean(
@@ -1127,6 +1226,24 @@ public void setOperationTimeout(int operationTimeout) {
(this.operationTimeout != HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT));
}
+ // todo
+ @Override
+ public int getOperationTimeout() {
+ throw new FeatureNotSupportedException("not supported yet.");
+ }
+
+ //todo
+ @Override
+ public void setRpcTimeout(int i) {
+ throw new FeatureNotSupportedException("not supported yet.");
+ }
+
+ // todo
+ @Override
+ public int getRpcTimeout() {
+ throw new FeatureNotSupportedException("not supported yet.");
+ }
+
public void setRuntimeBatchExecutor(ExecutorService runtimeBatchExecutor) {
this.obTableClient.setRuntimeBatchExecutor(runtimeBatchExecutor);
}
@@ -1215,13 +1332,15 @@ private ObHTableFilter buildObHTableFilter(Filter filter, TimeRange timeRange, i
return obHTableFilter;
}
- private String buildCheckAndMutateFilterString(byte[] family, byte[] qualifier, byte[] value) {
+ private String buildCheckAndMutateFilterString(byte[] family, byte[] qualifier,
+ CompareFilter.CompareOp compareOp, byte[] value) {
if (value != null) {
- return ("CheckAndMutateFilter(=, 'binary:" + Bytes.toString(value) + "', '"
- + Bytes.toString(family) + "', '"
- + (qualifier == null ? "" : Bytes.toString(qualifier)) + "', false)");
+ return ("CheckAndMutateFilter(" + HBaseFilterUtils.toParseableString(compareOp)
+ + ", 'binary:" + Bytes.toString(value) + "', '" + Bytes.toString(family)
+ + "', '" + (qualifier == null ? "" : Bytes.toString(qualifier)) + "', false)");
} else {
- return ("CheckAndMutateFilter(=, 'binary:', '" + Bytes.toString(family) + "', '"
+ return ("CheckAndMutateFilter(" + HBaseFilterUtils.toParseableString(compareOp)
+ + ", 'binary:', '" + Bytes.toString(family) + "', '"
+ (qualifier == null ? "" : Bytes.toString(qualifier)) + "', true)");
}
}
@@ -1438,6 +1557,9 @@ private void checkFamilyViolation(Collection families) {
}
for (byte[] family : families) {
+ if (family == null || family.length == 0) {
+ throw new IllegalArgumentException("family is empty");
+ }
if (isBlank(Bytes.toString(family))) {
throw new IllegalArgumentException("family is blank");
}
diff --git a/src/main/java/com/alipay/oceanbase/hbase/OHTableClient.java b/src/main/java/com/alipay/oceanbase/hbase/OHTableClient.java
index 993e7916..a1d84329 100644
--- a/src/main/java/com/alipay/oceanbase/hbase/OHTableClient.java
+++ b/src/main/java/com/alipay/oceanbase/hbase/OHTableClient.java
@@ -111,7 +111,8 @@ public void close() throws IOException {
@Override
public CoprocessorRpcChannel coprocessorService(byte[] row) {
- throw new FeatureNotSupportedException("not supported yet'");
+ checkStatus();
+ return ohTable.coprocessorService(row);
}
@Override
@@ -120,7 +121,8 @@ public Map coprocessorService(Class service
Batch.Call callable)
throws ServiceException,
Throwable {
- throw new FeatureNotSupportedException("not supported yet'");
+ checkStatus();
+ return ohTable.coprocessorService(service, startKey, endKey, callable);
}
@Override
@@ -129,7 +131,8 @@ public void coprocessorService(Class service, byte[] s
Batch.Callback callback)
throws ServiceException,
Throwable {
- throw new FeatureNotSupportedException("not supported yet'");
+ checkStatus();
+ ohTable.coprocessorService(service, startKey, endKey, callable, callback);
}
private void checkStatus() throws IllegalStateException {
@@ -152,7 +155,8 @@ public void setAutoFlush(boolean autoFlush, boolean clearBufferOnFail) {
@Override
public void setAutoFlushTo(boolean autoFlush) {
- throw new FeatureNotSupportedException("not supported yet'");
+ checkStatus();
+ ohTable.setAutoFlushTo(autoFlush);
}
@Override
@@ -175,7 +179,9 @@ public Map batchCoprocessorService(Descriptors.Me
R responsePrototype)
throws ServiceException,
Throwable {
- throw new FeatureNotSupportedException("not supported yet'");
+ checkStatus();
+ return ohTable.batchCoprocessorService(methodDescriptor, request, startKey, endKey,
+ responsePrototype);
}
@Override
@@ -185,14 +191,41 @@ public void batchCoprocessorService(Descriptors.MethodDescri
Batch.Callback callback)
throws ServiceException,
Throwable {
- throw new FeatureNotSupportedException("not supported yet'");
+ checkStatus();
+ ohTable.batchCoprocessorService(methodDescriptor, request, startKey, endKey,
+ responsePrototype, callback);
}
@Override
public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier,
CompareFilter.CompareOp compareOp, byte[] value,
- RowMutations mutation) throws IOException {
- throw new FeatureNotSupportedException("not supported yet'");
+ RowMutations mutations) throws IOException {
+ checkStatus();
+ return ohTable.checkAndMutate(row, family, qualifier, compareOp, value, mutations);
+ }
+
+ @Override
+ public void setOperationTimeout(int i) {
+ checkStatus();
+ ohTable.setOperationTimeout(i);
+ }
+
+ @Override
+ public int getOperationTimeout() {
+ checkStatus();
+ return ohTable.getOperationTimeout();
+ }
+
+ @Override
+ public void setRpcTimeout(int i) {
+ checkStatus();
+ ohTable.setRpcTimeout(i);
+ }
+
+ @Override
+ public int getRpcTimeout() {
+ checkStatus();
+ return ohTable.getRpcTimeout();
}
@Override
@@ -202,7 +235,7 @@ public byte[] getTableName() {
@Override
public TableName getName() {
- throw new FeatureNotSupportedException("not supported yet'");
+ return ohTable.getName();
}
@Override
@@ -224,9 +257,16 @@ public boolean exists(Get get) throws IOException {
return ohTable.exists(get);
}
+ @Override
+ public boolean[] existsAll(List list) throws IOException {
+ checkStatus();
+ return ohTable.existsAll(list);
+ }
+
@Override
public Boolean[] exists(List gets) throws IOException {
- throw new FeatureNotSupportedException("not supported yet'");
+ checkStatus();
+ return ohTable.exists(gets);
}
// Not support.
@@ -248,14 +288,16 @@ public Object[] batch(List extends Row> actions) throws IOException, Interrupt
public void batchCallback(List extends Row> actions, Object[] results,
Batch.Callback callback) throws IOException,
InterruptedException {
- throw new FeatureNotSupportedException("not supported yet'");
+ checkStatus();
+ ohTable.batchCallback(actions, results, callback);
}
@Override
public Object[] batchCallback(List extends Row> actions, Batch.Callback callback)
throws IOException,
InterruptedException {
- throw new FeatureNotSupportedException("not supported yet'");
+ checkStatus();
+ return ohTable.batchCallback(actions, callback);
}
@Override
@@ -314,6 +356,14 @@ public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier, byte[] v
return ohTable.checkAndPut(row, family, qualifier, value, put);
}
+ @Override
+ public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier,
+ CompareFilter.CompareOp compareOp, byte[] value, Put put)
+ throws IOException {
+ checkStatus();
+ return ohTable.checkAndPut(row, family, qualifier, compareOp, value, put);
+ }
+
@Override
public void delete(Delete delete) throws IOException {
checkStatus();
@@ -333,6 +383,14 @@ public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier, byte[
return ohTable.checkAndDelete(row, family, qualifier, value, delete);
}
+ @Override
+ public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier,
+ CompareFilter.CompareOp compareOp, byte[] value, Delete delete)
+ throws IOException {
+ checkStatus();
+ return ohTable.checkAndDelete(row, family, qualifier, compareOp, value, delete);
+ }
+
// Not support.
@Override
public void mutateRow(RowMutations rm) throws IOException {
@@ -362,7 +420,8 @@ public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, lo
@Override
public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount,
Durability durability) throws IOException {
- throw new FeatureNotSupportedException("not supported yet'");
+ checkStatus();
+ return ohTable.incrementColumnValue(row, family, qualifier, amount, durability);
}
@Override
@@ -394,18 +453,22 @@ public void setTableNameString(String tableNameString) {
}
public void refreshTableEntry(String familyString, boolean hasTestLoad) throws Exception {
+ checkStatus();
this.ohTable.refreshTableEntry(familyString, hasTestLoad);
}
public byte[][] getStartKeys() throws IOException {
+ checkStatus();
return this.ohTable.getStartKeys();
}
public byte[][] getEndKeys() throws IOException {
+ checkStatus();
return this.ohTable.getEndKeys();
}
public Pair getStartEndKeys() throws IOException {
+ checkStatus();
return this.ohTable.getStartEndKeys();
}
}
diff --git a/src/main/java/com/alipay/oceanbase/hbase/OHTablePool.java b/src/main/java/com/alipay/oceanbase/hbase/OHTablePool.java
index 324be570..d4b92d08 100644
--- a/src/main/java/com/alipay/oceanbase/hbase/OHTablePool.java
+++ b/src/main/java/com/alipay/oceanbase/hbase/OHTablePool.java
@@ -477,8 +477,8 @@ public void setWriteBufferSize(final String tableName, long writeBufferSize) thr
*/
public long getWriteBufferSize(String tableName) {
byte[] attr = getTableAttribute(tableName, HBASE_HTABLE_POOL_WRITE_BUFFER_SIZE);
- return attr == null ? this.config.getLong(HBASE_HTABLE_CLIENT_WRITE_BUFFER, 2097152)
- : Bytes.toLong(attr);
+ return attr == null ? this.config.getLong(WRITE_BUFFER_SIZE_KEY, 2097152) : Bytes
+ .toLong(attr);
}
/**
@@ -697,9 +697,14 @@ public boolean exists(Get get) throws IOException {
return table.exists(get);
}
+ @Override
+ public boolean[] existsAll(List list) throws IOException {
+ return table.existsAll(list);
+ }
+
@Override
public Boolean[] exists(List gets) throws IOException {
- throw new FeatureNotSupportedException("not supported yet'");
+ return table.exists(gets);
}
@Override
@@ -774,6 +779,13 @@ public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier, byte[] v
return table.checkAndPut(row, family, qualifier, value, put);
}
+ @Override
+ public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier,
+ CompareFilter.CompareOp compareOp, byte[] value, Put put)
+ throws IOException {
+ return table.checkAndPut(row, family, qualifier, compareOp, value, put);
+ }
+
@Override
public void delete(Delete delete) throws IOException {
table.delete(delete);
@@ -790,6 +802,13 @@ public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier, byte[
return table.checkAndDelete(row, family, qualifier, value, delete);
}
+ @Override
+ public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier,
+ CompareFilter.CompareOp compareOp, byte[] value, Delete delete)
+ throws IOException {
+ return table.checkAndDelete(row, family, qualifier, compareOp, value, delete);
+ }
+
@Override
public Result increment(Increment increment) throws IOException {
return table.increment(increment);
@@ -804,7 +823,7 @@ public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, lo
@Override
public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount,
Durability durability) throws IOException {
- throw new FeatureNotSupportedException("not supported yet'");
+ return table.incrementColumnValue(row, family, qualifier, amount, durability);
}
@Override
@@ -834,7 +853,7 @@ public void close() throws IOException {
@Override
public CoprocessorRpcChannel coprocessorService(byte[] row) {
- throw new FeatureNotSupportedException("not supported yet'");
+ return table.coprocessorService(row);
}
@Override
@@ -844,7 +863,7 @@ public Map coprocessorService(Class service
Batch.Call callable)
throws ServiceException,
Throwable {
- throw new FeatureNotSupportedException("not supported yet'");
+ return table.coprocessorService(service, startKey, endKey, callable);
}
@Override
@@ -854,7 +873,7 @@ public void coprocessorService(Class service, byte[] s
Batch.Callback callback)
throws ServiceException,
Throwable {
- throw new FeatureNotSupportedException("not supported yet'");
+ table.coprocessorService(service, startKey, endKey, callable, callback);
}
@Override
@@ -893,7 +912,7 @@ public void setAutoFlush(boolean autoFlush, boolean clearBufferOnFail) {
@Override
public void setAutoFlushTo(boolean autoFlush) {
- throw new FeatureNotSupportedException("not supported yet'");
+ table.setAutoFlushTo(autoFlush);
}
@Override
@@ -914,7 +933,8 @@ public Map batchCoprocessorService(Descriptors.Me
R responsePrototype)
throws ServiceException,
Throwable {
- throw new FeatureNotSupportedException("not supported yet'");
+ return table.batchCoprocessorService(methodDescriptor, request, startKey, endKey,
+ responsePrototype);
}
@Override
@@ -924,14 +944,35 @@ public void batchCoprocessorService(Descriptors.MethodDescri
Batch.Callback callback)
throws ServiceException,
Throwable {
- throw new FeatureNotSupportedException("not supported yet'");
+ table.batchCoprocessorService(methodDescriptor, request, startKey, endKey,
+ responsePrototype, callback);
}
@Override
public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier,
CompareFilter.CompareOp compareOp, byte[] value,
- RowMutations mutation) throws IOException {
- throw new FeatureNotSupportedException("not supported yet'");
+ RowMutations mutations) throws IOException {
+ return table.checkAndMutate(row, family, qualifier, compareOp, value, mutations);
+ }
+
+ @Override
+ public void setOperationTimeout(int i) {
+ table.setOperationTimeout(i);
+ }
+
+ @Override
+ public int getOperationTimeout() {
+ return table.getOperationTimeout();
+ }
+
+ @Override
+ public void setRpcTimeout(int i) {
+ table.setRpcTimeout(i);
+ }
+
+ @Override
+ public int getRpcTimeout() {
+ return table.getRpcTimeout();
}
public HTableInterface getTable() {
diff --git a/src/main/java/com/alipay/oceanbase/hbase/constants/OHConstants.java b/src/main/java/com/alipay/oceanbase/hbase/constants/OHConstants.java
index 6e231c1f..bda64927 100644
--- a/src/main/java/com/alipay/oceanbase/hbase/constants/OHConstants.java
+++ b/src/main/java/com/alipay/oceanbase/hbase/constants/OHConstants.java
@@ -147,16 +147,16 @@ public final class OHConstants {
public static final int DEFAULT_HBASE_HTABLE_PRIVATE_THREADS_MAX = Integer.MAX_VALUE;
- public static final String HBASE_CLIENT_KEYVALUE_MAXSIZE = "hbase.client.keyvalue.maxsize";
+ public static final String WRITE_BUFFER_SIZE_KEY = "hbase.client.write.buffer";
- public static final int DEFAULT_HBASE_CLIENT_KEYVALUE_MAXSIZE = -1;
-
- public static final String HBASE_HTABLE_CLIENT_WRITE_BUFFER = "hbase.client.write.buffer";
-
- public static final long DEFAULT_HBASE_HTABLE_CLIENT_WRITE_BUFFER = 2097152;
+ public static final String MAX_KEYVALUE_SIZE_KEY = "hbase.client.keyvalue.maxsize";
public static final String HBASE_HTABLE_PUT_WRITE_BUFFER_CHECK = "hbase.htable.put.write.buffer.check";
public static final int DEFAULT_HBASE_HTABLE_PUT_WRITE_BUFFER_CHECK = 10;
+ public static final long WRITE_BUFFER_SIZE_DEFAULT = 2097152L;
+
+ public static final int MAX_KEYVALUE_SIZE_DEFAULT = -1;
+
}
diff --git a/src/main/java/com/alipay/oceanbase/hbase/filter/HBaseFilterUtils.java b/src/main/java/com/alipay/oceanbase/hbase/filter/HBaseFilterUtils.java
index 8210e051..6be151a4 100644
--- a/src/main/java/com/alipay/oceanbase/hbase/filter/HBaseFilterUtils.java
+++ b/src/main/java/com/alipay/oceanbase/hbase/filter/HBaseFilterUtils.java
@@ -17,11 +17,13 @@
package com.alipay.oceanbase.hbase.filter;
+import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.filter.*;
import org.apache.hadoop.hbase.util.Bytes;
import java.util.List;
+@InterfaceAudience.Private
public class HBaseFilterUtils {
public static String toParseableString(Filter filter) {
@@ -51,7 +53,7 @@ public static String toParseableString(Filter filter) {
}
}
- private static String toParseableString(CompareFilter.CompareOp op) {
+ public static String toParseableString(CompareFilter.CompareOp op) {
if (op == null) {
throw new IllegalArgumentException("Compare operator is null");
}
diff --git a/src/main/java/com/alipay/oceanbase/hbase/util/KeyDefiner.java b/src/main/java/com/alipay/oceanbase/hbase/util/KeyDefiner.java
index facd352d..058d05e7 100644
--- a/src/main/java/com/alipay/oceanbase/hbase/util/KeyDefiner.java
+++ b/src/main/java/com/alipay/oceanbase/hbase/util/KeyDefiner.java
@@ -18,7 +18,9 @@
package com.alipay.oceanbase.hbase.util;
import com.alipay.oceanbase.hbase.constants.OHConstants;
+import org.apache.hadoop.classification.InterfaceAudience;
+@InterfaceAudience.Private
public class KeyDefiner {
public static String genPooledOHTableAttributeName(String tableName, String key) {
return tableName + OHConstants.HBASE_HTABLE_POOL_SEPERATOR + key;
diff --git a/src/main/java/com/alipay/oceanbase/hbase/util/OHBaseFuncUtils.java b/src/main/java/com/alipay/oceanbase/hbase/util/OHBaseFuncUtils.java
index f87f0baa..9932f554 100644
--- a/src/main/java/com/alipay/oceanbase/hbase/util/OHBaseFuncUtils.java
+++ b/src/main/java/com/alipay/oceanbase/hbase/util/OHBaseFuncUtils.java
@@ -17,8 +17,11 @@
package com.alipay.oceanbase.hbase.util;
+import org.apache.hadoop.classification.InterfaceAudience;
+
import java.util.Arrays;
+@InterfaceAudience.Private
public class OHBaseFuncUtils {
public static byte[][] extractFamilyFromQualifier(byte[] qualifier) throws Exception {
int familyLen = -1;
diff --git a/src/main/java/com/alipay/oceanbase/hbase/util/OHBufferedMutatorImpl.java b/src/main/java/com/alipay/oceanbase/hbase/util/OHBufferedMutatorImpl.java
new file mode 100644
index 00000000..83c63c7a
--- /dev/null
+++ b/src/main/java/com/alipay/oceanbase/hbase/util/OHBufferedMutatorImpl.java
@@ -0,0 +1,69 @@
+/*-
+ * #%L
+ * OBKV HBase Client Framework
+ * %%
+ * Copyright (C) 2024 OceanBase Group
+ * %%
+ * OBKV HBase Client Framework is licensed under Mulan PSL v2.
+ * You can use this software according to the terms and conditions of the Mulan PSL v2.
+ * You may obtain a copy of Mulan PSL v2 at:
+ * http://license.coscl.org.cn/MulanPSL2
+ * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
+ * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
+ * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
+ * See the Mulan PSL v2 for more details.
+ * #L%
+ */
+
+package com.alipay.oceanbase.hbase.util;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.BufferedMutator;
+import org.apache.hadoop.hbase.client.BufferedMutatorParams;
+import org.apache.hadoop.hbase.client.Mutation;
+
+import java.io.IOException;
+import java.util.List;
+
+@InterfaceAudience.Private
+public class OHBufferedMutatorImpl implements BufferedMutator {
+ public OHBufferedMutatorImpl(OHConnectionImpl ohConnection, BufferedMutatorParams params) {
+ }
+
+ @Override
+ public TableName getName() {
+ return null;
+ }
+
+ @Override
+ public Configuration getConfiguration() {
+ return null;
+ }
+
+ @Override
+ public void mutate(Mutation mutation) throws IOException {
+
+ }
+
+ @Override
+ public void mutate(List extends Mutation> list) throws IOException {
+
+ }
+
+ @Override
+ public void close() throws IOException {
+
+ }
+
+ @Override
+ public void flush() throws IOException {
+
+ }
+
+ @Override
+ public long getWriteBufferSize() {
+ return 0;
+ }
+}
diff --git a/src/main/java/com/alipay/oceanbase/hbase/util/OHConnectionConfiguration.java b/src/main/java/com/alipay/oceanbase/hbase/util/OHConnectionConfiguration.java
new file mode 100644
index 00000000..e4789df3
--- /dev/null
+++ b/src/main/java/com/alipay/oceanbase/hbase/util/OHConnectionConfiguration.java
@@ -0,0 +1,138 @@
+/*-
+ * #%L
+ * OBKV HBase Client Framework
+ * %%
+ * Copyright (C) 2024 OceanBase Group
+ * %%
+ * OBKV HBase Client Framework is licensed under Mulan PSL v2.
+ * You can use this software according to the terms and conditions of the Mulan PSL v2.
+ * You may obtain a copy of Mulan PSL v2 at:
+ * http://license.coscl.org.cn/MulanPSL2
+ * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
+ * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
+ * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
+ * See the Mulan PSL v2 for more details.
+ * #L%
+ */
+
+package com.alipay.oceanbase.hbase.util;
+
+import com.alipay.oceanbase.rpc.property.Property;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+
+import java.util.Properties;
+
+import static com.alipay.oceanbase.hbase.constants.OHConstants.*;
+
+@InterfaceAudience.Private
+public class OHConnectionConfiguration {
+ private final Properties properties;
+ private final String paramUrl;
+ private final String fullUsername;
+ private final String password;
+ private final String sysUsername;
+ private final String sysPassword;
+ private final String odpAddr;
+ private final int odpPort;
+ private final boolean odpMode;
+ private final String database;
+ private final long writeBufferSize;
+ private final int operationTimeout;
+ private final int scannerCaching;
+ private final long scannerMaxResultSize;
+ private final int maxKeyValueSize;
+ private final int rpcTimeout;
+
+ public OHConnectionConfiguration(Configuration conf) {
+ this.paramUrl = conf.get(HBASE_OCEANBASE_PARAM_URL);
+ this.fullUsername = conf.get(HBASE_OCEANBASE_FULL_USER_NAME);
+ this.password = conf.get(HBASE_OCEANBASE_PASSWORD);
+ this.sysUsername = conf.get(HBASE_OCEANBASE_SYS_USER_NAME);
+ this.sysPassword = conf.get(HBASE_OCEANBASE_SYS_PASSWORD);
+ this.odpAddr = conf.get(HBASE_OCEANBASE_ODP_ADDR);
+ this.odpPort = conf.getInt(HBASE_OCEANBASE_ODP_PORT, -1);
+ this.odpMode = conf.getBoolean(HBASE_OCEANBASE_ODP_MODE, false);
+ this.database = conf.get(HBASE_OCEANBASE_DATABASE);
+ this.writeBufferSize = conf.getLong(WRITE_BUFFER_SIZE_KEY, WRITE_BUFFER_SIZE_DEFAULT);
+ this.operationTimeout = conf.getInt("hbase.client.operation.timeout", 1200000);
+ this.rpcTimeout = conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
+ HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
+ this.scannerCaching = conf.getInt("hbase.client.scanner.caching", Integer.MAX_VALUE);
+ this.scannerMaxResultSize = conf.getLong("hbase.client.scanner.max.result.size",
+ WRITE_BUFFER_SIZE_DEFAULT);
+ this.maxKeyValueSize = conf.getInt(MAX_KEYVALUE_SIZE_KEY, MAX_KEYVALUE_SIZE_DEFAULT);
+ properties = new Properties();
+ for (Property property : Property.values()) {
+ String value = conf.get(property.getKey());
+ if (value != null) {
+ properties.put(property.getKey(), value);
+ }
+ }
+ }
+
+ public long getWriteBufferSize() {
+ return this.writeBufferSize;
+ }
+
+ public int getOperationTimeout() {
+ return this.operationTimeout;
+ }
+
+ public int getScannerCaching() {
+ return this.scannerCaching;
+ }
+
+ public int getMaxKeyValueSize() {
+ return this.maxKeyValueSize;
+ }
+
+ public int getRpcTimeout() {
+ return this.rpcTimeout;
+ }
+
+ public long getScannerMaxResultSize() {
+ return this.scannerMaxResultSize;
+ }
+
+ public Properties getProperties() {
+ return this.properties;
+ }
+
+ public int getOdpPort() {
+ return this.odpPort;
+ }
+
+ public String getSysPassword() {
+ return this.sysPassword;
+ }
+
+ public String getSysUsername() {
+ return this.sysUsername;
+ }
+
+ public String getPassword() {
+ return this.password;
+ }
+
+ public String getFullUsername() {
+ return this.fullUsername;
+ }
+
+ public String getParamUrl() {
+ return this.paramUrl;
+ }
+
+ public String getOdpAddr() {
+ return this.odpAddr;
+ }
+
+ public boolean isOdpMode() {
+ return this.odpMode;
+ }
+
+ public String getDatabase() {
+ return this.database;
+ }
+}
diff --git a/src/main/java/com/alipay/oceanbase/hbase/util/OHConnectionImpl.java b/src/main/java/com/alipay/oceanbase/hbase/util/OHConnectionImpl.java
new file mode 100644
index 00000000..b8fb5411
--- /dev/null
+++ b/src/main/java/com/alipay/oceanbase/hbase/util/OHConnectionImpl.java
@@ -0,0 +1,192 @@
+/*-
+ * #%L
+ * OBKV HBase Client Framework
+ * %%
+ * Copyright (C) 2024 OceanBase Group
+ * %%
+ * OBKV HBase Client Framework is licensed under Mulan PSL v2.
+ * You can use this software according to the terms and conditions of the Mulan PSL v2.
+ * You may obtain a copy of Mulan PSL v2 at:
+ * http://license.coscl.org.cn/MulanPSL2
+ * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
+ * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
+ * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
+ * See the Mulan PSL v2 for more details.
+ * #L%
+ */
+
+package com.alipay.oceanbase.hbase.util;
+
+import com.alipay.oceanbase.hbase.OHTable;
+import com.alipay.oceanbase.hbase.exception.FeatureNotSupportedException;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.*;
+import org.apache.hadoop.hbase.client.*;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.util.Threads;
+import org.slf4j.Logger;
+import org.slf4j.Marker;
+import org.slf4j.MarkerFactory;
+
+import java.io.IOException;
+import java.util.concurrent.*;
+
+@InterfaceAudience.Private
+public class OHConnectionImpl implements Connection {
+
+ private static final Logger LOGGER = TableHBaseLoggerFactory
+ .getLogger(OHConnectionImpl.class);
+
+ private static final Marker FATAL = MarkerFactory.getMarker("FATAL");
+
+ static final int BUFFERED_PARAM_UNSET = -1;
+
+ private volatile boolean closed;
+ private volatile boolean aborted;
+
+ // thread executor shared by all HTableInterface instances created
+ // by this connection
+ private volatile ExecutorService batchPool = null;
+
+ // If the pool is internally generated, it needs to be released when closing.
+ private volatile boolean cleanupPool = false;
+
+ private final Configuration conf;
+
+ private final OHConnectionConfiguration connectionConfig;
+
+ OHConnectionImpl(Configuration conf, final boolean managed, ExecutorService pool,
+ final User user) throws IOException {
+ this.conf = conf;
+ this.batchPool = pool;
+ this.connectionConfig = new OHConnectionConfiguration(conf);
+ this.closed = false;
+ }
+
+ @Override
+ public Configuration getConfiguration() {
+ return this.conf;
+ }
+
+ private ExecutorService getBatchPool() {
+ if (batchPool == null) {
+ synchronized (this) {
+ if (batchPool == null) {
+ this.batchPool = getThreadPool(
+ conf.getInt("hbase.hconnection.threads.max", 256),
+ conf.getInt("hbase.hconnection.threads.core", 256), "-shared-", null);
+ }
+ }
+ }
+ return this.batchPool;
+ }
+
+ private ExecutorService getThreadPool(int maxThreads, int coreThreads, String nameHint,
+ BlockingQueue passedWorkQueue) {
+ // shared HTable thread executor not yet initialized
+ if (maxThreads == 0) {
+ maxThreads = Runtime.getRuntime().availableProcessors() * 8;
+ }
+ if (coreThreads == 0) {
+ coreThreads = Runtime.getRuntime().availableProcessors() * 8;
+ }
+ long keepAliveTime = conf.getLong("hbase.hconnection.threads.keepalivetime", 60);
+ BlockingQueue workQueue = passedWorkQueue;
+ if (workQueue == null) {
+ workQueue = new LinkedBlockingQueue(
+ maxThreads
+ * conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS,
+ HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS));
+ }
+ ThreadPoolExecutor tpe = new ThreadPoolExecutor(coreThreads, maxThreads, keepAliveTime,
+ TimeUnit.SECONDS, workQueue, Threads.newDaemonThreadFactory(toString() + nameHint));
+ tpe.allowCoreThreadTimeOut(true);
+ return tpe;
+ }
+
+ @Override
+ public HTableInterface getTable(TableName tableName) throws IOException {
+ return getTable(tableName, getBatchPool());
+ }
+
+ @Override
+ public HTableInterface getTable(TableName tableName, ExecutorService pool) throws IOException {
+ return new OHTable(tableName, this, connectionConfig, pool);
+ }
+
+ @Override
+ public BufferedMutator getBufferedMutator(TableName tableName) throws IOException {
+ return getBufferedMutator(new BufferedMutatorParams(tableName));
+ }
+
+ @Override
+ public BufferedMutator getBufferedMutator(BufferedMutatorParams params) throws IOException {
+ if (params.getTableName() == null) {
+ throw new IllegalArgumentException("TableName cannot be null.");
+ }
+ if (params.getPool() == null) {
+ params.pool(HTable.getDefaultExecutor(getConfiguration()));
+ }
+ if (params.getWriteBufferSize() == BUFFERED_PARAM_UNSET) {
+ params.writeBufferSize(connectionConfig.getWriteBufferSize());
+ }
+ if (params.getMaxKeyValueSize() == BUFFERED_PARAM_UNSET) {
+ params.maxKeyValueSize(connectionConfig.getMaxKeyValueSize());
+ }
+ return new OHBufferedMutatorImpl(this, params);
+ }
+
+ @Override
+ public RegionLocator getRegionLocator(TableName tableName) throws IOException {
+ throw new FeatureNotSupportedException("not supported yet'");
+ }
+
+ @Override
+ public Admin getAdmin() throws IOException {
+ throw new FeatureNotSupportedException("not supported yet'");
+ }
+
+ private void shutdownBatchPool(ExecutorService pool) {
+ pool.shutdown();
+ try {
+ if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
+ pool.shutdownNow();
+ }
+ } catch (InterruptedException e) {
+ pool.shutdownNow();
+ }
+ }
+
+ @Override
+ public void close() {
+ if (this.closed) {
+ return;
+ }
+ if (this.cleanupPool && this.batchPool != null && !this.batchPool.isShutdown()) {
+ shutdownBatchPool(this.batchPool);
+ }
+ this.closed = true;
+ }
+
+ @Override
+ public boolean isClosed() {
+ return this.closed;
+ }
+
+ @Override
+ public void abort(final String msg, Throwable t) {
+ if (t != null) {
+ LOGGER.error(FATAL, msg, t);
+ } else {
+ LOGGER.error(FATAL, msg);
+ }
+ this.aborted = true;
+ close();
+ }
+
+ @Override
+ public boolean isAborted() {
+ return this.aborted;
+ }
+}
diff --git a/src/main/java/com/alipay/oceanbase/hbase/util/OHTableFactory.java b/src/main/java/com/alipay/oceanbase/hbase/util/OHTableFactory.java
index 71a715f5..06c88892 100644
--- a/src/main/java/com/alipay/oceanbase/hbase/util/OHTableFactory.java
+++ b/src/main/java/com/alipay/oceanbase/hbase/util/OHTableFactory.java
@@ -19,6 +19,7 @@
import com.alipay.oceanbase.hbase.OHTable;
import com.alipay.oceanbase.hbase.OHTablePool;
+import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.HTableFactory;
import org.apache.hadoop.hbase.client.HTableInterface;
@@ -42,6 +43,7 @@
* Setting the table attributes through the method of {@link OHTablePool}.
* For example, see {@link OHTablePool#setAutoFlush(String, boolean)}
*/
+@InterfaceAudience.Private
public class OHTableFactory extends HTableFactory {
private final ExecutorService threadPool;
private final OHTablePool tablePool;
diff --git a/src/main/java/com/alipay/oceanbase/hbase/util/ObTableClientManager.java b/src/main/java/com/alipay/oceanbase/hbase/util/ObTableClientManager.java
index 744845fc..cfa9bb9b 100644
--- a/src/main/java/com/alipay/oceanbase/hbase/util/ObTableClientManager.java
+++ b/src/main/java/com/alipay/oceanbase/hbase/util/ObTableClientManager.java
@@ -21,7 +21,9 @@
import com.alipay.oceanbase.rpc.constant.Constants;
import com.alipay.oceanbase.rpc.property.Property;
import com.google.common.base.Objects;
+import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.ConnectionConfiguration;
import java.io.IOException;
import java.util.Map;
@@ -33,54 +35,52 @@
import static com.alipay.oceanbase.hbase.util.Preconditions.checkArgument;
import static org.apache.commons.lang.StringUtils.isNotBlank;
+@InterfaceAudience.Private
public class ObTableClientManager {
public static final ConcurrentHashMap OB_TABLE_CLIENT_LOCK = new ConcurrentHashMap();
public static final Map OB_TABLE_CLIENT_INSTANCE = new ConcurrentHashMap();
- public static ObTableClient getOrCreateObTableClient(Configuration conf)
- throws IllegalArgumentException,
- IOException {
+ public static ObTableClient getOrCreateObTableClient(OHConnectionConfiguration connectionConfig)
+ throws IllegalArgumentException,
+ IOException {
ObTableClientKey obTableClientKey = null;
- if (conf.getBoolean(HBASE_OCEANBASE_ODP_MODE, false)) {
- checkArgument(isNotBlank(conf.get(HBASE_OCEANBASE_ODP_ADDR)), HBASE_OCEANBASE_ODP_ADDR
- + " is blank");
- checkArgument(conf.getInt(HBASE_OCEANBASE_ODP_PORT, -1) >= 0, HBASE_OCEANBASE_ODP_PORT
- + " is invalid");
- checkArgument(isNotBlank(conf.get(HBASE_OCEANBASE_DATABASE)), HBASE_OCEANBASE_DATABASE
- + " is blank");
+ if (connectionConfig.isOdpMode()) {
+ checkArgument(isNotBlank(connectionConfig.getOdpAddr()), HBASE_OCEANBASE_ODP_ADDR
+ + " is blank");
+ checkArgument(connectionConfig.getOdpPort() >= 0, HBASE_OCEANBASE_ODP_PORT
+ + " is invalid");
+ checkArgument(isNotBlank(connectionConfig.getDatabase()), HBASE_OCEANBASE_DATABASE
+ + " is blank");
obTableClientKey = new ObTableClientKey();
- obTableClientKey.setOdpAddr(conf.get(HBASE_OCEANBASE_ODP_ADDR));
- obTableClientKey.setOdpPort(conf.getInt(HBASE_OCEANBASE_ODP_PORT, -1));
+ obTableClientKey.setOdpAddr(connectionConfig.getOdpAddr());
+ obTableClientKey.setOdpPort(connectionConfig.getOdpPort());
obTableClientKey.setOdpMode(true);
- obTableClientKey.setDatabase(conf.get(HBASE_OCEANBASE_DATABASE));
+ obTableClientKey.setDatabase(connectionConfig.getDatabase());
} else {
- checkArgument(isNotBlank(conf.get(HBASE_OCEANBASE_PARAM_URL)),
- HBASE_OCEANBASE_PARAM_URL + " is blank");
+ checkArgument(isNotBlank(connectionConfig.getParamUrl()), HBASE_OCEANBASE_PARAM_URL
+ + " is blank");
obTableClientKey = new ObTableClientKey();
- obTableClientKey.setParamUrl(conf.get(HBASE_OCEANBASE_PARAM_URL));
- obTableClientKey.setSysUserName(conf.get(HBASE_OCEANBASE_SYS_USER_NAME));
- if (conf.get(HBASE_OCEANBASE_SYS_PASSWORD) == null) {
+ obTableClientKey.setParamUrl(connectionConfig.getParamUrl());
+ obTableClientKey.setSysUserName(connectionConfig.getSysUsername());
+ if (connectionConfig.getSysPassword() == null) {
obTableClientKey.setSysPassword(Constants.EMPTY_STRING);
} else {
- obTableClientKey.setSysPassword(conf.get(HBASE_OCEANBASE_SYS_PASSWORD));
+ obTableClientKey.setSysPassword(connectionConfig.getSysPassword());
}
}
- checkArgument(isNotBlank(conf.get(HBASE_OCEANBASE_FULL_USER_NAME)),
+ checkArgument(isNotBlank(connectionConfig.getFullUsername()),
HBASE_OCEANBASE_FULL_USER_NAME + " is blank");
- obTableClientKey.setFullUserName(conf.get(HBASE_OCEANBASE_FULL_USER_NAME));
+ obTableClientKey.setFullUserName(connectionConfig.getFullUsername());
- if (conf.get(HBASE_OCEANBASE_PASSWORD) == null) {
+ if (connectionConfig.getPassword() == null) {
obTableClientKey.setPassword(Constants.EMPTY_STRING);
} else {
- obTableClientKey.setPassword(conf.get(HBASE_OCEANBASE_PASSWORD));
+ obTableClientKey.setPassword(connectionConfig.getPassword());
}
- for (Property property : Property.values()) {
- String value = conf.get(property.getKey());
- if (value != null) {
- obTableClientKey.getProperties().put(property.getKey(), value);
- }
+ for (Map.Entry