From fa09aa93cf7be642145b8d9616b055d92af6ccbe Mon Sep 17 00:00:00 2001 From: JackShi148 Date: Wed, 11 Sep 2024 14:12:16 +0800 Subject: [PATCH 01/12] init bufferedMutator --- .../hbase/util/OHBufferedMutatorImpl.java | 314 +++++++++++++++++- .../hbase/util/OHConnectionImpl.java | 4 + 2 files changed, 305 insertions(+), 13 deletions(-) diff --git a/src/main/java/com/alipay/oceanbase/hbase/util/OHBufferedMutatorImpl.java b/src/main/java/com/alipay/oceanbase/hbase/util/OHBufferedMutatorImpl.java index 83c63c7a..5371919d 100644 --- a/src/main/java/com/alipay/oceanbase/hbase/util/OHBufferedMutatorImpl.java +++ b/src/main/java/com/alipay/oceanbase/hbase/util/OHBufferedMutatorImpl.java @@ -17,53 +17,341 @@ package com.alipay.oceanbase.hbase.util; +import com.alipay.oceanbase.hbase.OHTable; +import com.alipay.oceanbase.hbase.exception.FeatureNotSupportedException; +import com.alipay.oceanbase.rpc.ObTableClient; +import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.*; +import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.BufferedMutator; -import org.apache.hadoop.hbase.client.BufferedMutatorParams; -import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.*; +import org.apache.hadoop.hbase.util.Bytes; +import org.slf4j.Logger; +import sun.awt.image.ImageWatched; +import javax.ws.rs.PUT; import java.io.IOException; -import java.util.List; +import java.io.InterruptedIOException; +import java.util.*; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +import static com.alipay.oceanbase.hbase.constants.OHConstants.*; +import static com.alipay.oceanbase.hbase.constants.OHConstants.DEFAULT_HBASE_HTABLE_TEST_LOAD_SUFFIX; +import static com.alipay.oceanbase.hbase.util.Preconditions.checkArgument; +import static com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableOperation.getInstance; +import static com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableOperationType.*; +import static com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableOperationType.DEL; +import static org.apache.commons.lang.StringUtils.isBlank; @InterfaceAudience.Private public class OHBufferedMutatorImpl implements BufferedMutator { - public OHBufferedMutatorImpl(OHConnectionImpl ohConnection, BufferedMutatorParams params) { + private static final Logger LOGGER = TableHBaseLoggerFactory + .getLogger(OHConnectionImpl.class); + + private static final int BUFFERED_PARAM_UNSET = -1; + + private final ExceptionListener listener; + + protected final ObTableClient obTableClient; + private final TableName tableName; + private volatile Configuration conf; + private final OHConnectionConfiguration connectionConfig; + + @VisibleForTesting + final ConcurrentLinkedQueue asyncWriteBuffer = new ConcurrentLinkedQueue(); + @VisibleForTesting + AtomicLong currentAsyncBufferSize = new AtomicLong(0); + + private long writeBufferSize; + private final int maxKeyValueSize; + private boolean closed = false; + private final ExecutorService pool; + private int rpcTimeout; + private int operationTimeout; + private final boolean cleanupPoolOnClose; + + public OHBufferedMutatorImpl(OHConnectionImpl ohConnection, BufferedMutatorParams params) throws IOException { + if (ohConnection == null || ohConnection.isClosed()) { + throw new IllegalArgumentException("Connection is null or closed."); + } + // create a ObTableClient to do rpc operations + this.obTableClient = ObTableClientManager.getOrCreateObTableClient(ohConnection.getOHConnectionConfiguration()); + + // init params in OHBufferedMutatorImpl: + // TableName + pool + Configuration + listener + writeBufferSize + maxKeyValueSize + rpcTimeout + operationTimeout + this.tableName = params.getTableName(); + this.conf = ohConnection.getConfiguration(); + this.connectionConfig = ohConnection.getOHConnectionConfiguration(); + this.listener = params.getListener(); + + if (params.getPool() == null) { + this.pool = HTable.getDefaultExecutor(ohConnection.getConfiguration()); + this.cleanupPoolOnClose = true; + } else { + this.pool = params.getPool(); + this.cleanupPoolOnClose = false; + } + + this.writeBufferSize = params.getWriteBufferSize() != BUFFERED_PARAM_UNSET ? + params.getWriteBufferSize() : connectionConfig.getWriteBufferSize(); + this.maxKeyValueSize = params.getMaxKeyValueSize() != BUFFERED_PARAM_UNSET ? + params.getMaxKeyValueSize() : connectionConfig.getMaxKeyValueSize(); + this.rpcTimeout = connectionConfig.getRpcTimeout(); + this.operationTimeout = connectionConfig.getOperationTimeout(); } @Override public TableName getName() { - return null; + return this.tableName; } @Override public Configuration getConfiguration() { - return null; + return this.conf; } @Override - public void mutate(Mutation mutation) throws IOException { - + public void mutate(Mutation mutation) throws InterruptedIOException, + RetriesExhaustedWithDetailsException { + // convert mutation into list, use the interface below + mutate(Collections.singletonList(mutation)); } + // only support for Put and Delete for 1.x @Override - public void mutate(List list) throws IOException { + public void mutate(List mutations) throws InterruptedIOException, + RetriesExhaustedWithDetailsException { + // add the mutations into writeAsyncBuffer + // atomically add size of mutations into currentWriteBufferSize + // do the flush/backgroundFlushCommits if currentWriteBufferSize > writeBufferSize + if (closed) { + throw new IllegalStateException("Cannot put when the BufferedMutator is closed."); + } + + long toAddSize = 0; + // check if every mutation's family is the same + if (!validateSameFamily(mutations)) { + throw new IllegalStateException("Family should keep the same in one batch."); + } + for (Mutation m : mutations) { + if (!validateInsUpAndDelete(m)) { + throw new IllegalArgumentException("Only support for Put and Delete for now."); + } + if (m instanceof Put) { + HTable.validatePut((Put) m, maxKeyValueSize); + } + toAddSize += m.heapSize(); + } + + currentAsyncBufferSize.addAndGet(toAddSize); + asyncWriteBuffer.addAll(mutations); + + asyncExecute(false); + } + + boolean validateInsUpAndDelete(Mutation mt) { + if (!(mt instanceof Put) && !(mt instanceof Delete)) { + return false; + } + return true; + } + + boolean validateSameFamily(List mutations) { + for (Mutation mutation : mutations) { + if (mutation.getFamilyMap().keySet() == null + || mutation.getFamilyMap().keySet().size() == 0) { + throw new IllegalArgumentException("Family is not provided in batch operations."); + } + if (mutation.getFamilyMap().keySet().size() > 1) { + return false; + } + } + return true; + } + + /** + * Send the operations in the buffer to the servers. Does not wait for the server's answer. If + * the is an error (max retried reach from a previous flush or bad operation), it tries to send + * all operations in the buffer and sends an exception. + * + * @param flushAll - if true, sends all the writes and wait for all of them to finish before + * returning. + */ + private void asyncExecute(boolean flushAll) throws + InterruptedIOException, + RetriesExhaustedWithDetailsException { + while (true) { + if (!flushAll && currentAsyncBufferSize.get() <= writeBufferSize) { + // There is the room to accept more mutations. + break; + } + try { + // namespace n1, n1:table_name + // namespace default, table_name + String tableNameString = tableName.getNameAsString(); + Map.Entry> entry = asyncWriteBuffer.peek().getFamilyMap().entrySet().iterator().next(); + byte[] family = entry.getKey(); + ObTableBatchOperation batch = buildObTableBatchOperation(asyncWriteBuffer); + String targetTableName = getTargetTableName(tableNameString, Bytes.toString(family)); + ObTableBatchOperationRequest request = buildObTableBatchOperationRequest(batch, targetTableName, pool); + ObTableBatchOperationResult result = (ObTableBatchOperationResult) obTableClient.execute(request); + } catch (Exception ex) { + + } + } } @Override public void close() throws IOException { - + if (closed) { + return; + } + try { + asyncExecute(true); + } finally { + if (cleanupPoolOnClose) { + this.pool.shutdown(); + try { + if (!pool.awaitTermination(600, TimeUnit.SECONDS)) { + LOGGER.warn("close() failed to terminate pool after 10 minutes. Abandoning pool."); + } + } catch (InterruptedException e) { + LOGGER.warn("waitForTermination interrupted"); + Thread.currentThread().interrupt(); + } + } + closed = true; + } } @Override public void flush() throws IOException { - + if (closed) { + throw new IllegalStateException("Cannot put when the BufferedMutator is closed."); + } + asyncExecute(true); } @Override public long getWriteBufferSize() { - return 0; + return this.writeBufferSize; + } + + private ObTableBatchOperation buildObTableBatchOperation(ConcurrentLinkedQueue asyncWriteBuffer) { + LinkedList execBuffer = new LinkedList<>(); + Mutation m; + while ((m = asyncWriteBuffer.poll()) != null) { + execBuffer.add(m); + long size = m.heapSize(); + currentAsyncBufferSize.addAndGet(-size); + } + List keyValueList = new LinkedList<>(); + for (Mutation mutation : execBuffer) { + checkFamilyViolation(mutation.getFamilyMap().keySet()); + for (Map.Entry> entry : mutation.getFamilyMap().entrySet()) { + keyValueList.addAll(entry.getValue()); + } + } + return buildObTableBatchOperation(keyValueList, false, null); + } + + private ObTableBatchOperation buildObTableBatchOperation(List keyValueList, + boolean putToAppend, + List qualifiers) { + ObTableBatchOperation batch = new ObTableBatchOperation(); + for (KeyValue kv : keyValueList) { + if (qualifiers != null) { + qualifiers.add(kv.getQualifier()); + } + batch.addTableOperation(buildObTableOperation(kv, putToAppend)); + } + batch.setSameType(true); + batch.setSamePropertiesNames(true); + return batch; + } + + private ObTableOperation buildObTableOperation(KeyValue kv, boolean putToAppend) { + KeyValue.Type kvType = KeyValue.Type.codeToType(kv.getType()); + switch (kvType) { + case Put: + ObTableOperationType operationType; + if (putToAppend) { + operationType = APPEND; + } else { + operationType = INSERT_OR_UPDATE; + } + return getInstance(operationType, + new Object[] { kv.getRow(), kv.getQualifier(), kv.getTimestamp() }, V_COLUMNS, + new Object[] { kv.getValue() }); + case Delete: + return getInstance(DEL, + new Object[] { kv.getRow(), kv.getQualifier(), kv.getTimestamp() }, null, null); + case DeleteColumn: + return getInstance(DEL, + new Object[] { kv.getRow(), kv.getQualifier(), -kv.getTimestamp() }, null, null); + case DeleteFamily: + return getInstance(DEL, new Object[] { kv.getRow(), null, -kv.getTimestamp() }, + null, null); + default: + throw new IllegalArgumentException("illegal mutation type " + kvType); + } + } + + private ObTableBatchOperationRequest buildObTableBatchOperationRequest(ObTableBatchOperation obTableBatchOperation, + String targetTableName, + ExecutorService pool) { + ObTableBatchOperationRequest request = new ObTableBatchOperationRequest(); + request.setTableName(targetTableName); + request.setReturningAffectedRows(true); + request.setEntityType(ObTableEntityType.HKV); + request.setBatchOperation(obTableBatchOperation); + request.setPool(pool); + return request; + } + + private void checkFamilyViolation(Collection families) { + if (families == null || families.size() == 0) { + throw new FeatureNotSupportedException("family is empty."); + } + + if (families.size() > 1) { + throw new FeatureNotSupportedException("multi family is not supported yet."); + } + + for (byte[] family : families) { + if (family == null || family.length == 0) { + throw new IllegalArgumentException("family is empty"); + } + if (isBlank(Bytes.toString(family))) { + throw new IllegalArgumentException("family is blank"); + } + } + + } + + private String getTargetTableName(String tableNameString, String familyString) { + checkArgument(tableNameString != null, "tableNameString is null"); + checkArgument(familyString != null, "familyString is null"); + if (conf.getBoolean(HBASE_HTABLE_TEST_LOAD_ENABLE, false)) { + return getTestLoadTargetTableName(tableNameString, familyString); + } + return getNormalTargetTableName(tableNameString, familyString); + } + + private String getTestLoadTargetTableName(String tableNameString, String familyString) { + String suffix = conf.get(HBASE_HTABLE_TEST_LOAD_SUFFIX, + DEFAULT_HBASE_HTABLE_TEST_LOAD_SUFFIX); + return tableNameString + suffix + "$" + familyString; + } + + private String getNormalTargetTableName(String tableNameString, String familyString) { + return tableNameString + "$" + familyString; } } diff --git a/src/main/java/com/alipay/oceanbase/hbase/util/OHConnectionImpl.java b/src/main/java/com/alipay/oceanbase/hbase/util/OHConnectionImpl.java index 12d2abb2..77eea825 100644 --- a/src/main/java/com/alipay/oceanbase/hbase/util/OHConnectionImpl.java +++ b/src/main/java/com/alipay/oceanbase/hbase/util/OHConnectionImpl.java @@ -69,6 +69,10 @@ public Configuration getConfiguration() { return this.conf; } + public OHConnectionConfiguration getOHConnectionConfiguration() { + return this.connectionConfig; + } + private ExecutorService getBatchPool() { if (batchPool == null) { synchronized (this) { From 41a02a8997fb077e8ae8cf91d22010b423fb2589 Mon Sep 17 00:00:00 2001 From: JackShi148 Date: Wed, 11 Sep 2024 16:03:59 +0800 Subject: [PATCH 02/12] finish validateFamily and asyncExecute --- .../hbase/util/OHBufferedMutatorImpl.java | 53 ++++++++++++++----- 1 file changed, 40 insertions(+), 13 deletions(-) diff --git a/src/main/java/com/alipay/oceanbase/hbase/util/OHBufferedMutatorImpl.java b/src/main/java/com/alipay/oceanbase/hbase/util/OHBufferedMutatorImpl.java index 5371919d..70cce009 100644 --- a/src/main/java/com/alipay/oceanbase/hbase/util/OHBufferedMutatorImpl.java +++ b/src/main/java/com/alipay/oceanbase/hbase/util/OHBufferedMutatorImpl.java @@ -20,6 +20,7 @@ import com.alipay.oceanbase.hbase.OHTable; import com.alipay.oceanbase.hbase.exception.FeatureNotSupportedException; import com.alipay.oceanbase.rpc.ObTableClient; +import com.alipay.oceanbase.rpc.exception.ObTableUnexpectedException; import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.*; import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.classification.InterfaceAudience; @@ -35,6 +36,7 @@ import javax.ws.rs.PUT; import java.io.IOException; import java.io.InterruptedIOException; +import java.rmi.UnexpectedException; import java.util.*; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutorService; @@ -47,6 +49,7 @@ import static com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableOperation.getInstance; import static com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableOperationType.*; import static com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableOperationType.DEL; +import static com.alipay.oceanbase.rpc.util.TableClientLoggerFactory.LCD; import static org.apache.commons.lang.StringUtils.isBlank; @InterfaceAudience.Private @@ -163,14 +166,24 @@ boolean validateInsUpAndDelete(Mutation mt) { } boolean validateSameFamily(List mutations) { + byte[] family = null; + if (asyncWriteBuffer.isEmpty()) { + family = asyncWriteBuffer.peek().getFamilyMap().firstKey(); + } for (Mutation mutation : mutations) { - if (mutation.getFamilyMap().keySet() == null - || mutation.getFamilyMap().keySet().size() == 0) { + if (mutation.getFamilyMap() == null + || mutation.getFamilyMap().keySet().isEmpty()) { throw new IllegalArgumentException("Family is not provided in batch operations."); } if (mutation.getFamilyMap().keySet().size() > 1) { return false; } + if (family != null) { + byte[] curFamily = mutation.getFamilyMap().firstKey(); + if (!Bytes.equals(family, curFamily)) { + return false; + } + } } return true; } @@ -186,26 +199,39 @@ boolean validateSameFamily(List mutations) { private void asyncExecute(boolean flushAll) throws InterruptedIOException, RetriesExhaustedWithDetailsException { - while (true) { - if (!flushAll && currentAsyncBufferSize.get() <= writeBufferSize) { - // There is the room to accept more mutations. - break; - } - try { + try { + while (true) { + if (!flushAll && currentAsyncBufferSize.get() <= writeBufferSize) { + // There is the room to accept more mutations. + break; + } // namespace n1, n1:table_name // namespace default, table_name String tableNameString = tableName.getNameAsString(); - Map.Entry> entry = asyncWriteBuffer.peek().getFamilyMap().entrySet().iterator().next(); - byte[] family = entry.getKey(); + // for now, operations' family is the same + byte[] family = asyncWriteBuffer.peek().getFamilyMap().firstKey(); ObTableBatchOperation batch = buildObTableBatchOperation(asyncWriteBuffer); String targetTableName = getTargetTableName(tableNameString, Bytes.toString(family)); ObTableBatchOperationRequest request = buildObTableBatchOperationRequest(batch, targetTableName, pool); ObTableBatchOperationResult result = (ObTableBatchOperationResult) obTableClient.execute(request); - } catch (Exception ex) { - + } + } catch (Exception ex) { + LOGGER.error(LCD.convert("01-00026"), ex); + // TODO: need to collect error information and actions during batch operations + // TODO: maybe keep in ObTableBatchOperationResult + List throwables = new ArrayList(); + List actions = new ArrayList(); + List addresses = new ArrayList(); + throwables.add(ex); + RetriesExhaustedWithDetailsException error = new RetriesExhaustedWithDetailsException( + new ArrayList(throwables), + new ArrayList(actions), new ArrayList(addresses)); + if (listener == null) { + throw error; + } else { + listener.onException(error, this); } } - } @Override @@ -313,6 +339,7 @@ private ObTableBatchOperationRequest buildObTableBatchOperationRequest(ObTableBa request.setEntityType(ObTableEntityType.HKV); request.setBatchOperation(obTableBatchOperation); request.setPool(pool); + request.setTimeout(rpcTimeout); return request; } From 4731710ffc00eae2cf0e121cb2572d008c843ec1 Mon Sep 17 00:00:00 2001 From: JackShi148 Date: Wed, 11 Sep 2024 16:10:18 +0800 Subject: [PATCH 03/12] correct log in OHBufferedMutatorImpl --- .../com/alipay/oceanbase/hbase/util/OHBufferedMutatorImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/com/alipay/oceanbase/hbase/util/OHBufferedMutatorImpl.java b/src/main/java/com/alipay/oceanbase/hbase/util/OHBufferedMutatorImpl.java index 70cce009..50fbe43b 100644 --- a/src/main/java/com/alipay/oceanbase/hbase/util/OHBufferedMutatorImpl.java +++ b/src/main/java/com/alipay/oceanbase/hbase/util/OHBufferedMutatorImpl.java @@ -55,7 +55,7 @@ @InterfaceAudience.Private public class OHBufferedMutatorImpl implements BufferedMutator { private static final Logger LOGGER = TableHBaseLoggerFactory - .getLogger(OHConnectionImpl.class); + .getLogger(OHBufferedMutatorImpl.class); private static final int BUFFERED_PARAM_UNSET = -1; From 1755b262a8db8363d2c99d675bb1c2b165046a37 Mon Sep 17 00:00:00 2001 From: JackShi148 Date: Thu, 12 Sep 2024 15:17:45 +0800 Subject: [PATCH 04/12] pass self-test --- .../hbase/util/OHBufferedMutatorImpl.java | 97 ++-- .../hbase/OHBufferedMutatorTest.java | 469 ++++++++++++++++++ 2 files changed, 525 insertions(+), 41 deletions(-) create mode 100644 src/test/java/com/alipay/oceanbase/hbase/OHBufferedMutatorTest.java diff --git a/src/main/java/com/alipay/oceanbase/hbase/util/OHBufferedMutatorImpl.java b/src/main/java/com/alipay/oceanbase/hbase/util/OHBufferedMutatorImpl.java index 50fbe43b..e4a8dcd6 100644 --- a/src/main/java/com/alipay/oceanbase/hbase/util/OHBufferedMutatorImpl.java +++ b/src/main/java/com/alipay/oceanbase/hbase/util/OHBufferedMutatorImpl.java @@ -71,13 +71,12 @@ public class OHBufferedMutatorImpl implements BufferedMutator { @VisibleForTesting AtomicLong currentAsyncBufferSize = new AtomicLong(0); - private long writeBufferSize; + private final long writeBufferSize; private final int maxKeyValueSize; private boolean closed = false; private final ExecutorService pool; - private int rpcTimeout; - private int operationTimeout; - private final boolean cleanupPoolOnClose; + private final int rpcTimeout; + private final int operationTimeout; public OHBufferedMutatorImpl(OHConnectionImpl ohConnection, BufferedMutatorParams params) throws IOException { if (ohConnection == null || ohConnection.isClosed()) { @@ -92,20 +91,14 @@ public OHBufferedMutatorImpl(OHConnectionImpl ohConnection, BufferedMutatorParam this.conf = ohConnection.getConfiguration(); this.connectionConfig = ohConnection.getOHConnectionConfiguration(); this.listener = params.getListener(); - - if (params.getPool() == null) { - this.pool = HTable.getDefaultExecutor(ohConnection.getConfiguration()); - this.cleanupPoolOnClose = true; - } else { - this.pool = params.getPool(); - this.cleanupPoolOnClose = false; - } + this.pool = params.getPool(); this.writeBufferSize = params.getWriteBufferSize() != BUFFERED_PARAM_UNSET ? params.getWriteBufferSize() : connectionConfig.getWriteBufferSize(); this.maxKeyValueSize = params.getMaxKeyValueSize() != BUFFERED_PARAM_UNSET ? params.getMaxKeyValueSize() : connectionConfig.getMaxKeyValueSize(); this.rpcTimeout = connectionConfig.getRpcTimeout(); + this.obTableClient.setRpcExecuteTimeout(rpcTimeout); this.operationTimeout = connectionConfig.getOperationTimeout(); } @@ -119,20 +112,28 @@ public Configuration getConfiguration() { return this.conf; } + /** + * Add the mutation into asyncWriteBuffer + * + * @param mutation - mutation operation + */ @Override public void mutate(Mutation mutation) throws InterruptedIOException, RetriesExhaustedWithDetailsException { - // convert mutation into list, use the interface below mutate(Collections.singletonList(mutation)); } - // only support for Put and Delete for 1.x + /** + * Add all mutations in List into asyncWriteBuffer + * + * @param mutations - mutation operations + */ @Override public void mutate(List mutations) throws InterruptedIOException, RetriesExhaustedWithDetailsException { // add the mutations into writeAsyncBuffer // atomically add size of mutations into currentWriteBufferSize - // do the flush/backgroundFlushCommits if currentWriteBufferSize > writeBufferSize + // do the flush if currentWriteBufferSize > writeBufferSize if (closed) { throw new IllegalStateException("Cannot put when the BufferedMutator is closed."); } @@ -140,7 +141,7 @@ public void mutate(List mutations) throws InterruptedIOExcep long toAddSize = 0; // check if every mutation's family is the same if (!validateSameFamily(mutations)) { - throw new IllegalStateException("Family should keep the same in one batch."); + throw new IllegalArgumentException("Family should keep the same in one batch."); } for (Mutation m : mutations) { if (!validateInsUpAndDelete(m)) { @@ -158,6 +159,10 @@ public void mutate(List mutations) throws InterruptedIOExcep asyncExecute(false); } + /** + * Check whether the mutation is Put or Delete in 1.x + * @param mt - mutation operation + */ boolean validateInsUpAndDelete(Mutation mt) { if (!(mt instanceof Put) && !(mt instanceof Delete)) { return false; @@ -165,9 +170,13 @@ boolean validateInsUpAndDelete(Mutation mt) { return true; } + /** + * Check whether the family in this batch is the same + * @param mutations - mutation operations + */ boolean validateSameFamily(List mutations) { byte[] family = null; - if (asyncWriteBuffer.isEmpty()) { + if (!asyncWriteBuffer.isEmpty()) { family = asyncWriteBuffer.peek().getFamilyMap().firstKey(); } for (Mutation mutation : mutations) { @@ -190,8 +199,7 @@ boolean validateSameFamily(List mutations) { /** * Send the operations in the buffer to the servers. Does not wait for the server's answer. If - * the is an error (max retried reach from a previous flush or bad operation), it tries to send - * all operations in the buffer and sends an exception. + * there is an error, either throw the error, or use the listener to deal with the error. * * @param flushAll - if true, sends all the writes and wait for all of them to finish before * returning. @@ -201,16 +209,30 @@ private void asyncExecute(boolean flushAll) throws RetriesExhaustedWithDetailsException { try { while (true) { - if (!flushAll && currentAsyncBufferSize.get() <= writeBufferSize) { - // There is the room to accept more mutations. + if (!flushAll || asyncWriteBuffer.isEmpty()) { + if (currentAsyncBufferSize.get() <= writeBufferSize) { + break; + } + } + Mutation m; + LinkedList execBuffer = new LinkedList<>(); + while ((m = asyncWriteBuffer.poll()) != null) { + execBuffer.add(m); + long size = m.heapSize(); + currentAsyncBufferSize.addAndGet(-size); + } + // in concurrent situation, asyncWriteBuffer may be empty here + // for other threads flush all buffer + if (execBuffer.isEmpty()) { break; } // namespace n1, n1:table_name // namespace default, table_name String tableNameString = tableName.getNameAsString(); // for now, operations' family is the same - byte[] family = asyncWriteBuffer.peek().getFamilyMap().firstKey(); - ObTableBatchOperation batch = buildObTableBatchOperation(asyncWriteBuffer); + byte[] family = execBuffer.getFirst().getFamilyMap().firstKey(); + ObTableBatchOperation batch = buildObTableBatchOperation(execBuffer); + // table_name$cf_name String targetTableName = getTargetTableName(tableNameString, Bytes.toString(family)); ObTableBatchOperationRequest request = buildObTableBatchOperationRequest(batch, targetTableName, pool); ObTableBatchOperationResult result = (ObTableBatchOperationResult) obTableClient.execute(request); @@ -242,21 +264,22 @@ public void close() throws IOException { try { asyncExecute(true); } finally { - if (cleanupPoolOnClose) { - this.pool.shutdown(); - try { - if (!pool.awaitTermination(600, TimeUnit.SECONDS)) { - LOGGER.warn("close() failed to terminate pool after 10 minutes. Abandoning pool."); - } - } catch (InterruptedException e) { - LOGGER.warn("waitForTermination interrupted"); - Thread.currentThread().interrupt(); + this.pool.shutdown(); + try { + if (!pool.awaitTermination(600, TimeUnit.SECONDS)) { + LOGGER.warn("close() failed to terminate pool after 10 minutes. Abandoning pool."); } + } catch (InterruptedException e) { + LOGGER.warn("waitForTermination interrupted"); + Thread.currentThread().interrupt(); } closed = true; } } + /** + * Force to commit all operations + */ @Override public void flush() throws IOException { if (closed) { @@ -270,14 +293,7 @@ public long getWriteBufferSize() { return this.writeBufferSize; } - private ObTableBatchOperation buildObTableBatchOperation(ConcurrentLinkedQueue asyncWriteBuffer) { - LinkedList execBuffer = new LinkedList<>(); - Mutation m; - while ((m = asyncWriteBuffer.poll()) != null) { - execBuffer.add(m); - long size = m.heapSize(); - currentAsyncBufferSize.addAndGet(-size); - } + private ObTableBatchOperation buildObTableBatchOperation(LinkedList execBuffer) { List keyValueList = new LinkedList<>(); for (Mutation mutation : execBuffer) { checkFamilyViolation(mutation.getFamilyMap().keySet()); @@ -339,7 +355,6 @@ private ObTableBatchOperationRequest buildObTableBatchOperationRequest(ObTableBa request.setEntityType(ObTableEntityType.HKV); request.setBatchOperation(obTableBatchOperation); request.setPool(pool); - request.setTimeout(rpcTimeout); return request; } diff --git a/src/test/java/com/alipay/oceanbase/hbase/OHBufferedMutatorTest.java b/src/test/java/com/alipay/oceanbase/hbase/OHBufferedMutatorTest.java new file mode 100644 index 00000000..26fc2308 --- /dev/null +++ b/src/test/java/com/alipay/oceanbase/hbase/OHBufferedMutatorTest.java @@ -0,0 +1,469 @@ +package com.alipay.oceanbase.hbase; + +import com.alipay.oceanbase.hbase.util.OHBufferedMutatorImpl; +import net.bytebuddy.implementation.bytecode.Throw; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.*; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Threads; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.*; + +import static org.apache.hadoop.hbase.util.Bytes.toBytes; + +public class OHBufferedMutatorTest { + protected Table hTable; + protected Configuration conf; + @Before + public void setup() throws IOException { + conf = ObHTableTestUtil.newConfiguration(); + conf.set("rs.list.acquire.read.timeout", "10000"); + } + /* + CREATE TABLEGROUP test SHARDING = 'ADAPTIVE'; + CREATE TABLE `test$family_group` ( + `K` varbinary(1024) NOT NULL, + `Q` varbinary(256) NOT NULL, + `T` bigint(20) NOT NULL, + `V` varbinary(1024) DEFAULT NULL, + PRIMARY KEY (`K`, `Q`, `T`) + ) TABLEGROUP = test; + */ + @Test + public void testBufferedMutatorWithFlush() throws Exception { + hTable = new OHTable(conf, "test"); + BufferedMutator ohBufferMutator = null; + try (Connection conn = ConnectionFactory.createConnection(conf)) { + TableName tableName = TableName.valueOf("test"); + + // use defualt params + ohBufferMutator = conn.getBufferedMutator(tableName); + + String key = "putKey"; + String column1 = "putColumn1"; + String value = "value333444"; + long timestamp = System.currentTimeMillis(); + + List mutations = new ArrayList<>(); + // test Put + Put put1 = new Put(Bytes.toBytes(key)); + put1.addColumn(Bytes.toBytes("family_group"), Bytes.toBytes(column1), timestamp, Bytes.toBytes(value)); + mutations.add(put1); + Put put2 = new Put(Bytes.toBytes(key)); + put2.addColumn(Bytes.toBytes("family_group"), Bytes.toBytes(column1 + "1"), timestamp, Bytes.toBytes(value + "4")); + mutations.add(put2); + // test add Mutations with List + ohBufferMutator.mutate(mutations); + ohBufferMutator.flush(); + + Get get = new Get(toBytes(key)); + Result r = hTable.get(get); + Assert.assertEquals(2, r.raw().length); + for (KeyValue keyValue : r.raw()) { + System.out.println("rowKey: " + new String(keyValue.getRow()) + " family :" + + new String(keyValue.getFamily()) + " columnQualifier:" + + new String(keyValue.getQualifier()) + " timestamp:" + + keyValue.getTimestamp() + " value:" + + new String(keyValue.getValue())); + } + + Delete del = new Delete(Bytes.toBytes(key)); + del.deleteFamily(Bytes.toBytes("family_group")); + // test add Mutation directly + ohBufferMutator.mutate(del); + ohBufferMutator.flush(); + + r = hTable.get(get); + Assert.assertEquals(0, r.raw().length); + + // test NOT_SUPPORT type' + Put put = new Put(Bytes.toBytes(key)); + put.addColumn(Bytes.toBytes("family_group"), Bytes.toBytes(column1), timestamp, Bytes.toBytes(value)); + put.addColumn(Bytes.toBytes("family_group1"), Bytes.toBytes(column1 + "1"), timestamp, Bytes.toBytes(value + "4")); + // not support different family in one batch + final BufferedMutator difMut = ohBufferMutator; + final Put difPut= put; + Assert.assertThrows(IllegalArgumentException.class, () -> { + difMut.mutate(difPut); + }); + + Append append = new Append(Bytes.toBytes(key)); + append.add("family_group".getBytes(), column1.getBytes(), toBytes("_suffix")); + // only support Put and Delete + final BufferedMutator apMut = ohBufferMutator; + Assert.assertThrows(IllegalArgumentException.class, () -> { + apMut.mutate(append); + }); + } catch (Exception ex) { + if (ex instanceof RetriesExhaustedWithDetailsException) { + ((RetriesExhaustedWithDetailsException) ex).getCauses().get(0).printStackTrace(); + } else { + ex.printStackTrace(); + } + Assert.assertTrue(false); + } finally { + if (ohBufferMutator != null) { + ohBufferMutator.close(); + } + } + } + + /* + CREATE TABLEGROUP n1:test SHARDING = 'ADAPTIVE'; + CREATE TABLE `n1:test$family_group` ( + `K` varbinary(1024) NOT NULL, + `Q` varbinary(256) NOT NULL, + `T` bigint(20) NOT NULL, + `V` varbinary(1024) DEFAULT NULL, + PRIMARY KEY (`K`, `Q`, `T`) + ) TABLEGROUP = n1:test; + */ + @Test + public void testBufferedMutatorUseNameSpaceWithFlush() throws Exception { + hTable = new OHTable(conf, "n1:test"); + BufferedMutator ohBufferMutator = null; + try (Connection conn = ConnectionFactory.createConnection(conf)) { + // in "n1" database + TableName tableName = TableName.valueOf("n1:test"); + + // use defualt params + ohBufferMutator = conn.getBufferedMutator(tableName); + + String key = "putKey"; + String column1 = "putColumn1"; + String value = "value333444"; + long timestamp = System.currentTimeMillis(); + + List mutations = new ArrayList<>(); + // test Put + Put put1 = new Put(Bytes.toBytes(key)); + put1.addColumn(Bytes.toBytes("family_group"), Bytes.toBytes(column1), timestamp, Bytes.toBytes(value)); + mutations.add(put1); + Put put2 = new Put(Bytes.toBytes(key)); + put2.addColumn(Bytes.toBytes("family_group"), Bytes.toBytes(column1 + "1"), timestamp, Bytes.toBytes(value + "4")); + mutations.add(put2); + // test add Mutations with List + ohBufferMutator.mutate(mutations); + ohBufferMutator.flush(); + + Get get = new Get(toBytes(key)); + Result r = hTable.get(get); + Assert.assertEquals(2, r.raw().length); + for (KeyValue keyValue : r.raw()) { + System.out.println("rowKey: " + new String(keyValue.getRow()) + " family :" + + new String(keyValue.getFamily()) + " columnQualifier:" + + new String(keyValue.getQualifier()) + " timestamp:" + + keyValue.getTimestamp() + " value:" + + new String(keyValue.getValue())); + } + + // test Delete + Delete del = new Delete(Bytes.toBytes(key)); + del.deleteFamily(Bytes.toBytes("family_group")); + // test add Mutation directly + ohBufferMutator.mutate(del); + ohBufferMutator.flush(); + + r = hTable.get(get); + Assert.assertEquals(0, r.raw().length); + + // test NOT_SUPPORT type + Put put = new Put(Bytes.toBytes(key)); + put.addColumn(Bytes.toBytes("family_group"), Bytes.toBytes(column1), timestamp, Bytes.toBytes(value)); + put.addColumn(Bytes.toBytes("family_group1"), Bytes.toBytes(column1 + "1"), timestamp, Bytes.toBytes(value + "4")); + // not support different family in one batch + final BufferedMutator difMut = ohBufferMutator; + final Put difPut = put; + Assert.assertThrows(IllegalArgumentException.class, () -> { + difMut.mutate(difPut); + }); + + Append append = new Append(Bytes.toBytes(key)); + append.add("family_group".getBytes(), column1.getBytes(), toBytes("_suffix")); + // only support Put and Delete + final BufferedMutator apMut = ohBufferMutator; + Assert.assertThrows(IllegalArgumentException.class, () -> { + apMut.mutate(append); + }); + } catch (Exception ex) { + if (ex instanceof RetriesExhaustedWithDetailsException) { + ((RetriesExhaustedWithDetailsException) ex).getCauses().get(0).printStackTrace(); + } else { + ex.printStackTrace(); + } + Assert.assertTrue(false); + } finally { + if (ohBufferMutator != null) { + ohBufferMutator.close(); + } + } + } + + /* + CREATE TABLEGROUP test SHARDING = 'ADAPTIVE'; + CREATE TABLE `test$family_group` ( + `K` varbinary(1024) NOT NULL, + `Q` varbinary(256) NOT NULL, + `T` bigint(20) NOT NULL, + `V` varbinary(1024) DEFAULT NULL, + PRIMARY KEY (`K`, `Q`, `T`) + ) TABLEGROUP = test; + */ + @Test + public void testBufferedMutatorWithAutoFlush() throws Exception { + hTable = new OHTable(conf, "test"); + BufferedMutator ohBufferMutator = null; + BufferedMutatorParams params = null; + long bufferSize = 45000L; + int count = 0; + try (Connection conn = ConnectionFactory.createConnection(conf)) { + // use default database + TableName tableName = TableName.valueOf("test"); + + // set params + params = new BufferedMutatorParams(tableName); + params.writeBufferSize(bufferSize); + + ohBufferMutator = conn.getBufferedMutator(params); + + String key = "putKey"; + String column1 = "putColumn1"; + String value = "value333444"; + long timestamp = System.currentTimeMillis(); + + List mutations = new ArrayList<>(); + for (int i = 0; i < 50; ++i) { + mutations.clear(); + for (int j = 0; j < 4; ++j) { + Put put = new Put(Bytes.toBytes(key)); + put.addColumn(Bytes.toBytes("family_group"), Bytes.toBytes(column1 + "_" + i + "_" + j), + timestamp, Bytes.toBytes(value + "_" + i + "_" + j)); + mutations.add(put); + } + ohBufferMutator.mutate(mutations); + } + } catch (Exception ex) { + if (ex instanceof RetriesExhaustedWithDetailsException) { + ((RetriesExhaustedWithDetailsException) ex).getCauses().get(0).printStackTrace(); + } else { + ex.printStackTrace(); + } + Assert.assertTrue(false); + } finally { + if (ohBufferMutator != null) { + ohBufferMutator.close(); + Get get = new Get(toBytes("putKey")); + Result r = hTable.get(get); + for (KeyValue keyValue : r.raw()) { + ++count; + System.out.println("rowKey: " + new String(keyValue.getRow()) + " family :" + + new String(keyValue.getFamily()) + " columnQualifier:" + + new String(keyValue.getQualifier()) + " timestamp:" + + keyValue.getTimestamp() + " value:" + + new String(keyValue.getValue())); + } + Assert.assertEquals(200, count); + Delete delete = new Delete(toBytes("putKey")); + delete.deleteFamily(toBytes("family_group")); + hTable.delete(delete); + + r = hTable.get(get); + Assert.assertEquals(0, r.raw().length); + } + if (params != null) { + if (params.getPool() != null) { + System.out.println("Check if user's pool is shut down."); + Assert.assertTrue(params.getPool().isShutdown()); + } + } + } + } + + @Test + public void testBufferedMutatorWithUserPool() throws Exception { + hTable = new OHTable(conf, "test"); + BufferedMutator ohBufferMutator = null; + BufferedMutatorParams params = null; + long bufferSize = 45000L; + int count = 0; + try (Connection conn = ConnectionFactory.createConnection(conf)) { + // use default database + TableName tableName = TableName.valueOf("test"); + + // set params + params = new BufferedMutatorParams(tableName); + params.writeBufferSize(bufferSize); + + // set thread pool + long keepAliveTime = conf.getLong("hbase.hconnection.threads.keepalivetime", 60); + ThreadPoolExecutor pool = new ThreadPoolExecutor(10, 256, keepAliveTime, TimeUnit.SECONDS, new SynchronousQueue(), Threads.newDaemonThreadFactory("htable")); + pool.allowCoreThreadTimeOut(true); + params.pool(pool); + + ohBufferMutator = conn.getBufferedMutator(params); + + String key = "putKey"; + String column1 = "putColumn1"; + String value = "value333444"; + long timestamp = System.currentTimeMillis(); + + List mutations = new ArrayList<>(); + for (int i = 0; i < 50; ++i) { + mutations.clear(); + for (int j = 0; j < 4; ++j) { + Put put = new Put(Bytes.toBytes(key)); + put.addColumn(Bytes.toBytes("family_group"), Bytes.toBytes(column1 + "_" + i + "_" + j), + timestamp, Bytes.toBytes(value + "_" + i + "_" + j)); + mutations.add(put); + } + ohBufferMutator.mutate(mutations); + } + } catch (Exception ex) { + if (ex instanceof RetriesExhaustedWithDetailsException) { + ((RetriesExhaustedWithDetailsException) ex).getCauses().get(0).printStackTrace(); + } else { + ex.printStackTrace(); + } + Assert.assertTrue(false); + } finally { + if (ohBufferMutator != null) { + ohBufferMutator.close(); + Get get = new Get(toBytes("putKey")); + Result r = hTable.get(get); + for (KeyValue keyValue : r.raw()) { + ++count; + System.out.println("rowKey: " + new String(keyValue.getRow()) + " family :" + + new String(keyValue.getFamily()) + " columnQualifier:" + + new String(keyValue.getQualifier()) + " timestamp:" + + keyValue.getTimestamp() + " value:" + + new String(keyValue.getValue())); + } + Assert.assertEquals(200, count); + Delete delete = new Delete(toBytes("putKey")); + delete.deleteFamily(toBytes("family_group")); + hTable.delete(delete); + + r = hTable.get(get); + Assert.assertEquals(0, r.raw().length); + } + if (params != null) { + if (params.getPool() != null) { + System.out.println("Check if user's pool is shut down."); + Assert.assertTrue(params.getPool().isShutdown()); + } + } + } + } + + @Test + public void testBufferedMutatorConcurrent() throws Exception { + hTable = new OHTable(conf, "test"); + BufferedMutator ohBufferMutator = null; + BufferedMutatorParams params = null; + ExecutorService executorService = Executors.newFixedThreadPool(10); + long bufferSize = 45000L; + int count = 0; + try (Connection conn = ConnectionFactory.createConnection(conf)) { + // use default database + TableName tableName = TableName.valueOf("test"); + + // set params + params = new BufferedMutatorParams(tableName); + params.writeBufferSize(bufferSize); + + // set thread pool + long keepAliveTime = conf.getLong("hbase.hconnection.threads.keepalivetime", 60); + ThreadPoolExecutor pool = new ThreadPoolExecutor(10, 256, keepAliveTime, TimeUnit.SECONDS, new SynchronousQueue(), Threads.newDaemonThreadFactory("htable")); + pool.allowCoreThreadTimeOut(true); + params.pool(pool); + + ohBufferMutator = conn.getBufferedMutator(params); + + String key = "putKey"; + String column1 = "putColumn1"; + String value = "value333444"; + long timestamp = System.currentTimeMillis(); + + for (int i = 0; i < 50; ++i) { + final int taskId = i; + final BufferedMutator thrBufferMutator = ohBufferMutator; + executorService.submit(() -> { + List mutations = new ArrayList<>(); + for (int j = 0; j < 4; ++j) { + String thrKey = key; + String thrColumn = column1 + "_" + taskId + "_" + j; + String thrValue = value + "_" + taskId + "_" + j; + long thrTimestamp = timestamp; + + Put put = new Put(Bytes.toBytes(thrKey)); + put.addColumn(Bytes.toBytes("family_group"), Bytes.toBytes(thrColumn), + thrTimestamp, Bytes.toBytes(thrValue)); + mutations.add(put); + } + try { + thrBufferMutator.mutate(mutations); + } catch (Exception ex) { + if (ex instanceof RetriesExhaustedWithDetailsException) { + ((RetriesExhaustedWithDetailsException) ex).getCauses().get(0).printStackTrace(); + } else { + ex.printStackTrace(); + } + Assert.assertTrue(false); + } + }); + } + } catch (Exception ex) { + if (ex instanceof RetriesExhaustedWithDetailsException) { + ((RetriesExhaustedWithDetailsException) ex).getCauses().get(0).printStackTrace(); + } else { + ex.printStackTrace(); + } + Assert.assertTrue(false); + } finally { + executorService.shutdown(); + try { + if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) { + executorService.shutdownNow(); + } + } catch (InterruptedException e) { + executorService.shutdownNow(); + Thread.currentThread().interrupt(); + } + if (ohBufferMutator != null) { + ohBufferMutator.close(); + Get get = new Get(toBytes("putKey")); + Result r = hTable.get(get); + for (KeyValue keyValue : r.raw()) { + ++count; + System.out.println("rowKey: " + new String(keyValue.getRow()) + " family :" + + new String(keyValue.getFamily()) + " columnQualifier:" + + new String(keyValue.getQualifier()) + " timestamp:" + + keyValue.getTimestamp() + " value:" + + new String(keyValue.getValue())); + } + Assert.assertEquals(200, count); + Delete delete = new Delete(toBytes("putKey")); + delete.deleteFamily(toBytes("family_group")); + hTable.delete(delete); + + r = hTable.get(get); + Assert.assertEquals(0, r.raw().length); + } + if (params != null) { + if (params.getPool() != null) { + System.out.println("Check if user's pool is shut down."); + Assert.assertTrue(params.getPool().isShutdown()); + } + } + } + } +} From 3d9c6f19ecd2a60dddeb453d6e02d99e58c4ab36 Mon Sep 17 00:00:00 2001 From: JackShi148 Date: Thu, 12 Sep 2024 16:11:38 +0800 Subject: [PATCH 05/12] format code --- .../hbase/util/OHBufferedMutatorImpl.java | 66 ++++++++++--------- .../hbase/OHBufferedMutatorTest.java | 6 +- 2 files changed, 38 insertions(+), 34 deletions(-) diff --git a/src/main/java/com/alipay/oceanbase/hbase/util/OHBufferedMutatorImpl.java b/src/main/java/com/alipay/oceanbase/hbase/util/OHBufferedMutatorImpl.java index e4a8dcd6..bccfc9e9 100644 --- a/src/main/java/com/alipay/oceanbase/hbase/util/OHBufferedMutatorImpl.java +++ b/src/main/java/com/alipay/oceanbase/hbase/util/OHBufferedMutatorImpl.java @@ -54,36 +54,38 @@ @InterfaceAudience.Private public class OHBufferedMutatorImpl implements BufferedMutator { - private static final Logger LOGGER = TableHBaseLoggerFactory - .getLogger(OHBufferedMutatorImpl.class); + private static final Logger LOGGER = TableHBaseLoggerFactory + .getLogger(OHBufferedMutatorImpl.class); - private static final int BUFFERED_PARAM_UNSET = -1; + private static final int BUFFERED_PARAM_UNSET = -1; - private final ExceptionListener listener; + private final ExceptionListener listener; - protected final ObTableClient obTableClient; - private final TableName tableName; - private volatile Configuration conf; + protected final ObTableClient obTableClient; + private final TableName tableName; + private volatile Configuration conf; private final OHConnectionConfiguration connectionConfig; @VisibleForTesting - final ConcurrentLinkedQueue asyncWriteBuffer = new ConcurrentLinkedQueue(); + final ConcurrentLinkedQueue asyncWriteBuffer = new ConcurrentLinkedQueue(); @VisibleForTesting - AtomicLong currentAsyncBufferSize = new AtomicLong(0); + AtomicLong currentAsyncBufferSize = new AtomicLong(0); - private final long writeBufferSize; - private final int maxKeyValueSize; - private boolean closed = false; - private final ExecutorService pool; - private final int rpcTimeout; - private final int operationTimeout; + private final long writeBufferSize; + private final int maxKeyValueSize; + private boolean closed = false; + private final ExecutorService pool; + private final int rpcTimeout; + private final int operationTimeout; - public OHBufferedMutatorImpl(OHConnectionImpl ohConnection, BufferedMutatorParams params) throws IOException { + public OHBufferedMutatorImpl(OHConnectionImpl ohConnection, BufferedMutatorParams params) + throws IOException { if (ohConnection == null || ohConnection.isClosed()) { throw new IllegalArgumentException("Connection is null or closed."); } // create a ObTableClient to do rpc operations - this.obTableClient = ObTableClientManager.getOrCreateObTableClient(ohConnection.getOHConnectionConfiguration()); + this.obTableClient = ObTableClientManager.getOrCreateObTableClient(ohConnection + .getOHConnectionConfiguration()); // init params in OHBufferedMutatorImpl: // TableName + pool + Configuration + listener + writeBufferSize + maxKeyValueSize + rpcTimeout + operationTimeout @@ -93,10 +95,10 @@ public OHBufferedMutatorImpl(OHConnectionImpl ohConnection, BufferedMutatorParam this.listener = params.getListener(); this.pool = params.getPool(); - this.writeBufferSize = params.getWriteBufferSize() != BUFFERED_PARAM_UNSET ? - params.getWriteBufferSize() : connectionConfig.getWriteBufferSize(); - this.maxKeyValueSize = params.getMaxKeyValueSize() != BUFFERED_PARAM_UNSET ? - params.getMaxKeyValueSize() : connectionConfig.getMaxKeyValueSize(); + this.writeBufferSize = params.getWriteBufferSize() != BUFFERED_PARAM_UNSET ? params + .getWriteBufferSize() : connectionConfig.getWriteBufferSize(); + this.maxKeyValueSize = params.getMaxKeyValueSize() != BUFFERED_PARAM_UNSET ? params + .getMaxKeyValueSize() : connectionConfig.getMaxKeyValueSize(); this.rpcTimeout = connectionConfig.getRpcTimeout(); this.obTableClient.setRpcExecuteTimeout(rpcTimeout); this.operationTimeout = connectionConfig.getOperationTimeout(); @@ -119,7 +121,7 @@ public Configuration getConfiguration() { */ @Override public void mutate(Mutation mutation) throws InterruptedIOException, - RetriesExhaustedWithDetailsException { + RetriesExhaustedWithDetailsException { mutate(Collections.singletonList(mutation)); } @@ -130,7 +132,7 @@ public void mutate(Mutation mutation) throws InterruptedIOException, */ @Override public void mutate(List mutations) throws InterruptedIOException, - RetriesExhaustedWithDetailsException { + RetriesExhaustedWithDetailsException { // add the mutations into writeAsyncBuffer // atomically add size of mutations into currentWriteBufferSize // do the flush if currentWriteBufferSize > writeBufferSize @@ -180,8 +182,7 @@ boolean validateSameFamily(List mutations) { family = asyncWriteBuffer.peek().getFamilyMap().firstKey(); } for (Mutation mutation : mutations) { - if (mutation.getFamilyMap() == null - || mutation.getFamilyMap().keySet().isEmpty()) { + if (mutation.getFamilyMap() == null || mutation.getFamilyMap().keySet().isEmpty()) { throw new IllegalArgumentException("Family is not provided in batch operations."); } if (mutation.getFamilyMap().keySet().size() > 1) { @@ -267,7 +268,8 @@ public void close() throws IOException { this.pool.shutdown(); try { if (!pool.awaitTermination(600, TimeUnit.SECONDS)) { - LOGGER.warn("close() failed to terminate pool after 10 minutes. Abandoning pool."); + LOGGER + .warn("close() failed to terminate pool after 10 minutes. Abandoning pool."); } } catch (InterruptedException e) { LOGGER.warn("waitForTermination interrupted"); @@ -330,17 +332,17 @@ private ObTableOperation buildObTableOperation(KeyValue kv, boolean putToAppend) operationType = INSERT_OR_UPDATE; } return getInstance(operationType, - new Object[] { kv.getRow(), kv.getQualifier(), kv.getTimestamp() }, V_COLUMNS, - new Object[] { kv.getValue() }); + new Object[] { kv.getRow(), kv.getQualifier(), kv.getTimestamp() }, V_COLUMNS, + new Object[] { kv.getValue() }); case Delete: return getInstance(DEL, - new Object[] { kv.getRow(), kv.getQualifier(), kv.getTimestamp() }, null, null); + new Object[] { kv.getRow(), kv.getQualifier(), kv.getTimestamp() }, null, null); case DeleteColumn: return getInstance(DEL, - new Object[] { kv.getRow(), kv.getQualifier(), -kv.getTimestamp() }, null, null); + new Object[] { kv.getRow(), kv.getQualifier(), -kv.getTimestamp() }, null, null); case DeleteFamily: return getInstance(DEL, new Object[] { kv.getRow(), null, -kv.getTimestamp() }, - null, null); + null, null); default: throw new IllegalArgumentException("illegal mutation type " + kvType); } @@ -389,7 +391,7 @@ private String getTargetTableName(String tableNameString, String familyString) { private String getTestLoadTargetTableName(String tableNameString, String familyString) { String suffix = conf.get(HBASE_HTABLE_TEST_LOAD_SUFFIX, - DEFAULT_HBASE_HTABLE_TEST_LOAD_SUFFIX); + DEFAULT_HBASE_HTABLE_TEST_LOAD_SUFFIX); return tableNameString + suffix + "$" + familyString; } diff --git a/src/test/java/com/alipay/oceanbase/hbase/OHBufferedMutatorTest.java b/src/test/java/com/alipay/oceanbase/hbase/OHBufferedMutatorTest.java index 26fc2308..e395e932 100644 --- a/src/test/java/com/alipay/oceanbase/hbase/OHBufferedMutatorTest.java +++ b/src/test/java/com/alipay/oceanbase/hbase/OHBufferedMutatorTest.java @@ -21,13 +21,15 @@ import static org.apache.hadoop.hbase.util.Bytes.toBytes; public class OHBufferedMutatorTest { - protected Table hTable; - protected Configuration conf; + protected Table hTable; + protected Configuration conf; + @Before public void setup() throws IOException { conf = ObHTableTestUtil.newConfiguration(); conf.set("rs.list.acquire.read.timeout", "10000"); } + /* CREATE TABLEGROUP test SHARDING = 'ADAPTIVE'; CREATE TABLE `test$family_group` ( From 9d6cb88d5033e72e753e81d2df4bff15b4ab2257 Mon Sep 17 00:00:00 2001 From: JackShi148 Date: Fri, 13 Sep 2024 10:36:37 +0800 Subject: [PATCH 06/12] add retry when batch fails --- .../com/alipay/oceanbase/hbase/OHTable.java | 16 +- .../hbase/util/OHBufferedMutatorImpl.java | 219 +++----- .../hbase/util/OHConnectionImpl.java | 2 +- .../hbase/OHBufferedMutatorTest.java | 471 ------------------ .../oceanbase/hbase/OHConnectionTest.java | 428 ++++++++++++++++ 5 files changed, 516 insertions(+), 620 deletions(-) delete mode 100644 src/test/java/com/alipay/oceanbase/hbase/OHBufferedMutatorTest.java diff --git a/src/main/java/com/alipay/oceanbase/hbase/OHTable.java b/src/main/java/com/alipay/oceanbase/hbase/OHTable.java index 7efb7669..b2aac800 100644 --- a/src/main/java/com/alipay/oceanbase/hbase/OHTable.java +++ b/src/main/java/com/alipay/oceanbase/hbase/OHTable.java @@ -1475,9 +1475,9 @@ private ObTableQuery buildObTableQuery(ObHTableFilter filter, final Scan scan) { return obTableQuery; } - private ObTableBatchOperation buildObTableBatchOperation(List keyValueList, - boolean putToAppend, - List qualifiers) { + public static ObTableBatchOperation buildObTableBatchOperation(List keyValueList, + boolean putToAppend, + List qualifiers) { ObTableBatchOperation batch = new ObTableBatchOperation(); for (KeyValue kv : keyValueList) { if (qualifiers != null) { @@ -1534,7 +1534,7 @@ private BatchOperation buildBatchOperation(String tableName, List keyV return batch; } - private ObTableOperation buildObTableOperation(KeyValue kv, boolean putToAppend) { + public static ObTableOperation buildObTableOperation(KeyValue kv, boolean putToAppend) { KeyValue.Type kvType = KeyValue.Type.codeToType(kv.getType()); switch (kvType) { case Put: @@ -1583,13 +1583,15 @@ private ObTableQueryAsyncRequest buildObTableQueryAsyncRequest(ObTableQuery obTa return asyncRequest; } - private ObTableBatchOperationRequest buildObTableBatchOperationRequest(ObTableBatchOperation obTableBatchOperation, - String targetTableName) { + public static ObTableBatchOperationRequest buildObTableBatchOperationRequest(ObTableBatchOperation obTableBatchOperation, + String targetTableName, + ExecutorService pool) { ObTableBatchOperationRequest request = new ObTableBatchOperationRequest(); request.setTableName(targetTableName); request.setReturningAffectedRows(true); request.setEntityType(ObTableEntityType.HKV); request.setBatchOperation(obTableBatchOperation); + request.setPool(pool); return request; } @@ -1607,7 +1609,7 @@ private ObTableQueryAndMutateRequest buildObTableQueryAndMutateRequest(ObTableQu return request; } - private void checkFamilyViolation(Collection families) { + public static void checkFamilyViolation(Collection families) { if (families == null || families.size() == 0) { throw new FeatureNotSupportedException("family is empty."); } diff --git a/src/main/java/com/alipay/oceanbase/hbase/util/OHBufferedMutatorImpl.java b/src/main/java/com/alipay/oceanbase/hbase/util/OHBufferedMutatorImpl.java index bccfc9e9..34863027 100644 --- a/src/main/java/com/alipay/oceanbase/hbase/util/OHBufferedMutatorImpl.java +++ b/src/main/java/com/alipay/oceanbase/hbase/util/OHBufferedMutatorImpl.java @@ -57,8 +57,6 @@ public class OHBufferedMutatorImpl implements BufferedMutator { private static final Logger LOGGER = TableHBaseLoggerFactory .getLogger(OHBufferedMutatorImpl.class); - private static final int BUFFERED_PARAM_UNSET = -1; - private final ExceptionListener listener; protected final ObTableClient obTableClient; @@ -66,9 +64,7 @@ public class OHBufferedMutatorImpl implements BufferedMutator { private volatile Configuration conf; private final OHConnectionConfiguration connectionConfig; - @VisibleForTesting final ConcurrentLinkedQueue asyncWriteBuffer = new ConcurrentLinkedQueue(); - @VisibleForTesting AtomicLong currentAsyncBufferSize = new AtomicLong(0); private final long writeBufferSize; @@ -95,9 +91,9 @@ public OHBufferedMutatorImpl(OHConnectionImpl ohConnection, BufferedMutatorParam this.listener = params.getListener(); this.pool = params.getPool(); - this.writeBufferSize = params.getWriteBufferSize() != BUFFERED_PARAM_UNSET ? params + this.writeBufferSize = params.getWriteBufferSize() != OHConnectionImpl.BUFFERED_PARAM_UNSET ? params .getWriteBufferSize() : connectionConfig.getWriteBufferSize(); - this.maxKeyValueSize = params.getMaxKeyValueSize() != BUFFERED_PARAM_UNSET ? params + this.maxKeyValueSize = params.getMaxKeyValueSize() != OHConnectionImpl.BUFFERED_PARAM_UNSET ? params .getMaxKeyValueSize() : connectionConfig.getMaxKeyValueSize(); this.rpcTimeout = connectionConfig.getRpcTimeout(); this.obTableClient.setRpcExecuteTimeout(rpcTimeout); @@ -131,7 +127,8 @@ public void mutate(Mutation mutation) throws InterruptedIOException, * @param mutations - mutation operations */ @Override - public void mutate(List mutations) throws InterruptedIOException, + public void mutate(List mutations) throws IllegalArgumentException, + InterruptedIOException, RetriesExhaustedWithDetailsException { // add the mutations into writeAsyncBuffer // atomically add size of mutations into currentWriteBufferSize @@ -142,16 +139,9 @@ public void mutate(List mutations) throws InterruptedIOExcep long toAddSize = 0; // check if every mutation's family is the same - if (!validateSameFamily(mutations)) { - throw new IllegalArgumentException("Family should keep the same in one batch."); - } for (Mutation m : mutations) { - if (!validateInsUpAndDelete(m)) { - throw new IllegalArgumentException("Only support for Put and Delete for now."); - } - if (m instanceof Put) { - HTable.validatePut((Put) m, maxKeyValueSize); - } + OHTable.checkFamilyViolation(m.getFamilyMap().keySet()); + validateInsUpAndDelete(m); toAddSize += m.heapSize(); } @@ -165,37 +155,27 @@ public void mutate(List mutations) throws InterruptedIOExcep * Check whether the mutation is Put or Delete in 1.x * @param mt - mutation operation */ - boolean validateInsUpAndDelete(Mutation mt) { + private void validateInsUpAndDelete(Mutation mt) throws IllegalArgumentException { if (!(mt instanceof Put) && !(mt instanceof Delete)) { - return false; + throw new IllegalArgumentException("Only support for Put and Delete for now."); + } + if (mt instanceof Put) { + HTable.validatePut((Put) mt, maxKeyValueSize); } - return true; } /** - * Check whether the family in this batch is the same - * @param mutations - mutation operations + * Check whether the mutations in this batch are the same type + * @param execBuffer - mutation operations */ - boolean validateSameFamily(List mutations) { - byte[] family = null; - if (!asyncWriteBuffer.isEmpty()) { - family = asyncWriteBuffer.peek().getFamilyMap().firstKey(); - } - for (Mutation mutation : mutations) { - if (mutation.getFamilyMap() == null || mutation.getFamilyMap().keySet().isEmpty()) { - throw new IllegalArgumentException("Family is not provided in batch operations."); - } - if (mutation.getFamilyMap().keySet().size() > 1) { - return false; - } - if (family != null) { - byte[] curFamily = mutation.getFamilyMap().firstKey(); - if (!Bytes.equals(family, curFamily)) { - return false; - } + private void checkAllOpsIsSameType(LinkedList execBuffer) { + Class type = execBuffer.get(0).getClass(); + for (Mutation m : execBuffer) { + Class curType = m.getClass(); + if (type != curType) { + throw new IllegalArgumentException("Not support different type in one batch."); } } - return true; } /** @@ -208,35 +188,69 @@ boolean validateSameFamily(List mutations) { private void asyncExecute(boolean flushAll) throws InterruptedIOException, RetriesExhaustedWithDetailsException { + LinkedList execBuffer = new LinkedList<>(); + ObTableBatchOperationRequest request = null; + // namespace n1, n1:table_name + // namespace default, table_name + String tableNameString = tableName.getNameAsString(); try { while (true) { - if (!flushAll || asyncWriteBuffer.isEmpty()) { - if (currentAsyncBufferSize.get() <= writeBufferSize) { + try{ + if (!flushAll || asyncWriteBuffer.isEmpty()) { + if (currentAsyncBufferSize.get() <= writeBufferSize) { + break; + } + } + System.out.println("Flush background."); + System.out.println("Threshold: " + writeBufferSize + ", flush size: " + currentAsyncBufferSize.get()); + Mutation m; + while ((m = asyncWriteBuffer.poll()) != null) { + execBuffer.add(m); + long size = m.heapSize(); + currentAsyncBufferSize.addAndGet(-size); + } + // in concurrent situation, asyncWriteBuffer may be empty here + // for other threads flush all buffer + if (execBuffer.isEmpty()) { break; } + checkAllOpsIsSameType(execBuffer); + // for now, operations' family is the same + byte[] family = execBuffer.getFirst().getFamilyMap().firstKey(); + ObTableBatchOperation batch = buildObTableBatchOperation(execBuffer); + // table_name$cf_name + String targetTableName = getTargetTableName(tableNameString, Bytes.toString(family)); + request = OHTable.buildObTableBatchOperationRequest(batch, targetTableName, pool); + } catch (Exception ex) { + LOGGER.error("Errors occur before mutation operation", ex); + throw ex; } - Mutation m; - LinkedList execBuffer = new LinkedList<>(); - while ((m = asyncWriteBuffer.poll()) != null) { - execBuffer.add(m); - long size = m.heapSize(); - currentAsyncBufferSize.addAndGet(-size); - } - // in concurrent situation, asyncWriteBuffer may be empty here - // for other threads flush all buffer - if (execBuffer.isEmpty()) { - break; + try { + ObTableBatchOperationResult result = (ObTableBatchOperationResult) obTableClient.execute(request); + } catch (Exception ex) { + LOGGER.debug("Errors occur during mutation operation", ex); + try { + // retry every single operation + while (!execBuffer.isEmpty()) { + // poll elements from execBuffer to recollect remaining operations + Mutation m = execBuffer.poll(); + byte[] family = m.getFamilyMap().firstKey(); + ObTableBatchOperation batch = buildObTableBatchOperation(new LinkedList<>(Collections.singletonList(m))); + String targetTableName = getTargetTableName(tableNameString, Bytes.toString(family)); + request = OHTable.buildObTableBatchOperationRequest(batch, targetTableName, pool); + ObTableBatchOperationResult result = (ObTableBatchOperationResult) obTableClient.execute(request); + } + } catch (Exception newEx) { + // if retry fails, only recollect remaining operations + while(!execBuffer.isEmpty()) { + Mutation m = execBuffer.poll(); + long size = m.heapSize(); + asyncWriteBuffer.add(m); + currentAsyncBufferSize.addAndGet(size); + } + throw newEx; + } } - // namespace n1, n1:table_name - // namespace default, table_name - String tableNameString = tableName.getNameAsString(); - // for now, operations' family is the same - byte[] family = execBuffer.getFirst().getFamilyMap().firstKey(); - ObTableBatchOperation batch = buildObTableBatchOperation(execBuffer); - // table_name$cf_name - String targetTableName = getTargetTableName(tableNameString, Bytes.toString(family)); - ObTableBatchOperationRequest request = buildObTableBatchOperationRequest(batch, targetTableName, pool); - ObTableBatchOperationResult result = (ObTableBatchOperationResult) obTableClient.execute(request); } } catch (Exception ex) { LOGGER.error(LCD.convert("01-00026"), ex); @@ -281,12 +295,10 @@ public void close() throws IOException { /** * Force to commit all operations + * do not care whether the pool is shut down or this BufferedMutator is closed */ @Override public void flush() throws IOException { - if (closed) { - throw new IllegalStateException("Cannot put when the BufferedMutator is closed."); - } asyncExecute(true); } @@ -298,86 +310,11 @@ public long getWriteBufferSize() { private ObTableBatchOperation buildObTableBatchOperation(LinkedList execBuffer) { List keyValueList = new LinkedList<>(); for (Mutation mutation : execBuffer) { - checkFamilyViolation(mutation.getFamilyMap().keySet()); for (Map.Entry> entry : mutation.getFamilyMap().entrySet()) { keyValueList.addAll(entry.getValue()); } } - return buildObTableBatchOperation(keyValueList, false, null); - } - - private ObTableBatchOperation buildObTableBatchOperation(List keyValueList, - boolean putToAppend, - List qualifiers) { - ObTableBatchOperation batch = new ObTableBatchOperation(); - for (KeyValue kv : keyValueList) { - if (qualifiers != null) { - qualifiers.add(kv.getQualifier()); - } - batch.addTableOperation(buildObTableOperation(kv, putToAppend)); - } - batch.setSameType(true); - batch.setSamePropertiesNames(true); - return batch; - } - - private ObTableOperation buildObTableOperation(KeyValue kv, boolean putToAppend) { - KeyValue.Type kvType = KeyValue.Type.codeToType(kv.getType()); - switch (kvType) { - case Put: - ObTableOperationType operationType; - if (putToAppend) { - operationType = APPEND; - } else { - operationType = INSERT_OR_UPDATE; - } - return getInstance(operationType, - new Object[] { kv.getRow(), kv.getQualifier(), kv.getTimestamp() }, V_COLUMNS, - new Object[] { kv.getValue() }); - case Delete: - return getInstance(DEL, - new Object[] { kv.getRow(), kv.getQualifier(), kv.getTimestamp() }, null, null); - case DeleteColumn: - return getInstance(DEL, - new Object[] { kv.getRow(), kv.getQualifier(), -kv.getTimestamp() }, null, null); - case DeleteFamily: - return getInstance(DEL, new Object[] { kv.getRow(), null, -kv.getTimestamp() }, - null, null); - default: - throw new IllegalArgumentException("illegal mutation type " + kvType); - } - } - - private ObTableBatchOperationRequest buildObTableBatchOperationRequest(ObTableBatchOperation obTableBatchOperation, - String targetTableName, - ExecutorService pool) { - ObTableBatchOperationRequest request = new ObTableBatchOperationRequest(); - request.setTableName(targetTableName); - request.setReturningAffectedRows(true); - request.setEntityType(ObTableEntityType.HKV); - request.setBatchOperation(obTableBatchOperation); - request.setPool(pool); - return request; - } - - private void checkFamilyViolation(Collection families) { - if (families == null || families.size() == 0) { - throw new FeatureNotSupportedException("family is empty."); - } - - if (families.size() > 1) { - throw new FeatureNotSupportedException("multi family is not supported yet."); - } - - for (byte[] family : families) { - if (family == null || family.length == 0) { - throw new IllegalArgumentException("family is empty"); - } - if (isBlank(Bytes.toString(family))) { - throw new IllegalArgumentException("family is blank"); - } - } - + return OHTable.buildObTableBatchOperation(keyValueList, false, null); } private String getTargetTableName(String tableNameString, String familyString) { diff --git a/src/main/java/com/alipay/oceanbase/hbase/util/OHConnectionImpl.java b/src/main/java/com/alipay/oceanbase/hbase/util/OHConnectionImpl.java index 77eea825..4256f316 100644 --- a/src/main/java/com/alipay/oceanbase/hbase/util/OHConnectionImpl.java +++ b/src/main/java/com/alipay/oceanbase/hbase/util/OHConnectionImpl.java @@ -40,7 +40,7 @@ public class OHConnectionImpl implements Connection { private static final Marker FATAL = MarkerFactory.getMarker("FATAL"); - private static final int BUFFERED_PARAM_UNSET = -1; + public static final int BUFFERED_PARAM_UNSET = -1; private volatile boolean closed; private volatile boolean aborted; diff --git a/src/test/java/com/alipay/oceanbase/hbase/OHBufferedMutatorTest.java b/src/test/java/com/alipay/oceanbase/hbase/OHBufferedMutatorTest.java deleted file mode 100644 index e395e932..00000000 --- a/src/test/java/com/alipay/oceanbase/hbase/OHBufferedMutatorTest.java +++ /dev/null @@ -1,471 +0,0 @@ -package com.alipay.oceanbase.hbase; - -import com.alipay.oceanbase.hbase.util.OHBufferedMutatorImpl; -import net.bytebuddy.implementation.bytecode.Throw; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.*; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.Threads; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.*; - -import static org.apache.hadoop.hbase.util.Bytes.toBytes; - -public class OHBufferedMutatorTest { - protected Table hTable; - protected Configuration conf; - - @Before - public void setup() throws IOException { - conf = ObHTableTestUtil.newConfiguration(); - conf.set("rs.list.acquire.read.timeout", "10000"); - } - - /* - CREATE TABLEGROUP test SHARDING = 'ADAPTIVE'; - CREATE TABLE `test$family_group` ( - `K` varbinary(1024) NOT NULL, - `Q` varbinary(256) NOT NULL, - `T` bigint(20) NOT NULL, - `V` varbinary(1024) DEFAULT NULL, - PRIMARY KEY (`K`, `Q`, `T`) - ) TABLEGROUP = test; - */ - @Test - public void testBufferedMutatorWithFlush() throws Exception { - hTable = new OHTable(conf, "test"); - BufferedMutator ohBufferMutator = null; - try (Connection conn = ConnectionFactory.createConnection(conf)) { - TableName tableName = TableName.valueOf("test"); - - // use defualt params - ohBufferMutator = conn.getBufferedMutator(tableName); - - String key = "putKey"; - String column1 = "putColumn1"; - String value = "value333444"; - long timestamp = System.currentTimeMillis(); - - List mutations = new ArrayList<>(); - // test Put - Put put1 = new Put(Bytes.toBytes(key)); - put1.addColumn(Bytes.toBytes("family_group"), Bytes.toBytes(column1), timestamp, Bytes.toBytes(value)); - mutations.add(put1); - Put put2 = new Put(Bytes.toBytes(key)); - put2.addColumn(Bytes.toBytes("family_group"), Bytes.toBytes(column1 + "1"), timestamp, Bytes.toBytes(value + "4")); - mutations.add(put2); - // test add Mutations with List - ohBufferMutator.mutate(mutations); - ohBufferMutator.flush(); - - Get get = new Get(toBytes(key)); - Result r = hTable.get(get); - Assert.assertEquals(2, r.raw().length); - for (KeyValue keyValue : r.raw()) { - System.out.println("rowKey: " + new String(keyValue.getRow()) + " family :" - + new String(keyValue.getFamily()) + " columnQualifier:" - + new String(keyValue.getQualifier()) + " timestamp:" - + keyValue.getTimestamp() + " value:" - + new String(keyValue.getValue())); - } - - Delete del = new Delete(Bytes.toBytes(key)); - del.deleteFamily(Bytes.toBytes("family_group")); - // test add Mutation directly - ohBufferMutator.mutate(del); - ohBufferMutator.flush(); - - r = hTable.get(get); - Assert.assertEquals(0, r.raw().length); - - // test NOT_SUPPORT type' - Put put = new Put(Bytes.toBytes(key)); - put.addColumn(Bytes.toBytes("family_group"), Bytes.toBytes(column1), timestamp, Bytes.toBytes(value)); - put.addColumn(Bytes.toBytes("family_group1"), Bytes.toBytes(column1 + "1"), timestamp, Bytes.toBytes(value + "4")); - // not support different family in one batch - final BufferedMutator difMut = ohBufferMutator; - final Put difPut= put; - Assert.assertThrows(IllegalArgumentException.class, () -> { - difMut.mutate(difPut); - }); - - Append append = new Append(Bytes.toBytes(key)); - append.add("family_group".getBytes(), column1.getBytes(), toBytes("_suffix")); - // only support Put and Delete - final BufferedMutator apMut = ohBufferMutator; - Assert.assertThrows(IllegalArgumentException.class, () -> { - apMut.mutate(append); - }); - } catch (Exception ex) { - if (ex instanceof RetriesExhaustedWithDetailsException) { - ((RetriesExhaustedWithDetailsException) ex).getCauses().get(0).printStackTrace(); - } else { - ex.printStackTrace(); - } - Assert.assertTrue(false); - } finally { - if (ohBufferMutator != null) { - ohBufferMutator.close(); - } - } - } - - /* - CREATE TABLEGROUP n1:test SHARDING = 'ADAPTIVE'; - CREATE TABLE `n1:test$family_group` ( - `K` varbinary(1024) NOT NULL, - `Q` varbinary(256) NOT NULL, - `T` bigint(20) NOT NULL, - `V` varbinary(1024) DEFAULT NULL, - PRIMARY KEY (`K`, `Q`, `T`) - ) TABLEGROUP = n1:test; - */ - @Test - public void testBufferedMutatorUseNameSpaceWithFlush() throws Exception { - hTable = new OHTable(conf, "n1:test"); - BufferedMutator ohBufferMutator = null; - try (Connection conn = ConnectionFactory.createConnection(conf)) { - // in "n1" database - TableName tableName = TableName.valueOf("n1:test"); - - // use defualt params - ohBufferMutator = conn.getBufferedMutator(tableName); - - String key = "putKey"; - String column1 = "putColumn1"; - String value = "value333444"; - long timestamp = System.currentTimeMillis(); - - List mutations = new ArrayList<>(); - // test Put - Put put1 = new Put(Bytes.toBytes(key)); - put1.addColumn(Bytes.toBytes("family_group"), Bytes.toBytes(column1), timestamp, Bytes.toBytes(value)); - mutations.add(put1); - Put put2 = new Put(Bytes.toBytes(key)); - put2.addColumn(Bytes.toBytes("family_group"), Bytes.toBytes(column1 + "1"), timestamp, Bytes.toBytes(value + "4")); - mutations.add(put2); - // test add Mutations with List - ohBufferMutator.mutate(mutations); - ohBufferMutator.flush(); - - Get get = new Get(toBytes(key)); - Result r = hTable.get(get); - Assert.assertEquals(2, r.raw().length); - for (KeyValue keyValue : r.raw()) { - System.out.println("rowKey: " + new String(keyValue.getRow()) + " family :" - + new String(keyValue.getFamily()) + " columnQualifier:" - + new String(keyValue.getQualifier()) + " timestamp:" - + keyValue.getTimestamp() + " value:" - + new String(keyValue.getValue())); - } - - // test Delete - Delete del = new Delete(Bytes.toBytes(key)); - del.deleteFamily(Bytes.toBytes("family_group")); - // test add Mutation directly - ohBufferMutator.mutate(del); - ohBufferMutator.flush(); - - r = hTable.get(get); - Assert.assertEquals(0, r.raw().length); - - // test NOT_SUPPORT type - Put put = new Put(Bytes.toBytes(key)); - put.addColumn(Bytes.toBytes("family_group"), Bytes.toBytes(column1), timestamp, Bytes.toBytes(value)); - put.addColumn(Bytes.toBytes("family_group1"), Bytes.toBytes(column1 + "1"), timestamp, Bytes.toBytes(value + "4")); - // not support different family in one batch - final BufferedMutator difMut = ohBufferMutator; - final Put difPut = put; - Assert.assertThrows(IllegalArgumentException.class, () -> { - difMut.mutate(difPut); - }); - - Append append = new Append(Bytes.toBytes(key)); - append.add("family_group".getBytes(), column1.getBytes(), toBytes("_suffix")); - // only support Put and Delete - final BufferedMutator apMut = ohBufferMutator; - Assert.assertThrows(IllegalArgumentException.class, () -> { - apMut.mutate(append); - }); - } catch (Exception ex) { - if (ex instanceof RetriesExhaustedWithDetailsException) { - ((RetriesExhaustedWithDetailsException) ex).getCauses().get(0).printStackTrace(); - } else { - ex.printStackTrace(); - } - Assert.assertTrue(false); - } finally { - if (ohBufferMutator != null) { - ohBufferMutator.close(); - } - } - } - - /* - CREATE TABLEGROUP test SHARDING = 'ADAPTIVE'; - CREATE TABLE `test$family_group` ( - `K` varbinary(1024) NOT NULL, - `Q` varbinary(256) NOT NULL, - `T` bigint(20) NOT NULL, - `V` varbinary(1024) DEFAULT NULL, - PRIMARY KEY (`K`, `Q`, `T`) - ) TABLEGROUP = test; - */ - @Test - public void testBufferedMutatorWithAutoFlush() throws Exception { - hTable = new OHTable(conf, "test"); - BufferedMutator ohBufferMutator = null; - BufferedMutatorParams params = null; - long bufferSize = 45000L; - int count = 0; - try (Connection conn = ConnectionFactory.createConnection(conf)) { - // use default database - TableName tableName = TableName.valueOf("test"); - - // set params - params = new BufferedMutatorParams(tableName); - params.writeBufferSize(bufferSize); - - ohBufferMutator = conn.getBufferedMutator(params); - - String key = "putKey"; - String column1 = "putColumn1"; - String value = "value333444"; - long timestamp = System.currentTimeMillis(); - - List mutations = new ArrayList<>(); - for (int i = 0; i < 50; ++i) { - mutations.clear(); - for (int j = 0; j < 4; ++j) { - Put put = new Put(Bytes.toBytes(key)); - put.addColumn(Bytes.toBytes("family_group"), Bytes.toBytes(column1 + "_" + i + "_" + j), - timestamp, Bytes.toBytes(value + "_" + i + "_" + j)); - mutations.add(put); - } - ohBufferMutator.mutate(mutations); - } - } catch (Exception ex) { - if (ex instanceof RetriesExhaustedWithDetailsException) { - ((RetriesExhaustedWithDetailsException) ex).getCauses().get(0).printStackTrace(); - } else { - ex.printStackTrace(); - } - Assert.assertTrue(false); - } finally { - if (ohBufferMutator != null) { - ohBufferMutator.close(); - Get get = new Get(toBytes("putKey")); - Result r = hTable.get(get); - for (KeyValue keyValue : r.raw()) { - ++count; - System.out.println("rowKey: " + new String(keyValue.getRow()) + " family :" - + new String(keyValue.getFamily()) + " columnQualifier:" - + new String(keyValue.getQualifier()) + " timestamp:" - + keyValue.getTimestamp() + " value:" - + new String(keyValue.getValue())); - } - Assert.assertEquals(200, count); - Delete delete = new Delete(toBytes("putKey")); - delete.deleteFamily(toBytes("family_group")); - hTable.delete(delete); - - r = hTable.get(get); - Assert.assertEquals(0, r.raw().length); - } - if (params != null) { - if (params.getPool() != null) { - System.out.println("Check if user's pool is shut down."); - Assert.assertTrue(params.getPool().isShutdown()); - } - } - } - } - - @Test - public void testBufferedMutatorWithUserPool() throws Exception { - hTable = new OHTable(conf, "test"); - BufferedMutator ohBufferMutator = null; - BufferedMutatorParams params = null; - long bufferSize = 45000L; - int count = 0; - try (Connection conn = ConnectionFactory.createConnection(conf)) { - // use default database - TableName tableName = TableName.valueOf("test"); - - // set params - params = new BufferedMutatorParams(tableName); - params.writeBufferSize(bufferSize); - - // set thread pool - long keepAliveTime = conf.getLong("hbase.hconnection.threads.keepalivetime", 60); - ThreadPoolExecutor pool = new ThreadPoolExecutor(10, 256, keepAliveTime, TimeUnit.SECONDS, new SynchronousQueue(), Threads.newDaemonThreadFactory("htable")); - pool.allowCoreThreadTimeOut(true); - params.pool(pool); - - ohBufferMutator = conn.getBufferedMutator(params); - - String key = "putKey"; - String column1 = "putColumn1"; - String value = "value333444"; - long timestamp = System.currentTimeMillis(); - - List mutations = new ArrayList<>(); - for (int i = 0; i < 50; ++i) { - mutations.clear(); - for (int j = 0; j < 4; ++j) { - Put put = new Put(Bytes.toBytes(key)); - put.addColumn(Bytes.toBytes("family_group"), Bytes.toBytes(column1 + "_" + i + "_" + j), - timestamp, Bytes.toBytes(value + "_" + i + "_" + j)); - mutations.add(put); - } - ohBufferMutator.mutate(mutations); - } - } catch (Exception ex) { - if (ex instanceof RetriesExhaustedWithDetailsException) { - ((RetriesExhaustedWithDetailsException) ex).getCauses().get(0).printStackTrace(); - } else { - ex.printStackTrace(); - } - Assert.assertTrue(false); - } finally { - if (ohBufferMutator != null) { - ohBufferMutator.close(); - Get get = new Get(toBytes("putKey")); - Result r = hTable.get(get); - for (KeyValue keyValue : r.raw()) { - ++count; - System.out.println("rowKey: " + new String(keyValue.getRow()) + " family :" - + new String(keyValue.getFamily()) + " columnQualifier:" - + new String(keyValue.getQualifier()) + " timestamp:" - + keyValue.getTimestamp() + " value:" - + new String(keyValue.getValue())); - } - Assert.assertEquals(200, count); - Delete delete = new Delete(toBytes("putKey")); - delete.deleteFamily(toBytes("family_group")); - hTable.delete(delete); - - r = hTable.get(get); - Assert.assertEquals(0, r.raw().length); - } - if (params != null) { - if (params.getPool() != null) { - System.out.println("Check if user's pool is shut down."); - Assert.assertTrue(params.getPool().isShutdown()); - } - } - } - } - - @Test - public void testBufferedMutatorConcurrent() throws Exception { - hTable = new OHTable(conf, "test"); - BufferedMutator ohBufferMutator = null; - BufferedMutatorParams params = null; - ExecutorService executorService = Executors.newFixedThreadPool(10); - long bufferSize = 45000L; - int count = 0; - try (Connection conn = ConnectionFactory.createConnection(conf)) { - // use default database - TableName tableName = TableName.valueOf("test"); - - // set params - params = new BufferedMutatorParams(tableName); - params.writeBufferSize(bufferSize); - - // set thread pool - long keepAliveTime = conf.getLong("hbase.hconnection.threads.keepalivetime", 60); - ThreadPoolExecutor pool = new ThreadPoolExecutor(10, 256, keepAliveTime, TimeUnit.SECONDS, new SynchronousQueue(), Threads.newDaemonThreadFactory("htable")); - pool.allowCoreThreadTimeOut(true); - params.pool(pool); - - ohBufferMutator = conn.getBufferedMutator(params); - - String key = "putKey"; - String column1 = "putColumn1"; - String value = "value333444"; - long timestamp = System.currentTimeMillis(); - - for (int i = 0; i < 50; ++i) { - final int taskId = i; - final BufferedMutator thrBufferMutator = ohBufferMutator; - executorService.submit(() -> { - List mutations = new ArrayList<>(); - for (int j = 0; j < 4; ++j) { - String thrKey = key; - String thrColumn = column1 + "_" + taskId + "_" + j; - String thrValue = value + "_" + taskId + "_" + j; - long thrTimestamp = timestamp; - - Put put = new Put(Bytes.toBytes(thrKey)); - put.addColumn(Bytes.toBytes("family_group"), Bytes.toBytes(thrColumn), - thrTimestamp, Bytes.toBytes(thrValue)); - mutations.add(put); - } - try { - thrBufferMutator.mutate(mutations); - } catch (Exception ex) { - if (ex instanceof RetriesExhaustedWithDetailsException) { - ((RetriesExhaustedWithDetailsException) ex).getCauses().get(0).printStackTrace(); - } else { - ex.printStackTrace(); - } - Assert.assertTrue(false); - } - }); - } - } catch (Exception ex) { - if (ex instanceof RetriesExhaustedWithDetailsException) { - ((RetriesExhaustedWithDetailsException) ex).getCauses().get(0).printStackTrace(); - } else { - ex.printStackTrace(); - } - Assert.assertTrue(false); - } finally { - executorService.shutdown(); - try { - if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) { - executorService.shutdownNow(); - } - } catch (InterruptedException e) { - executorService.shutdownNow(); - Thread.currentThread().interrupt(); - } - if (ohBufferMutator != null) { - ohBufferMutator.close(); - Get get = new Get(toBytes("putKey")); - Result r = hTable.get(get); - for (KeyValue keyValue : r.raw()) { - ++count; - System.out.println("rowKey: " + new String(keyValue.getRow()) + " family :" - + new String(keyValue.getFamily()) + " columnQualifier:" - + new String(keyValue.getQualifier()) + " timestamp:" - + keyValue.getTimestamp() + " value:" - + new String(keyValue.getValue())); - } - Assert.assertEquals(200, count); - Delete delete = new Delete(toBytes("putKey")); - delete.deleteFamily(toBytes("family_group")); - hTable.delete(delete); - - r = hTable.get(get); - Assert.assertEquals(0, r.raw().length); - } - if (params != null) { - if (params.getPool() != null) { - System.out.println("Check if user's pool is shut down."); - Assert.assertTrue(params.getPool().isShutdown()); - } - } - } - } -} diff --git a/src/test/java/com/alipay/oceanbase/hbase/OHConnectionTest.java b/src/test/java/com/alipay/oceanbase/hbase/OHConnectionTest.java index e477ab34..76256cd2 100644 --- a/src/test/java/com/alipay/oceanbase/hbase/OHConnectionTest.java +++ b/src/test/java/com/alipay/oceanbase/hbase/OHConnectionTest.java @@ -22,9 +22,14 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.*; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Threads; import org.junit.Assert; import org.junit.Test; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.*; + import static org.apache.hadoop.hbase.util.Bytes.toBytes; public class OHConnectionTest { @@ -174,4 +179,427 @@ private void testBasic() throws Exception { } + /* + CREATE TABLEGROUP test SHARDING = 'ADAPTIVE'; + CREATE TABLE `test$family_group` ( + `K` varbinary(1024) NOT NULL, + `Q` varbinary(256) NOT NULL, + `T` bigint(20) NOT NULL, + `V` varbinary(1024) DEFAULT NULL, + PRIMARY KEY (`K`, `Q`, `T`) + ) TABLEGROUP = test; + */ + @Test + public void testBufferedMutatorWithFlush() throws Exception { + Configuration conf = ObHTableTestUtil.newConfiguration(); + conf.set("rs.list.acquire.read.timeout", "10000"); + TableName tableName = TableName.valueOf("test"); + connection = ConnectionFactory.createConnection(conf); + hTable = connection.getTable(tableName); + BufferedMutator ohBufferMutator = null; + try (Connection conn = ConnectionFactory.createConnection(conf)) { + // use defualt params + ohBufferMutator = conn.getBufferedMutator(tableName); + + String key = "putKey"; + String column1 = "putColumn1"; + String value = "value333444"; + long timestamp = System.currentTimeMillis(); + + List mutations = new ArrayList<>(); + // test Put + Put put1 = new Put(Bytes.toBytes(key)); + put1.addColumn(Bytes.toBytes("family_group"), Bytes.toBytes(column1), timestamp, Bytes.toBytes(value)); + mutations.add(put1); + Put put2 = new Put(Bytes.toBytes(key)); + put2.addColumn(Bytes.toBytes("family_group"), Bytes.toBytes(column1 + "1"), timestamp, Bytes.toBytes(value + "4")); + mutations.add(put2); + // test add Mutations with List + ohBufferMutator.mutate(mutations); + ohBufferMutator.flush(); + + Get get = new Get(toBytes(key)); + Result r = hTable.get(get); + Assert.assertEquals(2, r.raw().length); + for (KeyValue keyValue : r.raw()) { + System.out.println("rowKey: " + new String(keyValue.getRow()) + " family :" + + new String(keyValue.getFamily()) + " columnQualifier:" + + new String(keyValue.getQualifier()) + " timestamp:" + + keyValue.getTimestamp() + " value:" + + new String(keyValue.getValue())); + } + + Delete del = new Delete(Bytes.toBytes(key)); + del.deleteFamily(Bytes.toBytes("family_group")); + // test add Mutation directly + ohBufferMutator.mutate(del); + ohBufferMutator.flush(); + + r = hTable.get(get); + Assert.assertEquals(0, r.raw().length); + + Append append = new Append(Bytes.toBytes(key)); + append.add("family_group".getBytes(), column1.getBytes(), toBytes("_suffix")); + // only support Put and Delete + final BufferedMutator apMut = ohBufferMutator; + Assert.assertThrows(IllegalArgumentException.class, () -> { + apMut.mutate(append); + }); + } catch (Exception ex) { + if (ex instanceof RetriesExhaustedWithDetailsException) { + ((RetriesExhaustedWithDetailsException) ex).getCauses().get(0).printStackTrace(); + } else { + ex.printStackTrace(); + } + Assert.assertTrue(false); + } finally { + if (ohBufferMutator != null) { + ohBufferMutator.close(); + } + } + } + + /* + CREATE TABLEGROUP n1:test SHARDING = 'ADAPTIVE'; + CREATE TABLE `n1:test$family_group` ( + `K` varbinary(1024) NOT NULL, + `Q` varbinary(256) NOT NULL, + `T` bigint(20) NOT NULL, + `V` varbinary(1024) DEFAULT NULL, + PRIMARY KEY (`K`, `Q`, `T`) + ) TABLEGROUP = n1:test; + */ + @Test + public void testBufferedMutatorUseNameSpaceWithFlush() throws Exception { + Configuration conf = ObHTableTestUtil.newConfiguration(); + conf.set("rs.list.acquire.read.timeout", "10000"); + // in "n1" database + TableName tableName = TableName.valueOf("n1:test"); + connection = ConnectionFactory.createConnection(conf); + hTable = connection.getTable(tableName); + BufferedMutator ohBufferMutator = null; + try (Connection conn = ConnectionFactory.createConnection(conf)) { + // use defualt params + ohBufferMutator = conn.getBufferedMutator(tableName); + + String key = "putKey"; + String column1 = "putColumn1"; + String value = "value333444"; + long timestamp = System.currentTimeMillis(); + + List mutations = new ArrayList<>(); + // test Put + Put put1 = new Put(Bytes.toBytes(key)); + put1.addColumn(Bytes.toBytes("family_group"), Bytes.toBytes(column1), timestamp, Bytes.toBytes(value)); + mutations.add(put1); + Put put2 = new Put(Bytes.toBytes(key)); + put2.addColumn(Bytes.toBytes("family_group"), Bytes.toBytes(column1 + "1"), timestamp, Bytes.toBytes(value + "4")); + mutations.add(put2); + // test add Mutations with List + ohBufferMutator.mutate(mutations); + ohBufferMutator.flush(); + + Get get = new Get(toBytes(key)); + Result r = hTable.get(get); + Assert.assertEquals(2, r.raw().length); + for (KeyValue keyValue : r.raw()) { + System.out.println("rowKey: " + new String(keyValue.getRow()) + " family :" + + new String(keyValue.getFamily()) + " columnQualifier:" + + new String(keyValue.getQualifier()) + " timestamp:" + + keyValue.getTimestamp() + " value:" + + new String(keyValue.getValue())); + } + + // test Delete + Delete del = new Delete(Bytes.toBytes(key)); + del.deleteFamily(Bytes.toBytes("family_group")); + // test add Mutation directly + ohBufferMutator.mutate(del); + ohBufferMutator.flush(); + + r = hTable.get(get); + Assert.assertEquals(0, r.raw().length); + + Append append = new Append(Bytes.toBytes(key)); + append.add("family_group".getBytes(), column1.getBytes(), toBytes("_suffix")); + // only support Put and Delete + final BufferedMutator apMut = ohBufferMutator; + Assert.assertThrows(IllegalArgumentException.class, () -> { + apMut.mutate(append); + }); + } catch (Exception ex) { + if (ex instanceof RetriesExhaustedWithDetailsException) { + ((RetriesExhaustedWithDetailsException) ex).getCauses().get(0).printStackTrace(); + } else { + ex.printStackTrace(); + } + Assert.assertTrue(false); + } finally { + if (ohBufferMutator != null) { + ohBufferMutator.close(); + } + } + } + + /* + CREATE TABLEGROUP test SHARDING = 'ADAPTIVE'; + CREATE TABLE `test$family_group` ( + `K` varbinary(1024) NOT NULL, + `Q` varbinary(256) NOT NULL, + `T` bigint(20) NOT NULL, + `V` varbinary(1024) DEFAULT NULL, + PRIMARY KEY (`K`, `Q`, `T`) + ) TABLEGROUP = test; + */ + @Test + public void testBufferedMutatorWithAutoFlush() throws Exception { + Configuration conf = ObHTableTestUtil.newConfiguration(); + conf.set("rs.list.acquire.read.timeout", "10000"); + TableName tableName = TableName.valueOf("test"); + connection = ConnectionFactory.createConnection(conf); + hTable = connection.getTable(tableName); + BufferedMutator ohBufferMutator = null; + BufferedMutatorParams params = null; + long bufferSize = 45000L; + int count = 0; + try (Connection conn = ConnectionFactory.createConnection(conf)) { + // set params + params = new BufferedMutatorParams(tableName); + params.writeBufferSize(bufferSize); + + ohBufferMutator = conn.getBufferedMutator(params); + + String key = "putKey"; + String column1 = "putColumn1"; + String value = "value333444"; + long timestamp = System.currentTimeMillis(); + + List mutations = new ArrayList<>(); + for (int i = 0; i < 50; ++i) { + mutations.clear(); + for (int j = 0; j < 4; ++j) { + Put put = new Put(Bytes.toBytes(key)); + put.addColumn(Bytes.toBytes("family_group"), Bytes.toBytes(column1 + "_" + i + "_" + j), + timestamp, Bytes.toBytes(value + "_" + i + "_" + j)); + mutations.add(put); + } + ohBufferMutator.mutate(mutations); + } + } catch (Exception ex) { + if (ex instanceof RetriesExhaustedWithDetailsException) { + ((RetriesExhaustedWithDetailsException) ex).getCauses().get(0).printStackTrace(); + } else { + ex.printStackTrace(); + } + Assert.assertTrue(false); + } finally { + if (ohBufferMutator != null) { + ohBufferMutator.close(); + Get get = new Get(toBytes("putKey")); + Result r = hTable.get(get); + for (KeyValue keyValue : r.raw()) { + ++count; + System.out.println("rowKey: " + new String(keyValue.getRow()) + " family :" + + new String(keyValue.getFamily()) + " columnQualifier:" + + new String(keyValue.getQualifier()) + " timestamp:" + + keyValue.getTimestamp() + " value:" + + new String(keyValue.getValue())); + } + Assert.assertEquals(200, count); + Delete delete = new Delete(toBytes("putKey")); + delete.deleteFamily(toBytes("family_group")); + hTable.delete(delete); + + r = hTable.get(get); + Assert.assertEquals(0, r.raw().length); + } + if (params != null) { + if (params.getPool() != null) { + System.out.println("Check if user's pool is shut down."); + Assert.assertTrue(params.getPool().isShutdown()); + } + } + } + } + + @Test + public void testBufferedMutatorWithUserPool() throws Exception { + Configuration conf = ObHTableTestUtil.newConfiguration(); + conf.set("rs.list.acquire.read.timeout", "10000"); + TableName tableName = TableName.valueOf("test"); + connection = ConnectionFactory.createConnection(conf); + hTable = connection.getTable(tableName); + BufferedMutator ohBufferMutator = null; + BufferedMutatorParams params = null; + long bufferSize = 45000L; + int count = 0; + try (Connection conn = ConnectionFactory.createConnection(conf)) { + // set params + params = new BufferedMutatorParams(tableName); + params.writeBufferSize(bufferSize); + + // set thread pool + long keepAliveTime = conf.getLong("hbase.hconnection.threads.keepalivetime", 60); + ThreadPoolExecutor pool = new ThreadPoolExecutor(10, 256, keepAliveTime, TimeUnit.SECONDS, new SynchronousQueue(), Threads.newDaemonThreadFactory("htable")); + pool.allowCoreThreadTimeOut(true); + params.pool(pool); + + ohBufferMutator = conn.getBufferedMutator(params); + + String key = "putKey"; + String column1 = "putColumn1"; + String value = "value333444"; + long timestamp = System.currentTimeMillis(); + + List mutations = new ArrayList<>(); + for (int i = 0; i < 50; ++i) { + mutations.clear(); + for (int j = 0; j < 4; ++j) { + Put put = new Put(Bytes.toBytes(key)); + put.addColumn(Bytes.toBytes("family_group"), Bytes.toBytes(column1 + "_" + i + "_" + j), + timestamp, Bytes.toBytes(value + "_" + i + "_" + j)); + mutations.add(put); + } + ohBufferMutator.mutate(mutations); + } + } catch (Exception ex) { + if (ex instanceof RetriesExhaustedWithDetailsException) { + ((RetriesExhaustedWithDetailsException) ex).getCauses().get(0).printStackTrace(); + } else { + ex.printStackTrace(); + } + Assert.assertTrue(false); + } finally { + if (ohBufferMutator != null) { + ohBufferMutator.close(); + Get get = new Get(toBytes("putKey")); + Result r = hTable.get(get); + for (KeyValue keyValue : r.raw()) { + ++count; + System.out.println("rowKey: " + new String(keyValue.getRow()) + " family :" + + new String(keyValue.getFamily()) + " columnQualifier:" + + new String(keyValue.getQualifier()) + " timestamp:" + + keyValue.getTimestamp() + " value:" + + new String(keyValue.getValue())); + } + Assert.assertEquals(200, count); + Delete delete = new Delete(toBytes("putKey")); + delete.deleteFamily(toBytes("family_group")); + hTable.delete(delete); + + r = hTable.get(get); + Assert.assertEquals(0, r.raw().length); + } + if (params != null) { + if (params.getPool() != null) { + System.out.println("Check if user's pool is shut down."); + Assert.assertTrue(params.getPool().isShutdown()); + } + } + } + } + + @Test + public void testBufferedMutatorConcurrent() throws Exception { + Configuration conf = ObHTableTestUtil.newConfiguration(); + conf.set("rs.list.acquire.read.timeout", "10000"); + TableName tableName = TableName.valueOf("test"); + connection = ConnectionFactory.createConnection(conf); + hTable = connection.getTable(tableName); + BufferedMutator ohBufferMutator = null; + BufferedMutatorParams params = null; + ExecutorService executorService = Executors.newFixedThreadPool(10); + long bufferSize = 45000L; + int count = 0; + try (Connection conn = ConnectionFactory.createConnection(conf)) { + // set params + params = new BufferedMutatorParams(tableName); + params.writeBufferSize(bufferSize); + + // set thread pool + long keepAliveTime = conf.getLong("hbase.hconnection.threads.keepalivetime", 60); + ThreadPoolExecutor pool = new ThreadPoolExecutor(10, 256, keepAliveTime, TimeUnit.SECONDS, new SynchronousQueue(), Threads.newDaemonThreadFactory("htable")); + pool.allowCoreThreadTimeOut(true); + params.pool(pool); + + ohBufferMutator = conn.getBufferedMutator(params); + + String key = "putKey"; + String column1 = "putColumn1"; + String value = "value333444"; + long timestamp = System.currentTimeMillis(); + + for (int i = 0; i < 50; ++i) { + final int taskId = i; + final BufferedMutator thrBufferMutator = ohBufferMutator; + executorService.submit(() -> { + List mutations = new ArrayList<>(); + for (int j = 0; j < 4; ++j) { + String thrKey = key; + String thrColumn = column1 + "_" + taskId + "_" + j; + String thrValue = value + "_" + taskId + "_" + j; + long thrTimestamp = timestamp; + + Put put = new Put(Bytes.toBytes(thrKey)); + put.addColumn(Bytes.toBytes("family_group"), Bytes.toBytes(thrColumn), + thrTimestamp, Bytes.toBytes(thrValue)); + mutations.add(put); + } + try { + thrBufferMutator.mutate(mutations); + } catch (Exception ex) { + if (ex instanceof RetriesExhaustedWithDetailsException) { + ((RetriesExhaustedWithDetailsException) ex).getCauses().get(0).printStackTrace(); + } else { + ex.printStackTrace(); + } + Assert.assertTrue(false); + } + }); + } + } catch (Exception ex) { + if (ex instanceof RetriesExhaustedWithDetailsException) { + ((RetriesExhaustedWithDetailsException) ex).getCauses().get(0).printStackTrace(); + } else { + ex.printStackTrace(); + } + Assert.assertTrue(false); + } finally { + executorService.shutdown(); + try { + if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) { + executorService.shutdownNow(); + } + } catch (InterruptedException e) { + executorService.shutdownNow(); + Thread.currentThread().interrupt(); + } + if (ohBufferMutator != null) { + ohBufferMutator.close(); + Get get = new Get(toBytes("putKey")); + Result r = hTable.get(get); + for (KeyValue keyValue : r.raw()) { + ++count; + System.out.println("rowKey: " + new String(keyValue.getRow()) + " family :" + + new String(keyValue.getFamily()) + " columnQualifier:" + + new String(keyValue.getQualifier()) + " timestamp:" + + keyValue.getTimestamp() + " value:" + + new String(keyValue.getValue())); + } + Assert.assertEquals(200, count); + Delete delete = new Delete(toBytes("putKey")); + delete.deleteFamily(toBytes("family_group")); + hTable.delete(delete); + + r = hTable.get(get); + Assert.assertEquals(0, r.raw().length); + } + if (params != null) { + if (params.getPool() != null) { + System.out.println("Check if user's pool is shut down."); + Assert.assertTrue(params.getPool().isShutdown()); + } + } + } + } } From cc17288ade9f798953bd553f596acea17cac5a1c Mon Sep 17 00:00:00 2001 From: JackShi148 Date: Fri, 13 Sep 2024 10:40:31 +0800 Subject: [PATCH 07/12] remove test print --- .../com/alipay/oceanbase/hbase/util/OHBufferedMutatorImpl.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/main/java/com/alipay/oceanbase/hbase/util/OHBufferedMutatorImpl.java b/src/main/java/com/alipay/oceanbase/hbase/util/OHBufferedMutatorImpl.java index 34863027..89a24b57 100644 --- a/src/main/java/com/alipay/oceanbase/hbase/util/OHBufferedMutatorImpl.java +++ b/src/main/java/com/alipay/oceanbase/hbase/util/OHBufferedMutatorImpl.java @@ -201,8 +201,6 @@ private void asyncExecute(boolean flushAll) throws break; } } - System.out.println("Flush background."); - System.out.println("Threshold: " + writeBufferSize + ", flush size: " + currentAsyncBufferSize.get()); Mutation m; while ((m = asyncWriteBuffer.poll()) != null) { execBuffer.add(m); From 46167f9c2d15f2ba7f9498295f6ab364e3250d89 Mon Sep 17 00:00:00 2001 From: JackShi148 Date: Fri, 13 Sep 2024 12:18:06 +0800 Subject: [PATCH 08/12] format code --- src/main/java/com/alipay/oceanbase/hbase/OHTable.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/java/com/alipay/oceanbase/hbase/OHTable.java b/src/main/java/com/alipay/oceanbase/hbase/OHTable.java index d741045f..84fb4113 100644 --- a/src/main/java/com/alipay/oceanbase/hbase/OHTable.java +++ b/src/main/java/com/alipay/oceanbase/hbase/OHTable.java @@ -1478,8 +1478,8 @@ private ObTableQuery buildObTableQuery(final Get get, Collection columnQ } public static ObTableBatchOperation buildObTableBatchOperation(List keyValueList, - boolean putToAppend, - List qualifiers) { + boolean putToAppend, + List qualifiers) { ObTableBatchOperation batch = new ObTableBatchOperation(); for (KeyValue kv : keyValueList) { if (qualifiers != null) { From 48f22abe377f6517acc2ff9bba2721a1226df3cf Mon Sep 17 00:00:00 2001 From: JackShi148 Date: Fri, 13 Sep 2024 14:50:55 +0800 Subject: [PATCH 09/12] make interface more generalized --- .../alipay/oceanbase/hbase/util/OHBufferedMutatorImpl.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/main/java/com/alipay/oceanbase/hbase/util/OHBufferedMutatorImpl.java b/src/main/java/com/alipay/oceanbase/hbase/util/OHBufferedMutatorImpl.java index 89a24b57..1c5b6cb4 100644 --- a/src/main/java/com/alipay/oceanbase/hbase/util/OHBufferedMutatorImpl.java +++ b/src/main/java/com/alipay/oceanbase/hbase/util/OHBufferedMutatorImpl.java @@ -168,7 +168,7 @@ private void validateInsUpAndDelete(Mutation mt) throws IllegalArgumentException * Check whether the mutations in this batch are the same type * @param execBuffer - mutation operations */ - private void checkAllOpsIsSameType(LinkedList execBuffer) { + private void checkAllOpsIsSameType(List execBuffer) { Class type = execBuffer.get(0).getClass(); for (Mutation m : execBuffer) { Class curType = m.getClass(); @@ -233,7 +233,7 @@ private void asyncExecute(boolean flushAll) throws // poll elements from execBuffer to recollect remaining operations Mutation m = execBuffer.poll(); byte[] family = m.getFamilyMap().firstKey(); - ObTableBatchOperation batch = buildObTableBatchOperation(new LinkedList<>(Collections.singletonList(m))); + ObTableBatchOperation batch = buildObTableBatchOperation(Collections.singletonList(m)); String targetTableName = getTargetTableName(tableNameString, Bytes.toString(family)); request = OHTable.buildObTableBatchOperationRequest(batch, targetTableName, pool); ObTableBatchOperationResult result = (ObTableBatchOperationResult) obTableClient.execute(request); @@ -305,7 +305,7 @@ public long getWriteBufferSize() { return this.writeBufferSize; } - private ObTableBatchOperation buildObTableBatchOperation(LinkedList execBuffer) { + private ObTableBatchOperation buildObTableBatchOperation(List execBuffer) { List keyValueList = new LinkedList<>(); for (Mutation mutation : execBuffer) { for (Map.Entry> entry : mutation.getFamilyMap().entrySet()) { From 5aed42259f1fdb09e017128f0ba18037ed6d97b2 Mon Sep 17 00:00:00 2001 From: JackShi148 Date: Sat, 14 Sep 2024 10:59:18 +0800 Subject: [PATCH 10/12] format BufferedMutator test case --- .../oceanbase/hbase/OHConnectionTest.java | 83 +++++++------------ 1 file changed, 29 insertions(+), 54 deletions(-) diff --git a/src/test/java/com/alipay/oceanbase/hbase/OHConnectionTest.java b/src/test/java/com/alipay/oceanbase/hbase/OHConnectionTest.java index 76256cd2..b4a5bf25 100644 --- a/src/test/java/com/alipay/oceanbase/hbase/OHConnectionTest.java +++ b/src/test/java/com/alipay/oceanbase/hbase/OHConnectionTest.java @@ -193,13 +193,14 @@ PRIMARY KEY (`K`, `Q`, `T`) public void testBufferedMutatorWithFlush() throws Exception { Configuration conf = ObHTableTestUtil.newConfiguration(); conf.set("rs.list.acquire.read.timeout", "10000"); - TableName tableName = TableName.valueOf("test"); - connection = ConnectionFactory.createConnection(conf); - hTable = connection.getTable(tableName); BufferedMutator ohBufferMutator = null; - try (Connection conn = ConnectionFactory.createConnection(conf)) { + try { + TableName tableName = TableName.valueOf("test"); + connection = ConnectionFactory.createConnection(conf); + hTable = connection.getTable(tableName); // use defualt params - ohBufferMutator = conn.getBufferedMutator(tableName); + ohBufferMutator = connection.getBufferedMutator(tableName); + hTable = connection.getTable(tableName); String key = "putKey"; String column1 = "putColumn1"; @@ -221,13 +222,6 @@ public void testBufferedMutatorWithFlush() throws Exception { Get get = new Get(toBytes(key)); Result r = hTable.get(get); Assert.assertEquals(2, r.raw().length); - for (KeyValue keyValue : r.raw()) { - System.out.println("rowKey: " + new String(keyValue.getRow()) + " family :" - + new String(keyValue.getFamily()) + " columnQualifier:" - + new String(keyValue.getQualifier()) + " timestamp:" - + keyValue.getTimestamp() + " value:" - + new String(keyValue.getValue())); - } Delete del = new Delete(Bytes.toBytes(key)); del.deleteFamily(Bytes.toBytes("family_group")); @@ -260,27 +254,27 @@ public void testBufferedMutatorWithFlush() throws Exception { } /* - CREATE TABLEGROUP n1:test SHARDING = 'ADAPTIVE'; + CREATE TABLEGROUP `n1:test` SHARDING = 'ADAPTIVE'; CREATE TABLE `n1:test$family_group` ( `K` varbinary(1024) NOT NULL, `Q` varbinary(256) NOT NULL, `T` bigint(20) NOT NULL, `V` varbinary(1024) DEFAULT NULL, PRIMARY KEY (`K`, `Q`, `T`) - ) TABLEGROUP = n1:test; + ) TABLEGROUP = `n1:test`; */ @Test public void testBufferedMutatorUseNameSpaceWithFlush() throws Exception { Configuration conf = ObHTableTestUtil.newConfiguration(); conf.set("rs.list.acquire.read.timeout", "10000"); - // in "n1" database - TableName tableName = TableName.valueOf("n1:test"); - connection = ConnectionFactory.createConnection(conf); - hTable = connection.getTable(tableName); BufferedMutator ohBufferMutator = null; - try (Connection conn = ConnectionFactory.createConnection(conf)) { + try { + // in "n1" database + TableName tableName = TableName.valueOf("n1:test"); + connection = ConnectionFactory.createConnection(conf); + hTable = connection.getTable(tableName); // use defualt params - ohBufferMutator = conn.getBufferedMutator(tableName); + ohBufferMutator = connection.getBufferedMutator(tableName); String key = "putKey"; String column1 = "putColumn1"; @@ -355,19 +349,18 @@ PRIMARY KEY (`K`, `Q`, `T`) public void testBufferedMutatorWithAutoFlush() throws Exception { Configuration conf = ObHTableTestUtil.newConfiguration(); conf.set("rs.list.acquire.read.timeout", "10000"); - TableName tableName = TableName.valueOf("test"); - connection = ConnectionFactory.createConnection(conf); - hTable = connection.getTable(tableName); BufferedMutator ohBufferMutator = null; BufferedMutatorParams params = null; long bufferSize = 45000L; int count = 0; - try (Connection conn = ConnectionFactory.createConnection(conf)) { + try { + TableName tableName = TableName.valueOf("test"); + connection = ConnectionFactory.createConnection(conf); + hTable = connection.getTable(tableName); // set params params = new BufferedMutatorParams(tableName); params.writeBufferSize(bufferSize); - - ohBufferMutator = conn.getBufferedMutator(params); + ohBufferMutator = connection.getBufferedMutator(params); String key = "putKey"; String column1 = "putColumn1"; @@ -399,11 +392,6 @@ public void testBufferedMutatorWithAutoFlush() throws Exception { Result r = hTable.get(get); for (KeyValue keyValue : r.raw()) { ++count; - System.out.println("rowKey: " + new String(keyValue.getRow()) + " family :" - + new String(keyValue.getFamily()) + " columnQualifier:" - + new String(keyValue.getQualifier()) + " timestamp:" - + keyValue.getTimestamp() + " value:" - + new String(keyValue.getValue())); } Assert.assertEquals(200, count); Delete delete = new Delete(toBytes("putKey")); @@ -415,7 +403,6 @@ public void testBufferedMutatorWithAutoFlush() throws Exception { } if (params != null) { if (params.getPool() != null) { - System.out.println("Check if user's pool is shut down."); Assert.assertTrue(params.getPool().isShutdown()); } } @@ -426,14 +413,14 @@ public void testBufferedMutatorWithAutoFlush() throws Exception { public void testBufferedMutatorWithUserPool() throws Exception { Configuration conf = ObHTableTestUtil.newConfiguration(); conf.set("rs.list.acquire.read.timeout", "10000"); - TableName tableName = TableName.valueOf("test"); - connection = ConnectionFactory.createConnection(conf); - hTable = connection.getTable(tableName); BufferedMutator ohBufferMutator = null; BufferedMutatorParams params = null; long bufferSize = 45000L; int count = 0; - try (Connection conn = ConnectionFactory.createConnection(conf)) { + try { + TableName tableName = TableName.valueOf("test"); + connection = ConnectionFactory.createConnection(conf); + hTable = connection.getTable(tableName); // set params params = new BufferedMutatorParams(tableName); params.writeBufferSize(bufferSize); @@ -444,7 +431,7 @@ public void testBufferedMutatorWithUserPool() throws Exception { pool.allowCoreThreadTimeOut(true); params.pool(pool); - ohBufferMutator = conn.getBufferedMutator(params); + ohBufferMutator = connection.getBufferedMutator(params); String key = "putKey"; String column1 = "putColumn1"; @@ -476,11 +463,6 @@ public void testBufferedMutatorWithUserPool() throws Exception { Result r = hTable.get(get); for (KeyValue keyValue : r.raw()) { ++count; - System.out.println("rowKey: " + new String(keyValue.getRow()) + " family :" - + new String(keyValue.getFamily()) + " columnQualifier:" - + new String(keyValue.getQualifier()) + " timestamp:" - + keyValue.getTimestamp() + " value:" - + new String(keyValue.getValue())); } Assert.assertEquals(200, count); Delete delete = new Delete(toBytes("putKey")); @@ -492,7 +474,6 @@ public void testBufferedMutatorWithUserPool() throws Exception { } if (params != null) { if (params.getPool() != null) { - System.out.println("Check if user's pool is shut down."); Assert.assertTrue(params.getPool().isShutdown()); } } @@ -503,15 +484,15 @@ public void testBufferedMutatorWithUserPool() throws Exception { public void testBufferedMutatorConcurrent() throws Exception { Configuration conf = ObHTableTestUtil.newConfiguration(); conf.set("rs.list.acquire.read.timeout", "10000"); - TableName tableName = TableName.valueOf("test"); - connection = ConnectionFactory.createConnection(conf); - hTable = connection.getTable(tableName); BufferedMutator ohBufferMutator = null; BufferedMutatorParams params = null; ExecutorService executorService = Executors.newFixedThreadPool(10); long bufferSize = 45000L; int count = 0; - try (Connection conn = ConnectionFactory.createConnection(conf)) { + try { + TableName tableName = TableName.valueOf("test"); + connection = ConnectionFactory.createConnection(conf); + hTable = connection.getTable(tableName); // set params params = new BufferedMutatorParams(tableName); params.writeBufferSize(bufferSize); @@ -522,7 +503,7 @@ public void testBufferedMutatorConcurrent() throws Exception { pool.allowCoreThreadTimeOut(true); params.pool(pool); - ohBufferMutator = conn.getBufferedMutator(params); + ohBufferMutator = connection.getBufferedMutator(params); String key = "putKey"; String column1 = "putColumn1"; @@ -580,11 +561,6 @@ public void testBufferedMutatorConcurrent() throws Exception { Result r = hTable.get(get); for (KeyValue keyValue : r.raw()) { ++count; - System.out.println("rowKey: " + new String(keyValue.getRow()) + " family :" - + new String(keyValue.getFamily()) + " columnQualifier:" - + new String(keyValue.getQualifier()) + " timestamp:" - + keyValue.getTimestamp() + " value:" - + new String(keyValue.getValue())); } Assert.assertEquals(200, count); Delete delete = new Delete(toBytes("putKey")); @@ -596,7 +572,6 @@ public void testBufferedMutatorConcurrent() throws Exception { } if (params != null) { if (params.getPool() != null) { - System.out.println("Check if user's pool is shut down."); Assert.assertTrue(params.getPool().isShutdown()); } } From cc3c2442b30196000f34a598e3dc707d9d87358d Mon Sep 17 00:00:00 2001 From: JackShi148 Date: Sat, 14 Sep 2024 11:04:12 +0800 Subject: [PATCH 11/12] remove redundancy, add some comments --- .../oceanbase/hbase/OHConnectionTest.java | 21 ++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/src/test/java/com/alipay/oceanbase/hbase/OHConnectionTest.java b/src/test/java/com/alipay/oceanbase/hbase/OHConnectionTest.java index b4a5bf25..36902cbc 100644 --- a/src/test/java/com/alipay/oceanbase/hbase/OHConnectionTest.java +++ b/src/test/java/com/alipay/oceanbase/hbase/OHConnectionTest.java @@ -200,7 +200,6 @@ public void testBufferedMutatorWithFlush() throws Exception { hTable = connection.getTable(tableName); // use defualt params ohBufferMutator = connection.getBufferedMutator(tableName); - hTable = connection.getTable(tableName); String key = "putKey"; String column1 = "putColumn1"; @@ -409,6 +408,16 @@ public void testBufferedMutatorWithAutoFlush() throws Exception { } } + /* + CREATE TABLEGROUP test SHARDING = 'ADAPTIVE'; + CREATE TABLE `test$family_group` ( + `K` varbinary(1024) NOT NULL, + `Q` varbinary(256) NOT NULL, + `T` bigint(20) NOT NULL, + `V` varbinary(1024) DEFAULT NULL, + PRIMARY KEY (`K`, `Q`, `T`) + ) TABLEGROUP = test; + */ @Test public void testBufferedMutatorWithUserPool() throws Exception { Configuration conf = ObHTableTestUtil.newConfiguration(); @@ -480,6 +489,16 @@ public void testBufferedMutatorWithUserPool() throws Exception { } } + /* + CREATE TABLEGROUP test SHARDING = 'ADAPTIVE'; + CREATE TABLE `test$family_group` ( + `K` varbinary(1024) NOT NULL, + `Q` varbinary(256) NOT NULL, + `T` bigint(20) NOT NULL, + `V` varbinary(1024) DEFAULT NULL, + PRIMARY KEY (`K`, `Q`, `T`) + ) TABLEGROUP = test; + */ @Test public void testBufferedMutatorConcurrent() throws Exception { Configuration conf = ObHTableTestUtil.newConfiguration(); From 0f5ab578008c9164c1252c868d19bb5acdb0e4d0 Mon Sep 17 00:00:00 2001 From: JackShi148 Date: Sat, 14 Sep 2024 16:00:58 +0800 Subject: [PATCH 12/12] fix type of a bufferedMutator. Optimize by review --- .../com/alipay/oceanbase/hbase/OHTable.java | 42 +++-- .../hbase/util/OHBufferedMutatorImpl.java | 86 ++++------ .../oceanbase/hbase/OHConnectionTest.java | 160 +++++++++++++----- 3 files changed, 172 insertions(+), 116 deletions(-) diff --git a/src/main/java/com/alipay/oceanbase/hbase/OHTable.java b/src/main/java/com/alipay/oceanbase/hbase/OHTable.java index 84fb4113..4980b390 100644 --- a/src/main/java/com/alipay/oceanbase/hbase/OHTable.java +++ b/src/main/java/com/alipay/oceanbase/hbase/OHTable.java @@ -520,8 +520,10 @@ public Result call() throws IOException { .entrySet()) { family = entry.getKey(); obTableQuery = buildObTableQuery(get, entry.getValue()); - request = buildObTableQueryRequest(obTableQuery, - getTargetTableName(tableNameString, Bytes.toString(family))); + request = buildObTableQueryRequest( + obTableQuery, + getTargetTableName(tableNameString, Bytes.toString(family), + configuration)); clientQueryStreamResult = (ObTableClientQueryStreamResult) obTableClient .execute(request); getKeyValueFromResult(clientQueryStreamResult, keyValueList, false, @@ -598,8 +600,10 @@ public ResultScanner call() throws IOException { scan.getMaxVersions(), entry.getValue()); obTableQuery = buildObTableQuery(filter, scan); - request = buildObTableQueryAsyncRequest(obTableQuery, - getTargetTableName(tableNameString, Bytes.toString(family))); + request = buildObTableQueryAsyncRequest( + obTableQuery, + getTargetTableName(tableNameString, Bytes.toString(family), + configuration)); clientQueryAsyncStreamResult = (ObTableClientQueryAsyncStreamResult) obTableClient .execute(request); return new ClientStreamScanner(clientQueryAsyncStreamResult, @@ -749,7 +753,7 @@ private void innerDelete(Delete delete) throws IOException { .next(); BatchOperation batch = buildBatchOperation( - getTargetTableName(tableNameString, Bytes.toString(entry.getKey())), + getTargetTableName(tableNameString, Bytes.toString(entry.getKey()), configuration), entry.getValue(), false, null); BatchOperationResult results = batch.execute(); @@ -851,7 +855,7 @@ private boolean checkAndMutation(byte[] row, byte[] family, byte[] qualifier, Co ObTableBatchOperation batch = buildObTableBatchOperation(keyValueList, false, null); ObTableQueryAndMutateRequest request = buildObTableQueryAndMutateRequest(obTableQuery, - batch, getTargetTableName(tableNameString, Bytes.toString(family))); + batch, getTargetTableName(tableNameString, Bytes.toString(family), configuration)); ObTableQueryAndMutateResult result = (ObTableQueryAndMutateResult) obTableClient .execute(request); return result.getAffectedRows() > 0; @@ -889,7 +893,8 @@ public Result append(Append append) throws IOException { queryAndMutate.setTableQuery(obTableQuery); queryAndMutate.setMutations(batchOperation); ObTableQueryAndMutateRequest request = buildObTableQueryAndMutateRequest(obTableQuery, - batchOperation, getTargetTableName(tableNameString, Bytes.toString(f))); + batchOperation, + getTargetTableName(tableNameString, Bytes.toString(f), configuration)); request.setReturningAffectedEntity(true); ObTableQueryAndMutateResult result = (ObTableQueryAndMutateResult) obTableClient .execute(request); @@ -949,7 +954,7 @@ public Result increment(Increment increment) throws IOException { queryAndMutate.setTableQuery(obTableQuery); ObTableQueryAndMutateRequest request = buildObTableQueryAndMutateRequest(obTableQuery, - batch, getTargetTableName(tableNameString, Bytes.toString(f))); + batch, getTargetTableName(tableNameString, Bytes.toString(f), configuration)); request.setReturningAffectedEntity(true); ObTableQueryAndMutateResult result = (ObTableQueryAndMutateResult) obTableClient .execute(request); @@ -999,7 +1004,7 @@ public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, lo queryAndMutate.setTableQuery(obTableQuery); ObTableQueryAndMutateRequest request = buildObTableQueryAndMutateRequest(obTableQuery, - batch, getTargetTableName(tableNameString, Bytes.toString(family))); + batch, getTargetTableName(tableNameString, Bytes.toString(family), configuration)); request.setReturningAffectedEntity(true); ObTableQueryAndMutateResult result = (ObTableQueryAndMutateResult) obTableClient .execute(request); @@ -1063,7 +1068,7 @@ public void flushCommits() throws IOException { .getSecond().size()); try { String targetTableName = getTargetTableName(this.tableNameString, - entry.getKey()); + entry.getKey(), configuration); BatchOperation batch = buildBatchOperation(targetTableName, entry .getValue().getSecond(), false, null); @@ -1308,21 +1313,23 @@ T executeServerCallable(final ServerCallable serverCallable) throws IOExc } } - private String getTargetTableName(String tableNameString, String familyString) { + public static String getTargetTableName(String tableNameString, String familyString, + Configuration conf) { checkArgument(tableNameString != null, "tableNameString is null"); checkArgument(familyString != null, "familyString is null"); - if (configuration.getBoolean(HBASE_HTABLE_TEST_LOAD_ENABLE, false)) { - return getTestLoadTargetTableName(tableNameString, familyString); + if (conf.getBoolean(HBASE_HTABLE_TEST_LOAD_ENABLE, false)) { + return getTestLoadTargetTableName(tableNameString, familyString, conf); } return getNormalTargetTableName(tableNameString, familyString); } - private String getNormalTargetTableName(String tableNameString, String familyString) { + private static String getNormalTargetTableName(String tableNameString, String familyString) { return tableNameString + "$" + familyString; } - private String getTestLoadTargetTableName(String tableNameString, String familyString) { - String suffix = configuration.get(HBASE_HTABLE_TEST_LOAD_SUFFIX, + private static String getTestLoadTargetTableName(String tableNameString, String familyString, + Configuration conf) { + String suffix = conf.get(HBASE_HTABLE_TEST_LOAD_SUFFIX, DEFAULT_HBASE_HTABLE_TEST_LOAD_SUFFIX); return tableNameString + suffix + "$" + familyString; } @@ -1639,7 +1646,8 @@ public void refreshTableEntry(String familyString, boolean hasTestLoad) throws E getNormalTargetTableName(tableNameString, familyString), true, true); if (hasTestLoad) { this.obTableClient.getOrRefreshTableEntry( - getTestLoadTargetTableName(tableNameString, familyString), true, true); + getTestLoadTargetTableName(tableNameString, familyString, configuration), true, + true); } } diff --git a/src/main/java/com/alipay/oceanbase/hbase/util/OHBufferedMutatorImpl.java b/src/main/java/com/alipay/oceanbase/hbase/util/OHBufferedMutatorImpl.java index 1c5b6cb4..958f2d99 100644 --- a/src/main/java/com/alipay/oceanbase/hbase/util/OHBufferedMutatorImpl.java +++ b/src/main/java/com/alipay/oceanbase/hbase/util/OHBufferedMutatorImpl.java @@ -20,6 +20,7 @@ import com.alipay.oceanbase.hbase.OHTable; import com.alipay.oceanbase.hbase.exception.FeatureNotSupportedException; import com.alipay.oceanbase.rpc.ObTableClient; +import com.alipay.oceanbase.rpc.exception.ObTableEntryRefreshException; import com.alipay.oceanbase.rpc.exception.ObTableUnexpectedException; import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.*; import com.google.common.annotations.VisibleForTesting; @@ -42,6 +43,9 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import static com.alipay.oceanbase.hbase.constants.OHConstants.*; import static com.alipay.oceanbase.hbase.constants.OHConstants.DEFAULT_HBASE_HTABLE_TEST_LOAD_SUFFIX; @@ -50,6 +54,7 @@ import static com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableOperationType.*; import static com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableOperationType.DEL; import static com.alipay.oceanbase.rpc.util.TableClientLoggerFactory.LCD; +import static com.alipay.oceanbase.rpc.util.TableClientLoggerFactory.RUNTIME; import static org.apache.commons.lang.StringUtils.isBlank; @InterfaceAudience.Private @@ -67,12 +72,12 @@ public class OHBufferedMutatorImpl implements BufferedMutator { final ConcurrentLinkedQueue asyncWriteBuffer = new ConcurrentLinkedQueue(); AtomicLong currentAsyncBufferSize = new AtomicLong(0); + private AtomicReference> type = new AtomicReference<>(null); private final long writeBufferSize; private final int maxKeyValueSize; private boolean closed = false; private final ExecutorService pool; private final int rpcTimeout; - private final int operationTimeout; public OHBufferedMutatorImpl(OHConnectionImpl ohConnection, BufferedMutatorParams params) throws IOException { @@ -97,7 +102,6 @@ public OHBufferedMutatorImpl(OHConnectionImpl ohConnection, BufferedMutatorParam .getMaxKeyValueSize() : connectionConfig.getMaxKeyValueSize(); this.rpcTimeout = connectionConfig.getRpcTimeout(); this.obTableClient.setRpcExecuteTimeout(rpcTimeout); - this.operationTimeout = connectionConfig.getOperationTimeout(); } @Override @@ -116,8 +120,7 @@ public Configuration getConfiguration() { * @param mutation - mutation operation */ @Override - public void mutate(Mutation mutation) throws InterruptedIOException, - RetriesExhaustedWithDetailsException { + public void mutate(Mutation mutation) throws IOException { mutate(Collections.singletonList(mutation)); } @@ -127,21 +130,28 @@ public void mutate(Mutation mutation) throws InterruptedIOException, * @param mutations - mutation operations */ @Override - public void mutate(List mutations) throws IllegalArgumentException, - InterruptedIOException, - RetriesExhaustedWithDetailsException { - // add the mutations into writeAsyncBuffer - // atomically add size of mutations into currentWriteBufferSize - // do the flush if currentWriteBufferSize > writeBufferSize + public void mutate(List mutations) throws IOException { if (closed) { throw new IllegalStateException("Cannot put when the BufferedMutator is closed."); } + if (mutations.isEmpty()) { + return; + } long toAddSize = 0; // check if every mutation's family is the same + // check if mutations are the same type for (Mutation m : mutations) { OHTable.checkFamilyViolation(m.getFamilyMap().keySet()); validateInsUpAndDelete(m); + Class curType = m.getClass(); + // set the type of this BufferedMutator + if (type.get() == null) { + type.compareAndSet(null, mutations.get(0).getClass()); + } + if (!type.get().equals(curType)) { + throw new IllegalArgumentException("Not support different type in one batch."); + } toAddSize += m.heapSize(); } @@ -164,20 +174,6 @@ private void validateInsUpAndDelete(Mutation mt) throws IllegalArgumentException } } - /** - * Check whether the mutations in this batch are the same type - * @param execBuffer - mutation operations - */ - private void checkAllOpsIsSameType(List execBuffer) { - Class type = execBuffer.get(0).getClass(); - for (Mutation m : execBuffer) { - Class curType = m.getClass(); - if (type != curType) { - throw new IllegalArgumentException("Not support different type in one batch."); - } - } - } - /** * Send the operations in the buffer to the servers. Does not wait for the server's answer. If * there is an error, either throw the error, or use the listener to deal with the error. @@ -185,9 +181,7 @@ private void checkAllOpsIsSameType(List execBuffer) { * @param flushAll - if true, sends all the writes and wait for all of them to finish before * returning. */ - private void asyncExecute(boolean flushAll) throws - InterruptedIOException, - RetriesExhaustedWithDetailsException { + private void asyncExecute(boolean flushAll) throws IOException { LinkedList execBuffer = new LinkedList<>(); ObTableBatchOperationRequest request = null; // namespace n1, n1:table_name @@ -212,36 +206,39 @@ private void asyncExecute(boolean flushAll) throws if (execBuffer.isEmpty()) { break; } - checkAllOpsIsSameType(execBuffer); // for now, operations' family is the same byte[] family = execBuffer.getFirst().getFamilyMap().firstKey(); ObTableBatchOperation batch = buildObTableBatchOperation(execBuffer); // table_name$cf_name - String targetTableName = getTargetTableName(tableNameString, Bytes.toString(family)); + String targetTableName = OHTable.getTargetTableName(tableNameString, Bytes.toString(family), conf); request = OHTable.buildObTableBatchOperationRequest(batch, targetTableName, pool); } catch (Exception ex) { LOGGER.error("Errors occur before mutation operation", ex); - throw ex; + throw new IllegalArgumentException("Errors occur before mutation operation", ex); } try { ObTableBatchOperationResult result = (ObTableBatchOperationResult) obTableClient.execute(request); } catch (Exception ex) { LOGGER.debug("Errors occur during mutation operation", ex); + Mutation m = null; try { // retry every single operation while (!execBuffer.isEmpty()) { // poll elements from execBuffer to recollect remaining operations - Mutation m = execBuffer.poll(); + m = execBuffer.poll(); byte[] family = m.getFamilyMap().firstKey(); ObTableBatchOperation batch = buildObTableBatchOperation(Collections.singletonList(m)); - String targetTableName = getTargetTableName(tableNameString, Bytes.toString(family)); + String targetTableName = OHTable.getTargetTableName(tableNameString, Bytes.toString(family), conf); request = OHTable.buildObTableBatchOperationRequest(batch, targetTableName, pool); ObTableBatchOperationResult result = (ObTableBatchOperationResult) obTableClient.execute(request); } } catch (Exception newEx) { + if (m != null) { + execBuffer.addFirst(m); + } // if retry fails, only recollect remaining operations while(!execBuffer.isEmpty()) { - Mutation m = execBuffer.poll(); + m = execBuffer.poll(); long size = m.heapSize(); asyncWriteBuffer.add(m); currentAsyncBufferSize.addAndGet(size); @@ -252,6 +249,10 @@ private void asyncExecute(boolean flushAll) throws } } catch (Exception ex) { LOGGER.error(LCD.convert("01-00026"), ex); + // if the cause is illegal argument, directly throw to user + if (ex instanceof IllegalArgumentException) { + throw (IllegalArgumentException) ex; + } // TODO: need to collect error information and actions during batch operations // TODO: maybe keep in ObTableBatchOperationResult List throwables = new ArrayList(); @@ -314,23 +315,4 @@ private ObTableBatchOperation buildObTableBatchOperation(List { + apMut.mutate(append); + }); + List mutations = new ArrayList<>(); // test Put Put put1 = new Put(Bytes.toBytes(key)); @@ -215,29 +228,31 @@ public void testBufferedMutatorWithFlush() throws Exception { put2.addColumn(Bytes.toBytes("family_group"), Bytes.toBytes(column1 + "1"), timestamp, Bytes.toBytes(value + "4")); mutations.add(put2); // test add Mutations with List - ohBufferMutator.mutate(mutations); - ohBufferMutator.flush(); + putBufferMutator.mutate(mutations); + putBufferMutator.flush(); Get get = new Get(toBytes(key)); Result r = hTable.get(get); Assert.assertEquals(2, r.raw().length); Delete del = new Delete(Bytes.toBytes(key)); + final BufferedMutator noCfMut = putBufferMutator; + // test mutation without setting family + Assert.assertThrows(FeatureNotSupportedException.class, () -> { + noCfMut.mutate(del); + }); del.deleteFamily(Bytes.toBytes("family_group")); + // test reuse different type bufferedMutator + final BufferedMutator difTypeMut = putBufferMutator; + Assert.assertThrows(IllegalArgumentException.class, () -> { + difTypeMut.mutate(del); + }); // test add Mutation directly - ohBufferMutator.mutate(del); - ohBufferMutator.flush(); + delBufferedMutator.mutate(del); + delBufferedMutator.flush(); r = hTable.get(get); Assert.assertEquals(0, r.raw().length); - - Append append = new Append(Bytes.toBytes(key)); - append.add("family_group".getBytes(), column1.getBytes(), toBytes("_suffix")); - // only support Put and Delete - final BufferedMutator apMut = ohBufferMutator; - Assert.assertThrows(IllegalArgumentException.class, () -> { - apMut.mutate(append); - }); } catch (Exception ex) { if (ex instanceof RetriesExhaustedWithDetailsException) { ((RetriesExhaustedWithDetailsException) ex).getCauses().get(0).printStackTrace(); @@ -246,8 +261,20 @@ public void testBufferedMutatorWithFlush() throws Exception { } Assert.assertTrue(false); } finally { - if (ohBufferMutator != null) { - ohBufferMutator.close(); + if (putBufferMutator != null ) { + putBufferMutator.close(); + // test flush after closed + putBufferMutator.flush(); + } + if (delBufferedMutator != null) { + delBufferedMutator.close(); + // test add mutations after closed + Delete delete = new Delete(Bytes.toBytes("putKey")); + delete.deleteFamily(Bytes.toBytes("family_group")); + final BufferedMutator closedMutator = delBufferedMutator; + Assert.assertThrows(IllegalStateException.class, () -> { + closedMutator.mutate(delete); + }); } } } @@ -266,20 +293,31 @@ PRIMARY KEY (`K`, `Q`, `T`) public void testBufferedMutatorUseNameSpaceWithFlush() throws Exception { Configuration conf = ObHTableTestUtil.newConfiguration(); conf.set("rs.list.acquire.read.timeout", "10000"); - BufferedMutator ohBufferMutator = null; + BufferedMutator putBufferMutator = null; + BufferedMutator delBufferedMutator = null; try { - // in "n1" database + // use n1 database TableName tableName = TableName.valueOf("n1:test"); connection = ConnectionFactory.createConnection(conf); hTable = connection.getTable(tableName); // use defualt params - ohBufferMutator = connection.getBufferedMutator(tableName); + putBufferMutator = connection.getBufferedMutator(tableName); + delBufferedMutator = connection.getBufferedMutator(tableName); String key = "putKey"; String column1 = "putColumn1"; String value = "value333444"; long timestamp = System.currentTimeMillis(); + // only support Put and Delete + // for other type of operations, BufferedMutator will not set its type for them + Append append = new Append(Bytes.toBytes(key)); + append.add("family_group".getBytes(), column1.getBytes(), toBytes("_suffix")); + final BufferedMutator apMut = putBufferMutator; + Assert.assertThrows(IllegalArgumentException.class, () -> { + apMut.mutate(append); + }); + List mutations = new ArrayList<>(); // test Put Put put1 = new Put(Bytes.toBytes(key)); @@ -289,37 +327,31 @@ public void testBufferedMutatorUseNameSpaceWithFlush() throws Exception { put2.addColumn(Bytes.toBytes("family_group"), Bytes.toBytes(column1 + "1"), timestamp, Bytes.toBytes(value + "4")); mutations.add(put2); // test add Mutations with List - ohBufferMutator.mutate(mutations); - ohBufferMutator.flush(); + putBufferMutator.mutate(mutations); + putBufferMutator.flush(); Get get = new Get(toBytes(key)); Result r = hTable.get(get); Assert.assertEquals(2, r.raw().length); - for (KeyValue keyValue : r.raw()) { - System.out.println("rowKey: " + new String(keyValue.getRow()) + " family :" - + new String(keyValue.getFamily()) + " columnQualifier:" - + new String(keyValue.getQualifier()) + " timestamp:" - + keyValue.getTimestamp() + " value:" - + new String(keyValue.getValue())); - } - // test Delete Delete del = new Delete(Bytes.toBytes(key)); + final BufferedMutator noCfMut = putBufferMutator; + // test mutation without setting family + Assert.assertThrows(FeatureNotSupportedException.class, () -> { + noCfMut.mutate(del); + }); del.deleteFamily(Bytes.toBytes("family_group")); + final BufferedMutator difTypeMut = putBufferMutator; + // test reuse different type bufferedMutator + Assert.assertThrows(IllegalArgumentException.class, () -> { + difTypeMut.mutate(del); + }); // test add Mutation directly - ohBufferMutator.mutate(del); - ohBufferMutator.flush(); + delBufferedMutator.mutate(del); + delBufferedMutator.flush(); r = hTable.get(get); Assert.assertEquals(0, r.raw().length); - - Append append = new Append(Bytes.toBytes(key)); - append.add("family_group".getBytes(), column1.getBytes(), toBytes("_suffix")); - // only support Put and Delete - final BufferedMutator apMut = ohBufferMutator; - Assert.assertThrows(IllegalArgumentException.class, () -> { - apMut.mutate(append); - }); } catch (Exception ex) { if (ex instanceof RetriesExhaustedWithDetailsException) { ((RetriesExhaustedWithDetailsException) ex).getCauses().get(0).printStackTrace(); @@ -328,8 +360,20 @@ public void testBufferedMutatorUseNameSpaceWithFlush() throws Exception { } Assert.assertTrue(false); } finally { - if (ohBufferMutator != null) { - ohBufferMutator.close(); + if (putBufferMutator != null ) { + putBufferMutator.close(); + // test flush after closed + putBufferMutator.flush(); + } + if (delBufferedMutator != null) { + delBufferedMutator.close(); + // test add mutations after closed + Delete delete = new Delete(Bytes.toBytes("putKey")); + delete.deleteFamily(Bytes.toBytes("family_group")); + final BufferedMutator closedMutator = delBufferedMutator; + Assert.assertThrows(IllegalStateException.class, () -> { + closedMutator.mutate(delete); + }); } } } @@ -348,7 +392,7 @@ PRIMARY KEY (`K`, `Q`, `T`) public void testBufferedMutatorWithAutoFlush() throws Exception { Configuration conf = ObHTableTestUtil.newConfiguration(); conf.set("rs.list.acquire.read.timeout", "10000"); - BufferedMutator ohBufferMutator = null; + BufferedMutator putBufferMutator = null; BufferedMutatorParams params = null; long bufferSize = 45000L; int count = 0; @@ -359,7 +403,7 @@ public void testBufferedMutatorWithAutoFlush() throws Exception { // set params params = new BufferedMutatorParams(tableName); params.writeBufferSize(bufferSize); - ohBufferMutator = connection.getBufferedMutator(params); + putBufferMutator = connection.getBufferedMutator(params); String key = "putKey"; String column1 = "putColumn1"; @@ -375,7 +419,7 @@ public void testBufferedMutatorWithAutoFlush() throws Exception { timestamp, Bytes.toBytes(value + "_" + i + "_" + j)); mutations.add(put); } - ohBufferMutator.mutate(mutations); + putBufferMutator.mutate(mutations); } } catch (Exception ex) { if (ex instanceof RetriesExhaustedWithDetailsException) { @@ -385,8 +429,8 @@ public void testBufferedMutatorWithAutoFlush() throws Exception { } Assert.assertTrue(false); } finally { - if (ohBufferMutator != null) { - ohBufferMutator.close(); + if (putBufferMutator != null) { + putBufferMutator.close(); Get get = new Get(toBytes("putKey")); Result r = hTable.get(get); for (KeyValue keyValue : r.raw()) { @@ -396,10 +440,18 @@ public void testBufferedMutatorWithAutoFlush() throws Exception { Delete delete = new Delete(toBytes("putKey")); delete.deleteFamily(toBytes("family_group")); hTable.delete(delete); - r = hTable.get(get); Assert.assertEquals(0, r.raw().length); + + // test add mutations after closed + final BufferedMutator closedMutator = putBufferMutator; + Assert.assertThrows(IllegalStateException.class, () -> { + closedMutator.mutate(delete); + }); + // test flush after closed + putBufferMutator.flush(); } + if (params != null) { if (params.getPool() != null) { Assert.assertTrue(params.getPool().isShutdown()); @@ -480,6 +532,13 @@ public void testBufferedMutatorWithUserPool() throws Exception { r = hTable.get(get); Assert.assertEquals(0, r.raw().length); + // test add mutations after closed + final BufferedMutator closedMutator = ohBufferMutator; + Assert.assertThrows(IllegalStateException.class, () -> { + closedMutator.mutate(delete); + }); + // test flush after closed + ohBufferMutator.flush(); } if (params != null) { if (params.getPool() != null) { @@ -588,6 +647,13 @@ public void testBufferedMutatorConcurrent() throws Exception { r = hTable.get(get); Assert.assertEquals(0, r.raw().length); + // test add mutations after closed + final BufferedMutator closedMutator = ohBufferMutator; + Assert.assertThrows(IllegalStateException.class, () -> { + closedMutator.mutate(delete); + }); + // test flush after closed + ohBufferMutator.flush(); } if (params != null) { if (params.getPool() != null) {