diff --git a/src/main/java/com/alipay/oceanbase/hbase/OHTable.java b/src/main/java/com/alipay/oceanbase/hbase/OHTable.java index f03ab9d5..083b8abd 100644 --- a/src/main/java/com/alipay/oceanbase/hbase/OHTable.java +++ b/src/main/java/com/alipay/oceanbase/hbase/OHTable.java @@ -36,7 +36,6 @@ import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.query.*; import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.syncquery.ObTableQueryAsyncRequest; import com.alipay.oceanbase.rpc.stream.ObTableClientQueryAsyncStreamResult; -import com.alipay.oceanbase.rpc.stream.ObTableClientQueryStreamResult; import com.alipay.oceanbase.rpc.table.ObHBaseParams; import com.alipay.oceanbase.rpc.table.ObKVParams; import com.alipay.sofa.common.thread.SofaThreadPoolExecutor; @@ -198,8 +197,13 @@ public OHTable(Configuration configuration, String tableName) throws IOException DEFAULT_HBASE_HTABLE_THREAD_KEEP_ALIVE_TIME); this.executePool = createDefaultThreadPoolExecutor(1, maxThreads, keepAliveTime); OHConnectionConfiguration ohConnectionConf = new OHConnectionConfiguration(configuration); + int numRetries = configuration.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, + HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); this.obTableClient = ObTableClientManager.getOrCreateObTableClient(setUserDefinedNamespace( this.tableNameString, ohConnectionConf)); + this.obTableClient.setRpcExecuteTimeout(ohConnectionConf.getRpcTimeout()); + this.obTableClient.setRuntimeRetryTimes(numRetries); + setOperationTimeout(ohConnectionConf.getOperationTimeout()); finishSetUp(); } @@ -246,8 +250,13 @@ public OHTable(Configuration configuration, final byte[] tableName, this.executePool = executePool; this.cleanupPoolOnClose = false; OHConnectionConfiguration ohConnectionConf = new OHConnectionConfiguration(configuration); + int numRetries = configuration.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, + HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); this.obTableClient = ObTableClientManager.getOrCreateObTableClient(setUserDefinedNamespace( this.tableNameString, ohConnectionConf)); + this.obTableClient.setRpcExecuteTimeout(ohConnectionConf.getRpcTimeout()); + this.obTableClient.setRuntimeRetryTimes(numRetries); + setOperationTimeout(ohConnectionConf.getOperationTimeout()); finishSetUp(); } @@ -311,8 +320,15 @@ public OHTable(TableName tableName, Connection connection, DEFAULT_HBASE_HTABLE_PUT_WRITE_BUFFER_CHECK); this.writeBufferSize = connectionConfig.getWriteBufferSize(); this.tableName = tableName.getName(); + int numRetries = configuration.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, + HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); this.obTableClient = ObTableClientManager.getOrCreateObTableClient(setUserDefinedNamespace( this.tableNameString, connectionConfig)); + this.obTableClient.setRpcExecuteTimeout(rpcTimeout); + this.obTableClient.setRuntimeRetryTimes(numRetries); + setOperationTimeout(operationTimeout); + + finishSetUp(); } /** @@ -833,7 +849,8 @@ private void validatePut(Put put) { throw new IllegalArgumentException("family is empty"); } for (Cell kv : entry.getValue()) { - if (kv.getRowLength() + kv.getValueLength() + kv.getQualifierLength() + Bytes.toBytes(kv.getTimestamp()).length + kv.getFamilyLength() > maxKeyValueSize) { + if (kv.getRowLength() + kv.getValueLength() + kv.getQualifierLength() + + Bytes.toBytes(kv.getTimestamp()).length + kv.getFamilyLength() > maxKeyValueSize) { throw new IllegalArgumentException("KeyValue size too large"); } } diff --git a/src/main/java/com/alipay/oceanbase/hbase/OHTablePool.java b/src/main/java/com/alipay/oceanbase/hbase/OHTablePool.java index 67183aa6..87ec2113 100644 --- a/src/main/java/com/alipay/oceanbase/hbase/OHTablePool.java +++ b/src/main/java/com/alipay/oceanbase/hbase/OHTablePool.java @@ -49,19 +49,19 @@ public class OHTablePool implements Closeable { - private String originTabelName = null; - private final PoolMap tables; - private final int maxSize; - private final PoolMap.PoolType poolType; - private final Configuration config; - private final OHTableFactory tableFactory; + private String originTabelName = null; + private final PoolMap tables; + private final int maxSize; + private final PoolMap.PoolType poolType; + private final Configuration config; + private final OHTableFactory tableFactory; // A map of table attributes used for the table created by this pool. The map // key is composed of Table_Name + SEPARATOR + Attribute_Name, and the value // is byte value of attribute. - private ConcurrentHashMap tableAttributes; + private ConcurrentHashMap tableAttributes; - private ConcurrentHashMap tableExtendAttributes; + private ConcurrentHashMap tableExtendAttributes; /** * Default Constructor. Default HBaseConfiguration and no limit on pool size. diff --git a/src/main/java/com/alipay/oceanbase/hbase/constants/OHConstants.java b/src/main/java/com/alipay/oceanbase/hbase/constants/OHConstants.java index e10ef0b1..b4e70677 100644 --- a/src/main/java/com/alipay/oceanbase/hbase/constants/OHConstants.java +++ b/src/main/java/com/alipay/oceanbase/hbase/constants/OHConstants.java @@ -157,7 +157,7 @@ public final class OHConstants { public static final long WRITE_BUFFER_SIZE_DEFAULT = 2097152L; - public static final int MAX_KEYVALUE_SIZE_DEFAULT = -1; + public static final int MAX_KEYVALUE_SIZE_DEFAULT = 10485760; public static final String SOCKET_TIMEOUT = "ipc.socket.timeout"; diff --git a/src/main/java/com/alipay/oceanbase/hbase/filter/HBaseFilterUtils.java b/src/main/java/com/alipay/oceanbase/hbase/filter/HBaseFilterUtils.java index 4a14021a..3b8dec3c 100644 --- a/src/main/java/com/alipay/oceanbase/hbase/filter/HBaseFilterUtils.java +++ b/src/main/java/com/alipay/oceanbase/hbase/filter/HBaseFilterUtils.java @@ -355,7 +355,8 @@ private static void toParseableByteArray(ByteArrayOutputStream byteStream, Times } // ColumnValueFilter('cf','q') - private static void toParseableByteArray(ByteArrayOutputStream byteStream, ColumnValueFilter filter) throws IOException { + private static void toParseableByteArray(ByteArrayOutputStream byteStream, + ColumnValueFilter filter) throws IOException { byteStream.write(filter.getClass().getSimpleName().getBytes()); byteStream.write('('); byteStream.write("'".getBytes()); diff --git a/src/main/java/com/alipay/oceanbase/hbase/util/OHBufferedMutatorImpl.java b/src/main/java/com/alipay/oceanbase/hbase/util/OHBufferedMutatorImpl.java index f07bae28..d6e63d40 100644 --- a/src/main/java/com/alipay/oceanbase/hbase/util/OHBufferedMutatorImpl.java +++ b/src/main/java/com/alipay/oceanbase/hbase/util/OHBufferedMutatorImpl.java @@ -18,17 +18,12 @@ package com.alipay.oceanbase.hbase.util; import com.alipay.oceanbase.hbase.OHTable; -import com.alipay.oceanbase.hbase.exception.FeatureNotSupportedException; -import com.alipay.oceanbase.rpc.ObTableClient; -import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.*; + import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.CellUtil; -import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.*; -import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; import org.slf4j.Logger; import java.io.IOException; @@ -36,55 +31,86 @@ import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; import static com.alipay.oceanbase.rpc.util.TableClientLoggerFactory.LCD; @InterfaceAudience.Private public class OHBufferedMutatorImpl implements BufferedMutator { - private static final Logger LOGGER = TableHBaseLoggerFactory - .getLogger(OHBufferedMutatorImpl.class); + private static final Logger LOGGER = TableHBaseLoggerFactory + .getLogger(OHBufferedMutatorImpl.class); + + private final ExceptionListener listener; + + private final OHTable ohTable; + private final TableName tableName; + private volatile Configuration conf; + private final OHConnectionConfiguration connectionConfig; - private final ExceptionListener listener; + private final ConcurrentLinkedQueue asyncWriteBuffer = new ConcurrentLinkedQueue(); + private final AtomicLong currentAsyncBufferSize = new AtomicLong( + 0); - protected final ObTableClient obTableClient; - private final TableName tableName; - private volatile Configuration conf; - private final OHConnectionConfiguration connectionConfig; + private final AtomicLong firstRecordInBufferTimestamp = new AtomicLong( + 0); + private final AtomicLong executedWriteBufferPeriodicFlushes = new AtomicLong( + 0); - final ConcurrentLinkedQueue asyncWriteBuffer = new ConcurrentLinkedQueue(); - AtomicLong currentAsyncBufferSize = new AtomicLong(0); + private final AtomicLong writeBufferPeriodicFlushTimeoutMs = new AtomicLong( + 0); + private final AtomicLong writeBufferPeriodicFlushTimerTickMs = new AtomicLong( + MIN_WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS); + private Timer writeBufferPeriodicFlushTimer = null; - private AtomicReference> type = new AtomicReference<>(null); - private final long writeBufferSize; - private final int maxKeyValueSize; - private boolean closed = false; - private final ExecutorService pool; - private int rpcTimeout; + private final long writeBufferSize; + private final int maxKeyValueSize; + private final ExecutorService pool; + private final AtomicInteger undealtMutationCount = new AtomicInteger( + 0); + private final AtomicInteger rpcTimeout; + private final AtomicInteger operationTimeout; + private final boolean cleanupPoolOnClose; + private volatile boolean closed = false; public OHBufferedMutatorImpl(OHConnectionImpl ohConnection, BufferedMutatorParams params) throws IOException { if (ohConnection == null || ohConnection.isClosed()) { throw new IllegalArgumentException("Connection is null or closed."); } - // create a ObTableClient to do rpc operations - this.obTableClient = ObTableClientManager.getOrCreateObTableClient(ohConnection - .getOHConnectionConfiguration()); - // init params in OHBufferedMutatorImpl this.tableName = params.getTableName(); this.conf = ohConnection.getConfiguration(); this.connectionConfig = ohConnection.getOHConnectionConfiguration(); this.listener = params.getListener(); - this.pool = params.getPool(); - this.obTableClient.setRuntimeBatchExecutor(pool); + if (params.getPool() == null) { // need to verify necessity + this.pool = HTable.getDefaultExecutor(conf); + this.cleanupPoolOnClose = true; + } else { + this.pool = params.getPool(); + this.cleanupPoolOnClose = false; + } + this.rpcTimeout = new AtomicInteger( + params.getRpcTimeout() != OHConnectionImpl.BUFFERED_PARAM_UNSET ? params + .getRpcTimeout() : connectionConfig.getRpcTimeout()); + this.operationTimeout = new AtomicInteger( + params.getOperationTimeout() != OHConnectionImpl.BUFFERED_PARAM_UNSET ? params + .getOperationTimeout() : connectionConfig.getOperationTimeout()); + + long newPeriodicFlushTimeoutMs = params.getWriteBufferPeriodicFlushTimeoutMs() != OHConnectionImpl.BUFFERED_PARAM_UNSET ? params + .getWriteBufferPeriodicFlushTimeoutMs() : connectionConfig + .getWriteBufferPeriodicFlushTimeoutMs(); + long newPeriodicFlushTimeIntervalMs = params.getWriteBufferPeriodicFlushTimerTickMs() != OHConnectionImpl.BUFFERED_PARAM_UNSET ? params + .getWriteBufferPeriodicFlushTimerTickMs() : connectionConfig + .getWriteBufferPeriodicFlushTimerTickMs(); + this.setWriteBufferPeriodicFlush(newPeriodicFlushTimeoutMs, newPeriodicFlushTimeIntervalMs); this.writeBufferSize = params.getWriteBufferSize() != OHConnectionImpl.BUFFERED_PARAM_UNSET ? params .getWriteBufferSize() : connectionConfig.getWriteBufferSize(); this.maxKeyValueSize = params.getMaxKeyValueSize() != OHConnectionImpl.BUFFERED_PARAM_UNSET ? params .getMaxKeyValueSize() : connectionConfig.getMaxKeyValueSize(); - this.rpcTimeout = connectionConfig.getRpcTimeout(); - this.obTableClient.setRpcExecuteTimeout(rpcTimeout); + + // create an OHTable object to do batch work + this.ohTable = new OHTable(tableName, ohConnection, connectionConfig, pool); } @Override @@ -114,46 +140,108 @@ public void mutate(Mutation mutation) throws IOException { */ @Override public void mutate(List mutations) throws IOException { - if (closed) { - throw new IllegalStateException("Cannot put when the BufferedMutator is closed."); - } + checkClose(); if (mutations.isEmpty()) { return; } long toAddSize = 0; - // check if every mutation's family is the same - // check if mutations are the same type + int toAddCount = 0; for (Mutation m : mutations) { - OHTable.checkFamilyViolation(m.getFamilyCellMap().keySet(), true); - validateInsUpAndDelete(m); - Class curType = m.getClass(); - // set the type of this BufferedMutator - if (type.get() == null) { - type.compareAndSet(null, mutations.get(0).getClass()); - } - if (!type.get().equals(curType)) { - throw new IllegalArgumentException("Not support different type in one batch."); - } + validateOperation(m); toAddSize += m.heapSize(); + ++toAddCount; } + if (currentAsyncBufferSize.get() == 0) { + firstRecordInBufferTimestamp.set(System.currentTimeMillis()); + } + undealtMutationCount.addAndGet(toAddCount); currentAsyncBufferSize.addAndGet(toAddSize); asyncWriteBuffer.addAll(mutations); - asyncExecute(false); + execute(false); + } + + private void checkClose() { + if (closed) { + throw new IllegalStateException("The BufferedMutator is closed."); + } } /** - * Check whether the mutation is Put or Delete in 1.x + * Check mutations in 2.x * @param mt - mutation operation */ - private void validateInsUpAndDelete(Mutation mt) throws IllegalArgumentException { + private void validateOperation(Mutation mt) throws IllegalArgumentException { + if (mt == null) { + throw new IllegalArgumentException("Mutation operation cannot be null."); + } if (!(mt instanceof Put) && !(mt instanceof Delete)) { throw new IllegalArgumentException("Only support for Put and Delete for now."); } if (mt instanceof Put) { + // family empty check is in validatePut HTable.validatePut((Put) mt, maxKeyValueSize); + OHTable.checkFamilyViolation(mt.getFamilyCellMap().keySet(), true); + } else { + OHTable.checkFamilyViolation(mt.getFamilyCellMap().keySet(), false); + } + } + + /** + * triggered to do periodic flush if reach the time limit + * */ + public void timeTriggerForWriteBufferPeriodicFlush() { + if (currentAsyncBufferSize.get() == 0) { + return; + } + long now = System.currentTimeMillis(); + if (firstRecordInBufferTimestamp.get() + writeBufferPeriodicFlushTimeoutMs.get() > now) { + // too soon to execute + return; + } + try { + executedWriteBufferPeriodicFlushes.incrementAndGet(); + flush(); + } catch (Exception e) { + LOGGER.error("Errors occur during timeTriggerForWriteBufferPeriodicFlush: { " + + e.getMessage() + " }"); + } + } + + /** + * set time for periodic flush timer + * @param timeoutMs control when to flush from collecting first mutation + * @param timerTickMs control time interval to trigger the timer + * */ + @Override + public synchronized void setWriteBufferPeriodicFlush(long timeoutMs, long timerTickMs) { + long originalTimeoutMs = this.writeBufferPeriodicFlushTimeoutMs.get(); + long originalTimeTickMs = this.writeBufferPeriodicFlushTimerTickMs.get(); + + writeBufferPeriodicFlushTimeoutMs.set(Math.max(0, timeoutMs)); + writeBufferPeriodicFlushTimerTickMs.set(Math.max( + MIN_WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS, timerTickMs)); + + // if time parameters are updated, stop the old timer + if (writeBufferPeriodicFlushTimeoutMs.get() != originalTimeoutMs + || writeBufferPeriodicFlushTimerTickMs.get() != originalTimeTickMs) { + if (writeBufferPeriodicFlushTimer != null) { + writeBufferPeriodicFlushTimer.cancel(); + writeBufferPeriodicFlushTimer = null; + } + } + + if (writeBufferPeriodicFlushTimer == null && writeBufferPeriodicFlushTimeoutMs.get() > 0) { + writeBufferPeriodicFlushTimer = new Timer(true); + writeBufferPeriodicFlushTimer.schedule(new TimerTask() { + @Override + public void run() { + OHBufferedMutatorImpl.this.timeTriggerForWriteBufferPeriodicFlush(); + } + }, this.writeBufferPeriodicFlushTimerTickMs.get(), + this.writeBufferPeriodicFlushTimerTickMs.get()); } } @@ -161,116 +249,90 @@ private void validateInsUpAndDelete(Mutation mt) throws IllegalArgumentException * Send the operations in the buffer to the servers. Does not wait for the server's answer. If * there is an error, either throw the error, or use the listener to deal with the error. * - * @param flushAll - if true, sends all the writes and wait for all of them to finish before - * returning. + * @param flushAll - if true, force to commit all mutations in asyncWriteBuffer; else to commit only if + * larger than writeBufferSize */ - private void asyncExecute(boolean flushAll) throws IOException { + private void execute(boolean flushAll) throws IOException { LinkedList execBuffer = new LinkedList<>(); - ObTableBatchOperationRequest request = null; - // namespace n1, n1:table_name - // namespace default, table_name - String tableNameString = tableName.getNameAsString(); try { - while (true) { - try{ - if (!flushAll || asyncWriteBuffer.isEmpty()) { - if (currentAsyncBufferSize.get() <= writeBufferSize) { - break; - } - } - Mutation m; - while ((m = asyncWriteBuffer.poll()) != null) { - execBuffer.add(m); - long size = m.heapSize(); - currentAsyncBufferSize.addAndGet(-size); - } - // in concurrent situation, asyncWriteBuffer may be empty here - // for other threads flush all buffer - if (execBuffer.isEmpty()) { - break; - } - // for now, operations' family is the same - byte[] family = execBuffer.getFirst().getFamilyCellMap().firstKey(); - ObTableBatchOperation batch = buildObTableBatchOperation(execBuffer); - // table_name$cf_name - String targetTableName = OHTable.getTargetTableName(tableNameString, Bytes.toString(family), conf); - request = OHTable.buildObTableBatchOperationRequest(batch, targetTableName); - } catch (Exception ex) { - LOGGER.error("Errors occur before mutation operation", ex); - throw new IllegalArgumentException("Errors occur before mutation operation", ex); - } - try { - ObTableBatchOperationResult result = (ObTableBatchOperationResult) obTableClient.execute(request); - } catch (Exception ex) { - LOGGER.debug("Errors occur during mutation operation", ex); - Mutation m = null; - try { - // retry every single operation - while (!execBuffer.isEmpty()) { - // poll elements from execBuffer to recollect remaining operations - m = execBuffer.poll(); - byte[] family = m.getFamilyCellMap().firstKey(); - ObTableBatchOperation batch = buildObTableBatchOperation(Collections.singletonList(m)); - String targetTableName = OHTable.getTargetTableName(tableNameString, Bytes.toString(family), conf); - request = OHTable.buildObTableBatchOperationRequest(batch, targetTableName); - ObTableBatchOperationResult result = (ObTableBatchOperationResult) obTableClient.execute(request); - } - } catch (Exception newEx) { - if (m != null) { - execBuffer.addFirst(m); - } - // if retry fails, only recollect remaining operations - while(!execBuffer.isEmpty()) { - m = execBuffer.poll(); - long size = m.heapSize(); - asyncWriteBuffer.add(m); - currentAsyncBufferSize.addAndGet(size); - } - throw newEx; - } + if (flushAll || currentAsyncBufferSize.get() > writeBufferSize) { + Mutation m; + int dealtCount = 0; + while ((m = asyncWriteBuffer.poll())!= null) { + execBuffer.add(m); + long size = m.heapSize(); + currentAsyncBufferSize.addAndGet(-size); + ++dealtCount; } + undealtMutationCount.addAndGet(-dealtCount); + } + + if (execBuffer.isEmpty()) { + return; } + Object[] results = new Object[execBuffer.size()]; + ohTable.batch(execBuffer, results); + // if commit all successfully, clean execBuffer + execBuffer.clear(); } catch (Exception ex) { LOGGER.error(LCD.convert("01-00026"), ex); - // if the cause is illegal argument, directly throw to user - if (ex instanceof IllegalArgumentException) { - throw (IllegalArgumentException) ex; - } - // TODO: need to collect error information and actions during batch operations - // TODO: maybe keep in ObTableBatchOperationResult - List throwables = new ArrayList(); - List actions = new ArrayList(); - List addresses = new ArrayList(); - throwables.add(ex); - RetriesExhaustedWithDetailsException error = new RetriesExhaustedWithDetailsException( - new ArrayList(throwables), - new ArrayList(actions), new ArrayList(addresses)); - if (listener == null) { - throw error; + if (ex.getCause() instanceof RetriesExhaustedWithDetailsException) { + LOGGER.error(tableName + ": One or more of the operations have failed after retries."); + RetriesExhaustedWithDetailsException retryException = (RetriesExhaustedWithDetailsException) ex.getCause(); + // recollect failed mutations + execBuffer.clear(); + for (int i = 0; i < retryException.getNumExceptions(); ++i) { + execBuffer.add((Mutation) retryException.getRow(i)); + } + if (listener != null) { + listener.onException(retryException, this); + } else { + throw retryException; + } } else { - listener.onException(error, this); + LOGGER.error("Errors unrelated to operations occur during mutation operation", ex); + throw ex; + } + } finally { + for (Mutation mutation : execBuffer) { + long size = mutation.heapSize(); + currentAsyncBufferSize.addAndGet(size); + asyncWriteBuffer.add(mutation); + undealtMutationCount.incrementAndGet(); } } } + /** + * reset the time parameters and cancel the timer (if exists) + * */ + @Override + public void disableWriteBufferPeriodicFlush() { + setWriteBufferPeriodicFlush(0, MIN_WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS); + } + @Override public void close() throws IOException { if (closed) { return; } + // reset timeout, timeTick and Timer + disableWriteBufferPeriodicFlush(); try { - asyncExecute(true); + execute(true); } finally { - // the pool in ObTableClient will be shut down too - this.pool.shutdown(); - try { - if (!pool.awaitTermination(600, TimeUnit.SECONDS)) { - LOGGER - .warn("close() failed to terminate pool after 10 minutes. Abandoning pool."); + if (cleanupPoolOnClose) { + // the pool in ObTableClient will be shut down too + this.pool.shutdown(); + try { + if (!pool.awaitTermination(600, TimeUnit.SECONDS)) { + LOGGER + .warn("close() failed to terminate pool after 10 minutes. Abandoning pool."); + } + } catch (InterruptedException e) { + LOGGER.warn("waitForTermination interrupted"); + Thread.currentThread().interrupt(); } - } catch (InterruptedException e) { - LOGGER.warn("waitForTermination interrupted"); - Thread.currentThread().interrupt(); } closed = true; } @@ -278,11 +340,21 @@ public void close() throws IOException { /** * Force to commit all operations - * do not care whether the pool is shut down or this BufferedMutator is closed */ @Override public void flush() throws IOException { - asyncExecute(true); + checkClose(); + execute(true); + } + + @Override + public long getWriteBufferPeriodicFlushTimeoutMs() { + return writeBufferPeriodicFlushTimeoutMs.get(); + } + + @Override + public long getWriteBufferPeriodicFlushTimerTickMs() { + return writeBufferPeriodicFlushTimerTickMs.get(); } @Override @@ -291,22 +363,32 @@ public long getWriteBufferSize() { } @Override - public void setRpcTimeout(int i) { - this.rpcTimeout = i; + public void setRpcTimeout(int rpcTimeout) { + this.rpcTimeout.set(rpcTimeout); + this.ohTable.setRpcTimeout(rpcTimeout); } @Override - public void setOperationTimeout(int i) { - throw new FeatureNotSupportedException("not supported yet'"); + public void setOperationTimeout(int operationTimeout) { + this.operationTimeout.set(operationTimeout); + this.ohTable.setOperationTimeout(operationTimeout); } - private ObTableBatchOperation buildObTableBatchOperation(List execBuffer) { - List keyValueList = new LinkedList<>(); - for (Mutation mutation : execBuffer) { - for (Map.Entry> entry : mutation.getFamilyCellMap().entrySet()) { - keyValueList.addAll(entry.getValue()); - } - } - return OHTable.buildObTableBatchOperation(keyValueList, false, null); + /** + * Count the mutations which haven't been processed. + */ + @VisibleForTesting + public int size() { + return undealtMutationCount.get(); + } + + @VisibleForTesting + public ExecutorService getPool() { + return pool; + } + + @VisibleForTesting + protected long getExecutedWriteBufferPeriodicFlushes() { + return executedWriteBufferPeriodicFlushes.get(); } } diff --git a/src/main/java/com/alipay/oceanbase/hbase/util/OHConnectionConfiguration.java b/src/main/java/com/alipay/oceanbase/hbase/util/OHConnectionConfiguration.java index f602fdda..fd032267 100644 --- a/src/main/java/com/alipay/oceanbase/hbase/util/OHConnectionConfiguration.java +++ b/src/main/java/com/alipay/oceanbase/hbase/util/OHConnectionConfiguration.java @@ -25,6 +25,10 @@ import java.util.Properties; import static com.alipay.oceanbase.hbase.constants.OHConstants.*; +import static org.apache.hadoop.hbase.client.ConnectionConfiguration.WRITE_BUFFER_PERIODIC_FLUSH_TIMEOUT_MS; +import static org.apache.hadoop.hbase.client.ConnectionConfiguration.WRITE_BUFFER_PERIODIC_FLUSH_TIMEOUT_MS_DEFAULT; +import static org.apache.hadoop.hbase.client.ConnectionConfiguration.WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS; +import static org.apache.hadoop.hbase.client.ConnectionConfiguration.WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS_DEFAULT; import static org.apache.commons.lang.StringUtils.isBlank; import static org.apache.hadoop.hbase.ipc.RpcClient.DEFAULT_SOCKET_TIMEOUT_CONNECT; import static org.apache.hadoop.hbase.ipc.RpcClient.SOCKET_TIMEOUT_CONNECT; @@ -48,6 +52,8 @@ public class OHConnectionConfiguration { private final int maxKeyValueSize; private final int rpcTimeout; private final int rpcConnectTimeout; + private final long writeBufferPeriodicFlushTimeoutMs; + private final long writeBufferPeriodicFlushTimerTickMs; public OHConnectionConfiguration(Configuration conf) { this.paramUrl = conf.get(HBASE_OCEANBASE_PARAM_URL); @@ -64,9 +70,15 @@ public OHConnectionConfiguration(Configuration conf) { } this.database = database; this.writeBufferSize = conf.getLong(WRITE_BUFFER_SIZE_KEY, WRITE_BUFFER_SIZE_DEFAULT); - this.operationTimeout = conf.getInt("hbase.client.operation.timeout", 1200000); + this.operationTimeout = conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, + HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT); this.rpcTimeout = conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT); + this.writeBufferPeriodicFlushTimeoutMs = conf.getLong( + WRITE_BUFFER_PERIODIC_FLUSH_TIMEOUT_MS, WRITE_BUFFER_PERIODIC_FLUSH_TIMEOUT_MS_DEFAULT); + this.writeBufferPeriodicFlushTimerTickMs = conf.getLong( + WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS, + WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS_DEFAULT); int rpcConnectTimeout = -1; if (conf.get(SOCKET_TIMEOUT_CONNECT) != null) { rpcConnectTimeout = conf.getInt(SOCKET_TIMEOUT_CONNECT, DEFAULT_SOCKET_TIMEOUT_CONNECT); @@ -79,9 +91,10 @@ public OHConnectionConfiguration(Configuration conf) { } } this.rpcConnectTimeout = rpcConnectTimeout; - this.scannerCaching = conf.getInt("hbase.client.scanner.caching", Integer.MAX_VALUE); - this.scannerMaxResultSize = conf.getLong("hbase.client.scanner.max.result.size", - WRITE_BUFFER_SIZE_DEFAULT); + this.scannerCaching = conf.getInt(HConstants.HBASE_CLIENT_SCANNER_CACHING, + Integer.MAX_VALUE); + this.scannerMaxResultSize = conf.getLong( + HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY, WRITE_BUFFER_SIZE_DEFAULT); this.maxKeyValueSize = conf.getInt(MAX_KEYVALUE_SIZE_KEY, MAX_KEYVALUE_SIZE_DEFAULT); properties = new Properties(); for (Property property : Property.values()) { @@ -167,4 +180,12 @@ public boolean isOdpMode() { public String getDatabase() { return this.database; } + + public long getWriteBufferPeriodicFlushTimeoutMs() { + return this.writeBufferPeriodicFlushTimeoutMs; + } + + public long getWriteBufferPeriodicFlushTimerTickMs() { + return this.writeBufferPeriodicFlushTimerTickMs; + } } diff --git a/src/main/java/com/alipay/oceanbase/hbase/util/OHConnectionImpl.java b/src/main/java/com/alipay/oceanbase/hbase/util/OHConnectionImpl.java index ad0219f7..3964f52f 100644 --- a/src/main/java/com/alipay/oceanbase/hbase/util/OHConnectionImpl.java +++ b/src/main/java/com/alipay/oceanbase/hbase/util/OHConnectionImpl.java @@ -56,8 +56,7 @@ public class OHConnectionImpl implements Connection { private final OHConnectionConfiguration connectionConfig; - OHConnectionImpl(Configuration conf, ExecutorService pool, - final User user) throws IOException { + OHConnectionImpl(Configuration conf, ExecutorService pool, final User user) throws IOException { this.conf = conf; this.batchPool = pool; this.connectionConfig = new OHConnectionConfiguration(conf); @@ -129,9 +128,7 @@ public BufferedMutator getBufferedMutator(BufferedMutatorParams params) throws I if (params.getTableName() == null) { throw new IllegalArgumentException("TableName cannot be null."); } - if (params.getPool() == null) { - params.pool(HTable.getDefaultExecutor(getConfiguration())); - } + // do not check whether the pool is null, cause now the bufferedMutator will take over the control if (params.getWriteBufferSize() == BUFFERED_PARAM_UNSET) { params.writeBufferSize(connectionConfig.getWriteBufferSize()); } diff --git a/src/test/java/com/alipay/oceanbase/hbase/HTableMultiCFTestBase.java b/src/test/java/com/alipay/oceanbase/hbase/HTableMultiCFTestBase.java index 6df0b8ef..0eaef049 100644 --- a/src/test/java/com/alipay/oceanbase/hbase/HTableMultiCFTestBase.java +++ b/src/test/java/com/alipay/oceanbase/hbase/HTableMultiCFTestBase.java @@ -17,9 +17,13 @@ package com.alipay.oceanbase.hbase; +import com.alipay.oceanbase.hbase.util.OHBufferedMutatorImpl; +import com.alipay.oceanbase.hbase.util.ObHTableTestUtil; +import org.apache.hadoop.conf.Configuration; import com.alipay.oceanbase.rpc.mutation.result.MutationResult; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.*; import org.apache.hadoop.hbase.client.coprocessor.Batch; import org.apache.hadoop.hbase.filter.PrefixFilter; @@ -93,29 +97,36 @@ public void testDeleteFamilyVerison() throws Exception { toBytes(value1)); Put putKey3Fam1Column1Ts1 = new Put(toBytes(key3)); - putKey3Fam1Column1Ts1.addColumn(toBytes(family1), toBytes(column1), timeStamp1, toBytes(value2)); + putKey3Fam1Column1Ts1.addColumn(toBytes(family1), toBytes(column1), timeStamp1, + toBytes(value2)); Put putKey1Fam1Column2MinTs = new Put(toBytes(key1)); putKey1Fam1Column2MinTs.addColumn(toBytes(family1), toBytes(column2), minTimeStamp, toBytes(value1)); Put putKey1Fam1Column2Ts3 = new Put(toBytes(key1)); - putKey1Fam1Column2Ts3.addColumn(toBytes(family1), toBytes(column2), timeStamp3, toBytes(value2)); + putKey1Fam1Column2Ts3.addColumn(toBytes(family1), toBytes(column2), timeStamp3, + toBytes(value2)); Put putKey2Fam1Column2Ts3 = new Put(toBytes(key2)); - putKey2Fam1Column2Ts3.addColumn(toBytes(family1), toBytes(column2), timeStamp3, toBytes(value2)); + putKey2Fam1Column2Ts3.addColumn(toBytes(family1), toBytes(column2), timeStamp3, + toBytes(value2)); Put putKey2Fam1Column3Ts1 = new Put(toBytes(key2)); - putKey2Fam1Column3Ts1.addColumn(toBytes(family1), toBytes(column3), timeStamp1, toBytes(value2)); + putKey2Fam1Column3Ts1.addColumn(toBytes(family1), toBytes(column3), timeStamp1, + toBytes(value2)); Put putKey3Fam1Column3Ts1 = new Put(toBytes(key3)); - putKey3Fam1Column3Ts1.addColumn(toBytes(family1), toBytes(column3), timeStamp1, toBytes(value2)); + putKey3Fam1Column3Ts1.addColumn(toBytes(family1), toBytes(column3), timeStamp1, + toBytes(value2)); Put putKey3Fam1Column2Ts4 = new Put(toBytes(key3)); - putKey3Fam1Column2Ts4.addColumn(toBytes(family1), toBytes(column2), timeStamp4, toBytes(value1)); + putKey3Fam1Column2Ts4.addColumn(toBytes(family1), toBytes(column2), timeStamp4, + toBytes(value1)); Put putKey2Fam1Column3Ts3 = new Put(toBytes(key2)); - putKey2Fam1Column3Ts3.addColumn(toBytes(family1), toBytes(column3), timeStamp3, toBytes(value1)); + putKey2Fam1Column3Ts3.addColumn(toBytes(family1), toBytes(column3), timeStamp3, + toBytes(value1)); tryPut(multiCfHTable, putKey1Fam1Column1MinTs); tryPut(multiCfHTable, putKey3Fam1Column1Ts1); @@ -133,21 +144,21 @@ public void testDeleteFamilyVerison() throws Exception { get.setTimeStamp(minTimeStamp); get.setMaxVersions(10); Result r = multiCfHTable.get(get); - Assert.assertEquals(2, r.getRow().length); + Assert.assertEquals(2, r.rawCells().length); get = new Get(toBytes(key3)); get.addFamily(toBytes(family1)); get.setTimeStamp(timeStamp1); get.setMaxVersions(10); r = multiCfHTable.get(get); - Assert.assertEquals(2, r.getRow().length); + Assert.assertEquals(2, r.rawCells().length); get = new Get(toBytes(key2)); get.addFamily(toBytes(family1)); get.setTimeStamp(timeStamp3); get.setMaxVersions(10); r = multiCfHTable.get(get); - Assert.assertEquals(2, r.getRow().length); + Assert.assertEquals(2, r.rawCells().length); Delete delKey1MinTs = new Delete(toBytes(key1)); delKey1MinTs.addFamilyVersion(toBytes(family1), minTimeStamp); @@ -158,7 +169,7 @@ public void testDeleteFamilyVerison() throws Exception { get.setTimeStamp(minTimeStamp); get.setMaxVersions(10); r = multiCfHTable.get(get); - Assert.assertEquals(0, r.getRow().length); + Assert.assertEquals(0, r.rawCells().length); Delete delKey3Ts1 = new Delete(toBytes(key3)); delKey3Ts1.addFamilyVersion(toBytes(family1), timeStamp1); @@ -169,7 +180,7 @@ public void testDeleteFamilyVerison() throws Exception { get.setTimeStamp(timeStamp1); get.setMaxVersions(10); r = multiCfHTable.get(get); - Assert.assertEquals(0, r.getRow().length); + Assert.assertEquals(0, r.rawCells().length); Delete delKey2Ts3 = new Delete(toBytes(key2)); delKey2Ts3.addFamilyVersion(family1.getBytes(), timeStamp3); @@ -180,7 +191,7 @@ public void testDeleteFamilyVerison() throws Exception { get.setTimeStamp(timeStamp3); get.setMaxVersions(10); r = multiCfHTable.get(get); - Assert.assertEquals(0, r.getRow().length); + Assert.assertEquals(0, r.rawCells().length); Scan scan = new Scan(); scan.setStartRow(toBytes(key1)); @@ -210,40 +221,52 @@ public void testDeleteFamilyVerison() throws Exception { // test DeleteFamilyVersion multiple cf Put putKey1Fam1Column3Ts4 = new Put(toBytes(key1)); - putKey1Fam1Column3Ts4.addColumn(toBytes(family1), toBytes(column3), timeStamp4, toBytes(value3)); + putKey1Fam1Column3Ts4.addColumn(toBytes(family1), toBytes(column3), timeStamp4, + toBytes(value3)); Put putKey1Fam2Column2Ts2 = new Put(toBytes(key1)); - putKey1Fam2Column2Ts2.addColumn(toBytes(family2), toBytes(column2), timeStamp2, toBytes(value1)); + putKey1Fam2Column2Ts2.addColumn(toBytes(family2), toBytes(column2), timeStamp2, + toBytes(value1)); Put putKey1Fam2Column3Ts2 = new Put(toBytes(key1)); - putKey1Fam2Column3Ts2.addColumn(toBytes(family2), toBytes(column3), timeStamp2, toBytes(value1)); + putKey1Fam2Column3Ts2.addColumn(toBytes(family2), toBytes(column3), timeStamp2, + toBytes(value1)); Put putKey1Fam1Column2Ts1 = new Put(toBytes(key1)); - putKey1Fam1Column2Ts1.addColumn(toBytes(family1), toBytes(column2), timeStamp1, toBytes(value2)); + putKey1Fam1Column2Ts1.addColumn(toBytes(family1), toBytes(column2), timeStamp1, + toBytes(value2)); Put putKey2Fam1Column2Ts5 = new Put(toBytes(key2)); - putKey2Fam1Column2Ts5.addColumn(toBytes(family1), toBytes(column2), timeStamp5, toBytes(value2)); + putKey2Fam1Column2Ts5.addColumn(toBytes(family1), toBytes(column2), timeStamp5, + toBytes(value2)); Put putKey2Fam2Column3Ts1 = new Put(toBytes(key2)); - putKey2Fam2Column3Ts1.addColumn(toBytes(family2), toBytes(column3), timeStamp3, toBytes(value3)); + putKey2Fam2Column3Ts1.addColumn(toBytes(family2), toBytes(column3), timeStamp3, + toBytes(value3)); Put putKey2Fam1Column1Ts5 = new Put(toBytes(key2)); - putKey2Fam1Column1Ts5.addColumn(toBytes(family1), toBytes(column1), timeStamp5, toBytes(value1)); + putKey2Fam1Column1Ts5.addColumn(toBytes(family1), toBytes(column1), timeStamp5, + toBytes(value1)); Put putKey2Fam2Column1Ts3 = new Put(toBytes(key2)); - putKey2Fam2Column1Ts3.addColumn(toBytes(family2), toBytes(column1), timeStamp3, toBytes(value2)); + putKey2Fam2Column1Ts3.addColumn(toBytes(family2), toBytes(column1), timeStamp3, + toBytes(value2)); Put putKey3Fam1Column2Ts6 = new Put(toBytes(key3)); - putKey3Fam1Column2Ts6.addColumn(toBytes(family1), toBytes(column2), timeStamp6, toBytes(value2)); + putKey3Fam1Column2Ts6.addColumn(toBytes(family1), toBytes(column2), timeStamp6, + toBytes(value2)); Put putKey3Fam2Column3Ts7 = new Put(toBytes(key3)); - putKey3Fam2Column3Ts7.addColumn(toBytes(family2), toBytes(column3), timeStamp7, toBytes(value1)); + putKey3Fam2Column3Ts7.addColumn(toBytes(family2), toBytes(column3), timeStamp7, + toBytes(value1)); Put putKey3Fam2Column1Ts7 = new Put(toBytes(key3)); - putKey3Fam2Column1Ts7.addColumn(toBytes(family2), toBytes(column1), timeStamp7, toBytes(value2)); + putKey3Fam2Column1Ts7.addColumn(toBytes(family2), toBytes(column1), timeStamp7, + toBytes(value2)); Put putKey3Fam1Column2Ts2 = new Put(toBytes(key3)); - putKey3Fam1Column2Ts2.addColumn(toBytes(family1), toBytes(column2), timeStamp2, toBytes(value1)); + putKey3Fam1Column2Ts2.addColumn(toBytes(family1), toBytes(column2), timeStamp2, + toBytes(value1)); tryPut(multiCfHTable, putKey1Fam1Column3Ts4); tryPut(multiCfHTable, putKey1Fam2Column2Ts2); @@ -263,21 +286,21 @@ public void testDeleteFamilyVerison() throws Exception { getKey1.addFamily(toBytes(family2)); getKey1.setMaxVersions(10); r = multiCfHTable.get(getKey1); - Assert.assertEquals(4, r.getRow().length); + Assert.assertEquals(4, r.rawCells().length); Get getKey2 = new Get(toBytes(key2)); getKey2.addFamily(toBytes(family1)); getKey2.addFamily(toBytes(family2)); getKey2.setMaxVersions(10); r = multiCfHTable.get(getKey2); - Assert.assertEquals(4, r.getRow().length); + Assert.assertEquals(4, r.rawCells().length); Get getKey3 = new Get(toBytes(key3)); getKey3.addFamily(toBytes(family1)); getKey3.addFamily(toBytes(family2)); getKey3.setMaxVersions(10); r = multiCfHTable.get(getKey3); - Assert.assertEquals(4, r.getRow().length); + Assert.assertEquals(4, r.rawCells().length); Delete delKey1Ts_6_2 = new Delete(toBytes(key1)); delKey1Ts_6_2.addFamilyVersion(toBytes(family1), timeStamp4); @@ -289,7 +312,7 @@ public void testDeleteFamilyVerison() throws Exception { getKey1.addFamily(toBytes(family2)); getKey1.setMaxVersions(10); r = multiCfHTable.get(getKey1); - Assert.assertEquals(1, r.getRow().length); + Assert.assertEquals(1, r.rawCells().length); for (Cell cell : r.rawCells()) { Assert.assertEquals(timeStamp1, cell.getTimestamp()); } @@ -304,7 +327,7 @@ public void testDeleteFamilyVerison() throws Exception { getKey2.addFamily(toBytes(family2)); getKey2.setMaxVersions(10); r = multiCfHTable.get(getKey2); - Assert.assertEquals(0, r.getRow().length); + Assert.assertEquals(0, r.rawCells().length); Delete delKey3Ts_2_7 = new Delete(toBytes(key3)); delKey3Ts_2_7.addFamilyVersion(toBytes(family1), timeStamp2); @@ -316,7 +339,7 @@ public void testDeleteFamilyVerison() throws Exception { getKey3.addFamily(toBytes(family2)); getKey3.setMaxVersions(10); r = multiCfHTable.get(getKey3); - Assert.assertEquals(1, r.getRow().length); + Assert.assertEquals(1, r.rawCells().length); for (Cell cell : r.rawCells()) { Assert.assertEquals(timeStamp6, cell.getTimestamp()); } @@ -346,6 +369,264 @@ public void testDeleteFamilyVerison() throws Exception { multiCfHTable.delete(deleteKey3Family); } + @Test + public void testMultiColumnFamilyBufferedMutator() throws Exception { + byte[] family1 = "family_with_group1".getBytes(); + byte[] family2 = "family_with_group2".getBytes(); + byte[] family3 = "family_with_group3".getBytes(); + + byte[] family1_column1 = "family1_column1".getBytes(); + byte[] family1_column2 = "family1_column2".getBytes(); + byte[] family1_column3 = "family1_column3".getBytes(); + byte[] family2_column1 = "family2_column1".getBytes(); + byte[] family2_column2 = "family2_column2".getBytes(); + byte[] family3_column1 = "family3_column1".getBytes(); + byte[] family3_column2 = "family3_column2".getBytes(); + byte[] family1_value = "VVV1".getBytes(); + byte[] family2_value = "VVV2".getBytes(); + byte[] family3_value = "VVV3".getBytes(); + + Configuration conf = ObHTableTestUtil.newConfiguration(); + TableName tableName = TableName.valueOf("test_multi_cf"); + Connection connection = ConnectionFactory.createConnection(conf); + BufferedMutator mutator = connection.getBufferedMutator(tableName); + + int rows = 10; + List keys = new ArrayList<>(); + List mutations = new ArrayList<>(); + for (int i = 0; i < rows; ++i) { + String key = "Key" + i; + keys.add(key); + Delete delete = new Delete(toBytes(key)); + mutations.add(delete); + Put put = new Put(toBytes(key)); + put.addColumn(family1, family1_column1, family1_value); + put.addColumn(family1, family1_column2, family1_value); + put.addColumn(family1, family1_column3, family1_value); + put.addColumn(family2, family2_column1, family2_value); + put.addColumn(family2, family2_column2, family2_value); + put.addColumn(family3, family3_column1, family3_value); + mutations.add(put); + } + mutator.mutate(mutations); + + // test force flush + mutator.flush(); + Get get = new Get(toBytes("Key2")); + get.addFamily(family1); + get.addFamily(family2); + Result result = multiCfHTable.get(get); + Assert.assertEquals(5, result.rawCells().length); + + mutations.clear(); + for (int i = 0; i < rows; ++i) { + if (i % 5 == 0) { // 0, 5 + Delete delete = new Delete(toBytes("Key" + i)); + delete.addFamily(family2); + delete.addFamily(family3); + mutations.add(delete); + } + } + mutator.mutate(mutations); + mutator.flush(); + + get = new Get(toBytes("Key0")); + result = multiCfHTable.get(get); + Assert.assertEquals(3, result.rawCells().length); + Assert.assertFalse(result.containsColumn(family2, family2_column1)); + Assert.assertFalse(result.containsColumn(family2, family2_column2)); + Assert.assertFalse(result.containsColumn(family3, family3_column1)); + + get = new Get(toBytes("Key5")); + result = multiCfHTable.get(get); + Assert.assertEquals(3, result.rawCells().length); + Assert.assertFalse(result.containsColumn(family2, family2_column1)); + Assert.assertFalse(result.containsColumn(family2, family2_column2)); + Assert.assertFalse(result.containsColumn(family3, family3_column1)); + + mutations.clear(); + for (String key : keys) { + Delete delete = new Delete(toBytes(key)); + mutations.add(delete); + } + mutator.mutate(mutations); + mutator.flush(); + + Scan scan = new Scan(); + scan.setStartRow(toBytes("Key0")); + scan.setStopRow(toBytes("Key10")); + scan.addFamily(family1); + scan.addFamily(family2); + scan.addFamily(family3); + ResultScanner scanner = multiCfHTable.getScanner(scan); + int count = 0; + for (Result r : scanner) { + count += r.rawCells().length; + } + Assert.assertEquals(0, count); + + // test auto flush + long bufferSize = 45000L; + BufferedMutatorParams params = new BufferedMutatorParams(tableName); + params.writeBufferSize(bufferSize); + mutator = connection.getBufferedMutator(params); + + while (true) { + for (int i = 0; i < rows; ++i) { + mutations.clear(); + Put put = new Put(toBytes(keys.get(i))); + put.addColumn(family1, family1_column1, family1_value); + put.addColumn(family1, family1_column2, family1_value); + put.addColumn(family1, family1_column3, family1_value); + put.addColumn(family2, family2_column1, family2_value); + put.addColumn(family3, family3_column1, family2_value); + put.addColumn(family3, family3_column2, family3_value); + mutations.add(put); + if (i % 3 == 0) { // 0, 3, 6, 9 + Delete delete = new Delete(toBytes(keys.get(i))); + delete.addFamily(family1); + delete.addFamily(family2); + mutations.add(delete); + } + mutator.mutate(mutations); + } + + get = new Get(toBytes("Key0")); + result = multiCfHTable.get(get); + if (!result.isEmpty()) { + break; + } + } + get = new Get(toBytes("Key2")); + result = multiCfHTable.get(get); + Assert.assertEquals(6 , result.rawCells().length); + Assert.assertTrue(result.containsColumn(family1, family1_column1)); + Assert.assertTrue(result.containsColumn(family1, family1_column2)); + Assert.assertTrue(result.containsColumn(family1, family1_column3)); + Assert.assertTrue(result.containsColumn(family2, family2_column1)); + Assert.assertTrue(result.containsColumn(family3, family3_column1)); + Assert.assertTrue(result.containsColumn(family3, family3_column2)); + + get = new Get(toBytes("Key3")); + result = multiCfHTable.get(get); + if (result.containsColumn(family1, family1_column1) || result.containsColumn(family2, family2_column1)) { + mutator.flush(); + } + get = new Get(toBytes("Key3")); + result = multiCfHTable.get(get); + Assert.assertEquals(2, result.rawCells().length); + Assert.assertFalse(result.containsColumn(family1, family1_column1)); + Assert.assertFalse(result.containsColumn(family1, family1_column2)); + Assert.assertFalse(result.containsColumn(family1, family1_column3)); + Assert.assertFalse(result.containsColumn(family2, family2_column1)); + Assert.assertTrue(result.containsColumn(family3, family3_column1)); + Assert.assertTrue(result.containsColumn(family3, family3_column2)); + + get = new Get(toBytes("Key9")); + result = multiCfHTable.get(get); + if (result.containsColumn(family1, family1_column1) || result.containsColumn(family2, family2_column1)) { + mutator.flush(); + } + get = new Get(toBytes("Key9")); + result = multiCfHTable.get(get); + Assert.assertEquals(2, result.rawCells().length); + Assert.assertFalse(result.containsColumn(family1, family1_column1)); + Assert.assertFalse(result.containsColumn(family1, family1_column2)); + Assert.assertFalse(result.containsColumn(family1, family1_column3)); + Assert.assertFalse(result.containsColumn(family2, family2_column1)); + Assert.assertTrue(result.containsColumn(family3, family3_column1)); + Assert.assertTrue(result.containsColumn(family3, family3_column2)); + + mutations.clear(); + for (String key : keys) { + Delete delete = new Delete(toBytes(key)); + mutations.add(delete); + } + mutator.mutate(mutations); + mutator.flush(); + + scan = new Scan(); + scan.setStartRow(toBytes("Key0")); + scan.setStopRow(toBytes("Key10")); + scan.addFamily(family1); + scan.addFamily(family2); + scan.addFamily(family3); + scanner = multiCfHTable.getScanner(scan); + count = 0; + for (Result r : scanner) { + count += r.rawCells().length; + } + Assert.assertEquals(0, count); + + // test periodic flush + params.setWriteBufferPeriodicFlushTimeoutMs(100); + mutator = connection.getBufferedMutator(params); + while (true) { + for (int i = 0; i < rows; ++i) { + mutations.clear(); + Put put = new Put(toBytes(keys.get(i))); + put.addColumn(family1, family1_column1, family1_value); + put.addColumn(family1, family1_column2, family1_value); + put.addColumn(family1, family1_column3, family1_value); + put.addColumn(family2, family2_column1, family2_value); + put.addColumn(family3, family3_column1, family2_value); + put.addColumn(family3, family3_column2, family3_value); + mutations.add(put); + if (i % 3 == 0) { // 0, 3, 6, 9 + Delete delete = new Delete(toBytes(keys.get(i))); + delete.addFamily(family1); + delete.addFamily(family2); + mutations.add(delete); + } + mutator.mutate(mutations); + } + + get = new Get(toBytes("Key0")); + result = multiCfHTable.get(get); + if (!result.isEmpty()) { + break; + } + } + get = new Get(toBytes("Key2")); + get.setMaxVersions(); + result = multiCfHTable.get(get); + count = result.rawCells().length; + Assert.assertTrue(count > 0); + // test timer periodic flush + int lastUndealtCount = ((OHBufferedMutatorImpl) mutator).size(); + Thread.sleep(1000); + int currentUndealtCount = ((OHBufferedMutatorImpl) mutator).size(); + Assert.assertNotEquals(lastUndealtCount, currentUndealtCount); + // after periodic flush, all mutations will be committed + Assert.assertEquals(0, currentUndealtCount); + result = multiCfHTable.get(get); + int newCount = result.rawCells().length; + Assert.assertNotEquals(count, newCount); + + // clean data + mutations.clear(); + for (String key : keys) { + Delete delete = new Delete(toBytes(key)); + mutations.add(delete); + } + mutator.mutate(mutations); + mutator.flush(); + mutator.close(); + + scan = new Scan(); + scan.setStartRow(toBytes("Key0")); + scan.setStopRow(toBytes("Key10")); + scan.addFamily(family1); + scan.addFamily(family2); + scan.addFamily(family3); + scanner = multiCfHTable.getScanner(scan); + count = 0; + for (Result r : scanner) { + count += r.rawCells().length; + } + Assert.assertEquals(0, count); + } + @Test public void testMultiColumnFamilyBatch() throws Exception { byte[] family1 = "family_with_group1".getBytes(); @@ -938,6 +1219,13 @@ public void testMultiColumnFamilyGet() throws Exception { @Test public void testMultiColumnFamilyDelete() throws Exception { + String key1 = "scanKey1x"; + String key2 = "scanKey2x"; + String key3 = "scanKey3x"; + String value1 = "value1"; + String value2 = "value2"; + String value3 = "value3"; + byte[] family1 = "family_with_group1".getBytes(); byte[] family2 = "family_with_group2".getBytes(); byte[] family3 = "family_with_group3".getBytes(); @@ -947,6 +1235,7 @@ public void testMultiColumnFamilyDelete() throws Exception { byte[] family1_column3 = "family1_column3".getBytes(); byte[] family2_column1 = "family2_column1".getBytes(); byte[] family2_column2 = "family2_column2".getBytes(); + byte[] family2_column3 = "family2_column3".getBytes(); byte[] family3_column1 = "family3_column1".getBytes(); byte[] family1_value = "VVV1".getBytes(); byte[] family2_value = "VVV2".getBytes(); @@ -1064,5 +1353,301 @@ public void testMultiColumnFamilyDelete() throws Exception { assertEquals(oldTimestamp, result.getColumnCells(family2, family2_column1).get(0) .getTimestamp()); assertTrue(lastTimestamp > oldTimestamp); + + // delete previous data + Delete deleteKey1Family = new Delete(toBytes(key1)); + deleteKey1Family.addFamily(family1); + deleteKey1Family.addFamily(family2); + Delete deleteKey2Family = new Delete(toBytes(key2)); + deleteKey2Family.addFamily(family1); + deleteKey2Family.addFamily(family2); + Delete deleteKey3Family = new Delete(toBytes(key3)); + deleteKey3Family.addFamily(family1); + deleteKey3Family.addFamily(family2); + + multiCfHTable.delete(deleteKey1Family); + multiCfHTable.delete(deleteKey2Family); + multiCfHTable.delete(deleteKey3Family); + + long minTimeStamp = System.currentTimeMillis(); + Thread.sleep(5); + long timeStamp1 = System.currentTimeMillis(); + Thread.sleep(5); + long timeStamp2 = System.currentTimeMillis(); + Thread.sleep(5); + long timeStamp3 = System.currentTimeMillis(); + Thread.sleep(5); + long timeStamp4 = System.currentTimeMillis(); + Thread.sleep(5); + long timeStamp5 = System.currentTimeMillis(); + Thread.sleep(5); + long timeStamp6 = System.currentTimeMillis(); + Thread.sleep(5); + long timeStamp7 = System.currentTimeMillis(); + Thread.sleep(5); + long timeStamp8 = System.currentTimeMillis(); + Thread.sleep(5); + long timeStamp9 = System.currentTimeMillis(); + Thread.sleep(5); + long timeStamp10 = System.currentTimeMillis(); + Thread.sleep(5); + long timeStamp11 = System.currentTimeMillis(); + Thread.sleep(5); + long maxTimeStamp = System.currentTimeMillis(); + + Put putKey1Fam1Column1MinTs = new Put(toBytes(key1)); + putKey1Fam1Column1MinTs.addColumn(family1, family1_column1, minTimeStamp, toBytes(value1)); + + Put putKey3Fam1Column1Ts1 = new Put(toBytes(key3)); + putKey3Fam1Column1Ts1.addColumn(family1, family1_column1, timeStamp1, toBytes(value2)); + + Put putKey1Fam1Column2MinTs = new Put(toBytes(key1)); + putKey1Fam1Column2MinTs.addColumn(family1, family1_column2, minTimeStamp, toBytes(value1)); + + Put putKey1Fam1Column2Ts3 = new Put(toBytes(key1)); + putKey1Fam1Column2Ts3.addColumn(family1, family1_column2, timeStamp3, toBytes(value2)); + + Put putKey2Fam1Column2Ts3 = new Put(toBytes(key2)); + putKey2Fam1Column2Ts3.addColumn(family1, family1_column2, timeStamp3, toBytes(value2)); + + Put putKey2Fam1Column3Ts1 = new Put(toBytes(key2)); + putKey2Fam1Column3Ts1.addColumn(family1, family1_column3, timeStamp1, toBytes(value2)); + + Put putKey3Fam1Column3Ts1 = new Put(toBytes(key3)); + putKey3Fam1Column3Ts1.addColumn(family1, family1_column3, timeStamp1, toBytes(value2)); + + Put putKey3Fam1Column2Ts6 = new Put(toBytes(key3)); + putKey3Fam1Column2Ts6.addColumn(family1, family1_column2, timeStamp6, toBytes(value1)); + + Put putKey2Fam1Column3Ts6 = new Put(toBytes(key2)); + putKey2Fam1Column3Ts6.addColumn(family1, family1_column3, timeStamp3, toBytes(value1)); + + multiCfHTable.put(putKey1Fam1Column1MinTs); + multiCfHTable.put(putKey3Fam1Column1Ts1); + multiCfHTable.put(putKey1Fam1Column2MinTs); + multiCfHTable.put(putKey1Fam1Column2Ts3); + multiCfHTable.put(putKey2Fam1Column2Ts3); + multiCfHTable.put(putKey2Fam1Column3Ts1); + multiCfHTable.put(putKey3Fam1Column3Ts1); + multiCfHTable.put(putKey3Fam1Column2Ts6); + multiCfHTable.put(putKey2Fam1Column3Ts6); + + // test DeleteFamilyVersion single cf + get = new Get(toBytes(key1)); + get.addFamily(family1); + get.setTimeStamp(minTimeStamp); + get.setMaxVersions(10); + Result r = multiCfHTable.get(get); + Assert.assertEquals(2, r.rawCells().length); + + get = new Get(toBytes(key3)); + get.addFamily(family1); + get.setTimeStamp(timeStamp1); + get.setMaxVersions(10); + r = multiCfHTable.get(get); + Assert.assertEquals(2, r.rawCells().length); + + get = new Get(toBytes(key2)); + get.addFamily(family1); + get.setTimeStamp(timeStamp3); + get.setMaxVersions(10); + r = multiCfHTable.get(get); + Assert.assertEquals(2, r.rawCells().length); + + Delete delKey1MinTs = new Delete(toBytes(key1)); + delKey1MinTs.addFamilyVersion(family1, minTimeStamp); + multiCfHTable.delete(delKey1MinTs); + + get = new Get(toBytes(key1)); + get.addFamily(family1); + get.setTimeStamp(minTimeStamp); + get.setMaxVersions(10); + r = multiCfHTable.get(get); + Assert.assertEquals(0, r.rawCells().length); + + Delete delKey3Ts1 = new Delete(toBytes(key3)); + delKey3Ts1.addFamilyVersion(family1, timeStamp1); + multiCfHTable.delete(delKey3Ts1); + + get = new Get(toBytes(key3)); + get.addFamily(family1); + get.setTimeStamp(timeStamp1); + get.setMaxVersions(10); + r = multiCfHTable.get(get); + Assert.assertEquals(0, r.rawCells().length); + + Delete delKey2Ts3 = new Delete(toBytes(key2)); + delKey2Ts3.addFamilyVersion(family1, timeStamp3); + multiCfHTable.delete(delKey2Ts3); + + get = new Get(toBytes(key2)); + get.addFamily(family1); + get.setTimeStamp(timeStamp3); + get.setMaxVersions(10); + r = multiCfHTable.get(get); + Assert.assertEquals(0, r.rawCells().length); + + Scan scan = new Scan(); + scan.setStartRow(toBytes(key1)); + scan.setStopRow("scanKey4x".getBytes()); + scan.addFamily(family1); + scan.setMaxVersions(10); + ResultScanner scanner = multiCfHTable.getScanner(scan); + int key1Cnt = 0, key2Cnt = 0, key3Cnt = 0; + for (Result res : scanner) { + for (Cell kv : res.rawCells()) { + if (key1.equals(Bytes.toString(CellUtil.cloneRow(kv)))) { + ++key1Cnt; + } else if (key2.equals(Bytes.toString(CellUtil.cloneRow(kv)))) { + ++key2Cnt; + } else { + ++key3Cnt; + } + } + } + Assert.assertEquals(1, key1Cnt); + Assert.assertEquals(1, key2Cnt); + Assert.assertEquals(1, key3Cnt); + + multiCfHTable.delete(deleteKey1Family); + multiCfHTable.delete(deleteKey2Family); + multiCfHTable.delete(deleteKey3Family); + + // test DeleteFamilyVersion multiple cf + Put putKey1Fam1Column3Ts6 = new Put(toBytes(key1)); + putKey1Fam1Column3Ts6.addColumn(family1, family1_column3, timeStamp6, toBytes(value3)); + + Put putKey1Fam2Column2Ts2 = new Put(toBytes(key1)); + putKey1Fam2Column2Ts2.addColumn(family2, family2_column2, timeStamp2, toBytes(value1)); + + Put putKey1Fam2Column3Ts2 = new Put(toBytes(key1)); + putKey1Fam2Column3Ts2.addColumn(family2, family2_column3, timeStamp2, toBytes(value1)); + + Put putKey1Fam1Column2Ts1 = new Put(toBytes(key1)); + putKey1Fam1Column2Ts1.addColumn(family1, family1_column2, timeStamp1, toBytes(value2)); + + Put putKey2Fam1Column2Ts8 = new Put(toBytes(key2)); + putKey2Fam1Column2Ts8.addColumn(family1, family1_column2, timeStamp8, toBytes(value2)); + + Put putKey2Fam2Column3Ts1 = new Put(toBytes(key2)); + putKey2Fam2Column3Ts1.addColumn(family2, family2_column3, timeStamp3, toBytes(value3)); + + Put putKey2Fam1Column1Ts1 = new Put(toBytes(key2)); + putKey2Fam1Column1Ts1.addColumn(family1, family1_column1, timeStamp8, toBytes(value1)); + + Put putKey2Fam2Column1Ts3 = new Put(toBytes(key2)); + putKey2Fam2Column1Ts3.addColumn(family2, family2_column1, timeStamp3, toBytes(value2)); + + Put putKey3Fam1Column2Ts9 = new Put(toBytes(key3)); + putKey3Fam1Column2Ts9.addColumn(family1, family1_column2, timeStamp9, toBytes(value2)); + + Put putKey3Fam2Column3Ts10 = new Put(toBytes(key3)); + putKey3Fam2Column3Ts10.addColumn(family2, family2_column3, timeStamp10, toBytes(value1)); + + Put putKey3Fam2Column1Ts10 = new Put(toBytes(key3)); + putKey3Fam2Column1Ts10.addColumn(family2, family2_column1, timeStamp10, toBytes(value2)); + + Put putKey3Fam1Column2Ts2 = new Put(toBytes(key3)); + putKey3Fam1Column2Ts2.addColumn(family1, family1_column2, timeStamp2, toBytes(value1)); + + multiCfHTable.put(putKey1Fam1Column3Ts6); + multiCfHTable.put(putKey1Fam2Column2Ts2); + multiCfHTable.put(putKey1Fam2Column3Ts2); + multiCfHTable.put(putKey1Fam1Column2Ts1); + multiCfHTable.put(putKey2Fam1Column2Ts8); + multiCfHTable.put(putKey2Fam2Column3Ts1); + multiCfHTable.put(putKey2Fam1Column1Ts1); + multiCfHTable.put(putKey2Fam2Column1Ts3); + multiCfHTable.put(putKey3Fam1Column2Ts9); + multiCfHTable.put(putKey3Fam2Column3Ts10); + multiCfHTable.put(putKey3Fam2Column1Ts10); + multiCfHTable.put(putKey3Fam1Column2Ts2); + + Get getKey1 = new Get(toBytes(key1)); + getKey1.addFamily(family1); + getKey1.addFamily(family2); + getKey1.setMaxVersions(10); + r = multiCfHTable.get(getKey1); + Assert.assertEquals(4, r.rawCells().length); + + Get getKey2 = new Get(toBytes(key2)); + getKey2.addFamily(family1); + getKey2.addFamily(family2); + getKey2.setMaxVersions(10); + r = multiCfHTable.get(getKey2); + Assert.assertEquals(4, r.rawCells().length); + + Get getKey3 = new Get(toBytes(key3)); + getKey3.addFamily(family1); + getKey3.addFamily(family2); + getKey3.setMaxVersions(10); + r = multiCfHTable.get(getKey3); + Assert.assertEquals(4, r.rawCells().length); + + Delete delKey1Ts_6_2 = new Delete(toBytes(key1)); + delKey1Ts_6_2.addFamilyVersion(family1, timeStamp6); + delKey1Ts_6_2.addFamilyVersion(family2, timeStamp2); + multiCfHTable.delete(delKey1Ts_6_2); + + getKey1 = new Get(toBytes(key1)); + getKey1.addFamily(family1); + getKey1.addFamily(family2); + getKey1.setMaxVersions(10); + r = multiCfHTable.get(getKey1); + Assert.assertEquals(1, r.rawCells().length); + for (Cell kv : r.rawCells()) { + Assert.assertEquals(timeStamp1, kv.getTimestamp()); + } + + Delete delKey2Ts_8_3 = new Delete(toBytes(key2)); + delKey2Ts_8_3.addFamilyVersion(family1, timeStamp8); + delKey2Ts_8_3.addFamilyVersion(family2, timeStamp3); + multiCfHTable.delete(delKey2Ts_8_3); + + getKey2 = new Get(toBytes(key2)); + getKey2.addFamily(family1); + getKey2.addFamily(family2); + getKey2.setMaxVersions(10); + r = multiCfHTable.get(getKey2); + Assert.assertEquals(0, r.rawCells().length); + + Delete delKey3Ts_2_10 = new Delete(toBytes(key3)); + delKey3Ts_2_10.addFamilyVersion(family1, timeStamp2); + delKey3Ts_2_10.addFamilyVersion(family2, timeStamp10); + multiCfHTable.delete(delKey3Ts_2_10); + + getKey3 = new Get(toBytes(key3)); + getKey3.addFamily(family1); + getKey3.addFamily(family2); + getKey3.setMaxVersions(10); + r = multiCfHTable.get(getKey3); + Assert.assertEquals(1, r.rawCells().length); + for (Cell kv : r.rawCells()) { + Assert.assertEquals(timeStamp9, kv.getTimestamp()); + } + + scan = new Scan(); + scan.setStartRow(toBytes(key1)); + scan.setStopRow("scanKey4x".getBytes()); + scan.addFamily(family1); + scan.addFamily(family2); + scan.setMaxVersions(10); + scanner = multiCfHTable.getScanner(scan); + int ts1Cnt = 0, ts9Cnt = 0; + for (Result res : scanner) { + for (Cell kv : res.rawCells()) { + if (kv.getTimestamp() == timeStamp1) { + ++ts1Cnt; + } else if (kv.getTimestamp() == timeStamp9) { + ++ts9Cnt; + } + } + } + Assert.assertEquals(1, ts1Cnt); + Assert.assertEquals(1, ts9Cnt); + + multiCfHTable.delete(deleteKey1Family); + multiCfHTable.delete(deleteKey2Family); + multiCfHTable.delete(deleteKey3Family); } } diff --git a/src/test/java/com/alipay/oceanbase/hbase/HTableTestBase.java b/src/test/java/com/alipay/oceanbase/hbase/HTableTestBase.java index a8951458..a7c402fa 100644 --- a/src/test/java/com/alipay/oceanbase/hbase/HTableTestBase.java +++ b/src/test/java/com/alipay/oceanbase/hbase/HTableTestBase.java @@ -66,7 +66,8 @@ PRIMARY KEY (`K`, `Q`, `T`) long timestamp = System.currentTimeMillis(); // put data Put put = new Put(toBytes(key)); - KeyValue kv = new KeyValue(toBytes(key), "family_group".getBytes(), column1.getBytes(), timestamp, toBytes(value + "1")); + KeyValue kv = new KeyValue(toBytes(key), "family_group".getBytes(), column1.getBytes(), + timestamp, toBytes(value + "1")); put.add(kv); hTable.put(put); // test get with empty family @@ -111,8 +112,9 @@ PRIMARY KEY (`K`, `Q`, `T`) Assert.assertEquals(column1, Bytes.toString(CellUtil.cloneQualifier(keyValue))); Assert.assertEquals(timestamp, keyValue.getTimestamp()); Assert.assertEquals(value + "1", Bytes.toString(CellUtil.cloneValue(keyValue))); - System.out.println("rowKey: " + new String(CellUtil.cloneRow(keyValue)) + " family :" - + new String(CellUtil.cloneFamily(keyValue)) + " columnQualifier:" + System.out.println("rowKey: " + new String(CellUtil.cloneRow(keyValue)) + + " family :" + new String(CellUtil.cloneFamily(keyValue)) + + " columnQualifier:" + new String(CellUtil.cloneQualifier(keyValue)) + " timestamp:" + keyValue.getTimestamp() + " value:" + new String(CellUtil.cloneValue(keyValue))); @@ -133,7 +135,8 @@ private void testBasic(String family) throws Exception { long timestamp = System.currentTimeMillis(); Put put = new Put(toBytes(key)); - KeyValue kv = new KeyValue(toBytes(key), family.getBytes(), column1.getBytes(), timestamp, toBytes(value)); + KeyValue kv = new KeyValue(toBytes(key), family.getBytes(), column1.getBytes(), timestamp, + toBytes(value)); put.add(kv); hTable.put(put); Get get = new Get(toBytes(key)); @@ -142,7 +145,8 @@ private void testBasic(String family) throws Exception { Assert.assertEquals(1, r.rawCells().length); for (Cell keyValue : r.rawCells()) { - System.out.println("rowKey: " + new String(CellUtil.cloneRow(keyValue)) + " columnQualifier:" + System.out.println("rowKey: " + new String(CellUtil.cloneRow(keyValue)) + + " columnQualifier:" + new String(CellUtil.cloneQualifier(keyValue)) + " timestamp:" + keyValue.getTimestamp() + " value:" + new String(CellUtil.cloneValue(keyValue))); @@ -153,7 +157,8 @@ private void testBasic(String family) throws Exception { } put = new Put(toBytes(key)); - kv = new KeyValue(toBytes(key), family.getBytes(), column1.getBytes(), timestamp + 1, toBytes(value)); + kv = new KeyValue(toBytes(key), family.getBytes(), column1.getBytes(), timestamp + 1, + toBytes(value)); put.add(kv); hTable.put(put); get = new Get(toBytes(key)); @@ -171,7 +176,8 @@ private void testBasic(String family) throws Exception { hTable.delete(delete); for (Cell keyValue : r.rawCells()) { - System.out.println("rowKey: " + new String(CellUtil.cloneRow(keyValue)) + " columnQualifier:" + System.out.println("rowKey: " + new String(CellUtil.cloneRow(keyValue)) + + " columnQualifier:" + new String(CellUtil.cloneQualifier(keyValue)) + " timestamp:" + keyValue.getTimestamp() + " value:" + new String(CellUtil.cloneValue(keyValue))); @@ -202,12 +208,15 @@ private void testBasic(String family) throws Exception { boolean countAdd = true; for (Cell keyValue : result.rawCells()) { System.out.println("rowKey: " + new String(CellUtil.cloneRow(keyValue)) - + " columnQualifier:" + new String(CellUtil.cloneQualifier(keyValue)) + + " columnQualifier:" + + new String(CellUtil.cloneQualifier(keyValue)) + " timestamp:" + keyValue.getTimestamp() + " value:" + new String(CellUtil.cloneValue(keyValue))); Assert.assertEquals(key + "_" + i, Bytes.toString(CellUtil.cloneRow(keyValue))); - Assert.assertTrue(column1.equals(Bytes.toString(CellUtil.cloneQualifier(keyValue))) - || column2.equals(Bytes.toString(CellUtil.cloneQualifier(keyValue)))); + Assert.assertTrue(column1.equals(Bytes.toString(CellUtil + .cloneQualifier(keyValue))) + || column2.equals(Bytes.toString(CellUtil + .cloneQualifier(keyValue)))); Assert.assertEquals(timestamp + 2, keyValue.getTimestamp()); Assert.assertEquals(value, Bytes.toString(CellUtil.cloneValue(keyValue))); if (countAdd) { @@ -230,12 +239,15 @@ private void testBasic(String family) throws Exception { boolean countAdd = true; for (Cell keyValue : result.rawCells()) { System.out.println("rowKey: " + new String(CellUtil.cloneRow(keyValue)) - + " columnQualifier:" + new String(CellUtil.cloneQualifier(keyValue)) + + " columnQualifier:" + + new String(CellUtil.cloneQualifier(keyValue)) + " timestamp:" + keyValue.getTimestamp() + " value:" + new String(CellUtil.cloneValue(keyValue))); Assert.assertEquals(key + "_" + i, Bytes.toString(CellUtil.cloneRow(keyValue))); - Assert.assertTrue(column1.equals(Bytes.toString(CellUtil.cloneQualifier(keyValue))) - || column2.equals(Bytes.toString(CellUtil.cloneQualifier(keyValue)))); + Assert.assertTrue(column1.equals(Bytes.toString(CellUtil + .cloneQualifier(keyValue))) + || column2.equals(Bytes.toString(CellUtil + .cloneQualifier(keyValue)))); Assert.assertEquals(value, Bytes.toString(CellUtil.cloneValue(keyValue))); if (countAdd) { countAdd = false; @@ -274,7 +286,8 @@ public void testHugeData() throws IOException { System.arraycopy(keyBytes, 0, rowkey, 0, keyBytes.length); System.arraycopy(currentBytes, 0, rowkey, keyBytes.length, currentBytes.length); Put put = new Put(rowkey); - KeyValue kv = new KeyValue(rowkey, "family1".getBytes(), column1.getBytes(), toBytes(value)); + KeyValue kv = new KeyValue(rowkey, "family1".getBytes(), column1.getBytes(), + toBytes(value)); put.add(kv); kv = new KeyValue(rowkey, "family1".getBytes(), column2.getBytes(), toBytes(value)); put.add(kv); @@ -897,7 +910,8 @@ public void testFilter() throws Exception { long timestamp = System.currentTimeMillis(); putKey1Column1Value1 = new Put(toBytes(key1)); - putKey1Column1Value1.addColumn(toBytes(family), toBytes(column1), timestamp, toBytes(value1)); + putKey1Column1Value1.addColumn(toBytes(family), toBytes(column1), timestamp, + toBytes(value1)); putKey1Column1Value2 = new Put(toBytes(key1)); putKey1Column1Value2.addColumn(toBytes(family), toBytes(column1), toBytes(value2)); @@ -909,7 +923,8 @@ public void testFilter() throws Exception { putKey1Column2Value1.addColumn(toBytes(family), toBytes(column2), toBytes(value1)); putKey2Column1Value1 = new Put(toBytes(key2)); - putKey2Column1Value1.addColumn(toBytes(family), toBytes(column1), timestamp, toBytes(value1)); + putKey2Column1Value1.addColumn(toBytes(family), toBytes(column1), timestamp, + toBytes(value1)); putKey2Column1Value2 = new Put(toBytes(key2)); putKey2Column1Value2.addColumn(toBytes(family), toBytes(column1), toBytes(value2)); @@ -1278,13 +1293,13 @@ public void testColumnRangeFilter() throws Exception { int res_count = 0; for (Result result : scanner) { for (Cell keyValue : result.rawCells()) { - System.out.printf("Rowkey: %s, Column Family: %s, Column Qualifier: %s, Timestamp: %d, Value: %s%n", + System.out + .printf( + "Rowkey: %s, Column Family: %s, Column Qualifier: %s, Timestamp: %d, Value: %s%n", Bytes.toString(result.getRow()), Bytes.toString(CellUtil.cloneFamily(keyValue)), - Bytes.toString(CellUtil.cloneQualifier(keyValue)), - keyValue.getTimestamp(), - Bytes.toString(CellUtil.cloneValue(keyValue)) - ); + Bytes.toString(CellUtil.cloneQualifier(keyValue)), keyValue.getTimestamp(), + Bytes.toString(CellUtil.cloneValue(keyValue))); res_count += 1; } } @@ -1301,13 +1316,13 @@ public void testColumnRangeFilter() throws Exception { res_count = 0; for (Result result : scanner) { for (Cell keyValue : result.rawCells()) { - System.out.printf("Rowkey: %s, Column Family: %s, Column Qualifier: %s, Timestamp: %d, Value: %s%n", + System.out + .printf( + "Rowkey: %s, Column Family: %s, Column Qualifier: %s, Timestamp: %d, Value: %s%n", Bytes.toString(result.getRow()), Bytes.toString(CellUtil.cloneFamily(keyValue)), - Bytes.toString(CellUtil.cloneQualifier(keyValue)), - keyValue.getTimestamp(), - Bytes.toString(CellUtil.cloneValue(keyValue)) - ); + Bytes.toString(CellUtil.cloneQualifier(keyValue)), keyValue.getTimestamp(), + Bytes.toString(CellUtil.cloneValue(keyValue))); res_count += 1; } } @@ -1324,13 +1339,13 @@ public void testColumnRangeFilter() throws Exception { res_count = 0; for (Result result : scanner) { for (Cell keyValue : result.rawCells()) { - System.out.printf("Rowkey: %s, Column Family: %s, Column Qualifier: %s, Timestamp: %d, Value: %s%n", + System.out + .printf( + "Rowkey: %s, Column Family: %s, Column Qualifier: %s, Timestamp: %d, Value: %s%n", Bytes.toString(result.getRow()), Bytes.toString(CellUtil.cloneFamily(keyValue)), - Bytes.toString(CellUtil.cloneQualifier(keyValue)), - keyValue.getTimestamp(), - Bytes.toString(CellUtil.cloneValue(keyValue)) - ); + Bytes.toString(CellUtil.cloneQualifier(keyValue)), keyValue.getTimestamp(), + Bytes.toString(CellUtil.cloneValue(keyValue))); res_count += 1; } } @@ -1349,13 +1364,13 @@ public void testColumnRangeFilter() throws Exception { res_count = 0; for (Result result : scanner) { for (Cell keyValue : result.rawCells()) { - System.out.printf("Rowkey: %s, Column Family: %s, Column Qualifier: %s, Timestamp: %d, Value: %s%n", + System.out + .printf( + "Rowkey: %s, Column Family: %s, Column Qualifier: %s, Timestamp: %d, Value: %s%n", Bytes.toString(result.getRow()), Bytes.toString(CellUtil.cloneFamily(keyValue)), - Bytes.toString(CellUtil.cloneQualifier(keyValue)), - keyValue.getTimestamp(), - Bytes.toString(CellUtil.cloneValue(keyValue)) - ); + Bytes.toString(CellUtil.cloneQualifier(keyValue)), keyValue.getTimestamp(), + Bytes.toString(CellUtil.cloneValue(keyValue))); res_count += 1; } } @@ -1375,13 +1390,13 @@ public void testColumnRangeFilter() throws Exception { res_count = 0; for (Result result : scanner) { for (Cell keyValue : result.rawCells()) { - System.out.printf("Rowkey: %s, Column Family: %s, Column Qualifier: %s, Timestamp: %d, Value: %s%n", + System.out + .printf( + "Rowkey: %s, Column Family: %s, Column Qualifier: %s, Timestamp: %d, Value: %s%n", Bytes.toString(result.getRow()), Bytes.toString(CellUtil.cloneFamily(keyValue)), - Bytes.toString(CellUtil.cloneQualifier(keyValue)), - keyValue.getTimestamp(), - Bytes.toString(CellUtil.cloneValue(keyValue)) - ); + Bytes.toString(CellUtil.cloneQualifier(keyValue)), keyValue.getTimestamp(), + Bytes.toString(CellUtil.cloneValue(keyValue))); res_count += 1; } } @@ -1453,13 +1468,13 @@ public void testFilterNullRange() throws Exception { int res_count = 0; for (Result result : scanner) { for (Cell keyValue : result.rawCells()) { - System.out.printf("Rowkey: %s, Column Family: %s, Column Qualifier: %s, Timestamp: %d, Value: %s%n", + System.out + .printf( + "Rowkey: %s, Column Family: %s, Column Qualifier: %s, Timestamp: %d, Value: %s%n", Bytes.toString(result.getRow()), Bytes.toString(CellUtil.cloneFamily(keyValue)), - Bytes.toString(CellUtil.cloneQualifier(keyValue)), - keyValue.getTimestamp(), - Bytes.toString(CellUtil.cloneValue(keyValue)) - ); + Bytes.toString(CellUtil.cloneQualifier(keyValue)), keyValue.getTimestamp(), + Bytes.toString(CellUtil.cloneValue(keyValue))); Assert.assertArrayEquals(key2.getBytes(), CellUtil.cloneRow(keyValue)); res_count += 1; } @@ -1845,20 +1860,21 @@ public void testColumnValueFilter() throws Exception { scan = new Scan(); scan.addFamily(family.getBytes()); scan.setMaxVersions(10); - ColumnValueFilter filter = new ColumnValueFilter(Bytes.toBytes(family), Bytes.toBytes(column2), CompareOperator.EQUAL, Bytes.toBytes(value1)); + ColumnValueFilter filter = new ColumnValueFilter(Bytes.toBytes(family), + Bytes.toBytes(column2), CompareOperator.EQUAL, Bytes.toBytes(value1)); scan.setFilter(filter); ResultScanner scanner = hTable.getScanner(scan); int res_count = 0; for (Result result : scanner) { for (Cell keyValue : result.rawCells()) { - System.out.printf("Rowkey: %s, Column Family: %s, Column Qualifier: %s, Timestamp: %d, Value: %s%n", + System.out + .printf( + "Rowkey: %s, Column Family: %s, Column Qualifier: %s, Timestamp: %d, Value: %s%n", Bytes.toString(result.getRow()), Bytes.toString(CellUtil.cloneFamily(keyValue)), Bytes.toString(CellUtil.cloneQualifier(keyValue)), - keyValue.getTimestamp(), - Bytes.toString(CellUtil.cloneValue(keyValue)) - ); + keyValue.getTimestamp(), Bytes.toString(CellUtil.cloneValue(keyValue))); res_count += 1; } } @@ -1868,20 +1884,21 @@ public void testColumnValueFilter() throws Exception { scan = new Scan(); scan.addFamily(family.getBytes()); scan.setMaxVersions(10); - filter = new ColumnValueFilter(Bytes.toBytes(family), Bytes.toBytes(column2), CompareOperator.EQUAL, new BinaryPrefixComparator(Bytes.toBytes("value"))); + filter = new ColumnValueFilter(Bytes.toBytes(family), Bytes.toBytes(column2), + CompareOperator.EQUAL, new BinaryPrefixComparator(Bytes.toBytes("value"))); scan.setFilter(filter); scanner = hTable.getScanner(scan); res_count = 0; for (Result result : scanner) { for (Cell keyValue : result.rawCells()) { - System.out.printf("Rowkey: %s, Column Family: %s, Column Qualifier: %s, Timestamp: %d, Value: %s%n", + System.out + .printf( + "Rowkey: %s, Column Family: %s, Column Qualifier: %s, Timestamp: %d, Value: %s%n", Bytes.toString(result.getRow()), Bytes.toString(CellUtil.cloneFamily(keyValue)), Bytes.toString(CellUtil.cloneQualifier(keyValue)), - keyValue.getTimestamp(), - Bytes.toString(CellUtil.cloneValue(keyValue)) - ); + keyValue.getTimestamp(), Bytes.toString(CellUtil.cloneValue(keyValue))); res_count += 1; } } @@ -1891,20 +1908,21 @@ public void testColumnValueFilter() throws Exception { scan = new Scan(); scan.addFamily(family.getBytes()); scan.setMaxVersions(10); - filter = new ColumnValueFilter(Bytes.toBytes(family), Bytes.toBytes(column2), CompareOperator.GREATER, Bytes.toBytes(value1)); + filter = new ColumnValueFilter(Bytes.toBytes(family), Bytes.toBytes(column2), + CompareOperator.GREATER, Bytes.toBytes(value1)); scan.setFilter(filter); scanner = hTable.getScanner(scan); res_count = 0; for (Result result : scanner) { for (Cell keyValue : result.rawCells()) { - System.out.printf("Rowkey: %s, Column Family: %s, Column Qualifier: %s, Timestamp: %d, Value: %s%n", + System.out + .printf( + "Rowkey: %s, Column Family: %s, Column Qualifier: %s, Timestamp: %d, Value: %s%n", Bytes.toString(result.getRow()), Bytes.toString(CellUtil.cloneFamily(keyValue)), Bytes.toString(CellUtil.cloneQualifier(keyValue)), - keyValue.getTimestamp(), - Bytes.toString(CellUtil.cloneValue(keyValue)) - ); + keyValue.getTimestamp(), Bytes.toString(CellUtil.cloneValue(keyValue))); res_count += 1; } } @@ -1914,20 +1932,21 @@ public void testColumnValueFilter() throws Exception { scan = new Scan(); scan.addFamily(family.getBytes()); scan.setMaxVersions(10); - filter = new ColumnValueFilter(Bytes.toBytes(family), Bytes.toBytes(column1), CompareOperator.NOT_EQUAL, Bytes.toBytes(value1)); + filter = new ColumnValueFilter(Bytes.toBytes(family), Bytes.toBytes(column1), + CompareOperator.NOT_EQUAL, Bytes.toBytes(value1)); scan.setFilter(filter); scanner = hTable.getScanner(scan); res_count = 0; for (Result result : scanner) { for (Cell keyValue : result.rawCells()) { - System.out.printf("Rowkey: %s, Column Family: %s, Column Qualifier: %s, Timestamp: %d, Value: %s%n", + System.out + .printf( + "Rowkey: %s, Column Family: %s, Column Qualifier: %s, Timestamp: %d, Value: %s%n", Bytes.toString(result.getRow()), Bytes.toString(CellUtil.cloneFamily(keyValue)), Bytes.toString(CellUtil.cloneQualifier(keyValue)), - keyValue.getTimestamp(), - Bytes.toString(CellUtil.cloneValue(keyValue)) - ); + keyValue.getTimestamp(), Bytes.toString(CellUtil.cloneValue(keyValue))); res_count += 1; } } @@ -1937,20 +1956,21 @@ public void testColumnValueFilter() throws Exception { scan = new Scan(); scan.addFamily(family.getBytes()); scan.setMaxVersions(10); - filter = new ColumnValueFilter(Bytes.toBytes("ff"), Bytes.toBytes(column1), CompareOperator.NOT_EQUAL, Bytes.toBytes(value1)); + filter = new ColumnValueFilter(Bytes.toBytes("ff"), Bytes.toBytes(column1), + CompareOperator.NOT_EQUAL, Bytes.toBytes(value1)); scan.setFilter(filter); scanner = hTable.getScanner(scan); res_count = 0; for (Result result : scanner) { for (Cell keyValue : result.rawCells()) { - System.out.printf("Rowkey: %s, Column Family: %s, Column Qualifier: %s, Timestamp: %d, Value: %s%n", + System.out + .printf( + "Rowkey: %s, Column Family: %s, Column Qualifier: %s, Timestamp: %d, Value: %s%n", Bytes.toString(result.getRow()), Bytes.toString(CellUtil.cloneFamily(keyValue)), Bytes.toString(CellUtil.cloneQualifier(keyValue)), - keyValue.getTimestamp(), - Bytes.toString(CellUtil.cloneValue(keyValue)) - ); + keyValue.getTimestamp(), Bytes.toString(CellUtil.cloneValue(keyValue))); res_count += 1; } } @@ -2801,7 +2821,8 @@ public void testGetFilter() throws Exception { get.setFilter(filterList); r = hTable.get(get); Assert.assertEquals(2, r.rawCells().length); - Assert.assertFalse(Bytes.equals(CellUtil.cloneQualifier(r.rawCells()[0]), CellUtil.cloneQualifier(r.rawCells()[1]))); + Assert.assertFalse(Bytes.equals(CellUtil.cloneQualifier(r.rawCells()[0]), + CellUtil.cloneQualifier(r.rawCells()[1]))); filterList = new FilterList(); filterList.addFilter(new QualifierFilter(CompareFilter.CompareOp.EQUAL, @@ -5173,8 +5194,9 @@ public void testQualifyNull() throws Exception { r = hTable.get(get); for (Cell kv : r.rawCells()) { System.out.println("K = [" + Bytes.toString(CellUtil.cloneRow(kv)) + "] Q =[" - + Bytes.toString(CellUtil.cloneQualifier(kv)) + "] T = [" + kv.getTimestamp() - + "] V = [" + Bytes.toString(CellUtil.cloneValue(kv)) + "]"); + + Bytes.toString(CellUtil.cloneQualifier(kv)) + "] T = [" + + kv.getTimestamp() + "] V = [" + + Bytes.toString(CellUtil.cloneValue(kv)) + "]"); } Assert.assertEquals(1, r.rawCells().length); Assert.assertEquals(key, Bytes.toString(CellUtil.cloneRow(r.rawCells()[0]))); @@ -5401,8 +5423,8 @@ public void testColumnQualifier() throws Exception { Result result = hTable.get(get); Assert.assertEquals("", 1, result.rawCells().length); Assert.assertEquals("", 0, result.rawCells()[0].getQualifierLength()); - Assert - .assertArrayEquals("", "value1_qualifier_null".getBytes(), CellUtil.cloneValue(result.rawCells()[0])); + Assert.assertArrayEquals("", "value1_qualifier_null".getBytes(), + CellUtil.cloneValue(result.rawCells()[0])); put = new Put(("key_c_q_empty").getBytes()); put.addColumn("family1".getBytes(), "".getBytes(), "value1_qualifier_empty".getBytes()); @@ -5455,14 +5477,20 @@ public void testTTLColumnLevel() throws Exception { get.setMaxVersions(1); Result result = hTable.get(get); Assert.assertEquals("", 2, result.rawCells().length); - Assert.assertEquals("", 1, result - .getColumnCells("family_ttl".getBytes(), ("column1").getBytes()).size()); - Assert.assertEquals("", 1, result - .getColumnCells("family_ttl".getBytes(), ("column2").getBytes()).size()); - Assert.assertArrayEquals("", "column1_value1_ttl_column".getBytes(), - CellUtil.cloneValue(result.getColumnCells("family_ttl".getBytes(), ("column1").getBytes()).get(0))); - Assert.assertArrayEquals("", "column2_value1_ttl_column".getBytes(), - CellUtil.cloneValue(result.getColumnCells("family_ttl".getBytes(), ("column2").getBytes()).get(0))); + Assert.assertEquals("", 1, + result.getColumnCells("family_ttl".getBytes(), ("column1").getBytes()).size()); + Assert.assertEquals("", 1, + result.getColumnCells("family_ttl".getBytes(), ("column2").getBytes()).size()); + Assert.assertArrayEquals( + "", + "column1_value1_ttl_column".getBytes(), + CellUtil.cloneValue(result.getColumnCells("family_ttl".getBytes(), + ("column1").getBytes()).get(0))); + Assert.assertArrayEquals( + "", + "column2_value1_ttl_column".getBytes(), + CellUtil.cloneValue(result.getColumnCells("family_ttl".getBytes(), + ("column2").getBytes()).get(0))); //过期之后不能再查出数据 Thread.sleep(4 * 1000L); @@ -5483,14 +5511,20 @@ public void testTTLFamilyLevel() throws Exception { get.setMaxVersions(1); Result result = hTable.get(get); Assert.assertEquals("", 2, result.rawCells().length); - Assert.assertEquals("", 1, result - .getColumnCells("family_ttl".getBytes(), ("column1").getBytes()).size()); - Assert.assertEquals("", 1, result - .getColumnCells("family_ttl".getBytes(), ("column2").getBytes()).size()); - Assert.assertArrayEquals("", "column1_value_ttl_family".getBytes(), - CellUtil.cloneValue(result.getColumnCells("family_ttl".getBytes(), ("column1").getBytes()).get(0))); - Assert.assertArrayEquals("", "column2_value_ttl_family".getBytes(), - CellUtil.cloneValue(result.getColumnCells("family_ttl".getBytes(), ("column2").getBytes()).get(0))); + Assert.assertEquals("", 1, + result.getColumnCells("family_ttl".getBytes(), ("column1").getBytes()).size()); + Assert.assertEquals("", 1, + result.getColumnCells("family_ttl".getBytes(), ("column2").getBytes()).size()); + Assert.assertArrayEquals( + "", + "column1_value_ttl_family".getBytes(), + CellUtil.cloneValue(result.getColumnCells("family_ttl".getBytes(), + ("column1").getBytes()).get(0))); + Assert.assertArrayEquals( + "", + "column2_value_ttl_family".getBytes(), + CellUtil.cloneValue(result.getColumnCells("family_ttl".getBytes(), + ("column2").getBytes()).get(0))); //过期之后不能再查出数据 Thread.sleep(4 * 1000L); diff --git a/src/test/java/com/alipay/oceanbase/hbase/OHConnectionTest.java b/src/test/java/com/alipay/oceanbase/hbase/OHConnectionTest.java index 778ef6da..6ac69b6b 100644 --- a/src/test/java/com/alipay/oceanbase/hbase/OHConnectionTest.java +++ b/src/test/java/com/alipay/oceanbase/hbase/OHConnectionTest.java @@ -17,12 +17,11 @@ package com.alipay.oceanbase.hbase; -import com.alipay.oceanbase.hbase.exception.FeatureNotSupportedException; +import com.alipay.oceanbase.hbase.util.OHBufferedMutatorImpl; import com.alipay.oceanbase.hbase.util.ObHTableTestUtil; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; -import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.*; import org.apache.hadoop.hbase.util.Bytes; @@ -73,13 +72,6 @@ public void testConnectionByXml() throws Exception { hTable.close(); } - @BeforeClass - public static void before() throws Exception { - // use self-defined namespace "n1" - hTable = ObHTableTestUtil.newOHTableClient("n1:test"); - ((OHTableClient) hTable).init(); - } - @AfterClass public static void finish() throws IOException { hTable.close(); @@ -87,6 +79,8 @@ public static void finish() throws IOException { @Test public void testRefreshTableEntry() throws Exception { + hTable = ObHTableTestUtil.newOHTableClient("n1:test"); + ((OHTableClient) hTable).init(); ((OHTableClient) hTable).refreshTableEntry("family1", false); ((OHTableClient) hTable).refreshTableEntry("family1", true); } @@ -140,7 +134,8 @@ private void testBasic() throws Exception { hTable.delete(delete); for (Cell keyValue : r.rawCells()) { - System.out.println("rowKey: " + new String(CellUtil.cloneRow(keyValue)) + " columnQualifier:" + System.out.println("rowKey: " + new String(CellUtil.cloneRow(keyValue)) + + " columnQualifier:" + new String(CellUtil.cloneQualifier(keyValue)) + " timestamp:" + keyValue.getTimestamp() + " value:" + new String(CellUtil.cloneValue(keyValue))); @@ -171,8 +166,10 @@ private void testBasic() throws Exception { boolean countAdd = true; for (Cell keyValue : result.rawCells()) { Assert.assertEquals(key + "_" + i, Bytes.toString(CellUtil.cloneRow(keyValue))); - Assert.assertTrue(column1.equals(Bytes.toString(CellUtil.cloneQualifier(keyValue))) - || column2.equals(Bytes.toString(CellUtil.cloneQualifier(keyValue)))); + Assert.assertTrue(column1.equals(Bytes.toString(CellUtil + .cloneQualifier(keyValue))) + || column2.equals(Bytes.toString(CellUtil + .cloneQualifier(keyValue)))); Assert.assertEquals(timestamp + 2, keyValue.getTimestamp()); Assert.assertEquals(value, Bytes.toString(CellUtil.cloneValue(keyValue))); if (countAdd) { @@ -194,8 +191,10 @@ private void testBasic() throws Exception { boolean countAdd = true; for (Cell keyValue : result.rawCells()) { Assert.assertEquals(key + "_" + i, Bytes.toString(CellUtil.cloneRow(keyValue))); - Assert.assertTrue(column1.equals(Bytes.toString(CellUtil.cloneQualifier(keyValue))) - || column2.equals(Bytes.toString(CellUtil.cloneQualifier(keyValue)))); + Assert.assertTrue(column1.equals(Bytes.toString(CellUtil + .cloneQualifier(keyValue))) + || column2.equals(Bytes.toString(CellUtil + .cloneQualifier(keyValue)))); Assert.assertEquals(value, Bytes.toString(CellUtil.cloneValue(keyValue))); if (countAdd) { countAdd = false; @@ -231,8 +230,7 @@ public void testBufferedMutatorWithFlush() throws Exception { Configuration conf = ObHTableTestUtil.newConfiguration(); conf.set("rs.list.acquire.read.timeout", "10000"); conf.set(SOCKET_TIMEOUT_CONNECT, "15000"); - BufferedMutator putBufferMutator = null; - BufferedMutator delBufferedMutator = null; + BufferedMutator bufferMutator = null; String key = "putKey"; String column1 = "putColumn1"; String value = "value333444"; @@ -247,17 +245,15 @@ public void testBufferedMutatorWithFlush() throws Exception { hTable.delete(delete); // use defualt params - putBufferMutator = connection.getBufferedMutator(tableName); - delBufferedMutator = connection.getBufferedMutator(tableName); + bufferMutator = connection.getBufferedMutator(tableName); long timestamp = System.currentTimeMillis(); // only support Put and Delete - // for other type of operations, BufferedMutator will not set its type for them Append append = new Append(Bytes.toBytes(key)); - append.add("family_group".getBytes(), column1.getBytes(), toBytes("_suffix")); - final BufferedMutator apMut = putBufferMutator; + append.addColumn("family_group".getBytes(), column1.getBytes(), toBytes("_suffix")); + final BufferedMutator apMut = bufferMutator; Assert.assertThrows(IllegalArgumentException.class, () -> { apMut.mutate(append); }); @@ -271,31 +267,47 @@ public void testBufferedMutatorWithFlush() throws Exception { put2.addColumn(Bytes.toBytes(family), Bytes.toBytes(column1 + "1"), timestamp, Bytes.toBytes(value + "4")); mutations.add(put2); // test add Mutations with List - putBufferMutator.mutate(mutations); - putBufferMutator.flush(); + bufferMutator.mutate(mutations); + bufferMutator.flush(); Get get = new Get(toBytes(key)); Result r = hTable.get(get); Assert.assertEquals(2, r.rawCells().length); - Delete del = new Delete(Bytes.toBytes(key)); - final BufferedMutator noCfMut = putBufferMutator; - // test mutation without setting family - Assert.assertThrows(FeatureNotSupportedException.class, () -> { - noCfMut.mutate(del); - }); - del.addFamily(Bytes.toBytes(family)); - // test reuse different type bufferedMutator - final BufferedMutator difTypeMut = putBufferMutator; + Put put3 = new Put(Bytes.toBytes(key)); + final BufferedMutator noCfMut = bufferMutator; + // test Put without setting family Assert.assertThrows(IllegalArgumentException.class, () -> { - difTypeMut.mutate(del); + noCfMut.mutate(put3); }); + put3.addColumn(Bytes.toBytes(family), Bytes.toBytes(column1 + "2"), timestamp, Bytes.toBytes(value)); // test add Mutation directly - delBufferedMutator.mutate(del); - delBufferedMutator.flush(); + bufferMutator.mutate(put3); + bufferMutator.flush(); + r = hTable.get(get); + Assert.assertEquals(3, r.rawCells().length); + + // test Delete + Delete del = new Delete(toBytes(key)); + del.addFamily(toBytes(family)); + // test without setting family, delete all + bufferMutator.mutate(del); + bufferMutator.flush(); r = hTable.get(get); Assert.assertEquals(0, r.rawCells().length); + + // test hybrid mutations + mutations.clear(); + mutations.add(put1); + mutations.add(put2); + mutations.add(del); + mutations.add(put3); + bufferMutator.mutate(mutations); + bufferMutator.flush(); + + r = hTable.get(get); + Assert.assertEquals(1, r.rawCells().length); } catch (Exception ex) { if (ex instanceof RetriesExhaustedWithDetailsException) { ((RetriesExhaustedWithDetailsException) ex).getCauses().get(0).printStackTrace(); @@ -304,20 +316,22 @@ public void testBufferedMutatorWithFlush() throws Exception { } Assert.assertTrue(false); } finally { - if (putBufferMutator != null ) { - putBufferMutator.close(); + if (bufferMutator != null ) { + bufferMutator.close(); // test flush after closed - putBufferMutator.flush(); - } - if (delBufferedMutator != null) { - delBufferedMutator.close(); + final BufferedMutator closedMutator = bufferMutator; + Assert.assertThrows(IllegalStateException.class, () -> { + closedMutator.flush(); + }); // test add mutations after closed Delete delete = new Delete(Bytes.toBytes(key)); delete.addFamily(Bytes.toBytes(family)); - final BufferedMutator closedMutator = delBufferedMutator; Assert.assertThrows(IllegalStateException.class, () -> { closedMutator.mutate(delete); }); + if (((OHBufferedMutatorImpl) bufferMutator).getPool() != null) { + Assert.assertTrue(((OHBufferedMutatorImpl) bufferMutator).getPool().isShutdown()); + } } } } @@ -338,32 +352,31 @@ public void testBufferedMutatorUseNameSpaceWithFlush() throws Exception { Configuration conf = ObHTableTestUtil.newConfiguration(); conf.set("rs.list.acquire.read.timeout", "10000"); conf.set(SOCKET_TIMEOUT_CONNECT, "15000"); - BufferedMutator putBufferMutator = null; - BufferedMutator delBufferedMutator = null; + BufferedMutator bufferMutator = null; String key = "putKey"; String column1 = "putColumn1"; String value = "value333444"; String family = "family_group"; - long timestamp = System.currentTimeMillis(); try { // use n1 database TableName tableName = TableName.valueOf("n1","test"); connection = ConnectionFactory.createConnection(conf); hTable = connection.getTable(tableName); + Assert.assertEquals("n1:test", hTable.getName().getNameAsString()); Delete delete= new Delete(toBytes(key)); delete.addFamily(toBytes(family)); hTable.delete(delete); // use defualt params - putBufferMutator = connection.getBufferedMutator(tableName); - delBufferedMutator = connection.getBufferedMutator(tableName); + bufferMutator = connection.getBufferedMutator(tableName); + + long timestamp = System.currentTimeMillis(); // only support Put and Delete - // for other type of operations, BufferedMutator will not set its type for them Append append = new Append(Bytes.toBytes(key)); - append.add(toBytes(family), toBytes(column1), toBytes("_suffix")); - final BufferedMutator apMut = putBufferMutator; + append.add("family_group".getBytes(), column1.getBytes(), toBytes("_suffix")); + final BufferedMutator apMut = bufferMutator; Assert.assertThrows(IllegalArgumentException.class, () -> { apMut.mutate(append); }); @@ -377,31 +390,46 @@ public void testBufferedMutatorUseNameSpaceWithFlush() throws Exception { put2.addColumn(Bytes.toBytes(family), Bytes.toBytes(column1 + "1"), timestamp, Bytes.toBytes(value + "4")); mutations.add(put2); // test add Mutations with List - putBufferMutator.mutate(mutations); - putBufferMutator.flush(); + bufferMutator.mutate(mutations); + bufferMutator.flush(); Get get = new Get(toBytes(key)); Result r = hTable.get(get); Assert.assertEquals(2, r.rawCells().length); - Delete del = new Delete(Bytes.toBytes(key)); - final BufferedMutator noCfMut = putBufferMutator; - // test mutation without setting family - Assert.assertThrows(FeatureNotSupportedException.class, () -> { - noCfMut.mutate(del); - }); - del.addFamily(Bytes.toBytes(family)); - final BufferedMutator difTypeMut = putBufferMutator; - // test reuse different type bufferedMutator + Put put3 = new Put(Bytes.toBytes(key)); + final BufferedMutator noCfMut = bufferMutator; + // test Put without setting family Assert.assertThrows(IllegalArgumentException.class, () -> { - difTypeMut.mutate(del); + noCfMut.mutate(put3); }); + put3.addColumn(Bytes.toBytes(family), Bytes.toBytes(column1 + "2"), timestamp, Bytes.toBytes(value)); // test add Mutation directly - delBufferedMutator.mutate(del); - delBufferedMutator.flush(); + bufferMutator.mutate(put3); + bufferMutator.flush(); + r = hTable.get(get); + Assert.assertEquals(3, r.rawCells().length); + + // test Delete + Delete del = new Delete(toBytes(key)); + // test without setting family, delete all + bufferMutator.mutate(del); + bufferMutator.flush(); r = hTable.get(get); Assert.assertEquals(0, r.rawCells().length); + + // test hybrid mutations + mutations.clear(); + mutations.add(put1); + mutations.add(put2); + mutations.add(del); + mutations.add(put3); + bufferMutator.mutate(mutations); + bufferMutator.flush(); + + r = hTable.get(get); + Assert.assertEquals(1, r.rawCells().length); } catch (Exception ex) { if (ex instanceof RetriesExhaustedWithDetailsException) { ((RetriesExhaustedWithDetailsException) ex).getCauses().get(0).printStackTrace(); @@ -410,20 +438,22 @@ public void testBufferedMutatorUseNameSpaceWithFlush() throws Exception { } Assert.assertTrue(false); } finally { - if (putBufferMutator != null ) { - putBufferMutator.close(); + if (bufferMutator != null ) { + bufferMutator.close(); // test flush after closed - putBufferMutator.flush(); - } - if (delBufferedMutator != null) { - delBufferedMutator.close(); + final BufferedMutator closedMutator = bufferMutator; + Assert.assertThrows(IllegalStateException.class, () -> { + closedMutator.flush(); + }); // test add mutations after closed - Delete delete = new Delete(Bytes.toBytes("putKey")); - delete.addFamily(Bytes.toBytes("family_group")); - final BufferedMutator closedMutator = delBufferedMutator; + Delete delete = new Delete(Bytes.toBytes(key)); + delete.addFamily(Bytes.toBytes(family)); Assert.assertThrows(IllegalStateException.class, () -> { closedMutator.mutate(delete); }); + if (((OHBufferedMutatorImpl) bufferMutator).getPool() != null) { + Assert.assertTrue(((OHBufferedMutatorImpl) bufferMutator).getPool().isShutdown()); + } } } } @@ -442,7 +472,7 @@ PRIMARY KEY (`K`, `Q`, `T`) public void testBufferedMutatorWithAutoFlush() throws Exception { Configuration conf = ObHTableTestUtil.newConfiguration(); conf.set("rs.list.acquire.read.timeout", "10000"); - BufferedMutator putBufferMutator = null; + BufferedMutator bufferMutator = null; BufferedMutatorParams params = null; long bufferSize = 45000L; int count = 0; @@ -463,7 +493,7 @@ public void testBufferedMutatorWithAutoFlush() throws Exception { // set params params = new BufferedMutatorParams(tableName); params.writeBufferSize(bufferSize); - putBufferMutator = connection.getBufferedMutator(params); + bufferMutator = connection.getBufferedMutator(params); List mutations = new ArrayList<>(); for (int i = 0; i < 50; ++i) { @@ -474,8 +504,20 @@ public void testBufferedMutatorWithAutoFlush() throws Exception { timestamp, Bytes.toBytes(value + "_" + i + "_" + j)); mutations.add(put); } - putBufferMutator.mutate(mutations); + if (i % 10 == 0) { // 0, 10, 20, 30, 40 + for(int j = 0; j < 4; ++j) { + Delete del = new Delete(Bytes.toBytes(key)); + del.addColumns(toBytes(family), toBytes(column1 + "_" + i + "_" + j)); + mutations.add(del); + } + } + bufferMutator.mutate(mutations); } + Get get = new Get(toBytes(key)); + get.addFamily(toBytes(family)); + Result r = hTable.get(get); + count = r.rawCells().length; + Assert.assertTrue(count > 0); } catch (Exception ex) { if (ex instanceof RetriesExhaustedWithDetailsException) { ((RetriesExhaustedWithDetailsException) ex).getCauses().get(0).printStackTrace(); @@ -484,14 +526,13 @@ public void testBufferedMutatorWithAutoFlush() throws Exception { } Assert.assertTrue(false); } finally { - if (putBufferMutator != null) { - putBufferMutator.close(); + if (bufferMutator != null) { + bufferMutator.close(); Get get = new Get(toBytes(key)); + get.addFamily(toBytes(family)); Result r = hTable.get(get); - for (Cell keyValue : r.rawCells()) { - ++count; - } - Assert.assertEquals(200, count); + count = r.rawCells().length; + Assert.assertEquals(180, count); Delete delete = new Delete(toBytes(key)); delete.addFamily(toBytes(family)); hTable.delete(delete); @@ -499,17 +540,16 @@ public void testBufferedMutatorWithAutoFlush() throws Exception { Assert.assertEquals(0, r.rawCells().length); // test add mutations after closed - final BufferedMutator closedMutator = putBufferMutator; + final BufferedMutator closedMutator = bufferMutator; Assert.assertThrows(IllegalStateException.class, () -> { closedMutator.mutate(delete); }); // test flush after closed - putBufferMutator.flush(); - } - - if (params != null) { - if (params.getPool() != null) { - Assert.assertTrue(params.getPool().isShutdown()); + Assert.assertThrows(IllegalStateException.class, () -> { + closedMutator.flush(); + }); + if (((OHBufferedMutatorImpl) bufferMutator).getPool() != null) { + Assert.assertTrue(((OHBufferedMutatorImpl) bufferMutator).getPool().isShutdown()); } } } @@ -531,6 +571,7 @@ public void testBufferedMutatorWithUserPool() throws Exception { conf.set("rs.list.acquire.read.timeout", "10000"); BufferedMutator ohBufferMutator = null; BufferedMutatorParams params = null; + ThreadPoolExecutor pool = null; long bufferSize = 45000L; int count = 0; String key = "putKey"; @@ -553,7 +594,7 @@ public void testBufferedMutatorWithUserPool() throws Exception { // set thread pool long keepAliveTime = conf.getLong("hbase.hconnection.threads.keepalivetime", 60); - ThreadPoolExecutor pool = new ThreadPoolExecutor(10, 256, keepAliveTime, TimeUnit.SECONDS, new SynchronousQueue(), Threads.newDaemonThreadFactory("htable")); + pool = new ThreadPoolExecutor(10, 256, keepAliveTime, TimeUnit.SECONDS, new SynchronousQueue(), Threads.newDaemonThreadFactory("htable")); pool.allowCoreThreadTimeOut(true); params.pool(pool); @@ -568,8 +609,20 @@ public void testBufferedMutatorWithUserPool() throws Exception { timestamp, Bytes.toBytes(value + "_" + i + "_" + j)); mutations.add(put); } + if (i % 10 == 0) { // 0, 10, 20, 30, 40 + for(int j = 0; j < 4; ++j) { + Delete del = new Delete(Bytes.toBytes(key)); + del.addColumns(toBytes(family), toBytes(column1 + "_" + i + "_" + j)); + mutations.add(del); + } + } ohBufferMutator.mutate(mutations); } + Get get = new Get(toBytes(key)); + get.addFamily(toBytes(family)); + Result r = hTable.get(get); + count = r.rawCells().length; + Assert.assertTrue(count > 0); } catch (Exception ex) { if (ex instanceof RetriesExhaustedWithDetailsException) { ((RetriesExhaustedWithDetailsException) ex).getCauses().get(0).printStackTrace(); @@ -581,11 +634,10 @@ public void testBufferedMutatorWithUserPool() throws Exception { if (ohBufferMutator != null) { ohBufferMutator.close(); Get get = new Get(toBytes(key)); + get.addFamily(toBytes(family)); Result r = hTable.get(get); - for (Cell keyValue : r.rawCells()) { - ++count; - } - Assert.assertEquals(200, count); + count = r.rawCells().length; + Assert.assertEquals(180, count); Delete delete = new Delete(toBytes(key)); delete.addFamily(toBytes(family)); hTable.delete(delete); @@ -598,11 +650,23 @@ public void testBufferedMutatorWithUserPool() throws Exception { closedMutator.mutate(delete); }); // test flush after closed - ohBufferMutator.flush(); - } - if (params != null) { - if (params.getPool() != null) { - Assert.assertTrue(params.getPool().isShutdown()); + Assert.assertThrows(IllegalStateException.class, () -> { + closedMutator.flush(); + }); + ExecutorService bufferPool = ((OHBufferedMutatorImpl) ohBufferMutator).getPool(); + if (bufferPool != null) { + // self-defined pool must be shutdown by users + Assert.assertFalse(bufferPool.isShutdown()); + bufferPool.shutdown(); + try { + if (!bufferPool.awaitTermination(10, TimeUnit.SECONDS)) { + System.out.println("close() failed to terminate pool after 10 seconds. Abandoning pool."); + } + } catch (InterruptedException e) { + System.out.println("waitForTermination interrupted"); + Thread.currentThread().interrupt(); + } + Assert.assertTrue(bufferPool.isShutdown()); } } } @@ -624,6 +688,7 @@ public void testBufferedMutatorConcurrent() throws Exception { conf.set("rs.list.acquire.read.timeout", "10000"); BufferedMutator ohBufferMutator = null; BufferedMutatorParams params = null; + ThreadPoolExecutor pool = null; ExecutorService executorService = Executors.newFixedThreadPool(10); long bufferSize = 45000L; int count = 0; @@ -647,24 +712,24 @@ public void testBufferedMutatorConcurrent() throws Exception { // set thread pool long keepAliveTime = conf.getLong("hbase.hconnection.threads.keepalivetime", 60); - ThreadPoolExecutor pool = new ThreadPoolExecutor(10, 256, keepAliveTime, TimeUnit.SECONDS, new SynchronousQueue(), Threads.newDaemonThreadFactory("htable")); + pool = new ThreadPoolExecutor(10, 256, keepAliveTime, TimeUnit.SECONDS, new SynchronousQueue(), Threads.newDaemonThreadFactory("htable")); pool.allowCoreThreadTimeOut(true); params.pool(pool); ohBufferMutator = connection.getBufferedMutator(params); + // BufferedMutator is not concurrently safe for (int i = 0; i < 50; ++i) { final int taskId = i; final BufferedMutator thrBufferMutator = ohBufferMutator; executorService.submit(() -> { List mutations = new ArrayList<>(); for (int j = 0; j < 4; ++j) { - String thrKey = key; String thrColumn = column1 + "_" + taskId + "_" + j; String thrValue = value + "_" + taskId + "_" + j; long thrTimestamp = timestamp; - Put put = new Put(Bytes.toBytes(thrKey)); + Put put = new Put(Bytes.toBytes(key)); put.addColumn(Bytes.toBytes(family), Bytes.toBytes(thrColumn), thrTimestamp, Bytes.toBytes(thrValue)); mutations.add(put); @@ -691,7 +756,7 @@ public void testBufferedMutatorConcurrent() throws Exception { } finally { executorService.shutdown(); try { - if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) { + if (!executorService.awaitTermination(5, TimeUnit.SECONDS)) { executorService.shutdownNow(); } } catch (InterruptedException e) { @@ -701,15 +766,13 @@ public void testBufferedMutatorConcurrent() throws Exception { if (ohBufferMutator != null) { ohBufferMutator.close(); Get get = new Get(toBytes(key)); + get.addFamily(toBytes(family)); Result r = hTable.get(get); - for (Cell keyValue : r.rawCells()) { - ++count; - } + count = r.rawCells().length; Assert.assertEquals(200, count); Delete delete = new Delete(toBytes(key)); delete.addFamily(toBytes(family)); hTable.delete(delete); - r = hTable.get(get); Assert.assertEquals(0, r.rawCells().length); // test add mutations after closed @@ -718,11 +781,146 @@ public void testBufferedMutatorConcurrent() throws Exception { closedMutator.mutate(delete); }); // test flush after closed - ohBufferMutator.flush(); + Assert.assertThrows(IllegalStateException.class, () -> { + closedMutator.flush(); + }); + ExecutorService bufferPool = ((OHBufferedMutatorImpl) ohBufferMutator).getPool(); + if (bufferPool != null) { + // self-defined pool must be shutdown by users + Assert.assertFalse(bufferPool.isShutdown()); + bufferPool.shutdown(); + try { + if (!bufferPool.awaitTermination(10, TimeUnit.SECONDS)) { + System.out.println("close() failed to terminate pool after 10 seconds. Abandoning pool."); + } + } catch (InterruptedException e) { + System.out.println("waitForTermination interrupted"); + Thread.currentThread().interrupt(); + } + Assert.assertTrue(bufferPool.isShutdown()); + } + } + } + } + + @Test + public void testBufferedMutatorPeriodicFlush() throws Exception { + Configuration conf = ObHTableTestUtil.newConfiguration(); + conf.set("rs.list.acquire.read.timeout", "10000"); + BufferedMutator ohBufferMutator = null; + BufferedMutatorParams params = null; + ThreadPoolExecutor pool = null; + long bufferSize = 45000L; + int count = 0; + String key = "putKey"; + String column1 = "putColumn1"; + String value = "value333444"; + long timestamp = System.currentTimeMillis(); + String family = "family_group"; + try { + TableName tableName = TableName.valueOf("test"); + connection = ConnectionFactory.createConnection(conf); + hTable = connection.getTable(tableName); + + Delete delete= new Delete(toBytes(key)); + delete.addFamily(toBytes(family)); + hTable.delete(delete); + + // set params + params = new BufferedMutatorParams(tableName); + params.writeBufferSize(bufferSize); + // set periodic flush timeout to enable Timer + params.setWriteBufferPeriodicFlushTimeoutMs(100); + + // set thread pool + long keepAliveTime = conf.getLong("hbase.hconnection.threads.keepalivetime", 60); + pool = new ThreadPoolExecutor(10, 256, keepAliveTime, TimeUnit.SECONDS, new SynchronousQueue(), Threads.newDaemonThreadFactory("htable")); + pool.allowCoreThreadTimeOut(true); + params.pool(pool); + + ohBufferMutator = connection.getBufferedMutator(params); + + List mutations = new ArrayList<>(); + for (int i = 0; i < 50; ++i) { + mutations.clear(); + for (int j = 0; j < 4; ++j) { + Put put = new Put(Bytes.toBytes(key)); + put.addColumn(Bytes.toBytes(family), Bytes.toBytes(column1 + "_" + i + "_" + j), + timestamp, Bytes.toBytes(value + "_" + i + "_" + j)); + mutations.add(put); + } + if (i % 10 == 0) { // 0, 10, 20, 30, 40 + for(int j = 0; j < 4; ++j) { + Delete del = new Delete(Bytes.toBytes(key)); + del.addColumns(toBytes(family), toBytes(column1 + "_" + i + "_" + j)); + mutations.add(del); + } + } + ohBufferMutator.mutate(mutations); } - if (params != null) { - if (params.getPool() != null) { - Assert.assertTrue(params.getPool().isShutdown()); + + // test auto flush + Get get = new Get(toBytes(key)); + get.setMaxVersions(); + get.addFamily(toBytes(family)); + Result r = hTable.get(get); + count = r.rawCells().length; + Assert.assertTrue(count > 0); + + // test timer periodic flush + int lastUndealtCount = ((OHBufferedMutatorImpl) ohBufferMutator).size(); + Thread.sleep(1000); + int currentUndealtCount = ((OHBufferedMutatorImpl) ohBufferMutator).size(); + Assert.assertNotEquals(lastUndealtCount, currentUndealtCount); + // after periodic flush, all mutations will be committed + Assert.assertEquals(0, currentUndealtCount); + r = hTable.get(get); + int newCount = r.rawCells().length; + Assert.assertNotEquals(count, newCount); + } catch (Exception ex) { + if (ex instanceof RetriesExhaustedWithDetailsException) { + ((RetriesExhaustedWithDetailsException) ex).getCauses().get(0).printStackTrace(); + } else { + ex.printStackTrace(); + } + Assert.assertTrue(false); + } finally { + if (ohBufferMutator != null) { + ohBufferMutator.close(); + Get get = new Get(toBytes(key)); + get.addFamily(toBytes(family)); + Result r = hTable.get(get); + count = r.rawCells().length; + Assert.assertEquals(180, count); + Delete delete = new Delete(toBytes(key)); + delete.addFamily(toBytes(family)); + hTable.delete(delete); + + r = hTable.get(get); + Assert.assertEquals(0, r.rawCells().length); + // test add mutations after closed + final BufferedMutator closedMutator = ohBufferMutator; + Assert.assertThrows(IllegalStateException.class, () -> { + closedMutator.mutate(delete); + }); + // test flush after closed + Assert.assertThrows(IllegalStateException.class, () -> { + closedMutator.flush(); + }); + ExecutorService bufferPool = ((OHBufferedMutatorImpl) ohBufferMutator).getPool(); + if (bufferPool != null) { + // self-defined pool must be shutdown by users + Assert.assertFalse(bufferPool.isShutdown()); + bufferPool.shutdown(); + try { + if (!bufferPool.awaitTermination(10, TimeUnit.SECONDS)) { + System.out.println("close() failed to terminate pool after 10 seconds. Abandoning pool."); + } + } catch (InterruptedException e) { + System.out.println("waitForTermination interrupted"); + Thread.currentThread().interrupt(); + } + Assert.assertTrue(bufferPool.isShutdown()); } } } diff --git a/src/test/java/com/alipay/oceanbase/hbase/filter/HBaseFilterUtilsTest.java b/src/test/java/com/alipay/oceanbase/hbase/filter/HBaseFilterUtilsTest.java index b379295d..30a06fbe 100644 --- a/src/test/java/com/alipay/oceanbase/hbase/filter/HBaseFilterUtilsTest.java +++ b/src/test/java/com/alipay/oceanbase/hbase/filter/HBaseFilterUtilsTest.java @@ -210,10 +210,11 @@ public void testFuzzyRowFilter() throws IOException { @Test public void testColumnValueFilter() throws IOException { - ColumnValueFilter filter = new ColumnValueFilter(Bytes.toBytes("cf"), Bytes.toBytes("q"), CompareOperator.EQUAL, Bytes.toBytes("v")); + ColumnValueFilter filter = new ColumnValueFilter(Bytes.toBytes("cf"), Bytes.toBytes("q"), + CompareOperator.EQUAL, Bytes.toBytes("v")); System.out.println(Bytes.toString(HBaseFilterUtils.toParseableByteArray(filter))); Assert.assertArrayEquals("ColumnValueFilter('cf','q',=,'binary:v')".getBytes(), - HBaseFilterUtils.toParseableByteArray(filter)); + HBaseFilterUtils.toParseableByteArray(filter)); } @Test diff --git a/src/test/java/unit_test_db.sql b/src/test/java/unit_test_db.sql index c5681bd7..8fe8a9b3 100644 --- a/src/test/java/unit_test_db.sql +++ b/src/test/java/unit_test_db.sql @@ -168,7 +168,7 @@ CREATE TABLE `test$family'1` ( `T` bigint(20) NOT NULL, `V` varbinary(1024) DEFAULT NULL, PRIMARY KEY (`K`, `Q`, `T`) -) TABLEGROUP = test; +); CREATE TABLE `test_t$family'1` ( `K` varbinary(1024) NOT NULL, @@ -176,7 +176,7 @@ CREATE TABLE `test_t$family'1` ( `T` bigint(20) NOT NULL, `V` varbinary(1024) DEFAULT NULL, PRIMARY KEY (`K`, `Q`, `T`) -) TABLEGROUP = test_t; +); CREATE TABLEGROUP test_multi_cf SHARDING = 'ADAPTIVE'; @@ -228,7 +228,7 @@ CREATE TABLE `n1:test$family'1` ( `T` bigint(20) NOT NULL, `V` varbinary(1024) DEFAULT NULL, PRIMARY KEY (`K`, `Q`, `T`) -) TABLEGROUP = `n1:test`; +); CREATE TABLE `n1:test$family_with_local_index` ( `K` varbinary(1024) NOT NULL,