diff --git a/src/main/java/com/alipay/oceanbase/hbase/OHTable.java b/src/main/java/com/alipay/oceanbase/hbase/OHTable.java
index e1ab500b..10728ee2 100644
--- a/src/main/java/com/alipay/oceanbase/hbase/OHTable.java
+++ b/src/main/java/com/alipay/oceanbase/hbase/OHTable.java
@@ -81,102 +81,105 @@
public class OHTable implements HTableInterface {
- private static final Logger logger = TableHBaseLoggerFactory
- .getLogger(OHTable.class);
+ private static final Logger logger = TableHBaseLoggerFactory
+ .getLogger(OHTable.class);
/**
* the table client for oceanbase
*/
- private final ObTableClient obTableClient;
+ private final ObTableClient obTableClient;
/**
* the ohTable name in byte array
*/
- private final byte[] tableName;
+ private final byte[] tableName;
/**
* the ohTable name in string
*/
- private final String tableNameString;
+ private final String tableNameString;
/**
* operation timeout whose default value is Integer.MaxValue decide the timeout of executing in pool.
*
* if operation timeout is not equal to the default value mean the Get execute in the pool
*/
- private int operationTimeout;
+ private int operationTimeout;
/**
* timeout for each rpc request
*/
- private int rpcTimeout;
+ private int rpcTimeout;
/**
* if the Get executing pool is specified by user cleanupPoolOnClose will be false ,
* which means that user is responsible for the pool
*/
- private boolean cleanupPoolOnClose = true;
+ private boolean cleanupPoolOnClose = true;
/**
* if the obTableClient is specified by user closeClientOnClose will be false ,
* which means that user is responsible for obTableClient
*/
- private boolean closeClientOnClose = true;
+ private boolean closeClientOnClose = true;
+
+ /**
+ * If the connection this ObTable obtains is created by the ObTable itself,
+ * should set true and close the connection when this ObTable closes;
+ * otherwise set false
+ */
+ private final boolean cleanupConnectionOnClose;
/**
* when the operationExecuteInPool is true the Get
* will be executed in the pool.
*/
- private ExecutorService executePool;
+ private ExecutorService executePool;
/**
* decide whether the Get request will be executed
* in the pool.
*/
- private boolean operationExecuteInPool = false;
+ private boolean operationExecuteInPool = false;
- /**
- * the buffer of put request
- */
- private final ArrayList writeBuffer = new ArrayList<>();
/**
* when the put request reach the write buffer size the do put will
* flush commits automatically
*/
- private long writeBufferSize;
+ private long writeBufferSize;
+
/**
- * the do put check write buffer every putWriteBufferCheck puts
+ * whether flush the put automatically
*/
- private int putWriteBufferCheck;
+ private boolean autoFlush = true;
/**
- * decide whether clear the buffer when meet exception.the default
- * value is true. Be careful about the correctness when set it false
+ * the max size of put key value
*/
- private boolean clearBufferOnFail = true;
+ private int maxKeyValueSize;
+
+ // i.e., doPut checks the writebuffer every X Puts.
/**
- * whether flush the put automatically
+ * Configuration extends from hbase configuration
*/
- private boolean autoFlush = true;
+ private final Configuration configuration;
+
+ private int scannerTimeout;
/**
- * current buffer size
+ * the connection to obtain bufferedMutator for Put operations
*/
- private long currentWriteBufferSize;
+ private OHConnectionImpl connection;
/**
- * the max size of put key value
+ * the bufferedMutator to execute Puts
*/
- private int maxKeyValueSize;
-
- // i.e., doPut checks the writebuffer every X Puts.
+ private OHBufferedMutatorImpl mutator;
/**
- * Configuration extends from hbase configuration
+ * flag for whether closed
*/
- private final Configuration configuration;
-
- private int scannerTimeout;
+ private boolean isClosed = false;
/**
* Creates an object to access a HBase table.
@@ -196,6 +199,8 @@ public OHTable(Configuration configuration, String tableName) throws IOException
this.configuration = configuration;
this.tableName = tableName.getBytes();
this.tableNameString = tableName;
+ this.connection = (OHConnectionImpl) ConnectionFactory.createConnection(configuration);
+ this.cleanupConnectionOnClose = true;
int maxThreads = configuration.getInt(HBASE_HTABLE_PRIVATE_THREADS_MAX,
DEFAULT_HBASE_HTABLE_PRIVATE_THREADS_MAX);
@@ -253,6 +258,8 @@ public OHTable(Configuration configuration, final byte[] tableName,
this.configuration = configuration;
this.tableName = tableName;
this.tableNameString = Bytes.toString(tableName);
+ this.connection = (OHConnectionImpl) ConnectionFactory.createConnection(configuration);
+ this.cleanupConnectionOnClose = true;
this.executePool = executePool;
this.cleanupPoolOnClose = false;
OHConnectionConfiguration ohConnectionConf = new OHConnectionConfiguration(configuration);
@@ -289,9 +296,10 @@ public OHTable(final byte[] tableName, final ObTableClient obTableClient,
this.tableNameString = Bytes.toString(tableName);
this.cleanupPoolOnClose = false;
this.closeClientOnClose = false;
+ this.cleanupConnectionOnClose = false;
this.executePool = executePool;
this.obTableClient = obTableClient;
- this.configuration = new Configuration();
+ this.configuration = HBaseConfiguration.create();
finishSetUp();
}
@@ -306,6 +314,8 @@ public OHTable(TableName tableName, Connection connection,
this.tableNameString = Bytes.toString(tableName.getName());
this.configuration = connection.getConfiguration();
this.executePool = executePool;
+ this.connection = (OHConnectionImpl) connection;
+ this.cleanupConnectionOnClose = false;
if (executePool == null) {
int maxThreads = configuration.getInt(HBASE_HTABLE_PRIVATE_THREADS_MAX,
DEFAULT_HBASE_HTABLE_PRIVATE_THREADS_MAX);
@@ -323,8 +333,6 @@ public OHTable(TableName tableName, Connection connection,
HBASE_CLIENT_OPERATION_EXECUTE_IN_POOL,
(this.operationTimeout != HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT));
this.maxKeyValueSize = connectionConfig.getMaxKeyValueSize();
- this.putWriteBufferCheck = this.configuration.getInt(HBASE_HTABLE_PUT_WRITE_BUFFER_CHECK,
- DEFAULT_HBASE_HTABLE_PUT_WRITE_BUFFER_CHECK);
this.writeBufferSize = connectionConfig.getWriteBufferSize();
this.tableName = tableName.getName();
int numRetries = configuration.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
@@ -386,8 +394,6 @@ private void finishSetUp() {
(this.operationTimeout != HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT));
this.maxKeyValueSize = this.configuration.getInt(MAX_KEYVALUE_SIZE_KEY,
MAX_KEYVALUE_SIZE_DEFAULT);
- this.putWriteBufferCheck = this.configuration.getInt(HBASE_HTABLE_PUT_WRITE_BUFFER_CHECK,
- DEFAULT_HBASE_HTABLE_PUT_WRITE_BUFFER_CHECK);
this.writeBufferSize = this.configuration.getLong(WRITE_BUFFER_SIZE_KEY,
WRITE_BUFFER_SIZE_DEFAULT);
}
@@ -428,7 +434,7 @@ public byte[] getTableName() {
@Override
public TableName getName() {
- throw new FeatureNotSupportedException("not supported yet.");
+ return TableName.valueOf(this.tableNameString);
}
@Override
@@ -1155,29 +1161,16 @@ public ResultScanner getScanner(byte[] family, byte[] qualifier) throws IOExcept
@Override
public void put(Put put) throws IOException {
- doPut(Collections.singletonList(put));
+ getBufferedMutator().mutate(put);
+ if (autoFlush) {
+ flushCommits();
+ }
}
@Override
public void put(List puts) throws IOException {
- doPut(puts);
- }
-
- private void doPut(List puts) throws IOException {
- int n = 0;
- for (Put put : puts) {
- validatePut(put);
- checkFamilyViolation(put.getFamilyMap().keySet(), true);
- writeBuffer.add(put);
- currentWriteBufferSize += put.heapSize();
-
- // we need to periodically see if the writebuffer is full instead of waiting until the end of the List
- n++;
- if (n % putWriteBufferCheck == 0 && currentWriteBufferSize > writeBufferSize) {
- flushCommits();
- }
- }
- if (autoFlush || currentWriteBufferSize > writeBufferSize) {
+ getBufferedMutator().mutate(puts);
+ if (autoFlush) {
flushCommits();
}
}
@@ -1186,11 +1179,10 @@ private void doPut(List puts) throws IOException {
* 校验 put 里的参数是否合法,需要传入 family ,并且 keyvalue 的 size 不能太大
* @param put the put
*/
- private void validatePut(Put put) {
+ public static void validatePut(Put put, int maxKeyValueSize) {
if (put.isEmpty()) {
throw new IllegalArgumentException("No columns to insert");
}
-
if (maxKeyValueSize > 0) {
for (Map.Entry> entry : put.getFamilyMap().entrySet()) {
if (entry.getKey() == null || entry.getKey().length == 0) {
@@ -1526,73 +1518,27 @@ public boolean isAutoFlush() {
@Override
public void flushCommits() throws IOException {
-
- try {
- if (writeBuffer.isEmpty()){
- return;
- }
- Map exceptionRowMap = new LinkedHashMap();
- boolean[] resultSuccess = new boolean[writeBuffer.size()];
- try {
- String realTableName = getTargetTableName(writeBuffer);
- List resultMapSingleOp = new LinkedList<>();
- BatchOperation batch = buildBatchOperation(realTableName, writeBuffer, tableNameString.equals(realTableName), resultMapSingleOp);
- BatchOperationResult results = batch.execute();
- if (results != null) {
- int index = 0;
- for (int i = 0; i != resultSuccess.length; ++i) {
- if (results.getResults().get(index) instanceof ObTableException) {
- resultSuccess[i] = false;
- exceptionRowMap.put((ObTableException)results.getResults().get(index), writeBuffer.get(i));
- } else {
- resultSuccess[i] = true;
- }
- index += resultMapSingleOp.get(i);
- }
- }
- } catch (Exception e) {
- logger.error(LCD.convert("01-00008"), tableNameString, null, autoFlush,
- writeBuffer.size(), e);
- if (e instanceof IOException) {
- throw (IOException) e;
- }
- } finally {
- // mutate list so that it is empty for complete success, or contains
- // only failed records results are returned in the same order as the
- // requests in list walk the list backwards, so we can remove from list
- // without impacting the indexes of earlier members
- for (int i = resultSuccess.length - 1; i >= 0; i--) {
- if (resultSuccess[i]) {
- // successful Puts are removed from the list here.
- writeBuffer.remove(i);
- }
- }
- if (!exceptionRowMap.isEmpty()) {
- exceptionRowMap.forEach((e, row)->{
- logger.error(LCD.convert("01-00008"), row, tableNameString, autoFlush,
- writeBuffer.size(), e);
- });
- }
- }
- } finally {
- if (clearBufferOnFail) {
- writeBuffer.clear();
- currentWriteBufferSize = 0;
- } else {
- // the write buffer was adjusted by processBatchOfPuts
- currentWriteBufferSize = 0;
- for (Put aPut : writeBuffer) {
- currentWriteBufferSize += aPut.heapSize();
- }
- }
+ if (mutator == null) {
+ return;
}
+ mutator.flush();
}
@Override
public void close() throws IOException {
+ if (isClosed) {
+ return;
+ }
+ flushCommits();
if (cleanupPoolOnClose) {
executePool.shutdown();
}
+ if (cleanupConnectionOnClose) {
+ if (this.connection != null) {
+ this.connection.close();
+ }
+ }
+ this.isClosed = true;
}
@Override
@@ -1632,7 +1578,7 @@ public void setAutoFlush(boolean autoFlush) {
* Turns 'auto-flush' on or off.
*
* When enabled (default), {@link Put} operations don't get buffered/delayed
- * and are immediately executed. Failed operations are not retried. This is
+ * and are immediately executed. Failed operations will be retried in batch. This is
* slower but safer.
*
* Turning off {@link #autoFlush} means that multiple {@link Put}s will be
@@ -1640,28 +1586,24 @@ public void setAutoFlush(boolean autoFlush) {
* application dies before pending writes get flushed to HBase, data will be
* lost.
*
- * When you turn {@link #autoFlush} off, you should also consider the
- * {@link #clearBufferOnFail} option. By default, asynchronous {@link Put}
+ * By default, asynchronous {@link Put}
* requests will be retried on failure until successful. However, this can
* pollute the writeBuffer and slow down batching performance. Additionally,
* you may want to issue a number of Put requests and call
- * {@link #flushCommits()} as a barrier. In both use cases, consider setting
- * clearBufferOnFail to true to erase the buffer after {@link #flushCommits()}
- * has been called, regardless of success.
+ * {@link #flushCommits()} as a barrier.
*
* @param autoFlush Whether or not to enable 'auto-flush'.
- * @param clearBufferOnFail Whether to keep Put failures in the writeBuffer
+ * @param clearBufferOnFail Whether to keep Put failures in the writeBuffer (UNUSED for this version)
* @see #flushCommits
*/
@Override
public void setAutoFlush(boolean autoFlush, boolean clearBufferOnFail) {
this.autoFlush = autoFlush;
- this.clearBufferOnFail = autoFlush || clearBufferOnFail;
}
@Override
public void setAutoFlushTo(boolean autoFlush) {
- throw new FeatureNotSupportedException("not supported yet'");
+ this.autoFlush = autoFlush;
}
/**
@@ -1674,7 +1616,11 @@ public void setAutoFlushTo(boolean autoFlush) {
*/
@Override
public long getWriteBufferSize() {
- return writeBufferSize;
+ if (mutator == null) {
+ return writeBufferSize;
+ } else {
+ return mutator.getWriteBufferSize();
+ }
}
/**
@@ -1689,9 +1635,10 @@ public long getWriteBufferSize() {
@Override
public void setWriteBufferSize(long writeBufferSize) throws IOException {
this.writeBufferSize = writeBufferSize;
- if (currentWriteBufferSize > writeBufferSize) {
- flushCommits();
+ if (this.mutator == null) {
+ getBufferedMutator();
}
+ this.mutator.setWriteBufferSize(writeBufferSize);
}
@Override
@@ -2314,4 +2261,14 @@ public byte[][] getEndKeys() throws IOException {
public Pair getStartEndKeys() throws IOException {
return new Pair<>(getStartKeys(), getEndKeys());
}
+
+ private BufferedMutator getBufferedMutator() throws IOException {
+ if (this.mutator == null) {
+ this.mutator = (OHBufferedMutatorImpl) this.connection.getBufferedMutator(
+ new BufferedMutatorParams(TableName.valueOf(this.tableNameString))
+ .pool(this.executePool).writeBufferSize(this.writeBufferSize)
+ .maxKeyValueSize(this.maxKeyValueSize), this);
+ }
+ return this.mutator;
+ }
}
\ No newline at end of file
diff --git a/src/main/java/com/alipay/oceanbase/hbase/OHTablePool.java b/src/main/java/com/alipay/oceanbase/hbase/OHTablePool.java
index f0d4a447..1a414ca5 100644
--- a/src/main/java/com/alipay/oceanbase/hbase/OHTablePool.java
+++ b/src/main/java/com/alipay/oceanbase/hbase/OHTablePool.java
@@ -26,6 +26,7 @@
import com.google.protobuf.Service;
import com.google.protobuf.ServiceException;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
@@ -67,7 +68,7 @@ public class OHTablePool implements Closeable {
* Default Constructor. Default HBaseConfiguration and no limit on pool size.
*/
public OHTablePool() {
- this(new Configuration(), Integer.MAX_VALUE);
+ this(HBaseConfiguration.create(), Integer.MAX_VALUE);
}
/**
diff --git a/src/main/java/com/alipay/oceanbase/hbase/constants/OHConstants.java b/src/main/java/com/alipay/oceanbase/hbase/constants/OHConstants.java
index e3a22b2a..5d3dbbdd 100644
--- a/src/main/java/com/alipay/oceanbase/hbase/constants/OHConstants.java
+++ b/src/main/java/com/alipay/oceanbase/hbase/constants/OHConstants.java
@@ -143,10 +143,6 @@ public final class OHConstants {
public static final String MAX_KEYVALUE_SIZE_KEY = "hbase.client.keyvalue.maxsize";
- public static final String HBASE_HTABLE_PUT_WRITE_BUFFER_CHECK = "hbase.htable.put.write.buffer.check";
-
- public static final int DEFAULT_HBASE_HTABLE_PUT_WRITE_BUFFER_CHECK = 10;
-
public static final long WRITE_BUFFER_SIZE_DEFAULT = 2097152L;
public static final int MAX_KEYVALUE_SIZE_DEFAULT = -1;
diff --git a/src/main/java/com/alipay/oceanbase/hbase/util/BatchError.java b/src/main/java/com/alipay/oceanbase/hbase/util/BatchError.java
index 49fe17e9..2bdae45a 100644
--- a/src/main/java/com/alipay/oceanbase/hbase/util/BatchError.java
+++ b/src/main/java/com/alipay/oceanbase/hbase/util/BatchError.java
@@ -1,3 +1,20 @@
+/*-
+ * #%L
+ * com.oceanbase:obkv-hbase-client
+ * %%
+ * Copyright (C) 2022 - 2025 OceanBase Group
+ * %%
+ * OBKV HBase Client Framework is licensed under Mulan PSL v2.
+ * You can use this software according to the terms and conditions of the Mulan PSL v2.
+ * You may obtain a copy of Mulan PSL v2 at:
+ * http://license.coscl.org.cn/MulanPSL2
+ * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
+ * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
+ * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
+ * See the Mulan PSL v2 for more details.
+ * #L%
+ */
+
package com.alipay.oceanbase.hbase.util;
import org.apache.hadoop.hbase.ServerName;
diff --git a/src/main/java/com/alipay/oceanbase/hbase/util/OHBufferedMutatorImpl.java b/src/main/java/com/alipay/oceanbase/hbase/util/OHBufferedMutatorImpl.java
index 42e44737..3d3dd196 100644
--- a/src/main/java/com/alipay/oceanbase/hbase/util/OHBufferedMutatorImpl.java
+++ b/src/main/java/com/alipay/oceanbase/hbase/util/OHBufferedMutatorImpl.java
@@ -18,9 +18,10 @@
package com.alipay.oceanbase.hbase.util;
import com.alipay.oceanbase.hbase.OHTable;
-import com.google.common.annotations.VisibleForTesting;
+import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableBatchOperation;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.slf4j.Logger;
@@ -31,7 +32,9 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
-import static com.alipay.oceanbase.rpc.util.TableClientLoggerFactory.LCD;
+
+import static com.alipay.oceanbase.hbase.util.TableHBaseLoggerFactory.LCD;
+import static com.alipay.oceanbase.rpc.ObGlobal.*;
@InterfaceAudience.Private
public class OHBufferedMutatorImpl implements BufferedMutator {
@@ -40,13 +43,11 @@ public class OHBufferedMutatorImpl implements BufferedMutator {
private final ExceptionListener listener;
- private final OHTable ohTable;
private final TableName tableName;
private volatile Configuration conf;
- @VisibleForTesting
+ private OHTable ohTable;
final ConcurrentLinkedQueue asyncWriteBuffer = new ConcurrentLinkedQueue();
- @VisibleForTesting
AtomicLong currentAsyncBufferSize = new AtomicLong(0);
private long writeBufferSize;
@@ -55,9 +56,11 @@ public class OHBufferedMutatorImpl implements BufferedMutator {
private final ExecutorService pool;
private int rpcTimeout;
private int operationTimeout;
+ private static final long OB_VERSION_4_2_5_1 = calcVersion(4, (short) 2,
+ (byte) 5, (byte) 1);
- public OHBufferedMutatorImpl(OHConnectionImpl ohConnection, BufferedMutatorParams params)
- throws IOException {
+ public OHBufferedMutatorImpl(OHConnectionImpl ohConnection, BufferedMutatorParams params,
+ OHTable ohTable) throws IOException {
if (ohConnection == null || ohConnection.isClosed()) {
throw new IllegalArgumentException("Connection is null or closed.");
}
@@ -77,7 +80,11 @@ public OHBufferedMutatorImpl(OHConnectionImpl ohConnection, BufferedMutatorParam
.getMaxKeyValueSize() : connectionConfig.getMaxKeyValueSize();
// create an OHTable object to do batch work
- this.ohTable = new OHTable(tableName, ohConnection, connectionConfig, pool);
+ if (ohTable != null) {
+ this.ohTable = ohTable;
+ } else {
+ this.ohTable = new OHTable(tableName, ohConnection, connectionConfig, pool);
+ }
}
@Override
@@ -119,14 +126,12 @@ public void mutate(List extends Mutation> mutations) throws IOException {
validateOperation(m);
toAddSize += m.heapSize();
}
-
currentAsyncBufferSize.addAndGet(toAddSize);
asyncWriteBuffer.addAll(mutations);
if (currentAsyncBufferSize.get() > writeBufferSize) {
- execute(false);
+ batchExecute(false);
}
-
}
/**
@@ -142,10 +147,18 @@ private void validateOperation(Mutation mt) throws IllegalArgumentException {
}
if (mt instanceof Put) {
// family empty check is in validatePut
- HTable.validatePut((Put) mt, maxKeyValueSize);
- OHTable.checkFamilyViolation(mt.getFamilyMap().keySet(), true);
+ OHTable.validatePut((Put) mt, maxKeyValueSize);
+ if (isMultiFamilySupport()) {
+ OHTable.checkFamilyViolation(mt.getFamilyMap().keySet(), true);
+ } else {
+ OHTable.checkFamilyViolationForOneFamily(mt.getFamilyMap().keySet());
+ }
} else {
- OHTable.checkFamilyViolation(mt.getFamilyMap().keySet(), false);
+ if (isMultiFamilySupport()) {
+ OHTable.checkFamilyViolation(mt.getFamilyMap().keySet(), false);
+ } else {
+ OHTable.checkFamilyViolationForOneFamily(mt.getFamilyMap().keySet());
+ }
}
}
@@ -156,7 +169,7 @@ private void validateOperation(Mutation mt) throws IllegalArgumentException {
* @param flushAll - if true, sends all the writes and wait for all of them to finish before
* returning.
*/
- private void execute(boolean flushAll) throws IOException {
+ private void batchExecute(boolean flushAll) throws IOException {
LinkedList execBuffer = new LinkedList<>();
long dequeuedSize = 0L;
try {
@@ -172,19 +185,16 @@ private void execute(boolean flushAll) throws IOException {
if (execBuffer.isEmpty()) {
return;
}
- ohTable.batch(execBuffer);
+ Object[] results = new Object[execBuffer.size()];
+ ohTable.batch(execBuffer, results);
// if commit all successfully, clean execBuffer
execBuffer.clear();
} catch (Exception ex) {
- LOGGER.error(LCD.convert("01-00026"), ex);
+ // do not recollect error operations, notify outside
+ LOGGER.error("error happens: table name = ", tableName.getNameAsString(), ex);
if (ex.getCause() instanceof RetriesExhaustedWithDetailsException) {
- LOGGER.error(tableName + ": One or more of the operations have failed after retries.");
+ LOGGER.error(tableName.getNameAsString() + ": One or more of the operations have failed after retries.", ex);
RetriesExhaustedWithDetailsException retryException = (RetriesExhaustedWithDetailsException) ex.getCause();
- // recollect mutations
- execBuffer.clear();
- for (int i = 0; i < retryException.getNumExceptions(); ++i) {
- execBuffer.add((Mutation) retryException.getRow(i));
- }
if (listener != null) {
listener.onException(retryException, this);
} else {
@@ -194,12 +204,6 @@ private void execute(boolean flushAll) throws IOException {
LOGGER.error("Errors unrelated to operations occur during mutation operation", ex);
throw ex;
}
- } finally {
- for (Mutation mutation : execBuffer) {
- long size = mutation.heapSize();
- currentAsyncBufferSize.addAndGet(size);
- asyncWriteBuffer.add(mutation);
- }
}
}
@@ -209,7 +213,7 @@ public void close() throws IOException {
return;
}
try {
- execute(true);
+ batchExecute(true);
} finally {
// the pool in ObTableClient will be shut down too
this.pool.shutdown();
@@ -234,13 +238,21 @@ public void setWriteBufferSize(long writeBufferSize) throws IOException {
}
}
+ /**
+ * Only 4_2_5 BP1 - 4_3_0 and after 4_3_4 support multi-cf
+ * */
+ boolean isMultiFamilySupport() {
+ return (OB_VERSION >= OB_VERSION_4_2_5_1 && OB_VERSION < OB_VERSION_4_3_0_0)
+ || (OB_VERSION >= OB_VERSION_4_3_4_0);
+ }
+
/**
* Force to commit all operations
* do not care whether the pool is shut down or this BufferedMutator is closed
*/
@Override
public void flush() throws IOException {
- execute(true);
+ batchExecute(true);
}
@Override
@@ -258,6 +270,10 @@ public void setOperationTimeout(int operationTimeout) {
this.ohTable.setOperationTimeout(operationTimeout);
}
+ public long getCurrentBufferSize() {
+ return currentAsyncBufferSize.get();
+ }
+
@Deprecated
public List getWriteBuffer() {
return Arrays.asList(asyncWriteBuffer.toArray(new Row[0]));
diff --git a/src/main/java/com/alipay/oceanbase/hbase/util/OHConnectionImpl.java b/src/main/java/com/alipay/oceanbase/hbase/util/OHConnectionImpl.java
index 4256f316..28b39b2c 100644
--- a/src/main/java/com/alipay/oceanbase/hbase/util/OHConnectionImpl.java
+++ b/src/main/java/com/alipay/oceanbase/hbase/util/OHConnectionImpl.java
@@ -126,6 +126,11 @@ public BufferedMutator getBufferedMutator(TableName tableName) throws IOExceptio
@Override
public BufferedMutator getBufferedMutator(BufferedMutatorParams params) throws IOException {
+ return getBufferedMutator(params, null);
+ }
+
+ public BufferedMutator getBufferedMutator(BufferedMutatorParams params, OHTable ohTable)
+ throws IOException {
if (params.getTableName() == null) {
throw new IllegalArgumentException("TableName cannot be null.");
}
@@ -138,7 +143,7 @@ public BufferedMutator getBufferedMutator(BufferedMutatorParams params) throws I
if (params.getMaxKeyValueSize() == BUFFERED_PARAM_UNSET) {
params.maxKeyValueSize(connectionConfig.getMaxKeyValueSize());
}
- return new OHBufferedMutatorImpl(this, params);
+ return new OHBufferedMutatorImpl(this, params, ohTable);
}
@Override
diff --git a/src/main/java/com/alipay/oceanbase/hbase/util/OHTableFactory.java b/src/main/java/com/alipay/oceanbase/hbase/util/OHTableFactory.java
index 8d42daa4..f74d184f 100644
--- a/src/main/java/com/alipay/oceanbase/hbase/util/OHTableFactory.java
+++ b/src/main/java/com/alipay/oceanbase/hbase/util/OHTableFactory.java
@@ -21,6 +21,7 @@
import com.alipay.oceanbase.hbase.OHTablePool;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTableFactory;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.util.Bytes;
@@ -170,7 +171,7 @@ private Configuration adjustConfiguration(Configuration configuration, String ta
}
private Configuration copyConfiguration(Configuration origin) {
- Configuration copy = new Configuration();
+ Configuration copy = HBaseConfiguration.create();
for (Map.Entry entry : origin) {
copy.set(entry.getKey(), entry.getValue());
diff --git a/src/main/java/com/alipay/oceanbase/hbase/util/ObTableClientManager.java b/src/main/java/com/alipay/oceanbase/hbase/util/ObTableClientManager.java
index 036ce265..5db940aa 100644
--- a/src/main/java/com/alipay/oceanbase/hbase/util/ObTableClientManager.java
+++ b/src/main/java/com/alipay/oceanbase/hbase/util/ObTableClientManager.java
@@ -21,8 +21,6 @@
import com.alipay.oceanbase.rpc.constant.Constants;
import com.google.common.base.Objects;
import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.client.ConnectionConfiguration;
import java.io.IOException;
import java.util.Map;
diff --git a/src/test/java/com/alipay/oceanbase/hbase/HTableTestBase.java b/src/test/java/com/alipay/oceanbase/hbase/HTableTestBase.java
index ec197fce..0f88af8b 100644
--- a/src/test/java/com/alipay/oceanbase/hbase/HTableTestBase.java
+++ b/src/test/java/com/alipay/oceanbase/hbase/HTableTestBase.java
@@ -5189,7 +5189,7 @@ public void testCellTTL() throws Exception {
try {
tryPut(hTable, errorPut);
} catch (Exception e) {
- assertTrue(e.getCause().toString().contains("Unknown column 'TTL'"));
+ assertTrue(e.getCause().getCause().toString().contains("Unknown column 'TTL'"));
}
// test put and get
tryPut(hTable, put1);
diff --git a/src/test/java/com/alipay/oceanbase/hbase/NativeHBaseTest.java b/src/test/java/com/alipay/oceanbase/hbase/NativeHBaseTest.java
index 545d95b9..06e565a1 100644
--- a/src/test/java/com/alipay/oceanbase/hbase/NativeHBaseTest.java
+++ b/src/test/java/com/alipay/oceanbase/hbase/NativeHBaseTest.java
@@ -4,6 +4,7 @@
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.junit.AfterClass;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
@@ -39,8 +40,13 @@ public void cleanData() throws IOException {
@AfterClass
public static void finish() throws IOException {
- hTable.close();
- multiCfHTable.close();
+ try {
+ hTable.close();
+ multiCfHTable.close();
+ } catch (Exception e) {
+ Assert.assertSame(e.getClass(), IOException.class);
+ Assert.assertTrue(e.getMessage().contains("put table"));
+ }
}
}
diff --git a/src/test/java/com/alipay/oceanbase/hbase/OHTableClientTest.java b/src/test/java/com/alipay/oceanbase/hbase/OHTableClientTest.java
index 01c4057f..76ddce12 100644
--- a/src/test/java/com/alipay/oceanbase/hbase/OHTableClientTest.java
+++ b/src/test/java/com/alipay/oceanbase/hbase/OHTableClientTest.java
@@ -20,6 +20,7 @@
import com.alipay.oceanbase.hbase.util.ObHTableTestUtil;
import org.junit.*;
+import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
@@ -37,7 +38,7 @@ public static void before() throws Exception {
((OHTableClient) multiCfHTable).init();
List tableGroups = new LinkedList<>();
tableGroups.add("test");
- tableGroups.add("test_multi_cf");
+// tableGroups.add("test_multi_cf");
ObHTableTestUtil.prepareClean(tableGroups);
}
@@ -77,8 +78,13 @@ public void testNew() throws Exception {
@AfterClass
public static void finish() throws Exception {
- hTable.close();
- multiCfHTable.close();
- ObHTableTestUtil.closeConn();
+ try {
+ hTable.close();
+ multiCfHTable.close();
+ ObHTableTestUtil.closeConn();
+ } catch (Exception e) {
+ Assert.assertSame(e.getClass(), IOException.class);
+ Assert.assertTrue(e.getMessage().contains("put table"));
+ }
}
}
diff --git a/src/test/java/com/alipay/oceanbase/hbase/OHTableClientTestLoadTest.java b/src/test/java/com/alipay/oceanbase/hbase/OHTableClientTestLoadTest.java
index ed9df2fa..dfa96848 100644
--- a/src/test/java/com/alipay/oceanbase/hbase/OHTableClientTestLoadTest.java
+++ b/src/test/java/com/alipay/oceanbase/hbase/OHTableClientTestLoadTest.java
@@ -67,9 +67,14 @@ public void test_refresh_table_entry() throws Exception {
@AfterClass
public static void after() throws Exception {
- hTable.close();
- multiCfHTable.close();
- ObHTableTestUtil.closeConn();
+ try {
+ hTable.close();
+ multiCfHTable.close();
+ ObHTableTestUtil.closeConn();
+ } catch (Exception e) {
+ Assert.assertSame(e.getClass(), IOException.class);
+ Assert.assertTrue(e.getMessage().contains("put table"));
+ }
}
@Test
diff --git a/src/test/java/com/alipay/oceanbase/hbase/OHTablePoolLoadTest.java b/src/test/java/com/alipay/oceanbase/hbase/OHTablePoolLoadTest.java
index d639d4a4..afe2c084 100644
--- a/src/test/java/com/alipay/oceanbase/hbase/OHTablePoolLoadTest.java
+++ b/src/test/java/com/alipay/oceanbase/hbase/OHTablePoolLoadTest.java
@@ -167,8 +167,13 @@ public void testNew() throws IOException {
@AfterClass
public static void finish() throws IOException, SQLException {
- hTable.close();
- multiCfHTable.close();
- ObHTableTestUtil.closeConn();
+ try {
+ hTable.close();
+ multiCfHTable.close();
+ ObHTableTestUtil.closeConn();
+ } catch (Exception e) {
+ Assert.assertSame(e.getClass(), IOException.class);
+ Assert.assertTrue(e.getMessage().contains("put table"));
+ }
}
}
diff --git a/src/test/java/com/alipay/oceanbase/hbase/OHTablePoolTest.java b/src/test/java/com/alipay/oceanbase/hbase/OHTablePoolTest.java
index d915afdc..5977f26f 100644
--- a/src/test/java/com/alipay/oceanbase/hbase/OHTablePoolTest.java
+++ b/src/test/java/com/alipay/oceanbase/hbase/OHTablePoolTest.java
@@ -20,6 +20,7 @@
import com.alipay.oceanbase.hbase.util.ObHTableTestUtil;
import com.alipay.remoting.util.ConcurrentHashSet;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.util.PoolMap;
import org.junit.*;
@@ -37,7 +38,7 @@ public class OHTablePoolTest extends HTableTestBase {
protected static OHTablePool ohTablePool;
private static OHTablePool newOHTablePool(final int maxSize, final PoolMap.PoolType poolType) {
- OHTablePool pool = new OHTablePool(new Configuration(), maxSize, poolType);
+ OHTablePool pool = new OHTablePool(HBaseConfiguration.create(), maxSize, poolType);
pool.setFullUserName("test", ObHTableTestUtil.FULL_USER_NAME);
pool.setPassword("test", ObHTableTestUtil.PASSWORD);
if (ObHTableTestUtil.ODP_MODE) {
@@ -55,7 +56,7 @@ private static OHTablePool newOHTablePool(final int maxSize, final PoolMap.PoolT
@BeforeClass
public static void setup() throws Exception {
- Configuration c = new Configuration();
+ Configuration c = HBaseConfiguration.create();
ohTablePool = newOHTablePool(10, null);
ohTablePool.setRuntimeBatchExecutor("test", Executors.newFixedThreadPool(3));
hTable = ohTablePool.getTable("test");
@@ -73,9 +74,14 @@ public void prepareCase() {
@AfterClass
public static void finish() throws IOException, SQLException {
- hTable.close();
- multiCfHTable.close();
- ObHTableTestUtil.closeConn();
+ try {
+ hTable.close();
+ multiCfHTable.close();
+ ObHTableTestUtil.closeConn();
+ } catch (Exception e) {
+ Assert.assertSame(e.getClass(), IOException.class);
+ Assert.assertTrue(e.getMessage().contains("put table"));
+ }
}
public void test_current_get_close(final OHTablePool ohTablePool, int concurrency, int maxSize) {
diff --git a/src/test/java/com/alipay/oceanbase/hbase/OHTableTest.java b/src/test/java/com/alipay/oceanbase/hbase/OHTableTest.java
index b4df2595..d4fd6331 100644
--- a/src/test/java/com/alipay/oceanbase/hbase/OHTableTest.java
+++ b/src/test/java/com/alipay/oceanbase/hbase/OHTableTest.java
@@ -108,9 +108,14 @@ public void testNew() throws Exception {
@AfterClass
public static void finish() throws IOException, SQLException {
- hTable.close();
- multiCfHTable.close();
- ObHTableTestUtil.closeConn();
+ try {
+ hTable.close();
+ multiCfHTable.close();
+ ObHTableTestUtil.closeConn();
+ } catch (Exception e) {
+ Assert.assertSame(e.getClass(), IOException.class);
+ Assert.assertTrue(e.getMessage().contains("put table"));
+ }
}
}