From 28619e1dc8637e556d300585baf555e61e02541f Mon Sep 17 00:00:00 2001 From: JackShi148 Date: Tue, 5 Nov 2024 19:38:53 +0800 Subject: [PATCH 01/15] adjust Put to be compatible to hbase 1.x --- .../com/alipay/oceanbase/hbase/OHTable.java | 222 ++++++++---------- .../hbase/constants/OHConstants.java | 4 - .../hbase/util/OHBufferedMutatorImpl.java | 33 ++- .../hbase/util/OHConnectionImpl.java | 7 +- 4 files changed, 125 insertions(+), 141 deletions(-) diff --git a/src/main/java/com/alipay/oceanbase/hbase/OHTable.java b/src/main/java/com/alipay/oceanbase/hbase/OHTable.java index e1ab500b..93b1584c 100644 --- a/src/main/java/com/alipay/oceanbase/hbase/OHTable.java +++ b/src/main/java/com/alipay/oceanbase/hbase/OHTable.java @@ -81,102 +81,110 @@ public class OHTable implements HTableInterface { - private static final Logger logger = TableHBaseLoggerFactory - .getLogger(OHTable.class); + private static final Logger logger = TableHBaseLoggerFactory + .getLogger(OHTable.class); /** * the table client for oceanbase */ - private final ObTableClient obTableClient; + private final ObTableClient obTableClient; /** * the ohTable name in byte array */ - private final byte[] tableName; + private final byte[] tableName; /** * the ohTable name in string */ - private final String tableNameString; + private final String tableNameString; /** * operation timeout whose default value is Integer.MaxValue decide the timeout of executing in pool. *

* if operation timeout is not equal to the default value mean the Get execute in the pool */ - private int operationTimeout; + private int operationTimeout; /** * timeout for each rpc request */ - private int rpcTimeout; + private int rpcTimeout; /** * if the Get executing pool is specified by user cleanupPoolOnClose will be false , * which means that user is responsible for the pool */ - private boolean cleanupPoolOnClose = true; + private boolean cleanupPoolOnClose = true; /** * if the obTableClient is specified by user closeClientOnClose will be false , * which means that user is responsible for obTableClient */ - private boolean closeClientOnClose = true; + private boolean closeClientOnClose = true; + + /** + * If the connection this ObTable obtains is created by the ObTable itself, + * should set true and close the connection when this ObTable closes; + * otherwise set false + */ + private final boolean cleanupConnectionOnClose; /** * when the operationExecuteInPool is true the Get * will be executed in the pool. */ - private ExecutorService executePool; + private ExecutorService executePool; /** * decide whether the Get request will be executed * in the pool. */ - private boolean operationExecuteInPool = false; + private boolean operationExecuteInPool = false; - /** - * the buffer of put request - */ - private final ArrayList writeBuffer = new ArrayList<>(); /** * when the put request reach the write buffer size the do put will * flush commits automatically */ - private long writeBufferSize; - /** - * the do put check write buffer every putWriteBufferCheck puts - */ - private int putWriteBufferCheck; + private long writeBufferSize; /** * decide whether clear the buffer when meet exception.the default * value is true. Be careful about the correctness when set it false */ - private boolean clearBufferOnFail = true; /** * whether flush the put automatically */ - private boolean autoFlush = true; - - /** - * current buffer size - */ - private long currentWriteBufferSize; + private boolean autoFlush = true; /** * the max size of put key value */ - private int maxKeyValueSize; + private int maxKeyValueSize; // i.e., doPut checks the writebuffer every X Puts. /** * Configuration extends from hbase configuration */ - private final Configuration configuration; + private final Configuration configuration; + + private int scannerTimeout; + + /** + * the connection to obtain bufferedMutator for Put operations + */ + private OHConnectionImpl connection; - private int scannerTimeout; + /** + * the bufferedMutator to execute Puts + */ + private OHBufferedMutatorImpl mutator; + + /** + * flag for whether closed + */ + private boolean isClosed = false; /** * Creates an object to access a HBase table. @@ -196,6 +204,8 @@ public OHTable(Configuration configuration, String tableName) throws IOException this.configuration = configuration; this.tableName = tableName.getBytes(); this.tableNameString = tableName; + this.connection = (OHConnectionImpl) ConnectionFactory.createConnection(configuration); + this.cleanupConnectionOnClose = true; int maxThreads = configuration.getInt(HBASE_HTABLE_PRIVATE_THREADS_MAX, DEFAULT_HBASE_HTABLE_PRIVATE_THREADS_MAX); @@ -253,6 +263,8 @@ public OHTable(Configuration configuration, final byte[] tableName, this.configuration = configuration; this.tableName = tableName; this.tableNameString = Bytes.toString(tableName); + this.connection = (OHConnectionImpl) ConnectionFactory.createConnection(configuration); + this.cleanupConnectionOnClose = true; this.executePool = executePool; this.cleanupPoolOnClose = false; OHConnectionConfiguration ohConnectionConf = new OHConnectionConfiguration(configuration); @@ -289,6 +301,7 @@ public OHTable(final byte[] tableName, final ObTableClient obTableClient, this.tableNameString = Bytes.toString(tableName); this.cleanupPoolOnClose = false; this.closeClientOnClose = false; + this.cleanupConnectionOnClose = false; this.executePool = executePool; this.obTableClient = obTableClient; this.configuration = new Configuration(); @@ -306,6 +319,8 @@ public OHTable(TableName tableName, Connection connection, this.tableNameString = Bytes.toString(tableName.getName()); this.configuration = connection.getConfiguration(); this.executePool = executePool; + this.connection = (OHConnectionImpl) connection; + this.cleanupConnectionOnClose = false; if (executePool == null) { int maxThreads = configuration.getInt(HBASE_HTABLE_PRIVATE_THREADS_MAX, DEFAULT_HBASE_HTABLE_PRIVATE_THREADS_MAX); @@ -323,8 +338,6 @@ public OHTable(TableName tableName, Connection connection, HBASE_CLIENT_OPERATION_EXECUTE_IN_POOL, (this.operationTimeout != HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT)); this.maxKeyValueSize = connectionConfig.getMaxKeyValueSize(); - this.putWriteBufferCheck = this.configuration.getInt(HBASE_HTABLE_PUT_WRITE_BUFFER_CHECK, - DEFAULT_HBASE_HTABLE_PUT_WRITE_BUFFER_CHECK); this.writeBufferSize = connectionConfig.getWriteBufferSize(); this.tableName = tableName.getName(); int numRetries = configuration.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, @@ -386,8 +399,6 @@ private void finishSetUp() { (this.operationTimeout != HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT)); this.maxKeyValueSize = this.configuration.getInt(MAX_KEYVALUE_SIZE_KEY, MAX_KEYVALUE_SIZE_DEFAULT); - this.putWriteBufferCheck = this.configuration.getInt(HBASE_HTABLE_PUT_WRITE_BUFFER_CHECK, - DEFAULT_HBASE_HTABLE_PUT_WRITE_BUFFER_CHECK); this.writeBufferSize = this.configuration.getLong(WRITE_BUFFER_SIZE_KEY, WRITE_BUFFER_SIZE_DEFAULT); } @@ -1155,29 +1166,16 @@ public ResultScanner getScanner(byte[] family, byte[] qualifier) throws IOExcept @Override public void put(Put put) throws IOException { - doPut(Collections.singletonList(put)); + getBufferedMutator().mutate(put); + if (autoFlush) { + flushCommits(); + } } @Override public void put(List puts) throws IOException { - doPut(puts); - } - - private void doPut(List puts) throws IOException { - int n = 0; - for (Put put : puts) { - validatePut(put); - checkFamilyViolation(put.getFamilyMap().keySet(), true); - writeBuffer.add(put); - currentWriteBufferSize += put.heapSize(); - - // we need to periodically see if the writebuffer is full instead of waiting until the end of the List - n++; - if (n % putWriteBufferCheck == 0 && currentWriteBufferSize > writeBufferSize) { - flushCommits(); - } - } - if (autoFlush || currentWriteBufferSize > writeBufferSize) { + getBufferedMutator().mutate(puts); + if (autoFlush) { flushCommits(); } } @@ -1186,11 +1184,10 @@ private void doPut(List puts) throws IOException { * 校验 put 里的参数是否合法,需要传入 family ,并且 keyvalue 的 size 不能太大 * @param put the put */ - private void validatePut(Put put) { + public static void validatePut(Put put, int maxKeyValueSize) { if (put.isEmpty()) { throw new IllegalArgumentException("No columns to insert"); } - if (maxKeyValueSize > 0) { for (Map.Entry> entry : put.getFamilyMap().entrySet()) { if (entry.getKey() == null || entry.getKey().length == 0) { @@ -1526,73 +1523,33 @@ public boolean isAutoFlush() { @Override public void flushCommits() throws IOException { - + if (mutator == null) { + return; + } try { - if (writeBuffer.isEmpty()){ - return; - } - Map exceptionRowMap = new LinkedHashMap(); - boolean[] resultSuccess = new boolean[writeBuffer.size()]; - try { - String realTableName = getTargetTableName(writeBuffer); - List resultMapSingleOp = new LinkedList<>(); - BatchOperation batch = buildBatchOperation(realTableName, writeBuffer, tableNameString.equals(realTableName), resultMapSingleOp); - BatchOperationResult results = batch.execute(); - if (results != null) { - int index = 0; - for (int i = 0; i != resultSuccess.length; ++i) { - if (results.getResults().get(index) instanceof ObTableException) { - resultSuccess[i] = false; - exceptionRowMap.put((ObTableException)results.getResults().get(index), writeBuffer.get(i)); - } else { - resultSuccess[i] = true; - } - index += resultMapSingleOp.get(i); - } - } - } catch (Exception e) { - logger.error(LCD.convert("01-00008"), tableNameString, null, autoFlush, - writeBuffer.size(), e); - if (e instanceof IOException) { - throw (IOException) e; - } - } finally { - // mutate list so that it is empty for complete success, or contains - // only failed records results are returned in the same order as the - // requests in list walk the list backwards, so we can remove from list - // without impacting the indexes of earlier members - for (int i = resultSuccess.length - 1; i >= 0; i--) { - if (resultSuccess[i]) { - // successful Puts are removed from the list here. - writeBuffer.remove(i); - } - } - if (!exceptionRowMap.isEmpty()) { - exceptionRowMap.forEach((e, row)->{ - logger.error(LCD.convert("01-00008"), row, tableNameString, autoFlush, - writeBuffer.size(), e); - }); - } - } - } finally { - if (clearBufferOnFail) { - writeBuffer.clear(); - currentWriteBufferSize = 0; - } else { - // the write buffer was adjusted by processBatchOfPuts - currentWriteBufferSize = 0; - for (Put aPut : writeBuffer) { - currentWriteBufferSize += aPut.heapSize(); - } - } + mutator.flush(); + } catch (Exception e) { + throw new IOException("put table " + tableNameString + " error codes " + null + + "auto flush " + autoFlush + " current buffer size " + + mutator.getCurrentBufferSize(), e); } } @Override public void close() throws IOException { + if (isClosed) { + return; + } + flushCommits(); if (cleanupPoolOnClose) { executePool.shutdown(); } + if (cleanupConnectionOnClose) { + if (this.connection != null) { + this.connection.close(); + } + } + this.isClosed = true; } @Override @@ -1632,7 +1589,7 @@ public void setAutoFlush(boolean autoFlush) { * Turns 'auto-flush' on or off. *

* When enabled (default), {@link Put} operations don't get buffered/delayed - * and are immediately executed. Failed operations are not retried. This is + * and are immediately executed. Failed operations will be retried in batch. This is * slower but safer. *

* Turning off {@link #autoFlush} means that multiple {@link Put}s will be @@ -1640,28 +1597,24 @@ public void setAutoFlush(boolean autoFlush) { * application dies before pending writes get flushed to HBase, data will be * lost. *

- * When you turn {@link #autoFlush} off, you should also consider the - * {@link #clearBufferOnFail} option. By default, asynchronous {@link Put} + * By default, asynchronous {@link Put} * requests will be retried on failure until successful. However, this can * pollute the writeBuffer and slow down batching performance. Additionally, * you may want to issue a number of Put requests and call - * {@link #flushCommits()} as a barrier. In both use cases, consider setting - * clearBufferOnFail to true to erase the buffer after {@link #flushCommits()} - * has been called, regardless of success. + * {@link #flushCommits()} as a barrier. * * @param autoFlush Whether or not to enable 'auto-flush'. - * @param clearBufferOnFail Whether to keep Put failures in the writeBuffer + * @param clearBufferOnFail Whether to keep Put failures in the writeBuffer (UNUSED for this version) * @see #flushCommits */ @Override public void setAutoFlush(boolean autoFlush, boolean clearBufferOnFail) { this.autoFlush = autoFlush; - this.clearBufferOnFail = autoFlush || clearBufferOnFail; } @Override public void setAutoFlushTo(boolean autoFlush) { - throw new FeatureNotSupportedException("not supported yet'"); + this.autoFlush = autoFlush; } /** @@ -1674,7 +1627,11 @@ public void setAutoFlushTo(boolean autoFlush) { */ @Override public long getWriteBufferSize() { - return writeBufferSize; + if (mutator == null) { + return writeBufferSize; + } else { + return mutator.getWriteBufferSize(); + } } /** @@ -1689,9 +1646,10 @@ public long getWriteBufferSize() { @Override public void setWriteBufferSize(long writeBufferSize) throws IOException { this.writeBufferSize = writeBufferSize; - if (currentWriteBufferSize > writeBufferSize) { - flushCommits(); + if (this.mutator == null) { + getBufferedMutator(); } + this.mutator.setWriteBufferSize(writeBufferSize); } @Override @@ -2314,4 +2272,18 @@ public byte[][] getEndKeys() throws IOException { public Pair getStartEndKeys() throws IOException { return new Pair<>(getStartKeys(), getEndKeys()); } + + public enum OHOpType { + Put, Append, Delete, Increment + } + + private BufferedMutator getBufferedMutator() throws IOException { + if (this.mutator == null) { + this.mutator = (OHBufferedMutatorImpl) this.connection.getBufferedMutator( + new BufferedMutatorParams(TableName.valueOf(this.tableNameString)) + .pool(this.executePool).writeBufferSize(this.writeBufferSize) + .maxKeyValueSize(this.maxKeyValueSize), this); + } + return this.mutator; + } } \ No newline at end of file diff --git a/src/main/java/com/alipay/oceanbase/hbase/constants/OHConstants.java b/src/main/java/com/alipay/oceanbase/hbase/constants/OHConstants.java index e3a22b2a..5d3dbbdd 100644 --- a/src/main/java/com/alipay/oceanbase/hbase/constants/OHConstants.java +++ b/src/main/java/com/alipay/oceanbase/hbase/constants/OHConstants.java @@ -143,10 +143,6 @@ public final class OHConstants { public static final String MAX_KEYVALUE_SIZE_KEY = "hbase.client.keyvalue.maxsize"; - public static final String HBASE_HTABLE_PUT_WRITE_BUFFER_CHECK = "hbase.htable.put.write.buffer.check"; - - public static final int DEFAULT_HBASE_HTABLE_PUT_WRITE_BUFFER_CHECK = 10; - public static final long WRITE_BUFFER_SIZE_DEFAULT = 2097152L; public static final int MAX_KEYVALUE_SIZE_DEFAULT = -1; diff --git a/src/main/java/com/alipay/oceanbase/hbase/util/OHBufferedMutatorImpl.java b/src/main/java/com/alipay/oceanbase/hbase/util/OHBufferedMutatorImpl.java index 42e44737..ab50af5a 100644 --- a/src/main/java/com/alipay/oceanbase/hbase/util/OHBufferedMutatorImpl.java +++ b/src/main/java/com/alipay/oceanbase/hbase/util/OHBufferedMutatorImpl.java @@ -31,7 +31,6 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; -import static com.alipay.oceanbase.rpc.util.TableClientLoggerFactory.LCD; @InterfaceAudience.Private public class OHBufferedMutatorImpl implements BufferedMutator { @@ -56,8 +55,8 @@ public class OHBufferedMutatorImpl implements BufferedMutator { private int rpcTimeout; private int operationTimeout; - public OHBufferedMutatorImpl(OHConnectionImpl ohConnection, BufferedMutatorParams params) - throws IOException { + public OHBufferedMutatorImpl(OHConnectionImpl ohConnection, BufferedMutatorParams params, + OHTable ohTable) throws IOException { if (ohConnection == null || ohConnection.isClosed()) { throw new IllegalArgumentException("Connection is null or closed."); } @@ -77,7 +76,11 @@ public OHBufferedMutatorImpl(OHConnectionImpl ohConnection, BufferedMutatorParam .getMaxKeyValueSize() : connectionConfig.getMaxKeyValueSize(); // create an OHTable object to do batch work - this.ohTable = new OHTable(tableName, ohConnection, connectionConfig, pool); + if (ohTable != null) { + this.ohTable = ohTable; + } else { + this.ohTable = new OHTable(tableName, ohConnection, connectionConfig, pool); + } } @Override @@ -142,7 +145,7 @@ private void validateOperation(Mutation mt) throws IllegalArgumentException { } if (mt instanceof Put) { // family empty check is in validatePut - HTable.validatePut((Put) mt, maxKeyValueSize); + OHTable.validatePut((Put) mt, maxKeyValueSize); OHTable.checkFamilyViolation(mt.getFamilyMap().keySet(), true); } else { OHTable.checkFamilyViolation(mt.getFamilyMap().keySet(), false); @@ -176,14 +179,17 @@ private void execute(boolean flushAll) throws IOException { // if commit all successfully, clean execBuffer execBuffer.clear(); } catch (Exception ex) { - LOGGER.error(LCD.convert("01-00026"), ex); if (ex.getCause() instanceof RetriesExhaustedWithDetailsException) { - LOGGER.error(tableName + ": One or more of the operations have failed after retries."); + LOGGER.error(TableHBaseLoggerFactory.LCD.convert("01-00011"), tableName.getNameAsString() + + ": One or more of the operations have failed after retries.", ex.getCause()); RetriesExhaustedWithDetailsException retryException = (RetriesExhaustedWithDetailsException) ex.getCause(); - // recollect mutations + // recollect mutations and log error information execBuffer.clear(); - for (int i = 0; i < retryException.getNumExceptions(); ++i) { - execBuffer.add((Mutation) retryException.getRow(i)); + for (int i = 0; i < retryException.getNumExceptions(); ++i) { + Row failedOp = retryException.getRow(i); + execBuffer.add((Mutation) failedOp); + LOGGER.error(TableHBaseLoggerFactory.LCD.convert("01-00011"), failedOp, tableName.getNameAsString(), + currentAsyncBufferSize.get(), retryException.getCause(i)); } if (listener != null) { listener.onException(retryException, this); @@ -191,7 +197,8 @@ private void execute(boolean flushAll) throws IOException { throw retryException; } } else { - LOGGER.error("Errors unrelated to operations occur during mutation operation", ex); + LOGGER.error(TableHBaseLoggerFactory.LCD.convert("01-00011"), tableName.getNameAsString() + + ": Errors unrelated to operations occur during mutation operation", ex); throw ex; } } finally { @@ -258,6 +265,10 @@ public void setOperationTimeout(int operationTimeout) { this.ohTable.setOperationTimeout(operationTimeout); } + public long getCurrentBufferSize() { + return currentAsyncBufferSize.get(); + } + @Deprecated public List getWriteBuffer() { return Arrays.asList(asyncWriteBuffer.toArray(new Row[0])); diff --git a/src/main/java/com/alipay/oceanbase/hbase/util/OHConnectionImpl.java b/src/main/java/com/alipay/oceanbase/hbase/util/OHConnectionImpl.java index 4256f316..28b39b2c 100644 --- a/src/main/java/com/alipay/oceanbase/hbase/util/OHConnectionImpl.java +++ b/src/main/java/com/alipay/oceanbase/hbase/util/OHConnectionImpl.java @@ -126,6 +126,11 @@ public BufferedMutator getBufferedMutator(TableName tableName) throws IOExceptio @Override public BufferedMutator getBufferedMutator(BufferedMutatorParams params) throws IOException { + return getBufferedMutator(params, null); + } + + public BufferedMutator getBufferedMutator(BufferedMutatorParams params, OHTable ohTable) + throws IOException { if (params.getTableName() == null) { throw new IllegalArgumentException("TableName cannot be null."); } @@ -138,7 +143,7 @@ public BufferedMutator getBufferedMutator(BufferedMutatorParams params) throws I if (params.getMaxKeyValueSize() == BUFFERED_PARAM_UNSET) { params.maxKeyValueSize(connectionConfig.getMaxKeyValueSize()); } - return new OHBufferedMutatorImpl(this, params); + return new OHBufferedMutatorImpl(this, params, ohTable); } @Override From df11904f1d3afca3cb9536cc27b11ccb312ac0f1 Mon Sep 17 00:00:00 2001 From: JackShi148 Date: Wed, 6 Nov 2024 10:09:05 +0800 Subject: [PATCH 02/15] init old server bufferedMutator compatible --- .../hbase/util/OHBufferedMutatorImpl.java | 197 ++++++++++++++++-- 1 file changed, 177 insertions(+), 20 deletions(-) diff --git a/src/main/java/com/alipay/oceanbase/hbase/util/OHBufferedMutatorImpl.java b/src/main/java/com/alipay/oceanbase/hbase/util/OHBufferedMutatorImpl.java index ab50af5a..3eb8ca02 100644 --- a/src/main/java/com/alipay/oceanbase/hbase/util/OHBufferedMutatorImpl.java +++ b/src/main/java/com/alipay/oceanbase/hbase/util/OHBufferedMutatorImpl.java @@ -18,11 +18,18 @@ package com.alipay.oceanbase.hbase.util; import com.alipay.oceanbase.hbase.OHTable; +import com.alipay.oceanbase.rpc.ObTableClient; +import com.alipay.oceanbase.rpc.exception.ObTableUnexpectedException; +import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableBatchOperation; +import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableBatchOperationRequest; +import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableBatchOperationResult; import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.*; +import org.apache.hadoop.hbase.util.Bytes; import org.slf4j.Logger; import java.io.IOException; @@ -31,6 +38,10 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; + +import static com.alipay.oceanbase.hbase.util.TableHBaseLoggerFactory.LCD; +import static com.alipay.oceanbase.rpc.ObGlobal.*; @InterfaceAudience.Private public class OHBufferedMutatorImpl implements BufferedMutator { @@ -39,14 +50,16 @@ public class OHBufferedMutatorImpl implements BufferedMutator { private final ExceptionListener listener; - private final OHTable ohTable; private final TableName tableName; private volatile Configuration conf; + private OHTable ohTable; + private ObTableClient obTableClient; @VisibleForTesting final ConcurrentLinkedQueue asyncWriteBuffer = new ConcurrentLinkedQueue(); @VisibleForTesting AtomicLong currentAsyncBufferSize = new AtomicLong(0); + private AtomicReference> type = new AtomicReference<>(null); private long writeBufferSize; private final int maxKeyValueSize; @@ -54,6 +67,7 @@ public class OHBufferedMutatorImpl implements BufferedMutator { private final ExecutorService pool; private int rpcTimeout; private int operationTimeout; + private static final long OB_VERSION_4_3_5_0 = calcVersion(4, (short) 3, (byte) 5, (byte) 0); public OHBufferedMutatorImpl(OHConnectionImpl ohConnection, BufferedMutatorParams params, OHTable ohTable) throws IOException { @@ -75,11 +89,18 @@ public OHBufferedMutatorImpl(OHConnectionImpl ohConnection, BufferedMutatorParam this.maxKeyValueSize = params.getMaxKeyValueSize() != OHConnectionImpl.BUFFERED_PARAM_UNSET ? params .getMaxKeyValueSize() : connectionConfig.getMaxKeyValueSize(); - // create an OHTable object to do batch work - if (ohTable != null) { - this.ohTable = ohTable; + if (isBatchSupport()) { + // create an OHTable object to do batch work + if (ohTable != null) { + this.ohTable = ohTable; + } else { + this.ohTable = new OHTable(tableName, ohConnection, connectionConfig, pool); + } } else { - this.ohTable = new OHTable(tableName, ohConnection, connectionConfig, pool); + // create an ObTableClient object to execute batch operation request + this.obTableClient = ObTableClientManager.getOrCreateObTableClient(connectionConfig); + this.obTableClient.setRuntimeBatchExecutor(pool); + this.obTableClient.setRpcExecuteTimeout(rpcTimeout); } } @@ -118,18 +139,39 @@ public void mutate(List mutations) throws IOException { } long toAddSize = 0; - for (Mutation m : mutations) { - validateOperation(m); - toAddSize += m.heapSize(); - } + if (isBatchSupport()) { + for (Mutation m : mutations) { + validateOperation(m); + toAddSize += m.heapSize(); + } - currentAsyncBufferSize.addAndGet(toAddSize); - asyncWriteBuffer.addAll(mutations); + currentAsyncBufferSize.addAndGet(toAddSize); + asyncWriteBuffer.addAll(mutations); - if (currentAsyncBufferSize.get() > writeBufferSize) { - execute(false); - } + if (currentAsyncBufferSize.get() > writeBufferSize) { + batchExecute(false); + } + } else { + // check if every mutation's family is the same + // check if mutations are the same type + for (Mutation m : mutations) { + validateOperation(m); + Class curType = m.getClass(); + // set the type of this BufferedMutator + type.compareAndSet(null, curType); + if (!type.get().equals(curType)) { + throw new IllegalArgumentException("Not support different type in one batch."); + } + toAddSize += m.heapSize(); + } + currentAsyncBufferSize.addAndGet(toAddSize); + asyncWriteBuffer.addAll(mutations); + + if (currentAsyncBufferSize.get() > writeBufferSize) { + normalExecute(false); + } + } } /** @@ -153,13 +195,14 @@ private void validateOperation(Mutation mt) throws IllegalArgumentException { } /** + * This execute only supports for server version of 4_3_5. * Send the operations in the buffer to the servers. Does not wait for the server's answer. If * there is an error, either throw the error, or use the listener to deal with the error. * * @param flushAll - if true, sends all the writes and wait for all of them to finish before * returning. */ - private void execute(boolean flushAll) throws IOException { + private void batchExecute(boolean flushAll) throws IOException { LinkedList execBuffer = new LinkedList<>(); long dequeuedSize = 0L; try { @@ -180,7 +223,7 @@ private void execute(boolean flushAll) throws IOException { execBuffer.clear(); } catch (Exception ex) { if (ex.getCause() instanceof RetriesExhaustedWithDetailsException) { - LOGGER.error(TableHBaseLoggerFactory.LCD.convert("01-00011"), tableName.getNameAsString() + LOGGER.error(LCD.convert("01-00011"), tableName.getNameAsString() + ": One or more of the operations have failed after retries.", ex.getCause()); RetriesExhaustedWithDetailsException retryException = (RetriesExhaustedWithDetailsException) ex.getCause(); // recollect mutations and log error information @@ -188,7 +231,7 @@ private void execute(boolean flushAll) throws IOException { for (int i = 0; i < retryException.getNumExceptions(); ++i) { Row failedOp = retryException.getRow(i); execBuffer.add((Mutation) failedOp); - LOGGER.error(TableHBaseLoggerFactory.LCD.convert("01-00011"), failedOp, tableName.getNameAsString(), + LOGGER.error(LCD.convert("01-00011"), failedOp, tableName.getNameAsString(), currentAsyncBufferSize.get(), retryException.getCause(i)); } if (listener != null) { @@ -197,7 +240,7 @@ private void execute(boolean flushAll) throws IOException { throw retryException; } } else { - LOGGER.error(TableHBaseLoggerFactory.LCD.convert("01-00011"), tableName.getNameAsString() + LOGGER.error(LCD.convert("01-00011"), tableName.getNameAsString() + ": Errors unrelated to operations occur during mutation operation", ex); throw ex; } @@ -210,13 +253,109 @@ private void execute(boolean flushAll) throws IOException { } } + /** + * This execute supports for server version below 4_3_5. + * Send the operations in the buffer to the servers. Does not wait for the server's answer. If + * there is an error, either throw the error, or use the listener to deal with the error. + * + * @param flushAll - if true, sends all the writes and wait for all of them to finish before + * returning. + */ + private void normalExecute(boolean flushAll) throws IOException { + LinkedList execBuffer = new LinkedList<>(); + ObTableBatchOperationRequest request = null; + // namespace n1, n1:table_name + // namespace default, table_name + String tableNameString = tableName.getNameAsString(); + try { + long dequeuedSize = 0L; + Mutation m; + while ((writeBufferSize <= 0 || dequeuedSize < (writeBufferSize * 2) || flushAll) + && (m = asyncWriteBuffer.poll()) != null) { + execBuffer.add(m); + long size = m.heapSize(); + currentAsyncBufferSize.addAndGet(-size); + dequeuedSize += size; + } + // in concurrent situation, asyncWriteBuffer may be empty here + // for other threads flush all buffer + if (execBuffer.isEmpty()) { + return; + } + try{ + // for now, operations' family is the same + byte[] family = execBuffer.getFirst().getFamilyMap().firstKey(); + ObTableBatchOperation batch = buildObTableBatchOperation(execBuffer); + // table_name$cf_name + String targetTableName = OHTable.getTargetTableName(tableNameString, Bytes.toString(family), conf); + request = OHTable.buildObTableBatchOperationRequest(batch, targetTableName); + } catch (Exception ex) { + LOGGER.error(LCD.convert("01-00011"), tableName.getNameAsString() + + ": Errors unrelated to operations occur before mutation operation", ex); + throw new ObTableUnexpectedException(tableName.getNameAsString() + ": Errors occur before mutation operation", ex); + } + try { + ObTableBatchOperationResult result = (ObTableBatchOperationResult) obTableClient.execute(request); + } catch (Exception ex) { + LOGGER.debug(LCD.convert("01-00011"), tableName.getNameAsString() + + ": Errors occur during mutation operation", ex); + m = null; + try { + // retry every single operation + while (!execBuffer.isEmpty()) { + // poll elements from execBuffer to recollect remaining operations + m = execBuffer.poll(); + byte[] family = m.getFamilyMap().firstKey(); + ObTableBatchOperation batch = buildObTableBatchOperation(Collections.singletonList(m)); + String targetTableName = OHTable.getTargetTableName(tableNameString, Bytes.toString(family), conf); + request = OHTable.buildObTableBatchOperationRequest(batch, targetTableName); + ObTableBatchOperationResult result = (ObTableBatchOperationResult) obTableClient.execute(request); + } + } catch (Exception newEx) { + if (m != null) { + execBuffer.addFirst(m); + } + // if retry fails, only recollect remaining operations + while(!execBuffer.isEmpty()) { + m = execBuffer.poll(); + long size = m.heapSize(); + asyncWriteBuffer.add(m); + currentAsyncBufferSize.addAndGet(size); + } + throw newEx; + } + } + } catch (Exception ex) { + LOGGER.error(LCD.convert("01-00011"), tableName.getNameAsString() + + ": Errors occur during mutation operation", ex); + // if the cause is illegal argument, directly throw to user + if (ex instanceof ObTableUnexpectedException) { + throw (ObTableUnexpectedException) ex; + } + // TODO: need to collect error information and actions in old version + // TODO: maybe keep in ObTableBatchOperationResult + List throwables = new ArrayList(); + List actions = new ArrayList(); + List addresses = new ArrayList(); + throwables.add(ex); + RetriesExhaustedWithDetailsException error = new RetriesExhaustedWithDetailsException( + new ArrayList(throwables), + new ArrayList(actions), new ArrayList(addresses)); + if (listener == null) { + throw error; + } else { + listener.onException(error, this); + } + } + } + @Override public void close() throws IOException { if (closed) { return; } try { - execute(true); + flush(); } finally { // the pool in ObTableClient will be shut down too this.pool.shutdown(); @@ -241,13 +380,31 @@ public void setWriteBufferSize(long writeBufferSize) throws IOException { } } + private ObTableBatchOperation buildObTableBatchOperation(List execBuffer) { + List keyValueList = new LinkedList<>(); + for (Mutation mutation : execBuffer) { + for (Map.Entry> entry : mutation.getFamilyMap().entrySet()) { + keyValueList.addAll(entry.getValue()); + } + } + return OHTable.buildObTableBatchOperation(keyValueList, false, null); + } + + boolean isBatchSupport() { + return OB_VERSION >= OB_VERSION_4_3_5_0; + } + /** * Force to commit all operations * do not care whether the pool is shut down or this BufferedMutator is closed */ @Override public void flush() throws IOException { - execute(true); + if (isBatchSupport()) { + batchExecute(true); + } else { + normalExecute(true); + } } @Override From 1743301820356a3b3dc0e943711ffed154375d33 Mon Sep 17 00:00:00 2001 From: JackShi148 Date: Wed, 6 Nov 2024 14:30:31 +0800 Subject: [PATCH 03/15] simple support single cf in bufferedmutator; wait for batch compatible --- .../com/alipay/oceanbase/hbase/OHTable.java | 2 +- .../hbase/util/OHBufferedMutatorImpl.java | 44 ++++++++++++------- 2 files changed, 30 insertions(+), 16 deletions(-) diff --git a/src/main/java/com/alipay/oceanbase/hbase/OHTable.java b/src/main/java/com/alipay/oceanbase/hbase/OHTable.java index 93b1584c..85c8dc43 100644 --- a/src/main/java/com/alipay/oceanbase/hbase/OHTable.java +++ b/src/main/java/com/alipay/oceanbase/hbase/OHTable.java @@ -1530,7 +1530,7 @@ public void flushCommits() throws IOException { mutator.flush(); } catch (Exception e) { throw new IOException("put table " + tableNameString + " error codes " + null - + "auto flush " + autoFlush + " current buffer size " + + " auto flush " + autoFlush + " current buffer size " + mutator.getCurrentBufferSize(), e); } } diff --git a/src/main/java/com/alipay/oceanbase/hbase/util/OHBufferedMutatorImpl.java b/src/main/java/com/alipay/oceanbase/hbase/util/OHBufferedMutatorImpl.java index 3eb8ca02..6c1dde26 100644 --- a/src/main/java/com/alipay/oceanbase/hbase/util/OHBufferedMutatorImpl.java +++ b/src/main/java/com/alipay/oceanbase/hbase/util/OHBufferedMutatorImpl.java @@ -68,6 +68,9 @@ public class OHBufferedMutatorImpl implements BufferedMutator { private int rpcTimeout; private int operationTimeout; private static final long OB_VERSION_4_3_5_0 = calcVersion(4, (short) 3, (byte) 5, (byte) 0); + private static final long OB_VERSION_4_3_0_0 = calcVersion(4, (short) 3, (byte) 0, (byte) 0); + private static final long OB_VERSION_4_2_5_1 = calcVersion(4, (short) 2, (byte) 5, (byte) 1); + private static final long OB_VERSION_4_3_4_0 = calcVersion(4, (short) 3, (byte) 4, (byte) 0); public OHBufferedMutatorImpl(OHConnectionImpl ohConnection, BufferedMutatorParams params, OHTable ohTable) throws IOException { @@ -144,16 +147,8 @@ public void mutate(List mutations) throws IOException { validateOperation(m); toAddSize += m.heapSize(); } - - currentAsyncBufferSize.addAndGet(toAddSize); - asyncWriteBuffer.addAll(mutations); - - if (currentAsyncBufferSize.get() > writeBufferSize) { - batchExecute(false); - } } else { - // check if every mutation's family is the same - // check if mutations are the same type + // version below 4_3_5 need the same type in one bufferedMutator for (Mutation m : mutations) { validateOperation(m); Class curType = m.getClass(); @@ -164,11 +159,14 @@ public void mutate(List mutations) throws IOException { } toAddSize += m.heapSize(); } + } + currentAsyncBufferSize.addAndGet(toAddSize); + asyncWriteBuffer.addAll(mutations); - currentAsyncBufferSize.addAndGet(toAddSize); - asyncWriteBuffer.addAll(mutations); - - if (currentAsyncBufferSize.get() > writeBufferSize) { + if (currentAsyncBufferSize.get() > writeBufferSize) { + if (isBatchSupport()) { + batchExecute(false); + } else { normalExecute(false); } } @@ -188,9 +186,17 @@ private void validateOperation(Mutation mt) throws IllegalArgumentException { if (mt instanceof Put) { // family empty check is in validatePut OHTable.validatePut((Put) mt, maxKeyValueSize); - OHTable.checkFamilyViolation(mt.getFamilyMap().keySet(), true); + if (isMultiFamilySupport()) { + OHTable.checkFamilyViolation(mt.getFamilyMap().keySet(), true); + } else { + OHTable.checkFamilyViolationForOneFamily(mt.getFamilyMap().keySet()); + } } else { - OHTable.checkFamilyViolation(mt.getFamilyMap().keySet(), false); + if (isMultiFamilySupport()) { + OHTable.checkFamilyViolation(mt.getFamilyMap().keySet(), false); + } else { + OHTable.checkFamilyViolationForOneFamily(mt.getFamilyMap().keySet()); + } } } @@ -394,6 +400,14 @@ boolean isBatchSupport() { return OB_VERSION >= OB_VERSION_4_3_5_0; } + /** + * Only 4_2_5 BP1 - 4_3_0 and after 4_3_4 support multi-cf + * */ + boolean isMultiFamilySupport() { + return (OB_VERSION >= OB_VERSION_4_2_5_1 && OB_VERSION < OB_VERSION_4_3_0_0) + || (OB_VERSION >= OB_VERSION_4_3_4_0); + } + /** * Force to commit all operations * do not care whether the pool is shut down or this BufferedMutator is closed From 65cba8dd02bc55552d81bd364de6a118b2e5a7ad Mon Sep 17 00:00:00 2001 From: stuBirdFly <1065492934@qq.com> Date: Wed, 6 Nov 2024 21:26:16 +0800 Subject: [PATCH 04/15] batch compat old server --- .../alipay/oceanbase/hbase/util/CompatibilityUtil.java | 9 +++++++++ 1 file changed, 9 insertions(+) create mode 100644 src/main/java/com/alipay/oceanbase/hbase/util/CompatibilityUtil.java diff --git a/src/main/java/com/alipay/oceanbase/hbase/util/CompatibilityUtil.java b/src/main/java/com/alipay/oceanbase/hbase/util/CompatibilityUtil.java new file mode 100644 index 00000000..39db637c --- /dev/null +++ b/src/main/java/com/alipay/oceanbase/hbase/util/CompatibilityUtil.java @@ -0,0 +1,9 @@ +package com.alipay.oceanbase.hbase.util; + +import com.alipay.oceanbase.rpc.ObGlobal; + +public class CompatibilityUtil { + public static boolean isBatchSupport() { + return ObGlobal.OB_VERSION > ObGlobal.OB_VERSION_4_3_4_0; + } +} From 7b79c08a9624a77b214973b56995ff8beadefd2e Mon Sep 17 00:00:00 2001 From: JackShi148 Date: Thu, 7 Nov 2024 11:59:05 +0800 Subject: [PATCH 05/15] bufferedMutator compatible to old server --- .../hbase/util/OHBufferedMutatorImpl.java | 163 ++---------------- 1 file changed, 12 insertions(+), 151 deletions(-) diff --git a/src/main/java/com/alipay/oceanbase/hbase/util/OHBufferedMutatorImpl.java b/src/main/java/com/alipay/oceanbase/hbase/util/OHBufferedMutatorImpl.java index 6c1dde26..329521e9 100644 --- a/src/main/java/com/alipay/oceanbase/hbase/util/OHBufferedMutatorImpl.java +++ b/src/main/java/com/alipay/oceanbase/hbase/util/OHBufferedMutatorImpl.java @@ -18,18 +18,13 @@ package com.alipay.oceanbase.hbase.util; import com.alipay.oceanbase.hbase.OHTable; -import com.alipay.oceanbase.rpc.ObTableClient; -import com.alipay.oceanbase.rpc.exception.ObTableUnexpectedException; import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableBatchOperation; -import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableBatchOperationRequest; -import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableBatchOperationResult; import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.*; -import org.apache.hadoop.hbase.util.Bytes; import org.slf4j.Logger; import java.io.IOException; @@ -38,7 +33,6 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; import static com.alipay.oceanbase.hbase.util.TableHBaseLoggerFactory.LCD; import static com.alipay.oceanbase.rpc.ObGlobal.*; @@ -54,12 +48,10 @@ public class OHBufferedMutatorImpl implements BufferedMutator { private volatile Configuration conf; private OHTable ohTable; - private ObTableClient obTableClient; @VisibleForTesting final ConcurrentLinkedQueue asyncWriteBuffer = new ConcurrentLinkedQueue(); @VisibleForTesting AtomicLong currentAsyncBufferSize = new AtomicLong(0); - private AtomicReference> type = new AtomicReference<>(null); private long writeBufferSize; private final int maxKeyValueSize; @@ -67,10 +59,8 @@ public class OHBufferedMutatorImpl implements BufferedMutator { private final ExecutorService pool; private int rpcTimeout; private int operationTimeout; - private static final long OB_VERSION_4_3_5_0 = calcVersion(4, (short) 3, (byte) 5, (byte) 0); - private static final long OB_VERSION_4_3_0_0 = calcVersion(4, (short) 3, (byte) 0, (byte) 0); - private static final long OB_VERSION_4_2_5_1 = calcVersion(4, (short) 2, (byte) 5, (byte) 1); - private static final long OB_VERSION_4_3_4_0 = calcVersion(4, (short) 3, (byte) 4, (byte) 0); + private static final long OB_VERSION_4_2_5_1 = calcVersion(4, (short) 2, + (byte) 5, (byte) 1); public OHBufferedMutatorImpl(OHConnectionImpl ohConnection, BufferedMutatorParams params, OHTable ohTable) throws IOException { @@ -92,18 +82,11 @@ public OHBufferedMutatorImpl(OHConnectionImpl ohConnection, BufferedMutatorParam this.maxKeyValueSize = params.getMaxKeyValueSize() != OHConnectionImpl.BUFFERED_PARAM_UNSET ? params .getMaxKeyValueSize() : connectionConfig.getMaxKeyValueSize(); - if (isBatchSupport()) { - // create an OHTable object to do batch work - if (ohTable != null) { - this.ohTable = ohTable; - } else { - this.ohTable = new OHTable(tableName, ohConnection, connectionConfig, pool); - } + // create an OHTable object to do batch work + if (ohTable != null) { + this.ohTable = ohTable; } else { - // create an ObTableClient object to execute batch operation request - this.obTableClient = ObTableClientManager.getOrCreateObTableClient(connectionConfig); - this.obTableClient.setRuntimeBatchExecutor(pool); - this.obTableClient.setRpcExecuteTimeout(rpcTimeout); + this.ohTable = new OHTable(tableName, ohConnection, connectionConfig, pool); } } @@ -142,33 +125,15 @@ public void mutate(List mutations) throws IOException { } long toAddSize = 0; - if (isBatchSupport()) { - for (Mutation m : mutations) { - validateOperation(m); - toAddSize += m.heapSize(); - } - } else { - // version below 4_3_5 need the same type in one bufferedMutator - for (Mutation m : mutations) { - validateOperation(m); - Class curType = m.getClass(); - // set the type of this BufferedMutator - type.compareAndSet(null, curType); - if (!type.get().equals(curType)) { - throw new IllegalArgumentException("Not support different type in one batch."); - } - toAddSize += m.heapSize(); - } + for (Mutation m : mutations) { + validateOperation(m); + toAddSize += m.heapSize(); } currentAsyncBufferSize.addAndGet(toAddSize); asyncWriteBuffer.addAll(mutations); if (currentAsyncBufferSize.get() > writeBufferSize) { - if (isBatchSupport()) { - batchExecute(false); - } else { - normalExecute(false); - } + batchExecute(false); } } @@ -259,102 +224,6 @@ private void batchExecute(boolean flushAll) throws IOException { } } - /** - * This execute supports for server version below 4_3_5. - * Send the operations in the buffer to the servers. Does not wait for the server's answer. If - * there is an error, either throw the error, or use the listener to deal with the error. - * - * @param flushAll - if true, sends all the writes and wait for all of them to finish before - * returning. - */ - private void normalExecute(boolean flushAll) throws IOException { - LinkedList execBuffer = new LinkedList<>(); - ObTableBatchOperationRequest request = null; - // namespace n1, n1:table_name - // namespace default, table_name - String tableNameString = tableName.getNameAsString(); - try { - long dequeuedSize = 0L; - Mutation m; - while ((writeBufferSize <= 0 || dequeuedSize < (writeBufferSize * 2) || flushAll) - && (m = asyncWriteBuffer.poll()) != null) { - execBuffer.add(m); - long size = m.heapSize(); - currentAsyncBufferSize.addAndGet(-size); - dequeuedSize += size; - } - // in concurrent situation, asyncWriteBuffer may be empty here - // for other threads flush all buffer - if (execBuffer.isEmpty()) { - return; - } - try{ - // for now, operations' family is the same - byte[] family = execBuffer.getFirst().getFamilyMap().firstKey(); - ObTableBatchOperation batch = buildObTableBatchOperation(execBuffer); - // table_name$cf_name - String targetTableName = OHTable.getTargetTableName(tableNameString, Bytes.toString(family), conf); - request = OHTable.buildObTableBatchOperationRequest(batch, targetTableName); - } catch (Exception ex) { - LOGGER.error(LCD.convert("01-00011"), tableName.getNameAsString() - + ": Errors unrelated to operations occur before mutation operation", ex); - throw new ObTableUnexpectedException(tableName.getNameAsString() + ": Errors occur before mutation operation", ex); - } - try { - ObTableBatchOperationResult result = (ObTableBatchOperationResult) obTableClient.execute(request); - } catch (Exception ex) { - LOGGER.debug(LCD.convert("01-00011"), tableName.getNameAsString() + - ": Errors occur during mutation operation", ex); - m = null; - try { - // retry every single operation - while (!execBuffer.isEmpty()) { - // poll elements from execBuffer to recollect remaining operations - m = execBuffer.poll(); - byte[] family = m.getFamilyMap().firstKey(); - ObTableBatchOperation batch = buildObTableBatchOperation(Collections.singletonList(m)); - String targetTableName = OHTable.getTargetTableName(tableNameString, Bytes.toString(family), conf); - request = OHTable.buildObTableBatchOperationRequest(batch, targetTableName); - ObTableBatchOperationResult result = (ObTableBatchOperationResult) obTableClient.execute(request); - } - } catch (Exception newEx) { - if (m != null) { - execBuffer.addFirst(m); - } - // if retry fails, only recollect remaining operations - while(!execBuffer.isEmpty()) { - m = execBuffer.poll(); - long size = m.heapSize(); - asyncWriteBuffer.add(m); - currentAsyncBufferSize.addAndGet(size); - } - throw newEx; - } - } - } catch (Exception ex) { - LOGGER.error(LCD.convert("01-00011"), tableName.getNameAsString() + - ": Errors occur during mutation operation", ex); - // if the cause is illegal argument, directly throw to user - if (ex instanceof ObTableUnexpectedException) { - throw (ObTableUnexpectedException) ex; - } - // TODO: need to collect error information and actions in old version - // TODO: maybe keep in ObTableBatchOperationResult - List throwables = new ArrayList(); - List actions = new ArrayList(); - List addresses = new ArrayList(); - throwables.add(ex); - RetriesExhaustedWithDetailsException error = new RetriesExhaustedWithDetailsException( - new ArrayList(throwables), - new ArrayList(actions), new ArrayList(addresses)); - if (listener == null) { - throw error; - } else { - listener.onException(error, this); - } - } - } - @Override public void close() throws IOException { if (closed) { @@ -396,16 +265,12 @@ private ObTableBatchOperation buildObTableBatchOperation(List= OB_VERSION_4_3_5_0; - } - /** * Only 4_2_5 BP1 - 4_3_0 and after 4_3_4 support multi-cf * */ boolean isMultiFamilySupport() { return (OB_VERSION >= OB_VERSION_4_2_5_1 && OB_VERSION < OB_VERSION_4_3_0_0) - || (OB_VERSION >= OB_VERSION_4_3_4_0); + || (OB_VERSION >= OB_VERSION_4_3_4_0); } /** @@ -414,11 +279,7 @@ boolean isMultiFamilySupport() { */ @Override public void flush() throws IOException { - if (isBatchSupport()) { - batchExecute(true); - } else { - normalExecute(true); - } + batchExecute(true); } @Override From 67f6370d89067ad83840a3a789e0aa7f875effb2 Mon Sep 17 00:00:00 2001 From: JackShi148 Date: Thu, 7 Nov 2024 12:12:22 +0800 Subject: [PATCH 06/15] correct test case when puts in bufferedMutator got wrong --- .../alipay/oceanbase/hbase/NativeHBaseTest.java | 10 ++++++++-- .../alipay/oceanbase/hbase/OHTableClientTest.java | 14 ++++++++++---- .../oceanbase/hbase/OHTableClientTestLoadTest.java | 11 ++++++++--- .../oceanbase/hbase/OHTablePoolLoadTest.java | 11 ++++++++--- .../alipay/oceanbase/hbase/OHTablePoolTest.java | 11 ++++++++--- .../com/alipay/oceanbase/hbase/OHTableTest.java | 11 ++++++++--- 6 files changed, 50 insertions(+), 18 deletions(-) diff --git a/src/test/java/com/alipay/oceanbase/hbase/NativeHBaseTest.java b/src/test/java/com/alipay/oceanbase/hbase/NativeHBaseTest.java index 545d95b9..06e565a1 100644 --- a/src/test/java/com/alipay/oceanbase/hbase/NativeHBaseTest.java +++ b/src/test/java/com/alipay/oceanbase/hbase/NativeHBaseTest.java @@ -4,6 +4,7 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.*; import org.junit.AfterClass; +import org.junit.Assert; import org.junit.Before; import org.junit.BeforeClass; @@ -39,8 +40,13 @@ public void cleanData() throws IOException { @AfterClass public static void finish() throws IOException { - hTable.close(); - multiCfHTable.close(); + try { + hTable.close(); + multiCfHTable.close(); + } catch (Exception e) { + Assert.assertSame(e.getClass(), IOException.class); + Assert.assertTrue(e.getMessage().contains("put table")); + } } } diff --git a/src/test/java/com/alipay/oceanbase/hbase/OHTableClientTest.java b/src/test/java/com/alipay/oceanbase/hbase/OHTableClientTest.java index 01c4057f..76ddce12 100644 --- a/src/test/java/com/alipay/oceanbase/hbase/OHTableClientTest.java +++ b/src/test/java/com/alipay/oceanbase/hbase/OHTableClientTest.java @@ -20,6 +20,7 @@ import com.alipay.oceanbase.hbase.util.ObHTableTestUtil; import org.junit.*; +import java.io.IOException; import java.util.LinkedList; import java.util.List; @@ -37,7 +38,7 @@ public static void before() throws Exception { ((OHTableClient) multiCfHTable).init(); List tableGroups = new LinkedList<>(); tableGroups.add("test"); - tableGroups.add("test_multi_cf"); +// tableGroups.add("test_multi_cf"); ObHTableTestUtil.prepareClean(tableGroups); } @@ -77,8 +78,13 @@ public void testNew() throws Exception { @AfterClass public static void finish() throws Exception { - hTable.close(); - multiCfHTable.close(); - ObHTableTestUtil.closeConn(); + try { + hTable.close(); + multiCfHTable.close(); + ObHTableTestUtil.closeConn(); + } catch (Exception e) { + Assert.assertSame(e.getClass(), IOException.class); + Assert.assertTrue(e.getMessage().contains("put table")); + } } } diff --git a/src/test/java/com/alipay/oceanbase/hbase/OHTableClientTestLoadTest.java b/src/test/java/com/alipay/oceanbase/hbase/OHTableClientTestLoadTest.java index ed9df2fa..dfa96848 100644 --- a/src/test/java/com/alipay/oceanbase/hbase/OHTableClientTestLoadTest.java +++ b/src/test/java/com/alipay/oceanbase/hbase/OHTableClientTestLoadTest.java @@ -67,9 +67,14 @@ public void test_refresh_table_entry() throws Exception { @AfterClass public static void after() throws Exception { - hTable.close(); - multiCfHTable.close(); - ObHTableTestUtil.closeConn(); + try { + hTable.close(); + multiCfHTable.close(); + ObHTableTestUtil.closeConn(); + } catch (Exception e) { + Assert.assertSame(e.getClass(), IOException.class); + Assert.assertTrue(e.getMessage().contains("put table")); + } } @Test diff --git a/src/test/java/com/alipay/oceanbase/hbase/OHTablePoolLoadTest.java b/src/test/java/com/alipay/oceanbase/hbase/OHTablePoolLoadTest.java index d639d4a4..afe2c084 100644 --- a/src/test/java/com/alipay/oceanbase/hbase/OHTablePoolLoadTest.java +++ b/src/test/java/com/alipay/oceanbase/hbase/OHTablePoolLoadTest.java @@ -167,8 +167,13 @@ public void testNew() throws IOException { @AfterClass public static void finish() throws IOException, SQLException { - hTable.close(); - multiCfHTable.close(); - ObHTableTestUtil.closeConn(); + try { + hTable.close(); + multiCfHTable.close(); + ObHTableTestUtil.closeConn(); + } catch (Exception e) { + Assert.assertSame(e.getClass(), IOException.class); + Assert.assertTrue(e.getMessage().contains("put table")); + } } } diff --git a/src/test/java/com/alipay/oceanbase/hbase/OHTablePoolTest.java b/src/test/java/com/alipay/oceanbase/hbase/OHTablePoolTest.java index d915afdc..7f704774 100644 --- a/src/test/java/com/alipay/oceanbase/hbase/OHTablePoolTest.java +++ b/src/test/java/com/alipay/oceanbase/hbase/OHTablePoolTest.java @@ -73,9 +73,14 @@ public void prepareCase() { @AfterClass public static void finish() throws IOException, SQLException { - hTable.close(); - multiCfHTable.close(); - ObHTableTestUtil.closeConn(); + try { + hTable.close(); + multiCfHTable.close(); + ObHTableTestUtil.closeConn(); + } catch (Exception e) { + Assert.assertSame(e.getClass(), IOException.class); + Assert.assertTrue(e.getMessage().contains("put table")); + } } public void test_current_get_close(final OHTablePool ohTablePool, int concurrency, int maxSize) { diff --git a/src/test/java/com/alipay/oceanbase/hbase/OHTableTest.java b/src/test/java/com/alipay/oceanbase/hbase/OHTableTest.java index b4df2595..d4fd6331 100644 --- a/src/test/java/com/alipay/oceanbase/hbase/OHTableTest.java +++ b/src/test/java/com/alipay/oceanbase/hbase/OHTableTest.java @@ -108,9 +108,14 @@ public void testNew() throws Exception { @AfterClass public static void finish() throws IOException, SQLException { - hTable.close(); - multiCfHTable.close(); - ObHTableTestUtil.closeConn(); + try { + hTable.close(); + multiCfHTable.close(); + ObHTableTestUtil.closeConn(); + } catch (Exception e) { + Assert.assertSame(e.getClass(), IOException.class); + Assert.assertTrue(e.getMessage().contains("put table")); + } } } From 13acb2b3dfef32444d158815166ec6c6f73115c2 Mon Sep 17 00:00:00 2001 From: Ziyu Shi <57038180+JackShi148@users.noreply.github.com> Date: Thu, 5 Dec 2024 11:41:55 +0800 Subject: [PATCH 07/15] correct bufferedMutator to original hbase (#127) * add DepentdentFilter and SingleColumnValueExcludeFilter * add singleColumnValueExcludeFilter and DependentColumnFilter; add test cases and optimize bufferedMutator test cases * merge obkv branch-1.3 and correct test case for the mulfi-cf bug * optimize test case and add sql to create self-defined database * correct bufferedMutator to adjust original hbase * directly throw exceptions before execution * fix incorrect close --- .../com/alipay/oceanbase/hbase/OHTable.java | 2 +- .../hbase/util/OHBufferedMutatorImpl.java | 31 +++++-------------- .../hbase/util/ObTableClientManager.java | 13 ++++++++ 3 files changed, 21 insertions(+), 25 deletions(-) diff --git a/src/main/java/com/alipay/oceanbase/hbase/OHTable.java b/src/main/java/com/alipay/oceanbase/hbase/OHTable.java index 85c8dc43..af046250 100644 --- a/src/main/java/com/alipay/oceanbase/hbase/OHTable.java +++ b/src/main/java/com/alipay/oceanbase/hbase/OHTable.java @@ -439,7 +439,7 @@ public byte[] getTableName() { @Override public TableName getName() { - throw new FeatureNotSupportedException("not supported yet."); + return TableName.valueOf(this.tableNameString); } @Override diff --git a/src/main/java/com/alipay/oceanbase/hbase/util/OHBufferedMutatorImpl.java b/src/main/java/com/alipay/oceanbase/hbase/util/OHBufferedMutatorImpl.java index 329521e9..d3850255 100644 --- a/src/main/java/com/alipay/oceanbase/hbase/util/OHBufferedMutatorImpl.java +++ b/src/main/java/com/alipay/oceanbase/hbase/util/OHBufferedMutatorImpl.java @@ -19,7 +19,6 @@ import com.alipay.oceanbase.hbase.OHTable; import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableBatchOperation; -import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.KeyValue; @@ -48,9 +47,7 @@ public class OHBufferedMutatorImpl implements BufferedMutator { private volatile Configuration conf; private OHTable ohTable; - @VisibleForTesting final ConcurrentLinkedQueue asyncWriteBuffer = new ConcurrentLinkedQueue(); - @VisibleForTesting AtomicLong currentAsyncBufferSize = new AtomicLong(0); private long writeBufferSize; @@ -166,7 +163,6 @@ private void validateOperation(Mutation mt) throws IllegalArgumentException { } /** - * This execute only supports for server version of 4_3_5. * Send the operations in the buffer to the servers. Does not wait for the server's answer. If * there is an error, either throw the error, or use the listener to deal with the error. * @@ -189,38 +185,25 @@ private void batchExecute(boolean flushAll) throws IOException { if (execBuffer.isEmpty()) { return; } - ohTable.batch(execBuffer); + Object[] results = new Object[execBuffer.size()]; + ohTable.batch(execBuffer, results); // if commit all successfully, clean execBuffer execBuffer.clear(); } catch (Exception ex) { + // do not recollect error operations, notify outside + LOGGER.error(LCD.convert("01-00026"), ex); if (ex.getCause() instanceof RetriesExhaustedWithDetailsException) { - LOGGER.error(LCD.convert("01-00011"), tableName.getNameAsString() - + ": One or more of the operations have failed after retries.", ex.getCause()); + LOGGER.error(tableName + ": One or more of the operations have failed after retries."); RetriesExhaustedWithDetailsException retryException = (RetriesExhaustedWithDetailsException) ex.getCause(); - // recollect mutations and log error information - execBuffer.clear(); - for (int i = 0; i < retryException.getNumExceptions(); ++i) { - Row failedOp = retryException.getRow(i); - execBuffer.add((Mutation) failedOp); - LOGGER.error(LCD.convert("01-00011"), failedOp, tableName.getNameAsString(), - currentAsyncBufferSize.get(), retryException.getCause(i)); - } if (listener != null) { listener.onException(retryException, this); } else { throw retryException; } } else { - LOGGER.error(LCD.convert("01-00011"), tableName.getNameAsString() - + ": Errors unrelated to operations occur during mutation operation", ex); + LOGGER.error("Errors unrelated to operations occur during mutation operation", ex); throw ex; } - } finally { - for (Mutation mutation : execBuffer) { - long size = mutation.heapSize(); - currentAsyncBufferSize.addAndGet(size); - asyncWriteBuffer.add(mutation); - } } } @@ -230,7 +213,7 @@ public void close() throws IOException { return; } try { - flush(); + batchExecute(true); } finally { // the pool in ObTableClient will be shut down too this.pool.shutdown(); diff --git a/src/main/java/com/alipay/oceanbase/hbase/util/ObTableClientManager.java b/src/main/java/com/alipay/oceanbase/hbase/util/ObTableClientManager.java index 036ce265..e921f023 100644 --- a/src/main/java/com/alipay/oceanbase/hbase/util/ObTableClientManager.java +++ b/src/main/java/com/alipay/oceanbase/hbase/util/ObTableClientManager.java @@ -28,6 +28,7 @@ import java.util.Map; import java.util.Properties; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; import java.util.concurrent.locks.ReentrantLock; import static com.alipay.oceanbase.hbase.constants.OHConstants.*; @@ -128,6 +129,18 @@ public static ObTableClient getOrCreateObTableClient(ObTableClientKey obTableCli return OB_TABLE_CLIENT_INSTANCE.get(obTableClientKey); } + public static void clear() throws IOException { + try { + for (Map.Entry pair : OB_TABLE_CLIENT_INSTANCE.entrySet()) { + pair.getValue().close(); + } + } + catch (Exception e) { + throw new IOException("fail to close tableClient" , e); + } + OB_TABLE_CLIENT_INSTANCE.clear(); + } + public static class ObTableClientKey { private String paramUrl; private String fullUserName; From 04061a90f8b6dd66612695ddba6c7ff4706adf58 Mon Sep 17 00:00:00 2001 From: JackShi148 Date: Wed, 18 Dec 2024 20:28:43 +0800 Subject: [PATCH 08/15] remove useless comments and old code --- .../java/com/alipay/oceanbase/hbase/OHTable.java | 1 - .../hbase/util/ObTableClientManager.java | 15 --------------- 2 files changed, 16 deletions(-) diff --git a/src/main/java/com/alipay/oceanbase/hbase/OHTable.java b/src/main/java/com/alipay/oceanbase/hbase/OHTable.java index af046250..3d81a69b 100644 --- a/src/main/java/com/alipay/oceanbase/hbase/OHTable.java +++ b/src/main/java/com/alipay/oceanbase/hbase/OHTable.java @@ -23,7 +23,6 @@ import com.alipay.oceanbase.hbase.filter.HBaseFilterUtils; import com.alipay.oceanbase.hbase.result.ClientStreamScanner; import com.alipay.oceanbase.hbase.util.*; -import com.alipay.oceanbase.rpc.ObGlobal; import com.alipay.oceanbase.rpc.ObTableClient; import com.alipay.oceanbase.rpc.exception.ObTableException; import com.alipay.oceanbase.rpc.location.model.partition.Partition; diff --git a/src/main/java/com/alipay/oceanbase/hbase/util/ObTableClientManager.java b/src/main/java/com/alipay/oceanbase/hbase/util/ObTableClientManager.java index e921f023..5db940aa 100644 --- a/src/main/java/com/alipay/oceanbase/hbase/util/ObTableClientManager.java +++ b/src/main/java/com/alipay/oceanbase/hbase/util/ObTableClientManager.java @@ -21,14 +21,11 @@ import com.alipay.oceanbase.rpc.constant.Constants; import com.google.common.base.Objects; import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.client.ConnectionConfiguration; import java.io.IOException; import java.util.Map; import java.util.Properties; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutionException; import java.util.concurrent.locks.ReentrantLock; import static com.alipay.oceanbase.hbase.constants.OHConstants.*; @@ -129,18 +126,6 @@ public static ObTableClient getOrCreateObTableClient(ObTableClientKey obTableCli return OB_TABLE_CLIENT_INSTANCE.get(obTableClientKey); } - public static void clear() throws IOException { - try { - for (Map.Entry pair : OB_TABLE_CLIENT_INSTANCE.entrySet()) { - pair.getValue().close(); - } - } - catch (Exception e) { - throw new IOException("fail to close tableClient" , e); - } - OB_TABLE_CLIENT_INSTANCE.clear(); - } - public static class ObTableClientKey { private String paramUrl; private String fullUserName; From 9cd4464ff7328330a5b0167411a11d5a2d8158cb Mon Sep 17 00:00:00 2001 From: JackShi148 Date: Mon, 10 Feb 2025 11:08:39 +0800 Subject: [PATCH 09/15] fix after rebase --- src/main/java/com/alipay/oceanbase/hbase/OHTable.java | 1 + 1 file changed, 1 insertion(+) diff --git a/src/main/java/com/alipay/oceanbase/hbase/OHTable.java b/src/main/java/com/alipay/oceanbase/hbase/OHTable.java index 3d81a69b..af046250 100644 --- a/src/main/java/com/alipay/oceanbase/hbase/OHTable.java +++ b/src/main/java/com/alipay/oceanbase/hbase/OHTable.java @@ -23,6 +23,7 @@ import com.alipay.oceanbase.hbase.filter.HBaseFilterUtils; import com.alipay.oceanbase.hbase.result.ClientStreamScanner; import com.alipay.oceanbase.hbase.util.*; +import com.alipay.oceanbase.rpc.ObGlobal; import com.alipay.oceanbase.rpc.ObTableClient; import com.alipay.oceanbase.rpc.exception.ObTableException; import com.alipay.oceanbase.rpc.location.model.partition.Partition; From 0a87f246a2124b4e6772fb1b59a05c2d248ef2e7 Mon Sep 17 00:00:00 2001 From: JackShi148 Date: Mon, 10 Feb 2025 16:20:58 +0800 Subject: [PATCH 10/15] remove useless method in bufferedMutatorImpl --- .../oceanbase/hbase/util/OHBufferedMutatorImpl.java | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/src/main/java/com/alipay/oceanbase/hbase/util/OHBufferedMutatorImpl.java b/src/main/java/com/alipay/oceanbase/hbase/util/OHBufferedMutatorImpl.java index d3850255..41440de6 100644 --- a/src/main/java/com/alipay/oceanbase/hbase/util/OHBufferedMutatorImpl.java +++ b/src/main/java/com/alipay/oceanbase/hbase/util/OHBufferedMutatorImpl.java @@ -238,16 +238,6 @@ public void setWriteBufferSize(long writeBufferSize) throws IOException { } } - private ObTableBatchOperation buildObTableBatchOperation(List execBuffer) { - List keyValueList = new LinkedList<>(); - for (Mutation mutation : execBuffer) { - for (Map.Entry> entry : mutation.getFamilyMap().entrySet()) { - keyValueList.addAll(entry.getValue()); - } - } - return OHTable.buildObTableBatchOperation(keyValueList, false, null); - } - /** * Only 4_2_5 BP1 - 4_3_0 and after 4_3_4 support multi-cf * */ From 1d6cbe3d44947b2c1c3fe1d07fd28b08ff11e664 Mon Sep 17 00:00:00 2001 From: JackShi148 Date: Tue, 11 Feb 2025 10:17:08 +0800 Subject: [PATCH 11/15] fix test case and correct log info --- .../alipay/oceanbase/hbase/util/OHBufferedMutatorImpl.java | 4 ++-- src/test/java/com/alipay/oceanbase/hbase/HTableTestBase.java | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/main/java/com/alipay/oceanbase/hbase/util/OHBufferedMutatorImpl.java b/src/main/java/com/alipay/oceanbase/hbase/util/OHBufferedMutatorImpl.java index 41440de6..3d3dd196 100644 --- a/src/main/java/com/alipay/oceanbase/hbase/util/OHBufferedMutatorImpl.java +++ b/src/main/java/com/alipay/oceanbase/hbase/util/OHBufferedMutatorImpl.java @@ -191,9 +191,9 @@ private void batchExecute(boolean flushAll) throws IOException { execBuffer.clear(); } catch (Exception ex) { // do not recollect error operations, notify outside - LOGGER.error(LCD.convert("01-00026"), ex); + LOGGER.error("error happens: table name = ", tableName.getNameAsString(), ex); if (ex.getCause() instanceof RetriesExhaustedWithDetailsException) { - LOGGER.error(tableName + ": One or more of the operations have failed after retries."); + LOGGER.error(tableName.getNameAsString() + ": One or more of the operations have failed after retries.", ex); RetriesExhaustedWithDetailsException retryException = (RetriesExhaustedWithDetailsException) ex.getCause(); if (listener != null) { listener.onException(retryException, this); diff --git a/src/test/java/com/alipay/oceanbase/hbase/HTableTestBase.java b/src/test/java/com/alipay/oceanbase/hbase/HTableTestBase.java index ec197fce..0f88af8b 100644 --- a/src/test/java/com/alipay/oceanbase/hbase/HTableTestBase.java +++ b/src/test/java/com/alipay/oceanbase/hbase/HTableTestBase.java @@ -5189,7 +5189,7 @@ public void testCellTTL() throws Exception { try { tryPut(hTable, errorPut); } catch (Exception e) { - assertTrue(e.getCause().toString().contains("Unknown column 'TTL'")); + assertTrue(e.getCause().getCause().toString().contains("Unknown column 'TTL'")); } // test put and get tryPut(hTable, put1); From a991be70234b4a281be7570c085dff89147388ac Mon Sep 17 00:00:00 2001 From: JackShi148 Date: Tue, 11 Feb 2025 19:59:23 +0800 Subject: [PATCH 12/15] fix configuration creation and format code --- .../com/alipay/oceanbase/hbase/OHTable.java | 2 +- .../com/alipay/oceanbase/hbase/OHTablePool.java | 3 ++- .../alipay/oceanbase/hbase/util/BatchError.java | 17 +++++++++++++++++ .../oceanbase/hbase/util/CompatibilityUtil.java | 9 --------- .../oceanbase/hbase/util/OHTableFactory.java | 3 ++- .../alipay/oceanbase/hbase/OHTablePoolTest.java | 5 +++-- 6 files changed, 25 insertions(+), 14 deletions(-) delete mode 100644 src/main/java/com/alipay/oceanbase/hbase/util/CompatibilityUtil.java diff --git a/src/main/java/com/alipay/oceanbase/hbase/OHTable.java b/src/main/java/com/alipay/oceanbase/hbase/OHTable.java index af046250..b64f7fec 100644 --- a/src/main/java/com/alipay/oceanbase/hbase/OHTable.java +++ b/src/main/java/com/alipay/oceanbase/hbase/OHTable.java @@ -304,7 +304,7 @@ public OHTable(final byte[] tableName, final ObTableClient obTableClient, this.cleanupConnectionOnClose = false; this.executePool = executePool; this.obTableClient = obTableClient; - this.configuration = new Configuration(); + this.configuration = HBaseConfiguration.create(); finishSetUp(); } diff --git a/src/main/java/com/alipay/oceanbase/hbase/OHTablePool.java b/src/main/java/com/alipay/oceanbase/hbase/OHTablePool.java index f0d4a447..1a414ca5 100644 --- a/src/main/java/com/alipay/oceanbase/hbase/OHTablePool.java +++ b/src/main/java/com/alipay/oceanbase/hbase/OHTablePool.java @@ -26,6 +26,7 @@ import com.google.protobuf.Service; import com.google.protobuf.ServiceException; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.*; @@ -67,7 +68,7 @@ public class OHTablePool implements Closeable { * Default Constructor. Default HBaseConfiguration and no limit on pool size. */ public OHTablePool() { - this(new Configuration(), Integer.MAX_VALUE); + this(HBaseConfiguration.create(), Integer.MAX_VALUE); } /** diff --git a/src/main/java/com/alipay/oceanbase/hbase/util/BatchError.java b/src/main/java/com/alipay/oceanbase/hbase/util/BatchError.java index 49fe17e9..2bdae45a 100644 --- a/src/main/java/com/alipay/oceanbase/hbase/util/BatchError.java +++ b/src/main/java/com/alipay/oceanbase/hbase/util/BatchError.java @@ -1,3 +1,20 @@ +/*- + * #%L + * com.oceanbase:obkv-hbase-client + * %% + * Copyright (C) 2022 - 2025 OceanBase Group + * %% + * OBKV HBase Client Framework is licensed under Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * http://license.coscl.org.cn/MulanPSL2 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PSL v2 for more details. + * #L% + */ + package com.alipay.oceanbase.hbase.util; import org.apache.hadoop.hbase.ServerName; diff --git a/src/main/java/com/alipay/oceanbase/hbase/util/CompatibilityUtil.java b/src/main/java/com/alipay/oceanbase/hbase/util/CompatibilityUtil.java deleted file mode 100644 index 39db637c..00000000 --- a/src/main/java/com/alipay/oceanbase/hbase/util/CompatibilityUtil.java +++ /dev/null @@ -1,9 +0,0 @@ -package com.alipay.oceanbase.hbase.util; - -import com.alipay.oceanbase.rpc.ObGlobal; - -public class CompatibilityUtil { - public static boolean isBatchSupport() { - return ObGlobal.OB_VERSION > ObGlobal.OB_VERSION_4_3_4_0; - } -} diff --git a/src/main/java/com/alipay/oceanbase/hbase/util/OHTableFactory.java b/src/main/java/com/alipay/oceanbase/hbase/util/OHTableFactory.java index 8d42daa4..f74d184f 100644 --- a/src/main/java/com/alipay/oceanbase/hbase/util/OHTableFactory.java +++ b/src/main/java/com/alipay/oceanbase/hbase/util/OHTableFactory.java @@ -21,6 +21,7 @@ import com.alipay.oceanbase.hbase.OHTablePool; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.HTableFactory; import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.util.Bytes; @@ -170,7 +171,7 @@ private Configuration adjustConfiguration(Configuration configuration, String ta } private Configuration copyConfiguration(Configuration origin) { - Configuration copy = new Configuration(); + Configuration copy = HBaseConfiguration.create(); for (Map.Entry entry : origin) { copy.set(entry.getKey(), entry.getValue()); diff --git a/src/test/java/com/alipay/oceanbase/hbase/OHTablePoolTest.java b/src/test/java/com/alipay/oceanbase/hbase/OHTablePoolTest.java index 7f704774..5977f26f 100644 --- a/src/test/java/com/alipay/oceanbase/hbase/OHTablePoolTest.java +++ b/src/test/java/com/alipay/oceanbase/hbase/OHTablePoolTest.java @@ -20,6 +20,7 @@ import com.alipay.oceanbase.hbase.util.ObHTableTestUtil; import com.alipay.remoting.util.ConcurrentHashSet; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.util.PoolMap; import org.junit.*; @@ -37,7 +38,7 @@ public class OHTablePoolTest extends HTableTestBase { protected static OHTablePool ohTablePool; private static OHTablePool newOHTablePool(final int maxSize, final PoolMap.PoolType poolType) { - OHTablePool pool = new OHTablePool(new Configuration(), maxSize, poolType); + OHTablePool pool = new OHTablePool(HBaseConfiguration.create(), maxSize, poolType); pool.setFullUserName("test", ObHTableTestUtil.FULL_USER_NAME); pool.setPassword("test", ObHTableTestUtil.PASSWORD); if (ObHTableTestUtil.ODP_MODE) { @@ -55,7 +56,7 @@ private static OHTablePool newOHTablePool(final int maxSize, final PoolMap.PoolT @BeforeClass public static void setup() throws Exception { - Configuration c = new Configuration(); + Configuration c = HBaseConfiguration.create(); ohTablePool = newOHTablePool(10, null); ohTablePool.setRuntimeBatchExecutor("test", Executors.newFixedThreadPool(3)); hTable = ohTablePool.getTable("test"); From c4d5d0754725c147ad6aba4d3b939a6e6c86692c Mon Sep 17 00:00:00 2001 From: JackShi148 Date: Tue, 11 Feb 2025 20:23:56 +0800 Subject: [PATCH 13/15] remove redundant exception catch --- src/main/java/com/alipay/oceanbase/hbase/OHTable.java | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/src/main/java/com/alipay/oceanbase/hbase/OHTable.java b/src/main/java/com/alipay/oceanbase/hbase/OHTable.java index b64f7fec..fe07ede8 100644 --- a/src/main/java/com/alipay/oceanbase/hbase/OHTable.java +++ b/src/main/java/com/alipay/oceanbase/hbase/OHTable.java @@ -1526,13 +1526,7 @@ public void flushCommits() throws IOException { if (mutator == null) { return; } - try { - mutator.flush(); - } catch (Exception e) { - throw new IOException("put table " + tableNameString + " error codes " + null - + " auto flush " + autoFlush + " current buffer size " - + mutator.getCurrentBufferSize(), e); - } + mutator.flush(); } @Override From f13645906b11fb2cea92c144793558407705c0b4 Mon Sep 17 00:00:00 2001 From: JackShi148 Date: Wed, 12 Feb 2025 14:40:49 +0800 Subject: [PATCH 14/15] remove useless comments --- src/main/java/com/alipay/oceanbase/hbase/OHTable.java | 5 ----- 1 file changed, 5 deletions(-) diff --git a/src/main/java/com/alipay/oceanbase/hbase/OHTable.java b/src/main/java/com/alipay/oceanbase/hbase/OHTable.java index fe07ede8..6956e6a4 100644 --- a/src/main/java/com/alipay/oceanbase/hbase/OHTable.java +++ b/src/main/java/com/alipay/oceanbase/hbase/OHTable.java @@ -147,11 +147,6 @@ public class OHTable implements HTableInterface { */ private long writeBufferSize; - /** - * decide whether clear the buffer when meet exception.the default - * value is true. Be careful about the correctness when set it false - */ - /** * whether flush the put automatically */ From ae01a2b6af34e7e7b77f04ee01852400da9fdf73 Mon Sep 17 00:00:00 2001 From: JackShi148 Date: Wed, 12 Feb 2025 14:53:52 +0800 Subject: [PATCH 15/15] remove useless function --- src/main/java/com/alipay/oceanbase/hbase/OHTable.java | 4 ---- 1 file changed, 4 deletions(-) diff --git a/src/main/java/com/alipay/oceanbase/hbase/OHTable.java b/src/main/java/com/alipay/oceanbase/hbase/OHTable.java index 6956e6a4..10728ee2 100644 --- a/src/main/java/com/alipay/oceanbase/hbase/OHTable.java +++ b/src/main/java/com/alipay/oceanbase/hbase/OHTable.java @@ -2262,10 +2262,6 @@ public Pair getStartEndKeys() throws IOException { return new Pair<>(getStartKeys(), getEndKeys()); } - public enum OHOpType { - Put, Append, Delete, Increment - } - private BufferedMutator getBufferedMutator() throws IOException { if (this.mutator == null) { this.mutator = (OHBufferedMutatorImpl) this.connection.getBufferedMutator(