diff --git a/src/main/java/com/alipay/oceanbase/hbase/OHTable.java b/src/main/java/com/alipay/oceanbase/hbase/OHTable.java index 77122af9..4980b390 100644 --- a/src/main/java/com/alipay/oceanbase/hbase/OHTable.java +++ b/src/main/java/com/alipay/oceanbase/hbase/OHTable.java @@ -520,8 +520,10 @@ public Result call() throws IOException { .entrySet()) { family = entry.getKey(); obTableQuery = buildObTableQuery(get, entry.getValue()); - request = buildObTableQueryRequest(obTableQuery, - getTargetTableName(tableNameString, Bytes.toString(family))); + request = buildObTableQueryRequest( + obTableQuery, + getTargetTableName(tableNameString, Bytes.toString(family), + configuration)); clientQueryStreamResult = (ObTableClientQueryStreamResult) obTableClient .execute(request); getKeyValueFromResult(clientQueryStreamResult, keyValueList, false, @@ -598,8 +600,10 @@ public ResultScanner call() throws IOException { scan.getMaxVersions(), entry.getValue()); obTableQuery = buildObTableQuery(filter, scan); - request = buildObTableQueryAsyncRequest(obTableQuery, - getTargetTableName(tableNameString, Bytes.toString(family))); + request = buildObTableQueryAsyncRequest( + obTableQuery, + getTargetTableName(tableNameString, Bytes.toString(family), + configuration)); clientQueryAsyncStreamResult = (ObTableClientQueryAsyncStreamResult) obTableClient .execute(request); return new ClientStreamScanner(clientQueryAsyncStreamResult, @@ -749,7 +753,7 @@ private void innerDelete(Delete delete) throws IOException { .next(); BatchOperation batch = buildBatchOperation( - getTargetTableName(tableNameString, Bytes.toString(entry.getKey())), + getTargetTableName(tableNameString, Bytes.toString(entry.getKey()), configuration), entry.getValue(), false, null); BatchOperationResult results = batch.execute(); @@ -851,7 +855,7 @@ private boolean checkAndMutation(byte[] row, byte[] family, byte[] qualifier, Co ObTableBatchOperation batch = buildObTableBatchOperation(keyValueList, false, null); ObTableQueryAndMutateRequest request = buildObTableQueryAndMutateRequest(obTableQuery, - batch, getTargetTableName(tableNameString, Bytes.toString(family))); + batch, getTargetTableName(tableNameString, Bytes.toString(family), configuration)); ObTableQueryAndMutateResult result = (ObTableQueryAndMutateResult) obTableClient .execute(request); return result.getAffectedRows() > 0; @@ -889,7 +893,8 @@ public Result append(Append append) throws IOException { queryAndMutate.setTableQuery(obTableQuery); queryAndMutate.setMutations(batchOperation); ObTableQueryAndMutateRequest request = buildObTableQueryAndMutateRequest(obTableQuery, - batchOperation, getTargetTableName(tableNameString, Bytes.toString(f))); + batchOperation, + getTargetTableName(tableNameString, Bytes.toString(f), configuration)); request.setReturningAffectedEntity(true); ObTableQueryAndMutateResult result = (ObTableQueryAndMutateResult) obTableClient .execute(request); @@ -949,7 +954,7 @@ public Result increment(Increment increment) throws IOException { queryAndMutate.setTableQuery(obTableQuery); ObTableQueryAndMutateRequest request = buildObTableQueryAndMutateRequest(obTableQuery, - batch, getTargetTableName(tableNameString, Bytes.toString(f))); + batch, getTargetTableName(tableNameString, Bytes.toString(f), configuration)); request.setReturningAffectedEntity(true); ObTableQueryAndMutateResult result = (ObTableQueryAndMutateResult) obTableClient .execute(request); @@ -999,7 +1004,7 @@ public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, lo queryAndMutate.setTableQuery(obTableQuery); ObTableQueryAndMutateRequest request = buildObTableQueryAndMutateRequest(obTableQuery, - batch, getTargetTableName(tableNameString, Bytes.toString(family))); + batch, getTargetTableName(tableNameString, Bytes.toString(family), configuration)); request.setReturningAffectedEntity(true); ObTableQueryAndMutateResult result = (ObTableQueryAndMutateResult) obTableClient .execute(request); @@ -1063,7 +1068,7 @@ public void flushCommits() throws IOException { .getSecond().size()); try { String targetTableName = getTargetTableName(this.tableNameString, - entry.getKey()); + entry.getKey(), configuration); BatchOperation batch = buildBatchOperation(targetTableName, entry .getValue().getSecond(), false, null); @@ -1308,21 +1313,23 @@ T executeServerCallable(final ServerCallable serverCallable) throws IOExc } } - private String getTargetTableName(String tableNameString, String familyString) { + public static String getTargetTableName(String tableNameString, String familyString, + Configuration conf) { checkArgument(tableNameString != null, "tableNameString is null"); checkArgument(familyString != null, "familyString is null"); - if (configuration.getBoolean(HBASE_HTABLE_TEST_LOAD_ENABLE, false)) { - return getTestLoadTargetTableName(tableNameString, familyString); + if (conf.getBoolean(HBASE_HTABLE_TEST_LOAD_ENABLE, false)) { + return getTestLoadTargetTableName(tableNameString, familyString, conf); } return getNormalTargetTableName(tableNameString, familyString); } - private String getNormalTargetTableName(String tableNameString, String familyString) { + private static String getNormalTargetTableName(String tableNameString, String familyString) { return tableNameString + "$" + familyString; } - private String getTestLoadTargetTableName(String tableNameString, String familyString) { - String suffix = configuration.get(HBASE_HTABLE_TEST_LOAD_SUFFIX, + private static String getTestLoadTargetTableName(String tableNameString, String familyString, + Configuration conf) { + String suffix = conf.get(HBASE_HTABLE_TEST_LOAD_SUFFIX, DEFAULT_HBASE_HTABLE_TEST_LOAD_SUFFIX); return tableNameString + suffix + "$" + familyString; } @@ -1477,9 +1484,9 @@ private ObTableQuery buildObTableQuery(final Get get, Collection columnQ return obTableQuery; } - private ObTableBatchOperation buildObTableBatchOperation(List keyValueList, - boolean putToAppend, - List qualifiers) { + public static ObTableBatchOperation buildObTableBatchOperation(List keyValueList, + boolean putToAppend, + List qualifiers) { ObTableBatchOperation batch = new ObTableBatchOperation(); for (KeyValue kv : keyValueList) { if (qualifiers != null) { @@ -1536,7 +1543,7 @@ private BatchOperation buildBatchOperation(String tableName, List keyV return batch; } - private ObTableOperation buildObTableOperation(KeyValue kv, boolean putToAppend) { + public static ObTableOperation buildObTableOperation(KeyValue kv, boolean putToAppend) { KeyValue.Type kvType = KeyValue.Type.codeToType(kv.getType()); switch (kvType) { case Put: @@ -1585,13 +1592,15 @@ private ObTableQueryAsyncRequest buildObTableQueryAsyncRequest(ObTableQuery obTa return asyncRequest; } - private ObTableBatchOperationRequest buildObTableBatchOperationRequest(ObTableBatchOperation obTableBatchOperation, - String targetTableName) { + public static ObTableBatchOperationRequest buildObTableBatchOperationRequest(ObTableBatchOperation obTableBatchOperation, + String targetTableName, + ExecutorService pool) { ObTableBatchOperationRequest request = new ObTableBatchOperationRequest(); request.setTableName(targetTableName); request.setReturningAffectedRows(true); request.setEntityType(ObTableEntityType.HKV); request.setBatchOperation(obTableBatchOperation); + request.setPool(pool); return request; } @@ -1609,7 +1618,7 @@ private ObTableQueryAndMutateRequest buildObTableQueryAndMutateRequest(ObTableQu return request; } - private void checkFamilyViolation(Collection families) { + public static void checkFamilyViolation(Collection families) { if (families == null || families.size() == 0) { throw new FeatureNotSupportedException("family is empty."); } @@ -1637,7 +1646,8 @@ public void refreshTableEntry(String familyString, boolean hasTestLoad) throws E getNormalTargetTableName(tableNameString, familyString), true, true); if (hasTestLoad) { this.obTableClient.getOrRefreshTableEntry( - getTestLoadTargetTableName(tableNameString, familyString), true, true); + getTestLoadTargetTableName(tableNameString, familyString, configuration), true, + true); } } diff --git a/src/main/java/com/alipay/oceanbase/hbase/util/OHBufferedMutatorImpl.java b/src/main/java/com/alipay/oceanbase/hbase/util/OHBufferedMutatorImpl.java index 83c63c7a..958f2d99 100644 --- a/src/main/java/com/alipay/oceanbase/hbase/util/OHBufferedMutatorImpl.java +++ b/src/main/java/com/alipay/oceanbase/hbase/util/OHBufferedMutatorImpl.java @@ -17,53 +17,302 @@ package com.alipay.oceanbase.hbase.util; +import com.alipay.oceanbase.hbase.OHTable; +import com.alipay.oceanbase.hbase.exception.FeatureNotSupportedException; +import com.alipay.oceanbase.rpc.ObTableClient; +import com.alipay.oceanbase.rpc.exception.ObTableEntryRefreshException; +import com.alipay.oceanbase.rpc.exception.ObTableUnexpectedException; +import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.*; +import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.BufferedMutator; -import org.apache.hadoop.hbase.client.BufferedMutatorParams; -import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.*; +import org.apache.hadoop.hbase.util.Bytes; +import org.slf4j.Logger; +import sun.awt.image.ImageWatched; +import javax.ws.rs.PUT; import java.io.IOException; -import java.util.List; +import java.io.InterruptedIOException; +import java.rmi.UnexpectedException; +import java.util.*; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import static com.alipay.oceanbase.hbase.constants.OHConstants.*; +import static com.alipay.oceanbase.hbase.constants.OHConstants.DEFAULT_HBASE_HTABLE_TEST_LOAD_SUFFIX; +import static com.alipay.oceanbase.hbase.util.Preconditions.checkArgument; +import static com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableOperation.getInstance; +import static com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableOperationType.*; +import static com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableOperationType.DEL; +import static com.alipay.oceanbase.rpc.util.TableClientLoggerFactory.LCD; +import static com.alipay.oceanbase.rpc.util.TableClientLoggerFactory.RUNTIME; +import static org.apache.commons.lang.StringUtils.isBlank; @InterfaceAudience.Private public class OHBufferedMutatorImpl implements BufferedMutator { - public OHBufferedMutatorImpl(OHConnectionImpl ohConnection, BufferedMutatorParams params) { + private static final Logger LOGGER = TableHBaseLoggerFactory + .getLogger(OHBufferedMutatorImpl.class); + + private final ExceptionListener listener; + + protected final ObTableClient obTableClient; + private final TableName tableName; + private volatile Configuration conf; + private final OHConnectionConfiguration connectionConfig; + + final ConcurrentLinkedQueue asyncWriteBuffer = new ConcurrentLinkedQueue(); + AtomicLong currentAsyncBufferSize = new AtomicLong(0); + + private AtomicReference> type = new AtomicReference<>(null); + private final long writeBufferSize; + private final int maxKeyValueSize; + private boolean closed = false; + private final ExecutorService pool; + private final int rpcTimeout; + + public OHBufferedMutatorImpl(OHConnectionImpl ohConnection, BufferedMutatorParams params) + throws IOException { + if (ohConnection == null || ohConnection.isClosed()) { + throw new IllegalArgumentException("Connection is null or closed."); + } + // create a ObTableClient to do rpc operations + this.obTableClient = ObTableClientManager.getOrCreateObTableClient(ohConnection + .getOHConnectionConfiguration()); + + // init params in OHBufferedMutatorImpl: + // TableName + pool + Configuration + listener + writeBufferSize + maxKeyValueSize + rpcTimeout + operationTimeout + this.tableName = params.getTableName(); + this.conf = ohConnection.getConfiguration(); + this.connectionConfig = ohConnection.getOHConnectionConfiguration(); + this.listener = params.getListener(); + this.pool = params.getPool(); + + this.writeBufferSize = params.getWriteBufferSize() != OHConnectionImpl.BUFFERED_PARAM_UNSET ? params + .getWriteBufferSize() : connectionConfig.getWriteBufferSize(); + this.maxKeyValueSize = params.getMaxKeyValueSize() != OHConnectionImpl.BUFFERED_PARAM_UNSET ? params + .getMaxKeyValueSize() : connectionConfig.getMaxKeyValueSize(); + this.rpcTimeout = connectionConfig.getRpcTimeout(); + this.obTableClient.setRpcExecuteTimeout(rpcTimeout); } @Override public TableName getName() { - return null; + return this.tableName; } @Override public Configuration getConfiguration() { - return null; + return this.conf; } + /** + * Add the mutation into asyncWriteBuffer + * + * @param mutation - mutation operation + */ @Override public void mutate(Mutation mutation) throws IOException { - + mutate(Collections.singletonList(mutation)); } + /** + * Add all mutations in List into asyncWriteBuffer + * + * @param mutations - mutation operations + */ @Override - public void mutate(List list) throws IOException { + public void mutate(List mutations) throws IOException { + if (closed) { + throw new IllegalStateException("Cannot put when the BufferedMutator is closed."); + } + if (mutations.isEmpty()) { + return; + } + + long toAddSize = 0; + // check if every mutation's family is the same + // check if mutations are the same type + for (Mutation m : mutations) { + OHTable.checkFamilyViolation(m.getFamilyMap().keySet()); + validateInsUpAndDelete(m); + Class curType = m.getClass(); + // set the type of this BufferedMutator + if (type.get() == null) { + type.compareAndSet(null, mutations.get(0).getClass()); + } + if (!type.get().equals(curType)) { + throw new IllegalArgumentException("Not support different type in one batch."); + } + toAddSize += m.heapSize(); + } + currentAsyncBufferSize.addAndGet(toAddSize); + asyncWriteBuffer.addAll(mutations); + + asyncExecute(false); + } + + /** + * Check whether the mutation is Put or Delete in 1.x + * @param mt - mutation operation + */ + private void validateInsUpAndDelete(Mutation mt) throws IllegalArgumentException { + if (!(mt instanceof Put) && !(mt instanceof Delete)) { + throw new IllegalArgumentException("Only support for Put and Delete for now."); + } + if (mt instanceof Put) { + HTable.validatePut((Put) mt, maxKeyValueSize); + } + } + + /** + * Send the operations in the buffer to the servers. Does not wait for the server's answer. If + * there is an error, either throw the error, or use the listener to deal with the error. + * + * @param flushAll - if true, sends all the writes and wait for all of them to finish before + * returning. + */ + private void asyncExecute(boolean flushAll) throws IOException { + LinkedList execBuffer = new LinkedList<>(); + ObTableBatchOperationRequest request = null; + // namespace n1, n1:table_name + // namespace default, table_name + String tableNameString = tableName.getNameAsString(); + try { + while (true) { + try{ + if (!flushAll || asyncWriteBuffer.isEmpty()) { + if (currentAsyncBufferSize.get() <= writeBufferSize) { + break; + } + } + Mutation m; + while ((m = asyncWriteBuffer.poll()) != null) { + execBuffer.add(m); + long size = m.heapSize(); + currentAsyncBufferSize.addAndGet(-size); + } + // in concurrent situation, asyncWriteBuffer may be empty here + // for other threads flush all buffer + if (execBuffer.isEmpty()) { + break; + } + // for now, operations' family is the same + byte[] family = execBuffer.getFirst().getFamilyMap().firstKey(); + ObTableBatchOperation batch = buildObTableBatchOperation(execBuffer); + // table_name$cf_name + String targetTableName = OHTable.getTargetTableName(tableNameString, Bytes.toString(family), conf); + request = OHTable.buildObTableBatchOperationRequest(batch, targetTableName, pool); + } catch (Exception ex) { + LOGGER.error("Errors occur before mutation operation", ex); + throw new IllegalArgumentException("Errors occur before mutation operation", ex); + } + try { + ObTableBatchOperationResult result = (ObTableBatchOperationResult) obTableClient.execute(request); + } catch (Exception ex) { + LOGGER.debug("Errors occur during mutation operation", ex); + Mutation m = null; + try { + // retry every single operation + while (!execBuffer.isEmpty()) { + // poll elements from execBuffer to recollect remaining operations + m = execBuffer.poll(); + byte[] family = m.getFamilyMap().firstKey(); + ObTableBatchOperation batch = buildObTableBatchOperation(Collections.singletonList(m)); + String targetTableName = OHTable.getTargetTableName(tableNameString, Bytes.toString(family), conf); + request = OHTable.buildObTableBatchOperationRequest(batch, targetTableName, pool); + ObTableBatchOperationResult result = (ObTableBatchOperationResult) obTableClient.execute(request); + } + } catch (Exception newEx) { + if (m != null) { + execBuffer.addFirst(m); + } + // if retry fails, only recollect remaining operations + while(!execBuffer.isEmpty()) { + m = execBuffer.poll(); + long size = m.heapSize(); + asyncWriteBuffer.add(m); + currentAsyncBufferSize.addAndGet(size); + } + throw newEx; + } + } + } + } catch (Exception ex) { + LOGGER.error(LCD.convert("01-00026"), ex); + // if the cause is illegal argument, directly throw to user + if (ex instanceof IllegalArgumentException) { + throw (IllegalArgumentException) ex; + } + // TODO: need to collect error information and actions during batch operations + // TODO: maybe keep in ObTableBatchOperationResult + List throwables = new ArrayList(); + List actions = new ArrayList(); + List addresses = new ArrayList(); + throwables.add(ex); + RetriesExhaustedWithDetailsException error = new RetriesExhaustedWithDetailsException( + new ArrayList(throwables), + new ArrayList(actions), new ArrayList(addresses)); + if (listener == null) { + throw error; + } else { + listener.onException(error, this); + } + } } @Override public void close() throws IOException { - + if (closed) { + return; + } + try { + asyncExecute(true); + } finally { + this.pool.shutdown(); + try { + if (!pool.awaitTermination(600, TimeUnit.SECONDS)) { + LOGGER + .warn("close() failed to terminate pool after 10 minutes. Abandoning pool."); + } + } catch (InterruptedException e) { + LOGGER.warn("waitForTermination interrupted"); + Thread.currentThread().interrupt(); + } + closed = true; + } } + /** + * Force to commit all operations + * do not care whether the pool is shut down or this BufferedMutator is closed + */ @Override public void flush() throws IOException { - + asyncExecute(true); } @Override public long getWriteBufferSize() { - return 0; + return this.writeBufferSize; + } + + private ObTableBatchOperation buildObTableBatchOperation(List execBuffer) { + List keyValueList = new LinkedList<>(); + for (Mutation mutation : execBuffer) { + for (Map.Entry> entry : mutation.getFamilyMap().entrySet()) { + keyValueList.addAll(entry.getValue()); + } + } + return OHTable.buildObTableBatchOperation(keyValueList, false, null); } } diff --git a/src/main/java/com/alipay/oceanbase/hbase/util/OHConnectionImpl.java b/src/main/java/com/alipay/oceanbase/hbase/util/OHConnectionImpl.java index 12d2abb2..4256f316 100644 --- a/src/main/java/com/alipay/oceanbase/hbase/util/OHConnectionImpl.java +++ b/src/main/java/com/alipay/oceanbase/hbase/util/OHConnectionImpl.java @@ -40,7 +40,7 @@ public class OHConnectionImpl implements Connection { private static final Marker FATAL = MarkerFactory.getMarker("FATAL"); - private static final int BUFFERED_PARAM_UNSET = -1; + public static final int BUFFERED_PARAM_UNSET = -1; private volatile boolean closed; private volatile boolean aborted; @@ -69,6 +69,10 @@ public Configuration getConfiguration() { return this.conf; } + public OHConnectionConfiguration getOHConnectionConfiguration() { + return this.connectionConfig; + } + private ExecutorService getBatchPool() { if (batchPool == null) { synchronized (this) { diff --git a/src/test/java/com/alipay/oceanbase/hbase/OHConnectionTest.java b/src/test/java/com/alipay/oceanbase/hbase/OHConnectionTest.java index e477ab34..b0c74d19 100644 --- a/src/test/java/com/alipay/oceanbase/hbase/OHConnectionTest.java +++ b/src/test/java/com/alipay/oceanbase/hbase/OHConnectionTest.java @@ -17,14 +17,21 @@ package com.alipay.oceanbase.hbase; +import com.alipay.oceanbase.hbase.exception.FeatureNotSupportedException; +import com.alipay.oceanbase.hbase.util.OHBufferedMutatorImpl; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.*; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Threads; import org.junit.Assert; import org.junit.Test; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.*; + import static org.apache.hadoop.hbase.util.Bytes.toBytes; public class OHConnectionTest { @@ -174,4 +181,485 @@ private void testBasic() throws Exception { } + /* + CREATE TABLEGROUP test SHARDING = 'ADAPTIVE'; + CREATE TABLE `test$family_group` ( + `K` varbinary(1024) NOT NULL, + `Q` varbinary(256) NOT NULL, + `T` bigint(20) NOT NULL, + `V` varbinary(1024) DEFAULT NULL, + PRIMARY KEY (`K`, `Q`, `T`) + ) TABLEGROUP = test; + */ + @Test + public void testBufferedMutatorWithFlush() throws Exception { + Configuration conf = ObHTableTestUtil.newConfiguration(); + conf.set("rs.list.acquire.read.timeout", "10000"); + BufferedMutator putBufferMutator = null; + BufferedMutator delBufferedMutator = null; + try { + TableName tableName = TableName.valueOf("test"); + connection = ConnectionFactory.createConnection(conf); + hTable = connection.getTable(tableName); + // use defualt params + putBufferMutator = connection.getBufferedMutator(tableName); + delBufferedMutator = connection.getBufferedMutator(tableName); + + String key = "putKey"; + String column1 = "putColumn1"; + String value = "value333444"; + long timestamp = System.currentTimeMillis(); + + // only support Put and Delete + // for other type of operations, BufferedMutator will not set its type for them + Append append = new Append(Bytes.toBytes(key)); + append.add("family_group".getBytes(), column1.getBytes(), toBytes("_suffix")); + final BufferedMutator apMut = putBufferMutator; + Assert.assertThrows(IllegalArgumentException.class, () -> { + apMut.mutate(append); + }); + + List mutations = new ArrayList<>(); + // test Put + Put put1 = new Put(Bytes.toBytes(key)); + put1.addColumn(Bytes.toBytes("family_group"), Bytes.toBytes(column1), timestamp, Bytes.toBytes(value)); + mutations.add(put1); + Put put2 = new Put(Bytes.toBytes(key)); + put2.addColumn(Bytes.toBytes("family_group"), Bytes.toBytes(column1 + "1"), timestamp, Bytes.toBytes(value + "4")); + mutations.add(put2); + // test add Mutations with List + putBufferMutator.mutate(mutations); + putBufferMutator.flush(); + + Get get = new Get(toBytes(key)); + Result r = hTable.get(get); + Assert.assertEquals(2, r.raw().length); + + Delete del = new Delete(Bytes.toBytes(key)); + final BufferedMutator noCfMut = putBufferMutator; + // test mutation without setting family + Assert.assertThrows(FeatureNotSupportedException.class, () -> { + noCfMut.mutate(del); + }); + del.deleteFamily(Bytes.toBytes("family_group")); + // test reuse different type bufferedMutator + final BufferedMutator difTypeMut = putBufferMutator; + Assert.assertThrows(IllegalArgumentException.class, () -> { + difTypeMut.mutate(del); + }); + // test add Mutation directly + delBufferedMutator.mutate(del); + delBufferedMutator.flush(); + + r = hTable.get(get); + Assert.assertEquals(0, r.raw().length); + } catch (Exception ex) { + if (ex instanceof RetriesExhaustedWithDetailsException) { + ((RetriesExhaustedWithDetailsException) ex).getCauses().get(0).printStackTrace(); + } else { + ex.printStackTrace(); + } + Assert.assertTrue(false); + } finally { + if (putBufferMutator != null ) { + putBufferMutator.close(); + // test flush after closed + putBufferMutator.flush(); + } + if (delBufferedMutator != null) { + delBufferedMutator.close(); + // test add mutations after closed + Delete delete = new Delete(Bytes.toBytes("putKey")); + delete.deleteFamily(Bytes.toBytes("family_group")); + final BufferedMutator closedMutator = delBufferedMutator; + Assert.assertThrows(IllegalStateException.class, () -> { + closedMutator.mutate(delete); + }); + } + } + } + + /* + CREATE TABLEGROUP `n1:test` SHARDING = 'ADAPTIVE'; + CREATE TABLE `n1:test$family_group` ( + `K` varbinary(1024) NOT NULL, + `Q` varbinary(256) NOT NULL, + `T` bigint(20) NOT NULL, + `V` varbinary(1024) DEFAULT NULL, + PRIMARY KEY (`K`, `Q`, `T`) + ) TABLEGROUP = `n1:test`; + */ + @Test + public void testBufferedMutatorUseNameSpaceWithFlush() throws Exception { + Configuration conf = ObHTableTestUtil.newConfiguration(); + conf.set("rs.list.acquire.read.timeout", "10000"); + BufferedMutator putBufferMutator = null; + BufferedMutator delBufferedMutator = null; + try { + // use n1 database + TableName tableName = TableName.valueOf("n1:test"); + connection = ConnectionFactory.createConnection(conf); + hTable = connection.getTable(tableName); + // use defualt params + putBufferMutator = connection.getBufferedMutator(tableName); + delBufferedMutator = connection.getBufferedMutator(tableName); + + String key = "putKey"; + String column1 = "putColumn1"; + String value = "value333444"; + long timestamp = System.currentTimeMillis(); + + // only support Put and Delete + // for other type of operations, BufferedMutator will not set its type for them + Append append = new Append(Bytes.toBytes(key)); + append.add("family_group".getBytes(), column1.getBytes(), toBytes("_suffix")); + final BufferedMutator apMut = putBufferMutator; + Assert.assertThrows(IllegalArgumentException.class, () -> { + apMut.mutate(append); + }); + + List mutations = new ArrayList<>(); + // test Put + Put put1 = new Put(Bytes.toBytes(key)); + put1.addColumn(Bytes.toBytes("family_group"), Bytes.toBytes(column1), timestamp, Bytes.toBytes(value)); + mutations.add(put1); + Put put2 = new Put(Bytes.toBytes(key)); + put2.addColumn(Bytes.toBytes("family_group"), Bytes.toBytes(column1 + "1"), timestamp, Bytes.toBytes(value + "4")); + mutations.add(put2); + // test add Mutations with List + putBufferMutator.mutate(mutations); + putBufferMutator.flush(); + + Get get = new Get(toBytes(key)); + Result r = hTable.get(get); + Assert.assertEquals(2, r.raw().length); + + Delete del = new Delete(Bytes.toBytes(key)); + final BufferedMutator noCfMut = putBufferMutator; + // test mutation without setting family + Assert.assertThrows(FeatureNotSupportedException.class, () -> { + noCfMut.mutate(del); + }); + del.deleteFamily(Bytes.toBytes("family_group")); + final BufferedMutator difTypeMut = putBufferMutator; + // test reuse different type bufferedMutator + Assert.assertThrows(IllegalArgumentException.class, () -> { + difTypeMut.mutate(del); + }); + // test add Mutation directly + delBufferedMutator.mutate(del); + delBufferedMutator.flush(); + + r = hTable.get(get); + Assert.assertEquals(0, r.raw().length); + } catch (Exception ex) { + if (ex instanceof RetriesExhaustedWithDetailsException) { + ((RetriesExhaustedWithDetailsException) ex).getCauses().get(0).printStackTrace(); + } else { + ex.printStackTrace(); + } + Assert.assertTrue(false); + } finally { + if (putBufferMutator != null ) { + putBufferMutator.close(); + // test flush after closed + putBufferMutator.flush(); + } + if (delBufferedMutator != null) { + delBufferedMutator.close(); + // test add mutations after closed + Delete delete = new Delete(Bytes.toBytes("putKey")); + delete.deleteFamily(Bytes.toBytes("family_group")); + final BufferedMutator closedMutator = delBufferedMutator; + Assert.assertThrows(IllegalStateException.class, () -> { + closedMutator.mutate(delete); + }); + } + } + } + + /* + CREATE TABLEGROUP test SHARDING = 'ADAPTIVE'; + CREATE TABLE `test$family_group` ( + `K` varbinary(1024) NOT NULL, + `Q` varbinary(256) NOT NULL, + `T` bigint(20) NOT NULL, + `V` varbinary(1024) DEFAULT NULL, + PRIMARY KEY (`K`, `Q`, `T`) + ) TABLEGROUP = test; + */ + @Test + public void testBufferedMutatorWithAutoFlush() throws Exception { + Configuration conf = ObHTableTestUtil.newConfiguration(); + conf.set("rs.list.acquire.read.timeout", "10000"); + BufferedMutator putBufferMutator = null; + BufferedMutatorParams params = null; + long bufferSize = 45000L; + int count = 0; + try { + TableName tableName = TableName.valueOf("test"); + connection = ConnectionFactory.createConnection(conf); + hTable = connection.getTable(tableName); + // set params + params = new BufferedMutatorParams(tableName); + params.writeBufferSize(bufferSize); + putBufferMutator = connection.getBufferedMutator(params); + + String key = "putKey"; + String column1 = "putColumn1"; + String value = "value333444"; + long timestamp = System.currentTimeMillis(); + + List mutations = new ArrayList<>(); + for (int i = 0; i < 50; ++i) { + mutations.clear(); + for (int j = 0; j < 4; ++j) { + Put put = new Put(Bytes.toBytes(key)); + put.addColumn(Bytes.toBytes("family_group"), Bytes.toBytes(column1 + "_" + i + "_" + j), + timestamp, Bytes.toBytes(value + "_" + i + "_" + j)); + mutations.add(put); + } + putBufferMutator.mutate(mutations); + } + } catch (Exception ex) { + if (ex instanceof RetriesExhaustedWithDetailsException) { + ((RetriesExhaustedWithDetailsException) ex).getCauses().get(0).printStackTrace(); + } else { + ex.printStackTrace(); + } + Assert.assertTrue(false); + } finally { + if (putBufferMutator != null) { + putBufferMutator.close(); + Get get = new Get(toBytes("putKey")); + Result r = hTable.get(get); + for (KeyValue keyValue : r.raw()) { + ++count; + } + Assert.assertEquals(200, count); + Delete delete = new Delete(toBytes("putKey")); + delete.deleteFamily(toBytes("family_group")); + hTable.delete(delete); + r = hTable.get(get); + Assert.assertEquals(0, r.raw().length); + + // test add mutations after closed + final BufferedMutator closedMutator = putBufferMutator; + Assert.assertThrows(IllegalStateException.class, () -> { + closedMutator.mutate(delete); + }); + // test flush after closed + putBufferMutator.flush(); + } + + if (params != null) { + if (params.getPool() != null) { + Assert.assertTrue(params.getPool().isShutdown()); + } + } + } + } + + /* + CREATE TABLEGROUP test SHARDING = 'ADAPTIVE'; + CREATE TABLE `test$family_group` ( + `K` varbinary(1024) NOT NULL, + `Q` varbinary(256) NOT NULL, + `T` bigint(20) NOT NULL, + `V` varbinary(1024) DEFAULT NULL, + PRIMARY KEY (`K`, `Q`, `T`) + ) TABLEGROUP = test; + */ + @Test + public void testBufferedMutatorWithUserPool() throws Exception { + Configuration conf = ObHTableTestUtil.newConfiguration(); + conf.set("rs.list.acquire.read.timeout", "10000"); + BufferedMutator ohBufferMutator = null; + BufferedMutatorParams params = null; + long bufferSize = 45000L; + int count = 0; + try { + TableName tableName = TableName.valueOf("test"); + connection = ConnectionFactory.createConnection(conf); + hTable = connection.getTable(tableName); + // set params + params = new BufferedMutatorParams(tableName); + params.writeBufferSize(bufferSize); + + // set thread pool + long keepAliveTime = conf.getLong("hbase.hconnection.threads.keepalivetime", 60); + ThreadPoolExecutor pool = new ThreadPoolExecutor(10, 256, keepAliveTime, TimeUnit.SECONDS, new SynchronousQueue(), Threads.newDaemonThreadFactory("htable")); + pool.allowCoreThreadTimeOut(true); + params.pool(pool); + + ohBufferMutator = connection.getBufferedMutator(params); + + String key = "putKey"; + String column1 = "putColumn1"; + String value = "value333444"; + long timestamp = System.currentTimeMillis(); + + List mutations = new ArrayList<>(); + for (int i = 0; i < 50; ++i) { + mutations.clear(); + for (int j = 0; j < 4; ++j) { + Put put = new Put(Bytes.toBytes(key)); + put.addColumn(Bytes.toBytes("family_group"), Bytes.toBytes(column1 + "_" + i + "_" + j), + timestamp, Bytes.toBytes(value + "_" + i + "_" + j)); + mutations.add(put); + } + ohBufferMutator.mutate(mutations); + } + } catch (Exception ex) { + if (ex instanceof RetriesExhaustedWithDetailsException) { + ((RetriesExhaustedWithDetailsException) ex).getCauses().get(0).printStackTrace(); + } else { + ex.printStackTrace(); + } + Assert.assertTrue(false); + } finally { + if (ohBufferMutator != null) { + ohBufferMutator.close(); + Get get = new Get(toBytes("putKey")); + Result r = hTable.get(get); + for (KeyValue keyValue : r.raw()) { + ++count; + } + Assert.assertEquals(200, count); + Delete delete = new Delete(toBytes("putKey")); + delete.deleteFamily(toBytes("family_group")); + hTable.delete(delete); + + r = hTable.get(get); + Assert.assertEquals(0, r.raw().length); + // test add mutations after closed + final BufferedMutator closedMutator = ohBufferMutator; + Assert.assertThrows(IllegalStateException.class, () -> { + closedMutator.mutate(delete); + }); + // test flush after closed + ohBufferMutator.flush(); + } + if (params != null) { + if (params.getPool() != null) { + Assert.assertTrue(params.getPool().isShutdown()); + } + } + } + } + + /* + CREATE TABLEGROUP test SHARDING = 'ADAPTIVE'; + CREATE TABLE `test$family_group` ( + `K` varbinary(1024) NOT NULL, + `Q` varbinary(256) NOT NULL, + `T` bigint(20) NOT NULL, + `V` varbinary(1024) DEFAULT NULL, + PRIMARY KEY (`K`, `Q`, `T`) + ) TABLEGROUP = test; + */ + @Test + public void testBufferedMutatorConcurrent() throws Exception { + Configuration conf = ObHTableTestUtil.newConfiguration(); + conf.set("rs.list.acquire.read.timeout", "10000"); + BufferedMutator ohBufferMutator = null; + BufferedMutatorParams params = null; + ExecutorService executorService = Executors.newFixedThreadPool(10); + long bufferSize = 45000L; + int count = 0; + try { + TableName tableName = TableName.valueOf("test"); + connection = ConnectionFactory.createConnection(conf); + hTable = connection.getTable(tableName); + // set params + params = new BufferedMutatorParams(tableName); + params.writeBufferSize(bufferSize); + + // set thread pool + long keepAliveTime = conf.getLong("hbase.hconnection.threads.keepalivetime", 60); + ThreadPoolExecutor pool = new ThreadPoolExecutor(10, 256, keepAliveTime, TimeUnit.SECONDS, new SynchronousQueue(), Threads.newDaemonThreadFactory("htable")); + pool.allowCoreThreadTimeOut(true); + params.pool(pool); + + ohBufferMutator = connection.getBufferedMutator(params); + + String key = "putKey"; + String column1 = "putColumn1"; + String value = "value333444"; + long timestamp = System.currentTimeMillis(); + + for (int i = 0; i < 50; ++i) { + final int taskId = i; + final BufferedMutator thrBufferMutator = ohBufferMutator; + executorService.submit(() -> { + List mutations = new ArrayList<>(); + for (int j = 0; j < 4; ++j) { + String thrKey = key; + String thrColumn = column1 + "_" + taskId + "_" + j; + String thrValue = value + "_" + taskId + "_" + j; + long thrTimestamp = timestamp; + + Put put = new Put(Bytes.toBytes(thrKey)); + put.addColumn(Bytes.toBytes("family_group"), Bytes.toBytes(thrColumn), + thrTimestamp, Bytes.toBytes(thrValue)); + mutations.add(put); + } + try { + thrBufferMutator.mutate(mutations); + } catch (Exception ex) { + if (ex instanceof RetriesExhaustedWithDetailsException) { + ((RetriesExhaustedWithDetailsException) ex).getCauses().get(0).printStackTrace(); + } else { + ex.printStackTrace(); + } + Assert.assertTrue(false); + } + }); + } + } catch (Exception ex) { + if (ex instanceof RetriesExhaustedWithDetailsException) { + ((RetriesExhaustedWithDetailsException) ex).getCauses().get(0).printStackTrace(); + } else { + ex.printStackTrace(); + } + Assert.assertTrue(false); + } finally { + executorService.shutdown(); + try { + if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) { + executorService.shutdownNow(); + } + } catch (InterruptedException e) { + executorService.shutdownNow(); + Thread.currentThread().interrupt(); + } + if (ohBufferMutator != null) { + ohBufferMutator.close(); + Get get = new Get(toBytes("putKey")); + Result r = hTable.get(get); + for (KeyValue keyValue : r.raw()) { + ++count; + } + Assert.assertEquals(200, count); + Delete delete = new Delete(toBytes("putKey")); + delete.deleteFamily(toBytes("family_group")); + hTable.delete(delete); + + r = hTable.get(get); + Assert.assertEquals(0, r.raw().length); + // test add mutations after closed + final BufferedMutator closedMutator = ohBufferMutator; + Assert.assertThrows(IllegalStateException.class, () -> { + closedMutator.mutate(delete); + }); + // test flush after closed + ohBufferMutator.flush(); + } + if (params != null) { + if (params.getPool() != null) { + Assert.assertTrue(params.getPool().isShutdown()); + } + } + } + } }