diff --git a/src/main/java/com/alipay/oceanbase/hbase/OHTable.java b/src/main/java/com/alipay/oceanbase/hbase/OHTable.java index a72f21fb..759da56b 100644 --- a/src/main/java/com/alipay/oceanbase/hbase/OHTable.java +++ b/src/main/java/com/alipay/oceanbase/hbase/OHTable.java @@ -69,6 +69,7 @@ import java.io.IOException; import java.util.*; import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicLong; import static com.alipay.oceanbase.hbase.constants.OHConstants.*; import static com.alipay.oceanbase.hbase.util.Preconditions.checkArgument; @@ -84,7 +85,6 @@ import static org.apache.commons.lang.StringUtils.isBlank; import static org.apache.commons.lang.StringUtils.isNotBlank; import static com.alipay.oceanbase.hbase.filter.HBaseFilterUtils.writeBytesWithEscape; -import static org.apache.hadoop.hbase.KeyValue.Type.*; public class OHTable implements Table { @@ -154,7 +154,7 @@ public class OHTable implements Table { /** * the buffer of put request */ - private final ArrayList writeBuffer = new ArrayList(); + private final List writeBuffer = new CopyOnWriteArrayList<>(); /** * when the put request reach the write buffer size the do put will * flush commits automatically @@ -179,7 +179,7 @@ public class OHTable implements Table { /** * current buffer size */ - private long currentWriteBufferSize; + private AtomicLong currentWriteBufferSize = new AtomicLong(0); /** * the max size of put key value @@ -195,7 +195,8 @@ public class OHTable implements Table { private int scannerTimeout; - private RegionLocator regionLocator; + private RegionLocator regionLocator; + /** * Creates an object to access a HBase table. * Shares oceanbase table obTableClient and other resources with other OHTable instances @@ -460,8 +461,8 @@ private void finishSetUp() { } public static OHConnectionConfiguration setUserDefinedNamespace(String tableNameString, - OHConnectionConfiguration ohConnectionConf) - throws IllegalArgumentException { + OHConnectionConfiguration ohConnectionConf) + throws IllegalArgumentException { if (tableNameString.indexOf(':') != -1) { String[] params = tableNameString.split(":"); if (params.length != 2) { @@ -501,13 +502,15 @@ public Configuration getConfiguration() { @Override public HTableDescriptor getTableDescriptor() throws IOException { - OHTableDescriptorExecutor executor = new OHTableDescriptorExecutor(tableNameString, obTableClient); + OHTableDescriptorExecutor executor = new OHTableDescriptorExecutor(tableNameString, + obTableClient); return executor.getTableDescriptor(); } @Override public TableDescriptor getDescriptor() throws IOException { - OHTableDescriptorExecutor executor = new OHTableDescriptorExecutor(tableNameString, obTableClient); + OHTableDescriptorExecutor executor = new OHTableDescriptorExecutor(tableNameString, + obTableClient); return executor.getTableDescriptor(); } @@ -701,7 +704,7 @@ private void compatOldServerBatch(final List actions, final Objec @Override public void batch(final List actions, final Object[] results) throws IOException { - if (actions == null) { + if (actions == null || actions.isEmpty()) { return; } if (results != null) { @@ -718,6 +721,19 @@ public void batch(final List actions, final Object[] results) thr } catch (Exception e) { throw new IOException(tableNameString + " table occurred unexpected error." , e); } + } else if (OHBaseFuncUtils.isAllPut(actions) && OHBaseFuncUtils.isHBasePutPefSupport(obTableClient)) { + // only support Put now + ObHbaseRequest request = buildHbaseRequest(actions); + try { + ObHbaseResult result = (ObHbaseResult) obTableClient.execute(request); + if (results != null) { + for (int i = 0; i < results.length; ++i) { + results[i] = new Result(); + } + } + } catch (Exception e) { + throw new IOException(tableNameString + " table occurred unexpected error." , e); + } } else { String realTableName = getTargetTableName(actions); BatchOperation batch = buildBatchOperation(realTableName, actions, @@ -726,7 +742,7 @@ public void batch(final List actions, final Object[] results) thr try { tmpResults = batch.execute(); } catch (Exception e) { - throw new IOException(tableNameString + " table occurred unexpected error." , e); + throw new IOException(tableNameString + " table occurred unexpected error.", e); } int index = 0; for (int i = 0; i != actions.size(); ++i) { @@ -994,6 +1010,8 @@ public Result call() throws IOException { if (get.isCheckExistenceOnly()) { return Result.create(null, !keyValueList.isEmpty()); } + // sort keyValues + OHBaseFuncUtils.sortHBaseResult(keyValueList); return Result.create(keyValueList); } }; @@ -1227,16 +1245,24 @@ private void doPut(List puts) throws IOException { validatePut(put); checkFamilyViolation(put.getFamilyCellMap().keySet(), true); writeBuffer.add(put); - currentWriteBufferSize += put.heapSize(); + currentWriteBufferSize.addAndGet(put.heapSize()); // we need to periodically see if the writebuffer is full instead of waiting until the end of the List n++; - if (n % putWriteBufferCheck == 0 && currentWriteBufferSize > writeBufferSize) { - flushCommits(); + if (n % putWriteBufferCheck == 0 && currentWriteBufferSize.get() > writeBufferSize) { + if (OHBaseFuncUtils.isHBasePutPefSupport(obTableClient)) { + flushCommitsV2(); + } else { + flushCommits(); + } } } - if (autoFlush || currentWriteBufferSize > writeBufferSize) { - flushCommits(); + if (autoFlush || currentWriteBufferSize.get() > writeBufferSize) { + if (OHBaseFuncUtils.isHBasePutPefSupport(obTableClient)) { + flushCommitsV2(); + } else { + flushCommits(); + } } } @@ -1653,12 +1679,45 @@ public void flushCommits() throws IOException { } finally { if (clearBufferOnFail) { writeBuffer.clear(); - currentWriteBufferSize = 0; + currentWriteBufferSize.set(0); + } else { + // the write buffer was adjusted by processBatchOfPuts + currentWriteBufferSize.set(0); + for (Put aPut : writeBuffer) { + currentWriteBufferSize.addAndGet(aPut.heapSize()); + } + } + } + } + + public void flushCommitsV2() throws IOException { + try { + if (writeBuffer.isEmpty()) { + return; + } + try { + ObHbaseRequest request = buildHbaseRequest(writeBuffer); + try { + ObHbaseResult result = (ObHbaseResult) obTableClient.execute(request); + } catch (Exception e) { + throw new IOException(tableNameString + " table occurred unexpected error.", e); + } + } catch (Exception e) { + logger.error(LCD.convert("01-00008"), tableNameString, null, autoFlush, + writeBuffer.size(), e); + if (e instanceof IOException) { + throw (IOException) e; + } + } + } finally { + if (clearBufferOnFail) { + writeBuffer.clear(); + currentWriteBufferSize.set(0); } else { // the write buffer was adjusted by processBatchOfPuts - currentWriteBufferSize = 0; + currentWriteBufferSize.set(0); for (Put aPut : writeBuffer) { - currentWriteBufferSize += aPut.heapSize(); + currentWriteBufferSize.addAndGet(aPut.heapSize()); } } } @@ -1884,7 +1943,7 @@ private ObHTableFilter buildObHTableFilter(byte[] filterString, TimeRange timeRa if (columnQualifier == null) { obHTableFilter.addSelectColumnQualifier(new byte[0]); } else { - obHTableFilter.addSelectColumnQualifier(columnQualifier); + obHTableFilter.addSelectColumnQualifier(columnQualifier); } } } @@ -1942,11 +2001,11 @@ private ObTableQuery buildObTableQuery(ObHTableFilter filter, final Scan scan) { filter.setOffsetPerRowPerCf(scan.getRowOffsetPerColumnFamily()); } if (scan.isReversed()) { - obTableQuery = buildObTableQuery(filter, scan.getStopRow(), scan.includeStopRow(), scan.getStartRow(), - scan.includeStartRow(), true, ts); + obTableQuery = buildObTableQuery(filter, scan.getStopRow(), scan.includeStopRow(), + scan.getStartRow(), scan.includeStartRow(), true, ts); } else { - obTableQuery = buildObTableQuery(filter, scan.getStartRow(), scan.includeStartRow(), scan.getStopRow(), - scan.includeStopRow(), false, ts); + obTableQuery = buildObTableQuery(filter, scan.getStartRow(), scan.includeStartRow(), + scan.getStopRow(), scan.includeStopRow(), false, ts); } obTableQuery.setBatchSize(scan.getBatch()); obTableQuery.setLimit(scan.getLimit()); @@ -2026,7 +2085,7 @@ private QueryAndMutate buildDeleteQueryAndMutate(KeyValue kv, KeyValue.Type kvType = KeyValue.Type.codeToType(kv.getType().getCode()); com.alipay.oceanbase.rpc.mutation.Mutation tableMutation = buildMutation(kv, operationType, isTableGroup, family, TTL); - if(isTableGroup) { + if (isTableGroup) { // construct new_kv otherwise filter will fail to match targeted columns byte[] oldQualifier = CellUtil.cloneQualifier(kv); byte[] newQualifier = new byte[family.length + 1/* length of "." */ + oldQualifier.length]; @@ -2114,13 +2173,10 @@ private QueryAndMutate buildDeleteQueryAndMutate(KeyValue kv, range.setEndKey(ObRowKey.getInstance(CellUtil.cloneRow(kv), ObObj.getMax(), ObObj.getMax())); if (!isTableGroup) { - filter = buildObHTableFilter(null, - timeRange, - Integer.MAX_VALUE); + filter = buildObHTableFilter(null, timeRange, Integer.MAX_VALUE); } else { - filter = buildObHTableFilter(null, - timeRange, - Integer.MAX_VALUE, CellUtil.cloneQualifier(kv)); + filter = buildObHTableFilter(null, timeRange, Integer.MAX_VALUE, + CellUtil.cloneQualifier(kv)); } } break; @@ -2318,6 +2374,66 @@ private BatchOperation buildBatchOperation(String tableName, List return batch; } + private ObHbaseRequest buildHbaseRequest(List actions) + throws FeatureNotSupportedException, + IllegalArgumentException, + IOException { + ObHbaseRequest request = new ObHbaseRequest(); + ObTableOperationType opType = null; + List keys = new ArrayList<>(); + List cfRowsArray = new ArrayList<>(); + Map cfRowsMap = new HashMap<>(); + int keyIndex = 0; + for (Row row : actions) { + if (row instanceof Put) { + opType = INSERT_OR_UPDATE; + Put put = (Put) row; + if (put.isEmpty()) { + throw new IllegalArgumentException("No columns to put for item"); + } + boolean isCellTTL = false; + long ttl = put.getTTL(); + if (ttl != Long.MAX_VALUE) { + isCellTTL = true; + } + keys.add(ObObj.getInstance(put.getRow())); + for (Map.Entry> entry : put.getFamilyCellMap().entrySet()) { + String family = Bytes.toString(entry.getKey()); + ObHbaseCfRows sameCfRows = cfRowsMap.get(family); + if (sameCfRows == null) { + sameCfRows = new ObHbaseCfRows(); + String realTableName = getTargetTableName(tableNameString, family, configuration); + sameCfRows.setRealTableName(realTableName); + cfRowsMap.put(family, sameCfRows); + cfRowsArray.add(sameCfRows); + } + List keyValueList = entry.getValue(); + List cells = new ArrayList<>(); + for (Cell kv : keyValueList) { + ObHbaseCell cell = new ObHbaseCell(isCellTTL); + cell.setQ(ObObj.getInstance(CellUtil.cloneQualifier(kv))); + cell.setT(ObObj.getInstance(-kv.getTimestamp())); // set timestamp as negative + cell.setV(ObObj.getInstance(CellUtil.cloneValue(kv))); + if (isCellTTL) { + cell.setTTL(ObObj.getInstance(ttl)); + } + cells.add(cell); + } + sameCfRows.add(keyIndex, cells.size(), cells); + } + } else { + throw new FeatureNotSupportedException( + "not supported other type in batch yet,only support get, put and delete"); + } + ++keyIndex; + } + request.setTableName(tableNameString); + request.setKeys(keys); + request.setOpType(opType); + request.setCfRows(cfRowsArray); + return request; + } + public static ObTableOperation buildObTableOperation(Cell kv, ObTableOperationType operationType, Long TTL) { diff --git a/src/main/java/com/alipay/oceanbase/hbase/result/ClientStreamScanner.java b/src/main/java/com/alipay/oceanbase/hbase/result/ClientStreamScanner.java index dca76d20..b5f41c9c 100644 --- a/src/main/java/com/alipay/oceanbase/hbase/result/ClientStreamScanner.java +++ b/src/main/java/com/alipay/oceanbase/hbase/result/ClientStreamScanner.java @@ -127,6 +127,8 @@ public Result next() throws IOException { break; } } + // sort keyValues + OHBaseFuncUtils.sortHBaseResult(keyValues); return Result.create(keyValues); } catch (Exception e) { logger.error(LCD.convert("01-00000"), streamResult.getTableName(), e); diff --git a/src/main/java/com/alipay/oceanbase/hbase/util/OHBaseFuncUtils.java b/src/main/java/com/alipay/oceanbase/hbase/util/OHBaseFuncUtils.java index 9932f554..3896f383 100644 --- a/src/main/java/com/alipay/oceanbase/hbase/util/OHBaseFuncUtils.java +++ b/src/main/java/com/alipay/oceanbase/hbase/util/OHBaseFuncUtils.java @@ -17,9 +17,17 @@ package com.alipay.oceanbase.hbase.util; +import com.alipay.oceanbase.rpc.ObGlobal; +import com.alipay.oceanbase.rpc.ObTableClient; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Row; +import org.apache.hadoop.hbase.util.Bytes; import java.util.Arrays; +import java.util.Comparator; +import java.util.List; @InterfaceAudience.Private public class OHBaseFuncUtils { @@ -42,4 +50,55 @@ public static byte[][] extractFamilyFromQualifier(byte[] qualifier) throws Excep byte[] newQualifier = Arrays.copyOfRange(qualifier, familyLen + 1, qualifier.length); return new byte[][] { family, newQualifier }; } + + public static boolean isHBasePutPefSupport(ObTableClient tableClient) { + if (tableClient.isOdpMode()) { + // server version support and distributed capacity is enabled and odp version support + return ObGlobal.isHBasePutPerfSupport() + && tableClient.getServerCapacity().isSupportDistributedExecute() + && ObGlobal.OB_PROXY_VERSION >= ObGlobal.OB_PROXY_VERSION_4_3_6_0; + } else { + // server version support and distributed capacity is enabled + return ObGlobal.isHBasePutPerfSupport() + && tableClient.getServerCapacity().isSupportDistributedExecute(); + } + } + + public static boolean isAllPut(List actions) { + boolean isAllPut = true; + for (Row action : actions) { + if (!(action instanceof Put)) { + isAllPut = false; + break; + } + } + return isAllPut; + } + + public static void sortHBaseResult(List cells) { + cells.sort(new Comparator() { + @Override + public int compare(Cell cell1, Cell cell2) { + // 1. sort family in lexicographical order + int familyComparison = Bytes.compareTo(cell1.getFamilyArray(), + cell1.getFamilyOffset(), cell1.getFamilyLength(), cell2.getFamilyArray(), + cell2.getFamilyOffset(), cell2.getFamilyLength()); + if (familyComparison != 0) { + return familyComparison; + } + + // 2: sort qualifier in lexicographical order + int qualifierComparison = Bytes.compareTo(cell1.getQualifierArray(), + cell1.getQualifierOffset(), cell1.getQualifierLength(), + cell2.getQualifierArray(), cell2.getQualifierOffset(), + cell2.getQualifierLength()); + if (qualifierComparison != 0) { + return qualifierComparison; + } + + // 3: sort timestamp in descend order + return Long.compare(cell2.getTimestamp(), cell1.getTimestamp()); + } + }); + } } diff --git a/src/main/java/com/alipay/oceanbase/hbase/util/OHBufferedMutatorImpl.java b/src/main/java/com/alipay/oceanbase/hbase/util/OHBufferedMutatorImpl.java index 76ddf9f7..66a80ee7 100644 --- a/src/main/java/com/alipay/oceanbase/hbase/util/OHBufferedMutatorImpl.java +++ b/src/main/java/com/alipay/oceanbase/hbase/util/OHBufferedMutatorImpl.java @@ -301,7 +301,7 @@ public void disableWriteBufferPeriodicFlush() { } @Override - public void close() throws IOException { + public synchronized void close() throws IOException { if (closed) { return; } diff --git a/src/test/java/com/alipay/oceanbase/hbase/secondary/OHTableSecondaryPartPutTest.java b/src/test/java/com/alipay/oceanbase/hbase/secondary/OHTableSecondaryPartPutTest.java index 32449005..5639f1d2 100644 --- a/src/test/java/com/alipay/oceanbase/hbase/secondary/OHTableSecondaryPartPutTest.java +++ b/src/test/java/com/alipay/oceanbase/hbase/secondary/OHTableSecondaryPartPutTest.java @@ -17,6 +17,7 @@ package com.alipay.oceanbase.hbase.secondary; +import com.alipay.oceanbase.hbase.OHTable; import com.alipay.oceanbase.hbase.OHTableClient; import com.alipay.oceanbase.hbase.util.ObHTableTestUtil; import com.alipay.oceanbase.hbase.util.TableTemplateManager; @@ -29,6 +30,8 @@ import org.junit.*; import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicInteger; import static com.alipay.oceanbase.hbase.util.ObHTableSecondaryPartUtil.*; import static com.alipay.oceanbase.hbase.util.ObHTableTestUtil.*; @@ -133,54 +136,242 @@ public static void testBatchPutImpl(String tableName) throws Exception { { long timestamp = System.currentTimeMillis(); List puts = new ArrayList<>(); - Get get = new Get(toBytes(key)); for (int i = 0; i < 10; ++i) { - Put put = new Put(toBytes(key)); - put.addColumn(family.getBytes(), column1.getBytes(), timestamp, toBytes(column1 + value + timestamp + i)); - put.addColumn(family.getBytes(), column2.getBytes(), timestamp, toBytes(column2 + value + timestamp + i)); + Put put = new Put(toBytes(key + i)); + put.addColumn(family.getBytes(), column1.getBytes(), timestamp, toBytes(value)); + put.addColumn(family.getBytes(), column2.getBytes(), timestamp, toBytes(value)); puts.add(put); - get.addColumn(family.getBytes(), column1.getBytes()); - get.addColumn(family.getBytes(), column2.getBytes()); } - Result[] results = new Result[puts.size()]; - hTable.batch(puts, results); - Result result = hTable.get(get); - Assert(tableName, ()->Assert.assertEquals(2, result.size())); - for (Cell cell : result.rawCells()) { - Assert(tableName, ()->Assert.assertEquals(timestamp, cell.getTimestamp())); - } - Assert(tableName, () -> Assert.assertTrue(secureCompare((column1 + value + timestamp + 9).getBytes(), result.getValue(family.getBytes(), column1.getBytes())))); + hTable.put(puts); } + { - long timestamp = System.currentTimeMillis(); List puts = new ArrayList<>(); - List gets = new ArrayList<>(); - for (int i = 0; i < 10; ++i) { Put put = new Put(toBytes(key + i)); - put.addColumn(family.getBytes(), column1.getBytes(), timestamp, toBytes(column1 + value + timestamp + i)); - put.addColumn(family.getBytes(), column2.getBytes(), timestamp, toBytes(column2 + value + timestamp + i)); + put.addColumn(family.getBytes(), column1.getBytes(), toBytes(value)); + put.addColumn(family.getBytes(), column2.getBytes(), toBytes(value)); puts.add(put); - Get get = new Get(toBytes(key + i)); + } + hTable.put(puts); + } + + hTable.close(); + } + + public static void testBatchPutConcurrentImpl(String tableName) throws Exception { + + String family = getColumnFamilyName(tableName); + String key = "putKey"; + String column1 = "putColumn1"; + String column2 = "putColumn2"; + String value = "value"; + + // 创建线程池 + int taskCount = 50; + ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 20, 100, TimeUnit.SECONDS, new LinkedBlockingQueue()); + AtomicInteger successCount = new AtomicInteger(0); + CountDownLatch countDownLatch = new CountDownLatch(taskCount); + + // 并发执行50个任务 + for (int i = 0; i < taskCount; i++) { + final int taskId = i; + threadPoolExecutor.submit(() -> { + try { + // 每个线程执行批量put操作 + OHTableClient hTable = ObHTableTestUtil.newOHTableClient(getTableName(tableName)); + hTable.init(); + List puts = new ArrayList<>(); + for (int j = 0; j < 10; ++j) { + Put put = new Put(toBytes(key + taskId + "_" + j)); + put.addColumn(family.getBytes(), column1.getBytes(), toBytes(value + "_" + taskId + "_" + j)); + put.addColumn(family.getBytes(), column2.getBytes(), toBytes(value + "_" + taskId + "_" + j)); + puts.add(put); + } + hTable.put(puts); + successCount.incrementAndGet(); + hTable.close(); + } catch (Exception e) { + // 记录异常但不中断测试 + System.err.println("Task " + taskId + " failed: " + e.getMessage()); + } finally { + countDownLatch.countDown(); + } + }); + } + + // 等待所有任务完成 + countDownLatch.await(30, TimeUnit.SECONDS); + threadPoolExecutor.shutdownNow(); + + // 验证结果 + System.out.println("Concurrent batch put completed. Success count: " + successCount.get()); + Assert.assertTrue("At least some operations should succeed", successCount.get() > 0); + } + + public static void testMixedOperationsConcurrentImpl(String tableName) throws Exception { + OHTableClient hTable = ObHTableTestUtil.newOHTableClient(getTableName(tableName)); + hTable.init(); + String family = getColumnFamilyName(tableName); + String key = "mixedKey"; + String column1 = "mixedColumn1"; + String column2 = "mixedColumn2"; + String value = "mixedValue"; + + // 创建线程池 + int taskCount = 50; + ThreadPoolExecutor threadPoolExecutor = OHTable.createDefaultThreadPoolExecutor(1, taskCount, 100); + AtomicInteger putSuccessCount = new AtomicInteger(0); + CountDownLatch countDownLatch = new CountDownLatch(taskCount); + + // 并发执行混合操作:50个put任务 + for (int i = 0; i < taskCount; i++) { + final int taskId = i; + // Put任务 + threadPoolExecutor.submit(() -> { + try { + long timestamp = System.currentTimeMillis(); + List puts = new ArrayList<>(); + for (int j = 0; j < 3; ++j) { + Put put = new Put(toBytes(key + taskId + "_" + j)); + put.addColumn(family.getBytes(), column1.getBytes(), timestamp, toBytes(value + "_" + taskId + "_" + j)); + put.addColumn(family.getBytes(), column2.getBytes(), timestamp, toBytes(value + "_" + taskId + "_" + j)); + puts.add(put); + } + hTable.put(puts); + putSuccessCount.incrementAndGet(); + } catch (Exception e) { + System.err.println("Put task " + taskId + " failed: " + e.getMessage()); + } finally { + countDownLatch.countDown(); + } + }); + } + + // 等待所有任务完成 + countDownLatch.await(60, TimeUnit.SECONDS); + threadPoolExecutor.shutdownNow(); + + // 验证结果 + System.out.println("Mixed operations completed. Put success: " + putSuccessCount.get()); + Assert.assertTrue("At least some put operations should succeed", putSuccessCount.get() > 0); + + // 验证最终数据一致性 + for (int i = 0; i < Math.min(10, putSuccessCount.get()); i++) { + for (int j = 0; j < 3; j++) { + Get get = new Get(toBytes(key + i + "_" + j)); get.addColumn(family.getBytes(), column1.getBytes()); get.addColumn(family.getBytes(), column2.getBytes()); - gets.add(get); + Result result = hTable.get(get); + if (result != null && !result.isEmpty()) { + Assert.assertEquals(2, result.size()); + Assert.assertTrue(ObHTableTestUtil.secureCompare( + toBytes(value + "_" + i + "_" + j), + result.getValue(family.getBytes(), column1.getBytes()))); + Assert.assertTrue(ObHTableTestUtil.secureCompare( + toBytes(value + "_" + i + "_" + j), + result.getValue(family.getBytes(), column2.getBytes()))); + } } + } + + hTable.close(); + } - Result[] results = new Result[puts.size()]; - hTable.batch(puts, results); - + public static void testBatchPutConsistencyImpl(String tableName) throws Exception { + OHTableClient hTable = ObHTableTestUtil.newOHTableClient(getTableName(tableName)); + hTable.init(); + String family = getColumnFamilyName(tableName); + String key = "putKey"; + String column1 = "putColumn1"; + String column2 = "putColumn2"; + String value = "value"; + { + long timestamp = System.currentTimeMillis(); + List puts = new ArrayList<>(); for (int i = 0; i < 10; ++i) { - Result result = hTable.get(gets.get(i)); - Assert(tableName, ()->Assert.assertEquals(2, result.size())); - for (Cell cell : result.rawCells()) { - Assert(tableName, ()->Assert.assertEquals(timestamp, cell.getTimestamp())); - } - int finalI = i; - Assert(tableName, () -> Assert.assertTrue(secureCompare((column1 + value + timestamp + finalI).getBytes(), result.getValue(family.getBytes(), column1.getBytes())))); + Put put = new Put(toBytes(key + i)); + put.addColumn(family.getBytes(), column1.getBytes(), timestamp, toBytes(value)); + put.addColumn(family.getBytes(), column2.getBytes(), timestamp, toBytes(value)); + puts.add(put); } + hTable.put(puts); + } + + hTable.close(); + } + + public static void testMultiCFConcurrentImpl(Map.Entry> entry) throws Exception { + String key = "multiCFConcurrentKey"; + String column1 = "multiCFColumn1"; + String column2 = "multiCFColumn2"; + String value = "multiCFValue"; + OHTableClient hTable = ObHTableTestUtil.newOHTableClient(getTableName(entry.getKey())); + hTable.init(); + + // 创建线程池 + int taskCount = 40; + ThreadPoolExecutor threadPoolExecutor = OHTable.createDefaultThreadPoolExecutor(1, taskCount, 100); + AtomicInteger successCount = new AtomicInteger(0); + CountDownLatch countDownLatch = new CountDownLatch(taskCount); + + // 并发执行多列族操作 + for (int i = 0; i < taskCount; i++) { + final int taskId = i; + threadPoolExecutor.submit(() -> { + try { + long timestamp = System.currentTimeMillis(); + Put put = new Put(toBytes(key + taskId)); + Get get = new Get(toBytes(key + taskId)); + + // 为每个列族添加数据 + for (String tableName : entry.getValue()) { + String family = getColumnFamilyName(tableName); + put.addColumn(family.getBytes(), column1.getBytes(), timestamp, toBytes(column1 + value + "_" + taskId)); + put.addColumn(family.getBytes(), column2.getBytes(), timestamp, toBytes(column2 + value + "_" + taskId)); + get.addColumn(family.getBytes(), column1.getBytes()); + get.addColumn(family.getBytes(), column2.getBytes()); + } + + hTable.put(put); + successCount.incrementAndGet(); + } catch (Exception e) { + System.err.println("MultiCF task " + taskId + " failed: " + e.getMessage()); + } finally { + countDownLatch.countDown(); + } + }); } + // 等待所有任务完成 + countDownLatch.await(45, TimeUnit.SECONDS); + threadPoolExecutor.shutdown(); + + // 验证结果 + System.out.println("MultiCF concurrent operations completed. Success count: " + successCount.get()); + Assert.assertTrue("At least some operations should succeed", successCount.get() > 0); + + // 验证部分数据 + for (int verifyIndex = 0; verifyIndex < Math.min(5, successCount.get()); verifyIndex++) { + final int finalVerifyIndex = verifyIndex; + Get get = new Get(toBytes(key + verifyIndex)); + for (String tableName : entry.getValue()) { + String family = getColumnFamilyName(tableName); + get.addColumn(family.getBytes(), column1.getBytes()); + get.addColumn(family.getBytes(), column2.getBytes()); + } + Result r = hTable.get(get); + Assert(entry.getValue(), ()->Assert.assertEquals(entry.getValue().size() * 2, r.size())); + for (String tableName : entry.getValue()) { + String family = getColumnFamilyName(tableName); + Assert(entry.getValue(), () -> Assert.assertTrue(secureCompare( + toBytes(column1 + value + "_" + finalVerifyIndex), + r.getValue(family.getBytes(), column1.getBytes())))); + Assert(entry.getValue(), () -> Assert.assertTrue(secureCompare( + toBytes(column2 + value + "_" + finalVerifyIndex), + r.getValue(family.getBytes(), column2.getBytes())))); + } + } hTable.close(); } @@ -322,6 +513,26 @@ public void testBatchPut() throws Throwable { FOR_EACH(tableNames, OHTableSecondaryPartPutTest::testBatchPutImpl); } + @Test + public void testBatchPutConcurrent() throws Throwable { + FOR_EACH(tableNames, OHTableSecondaryPartPutTest::testBatchPutConcurrentImpl); + } + + @Test + public void testMixedOperationsConcurrent() throws Throwable { + FOR_EACH(tableNames, OHTableSecondaryPartPutTest::testMixedOperationsConcurrentImpl); + } + + @Test + public void testMultiCFConcurrent() throws Throwable { + FOR_EACH(group2tableNames, OHTableSecondaryPartPutTest::testMultiCFConcurrentImpl); + } + + @Test + public void testBatchPutConsistency() throws Throwable { + FOR_EACH(tableNames, OHTableSecondaryPartPutTest::testBatchPutConsistencyImpl); + } + @Test public void testMultiCFPut() throws Throwable { FOR_EACH(group2tableNames, OHTableSecondaryPartPutTest::testMultiCFPutImpl);