From 37b4ca8d74a9888c14b468f8d0a8e981151c6b52 Mon Sep 17 00:00:00 2001 From: WeiXinChan Date: Mon, 21 Jul 2025 11:55:12 +0800 Subject: [PATCH 1/5] hbase new protocal --- .../com/alipay/oceanbase/hbase/OHTable.java | 78 ++++- .../OHTableSecondaryPartPutTest.java | 273 ++++++++++++++++-- 2 files changed, 318 insertions(+), 33 deletions(-) diff --git a/src/main/java/com/alipay/oceanbase/hbase/OHTable.java b/src/main/java/com/alipay/oceanbase/hbase/OHTable.java index a72f21fb..b52225fb 100644 --- a/src/main/java/com/alipay/oceanbase/hbase/OHTable.java +++ b/src/main/java/com/alipay/oceanbase/hbase/OHTable.java @@ -1232,11 +1232,11 @@ private void doPut(List puts) throws IOException { // we need to periodically see if the writebuffer is full instead of waiting until the end of the List n++; if (n % putWriteBufferCheck == 0 && currentWriteBufferSize > writeBufferSize) { - flushCommits(); + flushCommitsV2(); } } if (autoFlush || currentWriteBufferSize > writeBufferSize) { - flushCommits(); + flushCommitsV2(); } } @@ -1664,6 +1664,40 @@ public void flushCommits() throws IOException { } } + public void flushCommitsV2() throws IOException { + try { + if (writeBuffer.isEmpty()) { + return; + } + try { + String realTableName = getTargetTableName(writeBuffer); + ObHbaseRequest request = buildHbaseRequest(realTableName, writeBuffer); + try { + obTableClient.execute(request); + } catch (Exception e) { + throw new IOException(tableNameString + " table occurred unexpected error." , e); + } + } catch (Exception e) { + logger.error(LCD.convert("01-00008"), tableNameString, null, autoFlush, + writeBuffer.size(), e); + if (e instanceof IOException) { + throw (IOException) e; + } + } + } finally { + if (clearBufferOnFail) { + writeBuffer.clear(); + currentWriteBufferSize = 0; + } else { + // the write buffer was adjusted by processBatchOfPuts + currentWriteBufferSize = 0; + for (Put aPut : writeBuffer) { + currentWriteBufferSize += aPut.heapSize(); + } + } + } + } + @Override public void close() throws IOException { if (cleanupPoolOnClose) { @@ -2318,6 +2352,46 @@ private BatchOperation buildBatchOperation(String tableName, List return batch; } + private ObHbaseRequest buildHbaseRequest(String tableName, List actions) + throws FeatureNotSupportedException, + IllegalArgumentException, + IOException { + ObHbaseRequest request = new ObHbaseRequest(); + request.setTableName(tableName); + List keys = new ArrayList<>(); + List cellNumArray = new ArrayList<>(); + List cells = new ArrayList<>(); + for (Row row : actions) { + if (row instanceof Put) { + Put put = (Put) row; + if (put.isEmpty()) { + throw new IllegalArgumentException("No columns to put for item"); + } + keys.add(ObObj.getInstance(put.getRow())); + int cellCount = 0; + for (Map.Entry> entry : put.getFamilyCellMap().entrySet()) { + List keyValueList = entry.getValue(); + for (Cell kv : keyValueList) { + cellCount++; + ObHbaseQTV cell = new ObHbaseQTV(); + cell.setQ(ObObj.getInstance(CellUtil.cloneQualifier(kv))); + cell.setT(ObObj.getInstance(kv.getTimestamp())); + cell.setV(ObObj.getInstance(CellUtil.cloneValue(kv))); + cells.add(cell); + } + } + cellNumArray.add(cellCount); + } else { + throw new FeatureNotSupportedException( + "not supported other type in batch yet,only support get, put and delete"); + } + } + request.setKeys(keys); + request.setCellNumArray(cellNumArray); + request.setCells(cells); + return request; + } + public static ObTableOperation buildObTableOperation(Cell kv, ObTableOperationType operationType, Long TTL) { diff --git a/src/test/java/com/alipay/oceanbase/hbase/secondary/OHTableSecondaryPartPutTest.java b/src/test/java/com/alipay/oceanbase/hbase/secondary/OHTableSecondaryPartPutTest.java index 32449005..60ec3aee 100644 --- a/src/test/java/com/alipay/oceanbase/hbase/secondary/OHTableSecondaryPartPutTest.java +++ b/src/test/java/com/alipay/oceanbase/hbase/secondary/OHTableSecondaryPartPutTest.java @@ -17,6 +17,7 @@ package com.alipay.oceanbase.hbase.secondary; +import com.alipay.oceanbase.hbase.OHTable; import com.alipay.oceanbase.hbase.OHTableClient; import com.alipay.oceanbase.hbase.util.ObHTableTestUtil; import com.alipay.oceanbase.hbase.util.TableTemplateManager; @@ -29,6 +30,8 @@ import org.junit.*; import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicInteger; import static com.alipay.oceanbase.hbase.util.ObHTableSecondaryPartUtil.*; import static com.alipay.oceanbase.hbase.util.ObHTableTestUtil.*; @@ -42,6 +45,9 @@ public class OHTableSecondaryPartPutTest { public static void before() throws Exception { openDistributedExecute(); for (TableTemplateManager.TableType type : TableTemplateManager.NORMAL_TABLES) { + if (type != TableTemplateManager.TableType.SECONDARY_PARTITIONED_RANGE_KEY_GEN) { + continue; + } createTables(type, tableNames, group2tableNames, true); } } @@ -133,54 +139,239 @@ public static void testBatchPutImpl(String tableName) throws Exception { { long timestamp = System.currentTimeMillis(); List puts = new ArrayList<>(); - Get get = new Get(toBytes(key)); for (int i = 0; i < 10; ++i) { - Put put = new Put(toBytes(key)); - put.addColumn(family.getBytes(), column1.getBytes(), timestamp, toBytes(column1 + value + timestamp + i)); - put.addColumn(family.getBytes(), column2.getBytes(), timestamp, toBytes(column2 + value + timestamp + i)); + Put put = new Put(toBytes(key + i)); + put.addColumn(family.getBytes(), column1.getBytes(), timestamp, toBytes(value)); + put.addColumn(family.getBytes(), column2.getBytes(), timestamp, toBytes(value)); puts.add(put); - get.addColumn(family.getBytes(), column1.getBytes()); - get.addColumn(family.getBytes(), column2.getBytes()); - } - Result[] results = new Result[puts.size()]; - hTable.batch(puts, results); - Result result = hTable.get(get); - Assert(tableName, ()->Assert.assertEquals(2, result.size())); - for (Cell cell : result.rawCells()) { - Assert(tableName, ()->Assert.assertEquals(timestamp, cell.getTimestamp())); } - Assert(tableName, () -> Assert.assertTrue(secureCompare((column1 + value + timestamp + 9).getBytes(), result.getValue(family.getBytes(), column1.getBytes())))); + hTable.put(puts); } + { - long timestamp = System.currentTimeMillis(); List puts = new ArrayList<>(); - List gets = new ArrayList<>(); - for (int i = 0; i < 10; ++i) { Put put = new Put(toBytes(key + i)); - put.addColumn(family.getBytes(), column1.getBytes(), timestamp, toBytes(column1 + value + timestamp + i)); - put.addColumn(family.getBytes(), column2.getBytes(), timestamp, toBytes(column2 + value + timestamp + i)); + put.addColumn(family.getBytes(), column1.getBytes(), toBytes(value)); + put.addColumn(family.getBytes(), column2.getBytes(), toBytes(value)); puts.add(put); - Get get = new Get(toBytes(key + i)); + } + hTable.put(puts); + } + + hTable.close(); + } + + public static void testBatchPutConcurrentImpl(String tableName) throws Exception { + + String family = getColumnFamilyName(tableName); + String key = "putKey"; + String column1 = "putColumn1"; + String column2 = "putColumn2"; + String value = "value"; + + // 创建线程池 + ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 20, 100, TimeUnit.SECONDS, new LinkedBlockingQueue()); + AtomicInteger successCount = new AtomicInteger(0); + CountDownLatch countDownLatch = new CountDownLatch(50); + + // 并发执行50个任务 + for (int i = 0; i < 50; i++) { + final int taskId = i; + threadPoolExecutor.submit(() -> { + try { + // 每个线程执行批量put操作 + OHTableClient hTable = ObHTableTestUtil.newOHTableClient(getTableName(tableName)); + hTable.init(); + List puts = new ArrayList<>(); + for (int j = 0; j < 10; ++j) { + Put put = new Put(toBytes(key + taskId + "_" + j)); + put.addColumn(family.getBytes(), column1.getBytes(), toBytes(value + "_" + taskId + "_" + j)); + put.addColumn(family.getBytes(), column2.getBytes(), toBytes(value + "_" + taskId + "_" + j)); + puts.add(put); + } + hTable.put(puts); + successCount.incrementAndGet(); + hTable.close(); + } catch (Exception e) { + // 记录异常但不中断测试 + System.err.println("Task " + taskId + " failed: " + e.getMessage()); + } finally { + countDownLatch.countDown(); + } + }); + } + + // 等待所有任务完成 + countDownLatch.await(30, TimeUnit.SECONDS); + threadPoolExecutor.shutdown(); + + // 验证结果 + System.out.println("Concurrent batch put completed. Success count: " + successCount.get()); + Assert.assertTrue("At least some operations should succeed", successCount.get() > 0); + } + + public static void testMixedOperationsConcurrentImpl(String tableName) throws Exception { + OHTableClient hTable = ObHTableTestUtil.newOHTableClient(getTableName(tableName)); + hTable.init(); + String family = getColumnFamilyName(tableName); + String key = "mixedKey"; + String column1 = "mixedColumn1"; + String column2 = "mixedColumn2"; + String value = "mixedValue"; + + // 创建线程池 + ThreadPoolExecutor threadPoolExecutor = OHTable.createDefaultThreadPoolExecutor(1, 30, 100); + AtomicInteger putSuccessCount = new AtomicInteger(0); + CountDownLatch countDownLatch = new CountDownLatch(100); + + // 并发执行混合操作:50个put任务 + for (int i = 0; i < 50; i++) { + final int taskId = i; + // Put任务 + threadPoolExecutor.submit(() -> { + try { + long timestamp = System.currentTimeMillis(); + List puts = new ArrayList<>(); + for (int j = 0; j < 3; ++j) { + Put put = new Put(toBytes(key + taskId + "_" + j)); + put.addColumn(family.getBytes(), column1.getBytes(), timestamp, toBytes(value + "_" + taskId + "_" + j)); + put.addColumn(family.getBytes(), column2.getBytes(), timestamp, toBytes(value + "_" + taskId + "_" + j)); + puts.add(put); + } + hTable.put(puts); + putSuccessCount.incrementAndGet(); + } catch (Exception e) { + System.err.println("Put task " + taskId + " failed: " + e.getMessage()); + } finally { + countDownLatch.countDown(); + } + }); + } + + // 等待所有任务完成 + countDownLatch.await(60, TimeUnit.SECONDS); + threadPoolExecutor.shutdown(); + + // 验证结果 + System.out.println("Mixed operations completed. Put success: " + putSuccessCount.get() + ", Get success: " + getSuccessCount.get()); + Assert.assertTrue("At least some put operations should succeed", putSuccessCount.get() > 0); + + // 验证最终数据一致性 + for (int i = 0; i < Math.min(10, putSuccessCount.get()); i++) { + for (int j = 0; j < 3; j++) { + Get get = new Get(toBytes(key + i + "_" + j)); get.addColumn(family.getBytes(), column1.getBytes()); get.addColumn(family.getBytes(), column2.getBytes()); - gets.add(get); + Result result = hTable.get(get); + if (result != null && !result.isEmpty()) { + Assert.assertEquals(2, result.size()); + Assert.assertTrue(ObHTableTestUtil.secureCompare( + toBytes(value + "_" + i + "_" + j), + result.getValue(family.getBytes(), column1.getBytes()))); + Assert.assertTrue(ObHTableTestUtil.secureCompare( + toBytes(value + "_" + i + "_" + j), + result.getValue(family.getBytes(), column2.getBytes()))); + } } + } + + hTable.close(); + } - Result[] results = new Result[puts.size()]; - hTable.batch(puts, results); - + public static void testBatchPutConsistencyImpl(String tableName) throws Exception { + OHTableClient hTable = ObHTableTestUtil.newOHTableClient(getTableName(tableName)); + hTable.init(); + String family = getColumnFamilyName(tableName); + String key = "putKey"; + String column1 = "putColumn1"; + String column2 = "putColumn2"; + String value = "value"; + { + long timestamp = System.currentTimeMillis(); + List puts = new ArrayList<>(); for (int i = 0; i < 10; ++i) { - Result result = hTable.get(gets.get(i)); - Assert(tableName, ()->Assert.assertEquals(2, result.size())); - for (Cell cell : result.rawCells()) { - Assert(tableName, ()->Assert.assertEquals(timestamp, cell.getTimestamp())); - } - int finalI = i; - Assert(tableName, () -> Assert.assertTrue(secureCompare((column1 + value + timestamp + finalI).getBytes(), result.getValue(family.getBytes(), column1.getBytes())))); + Put put = new Put(toBytes(key + i)); + put.addColumn(family.getBytes(), column1.getBytes(), timestamp, toBytes(value)); + put.addColumn(family.getBytes(), column2.getBytes(), timestamp, toBytes(value)); + puts.add(put); } + hTable.put(puts); + } + + hTable.close(); + } + + public static void testMultiCFConcurrentImpl(Map.Entry> entry) throws Exception { + String key = "multiCFConcurrentKey"; + String column1 = "multiCFColumn1"; + String column2 = "multiCFColumn2"; + String value = "multiCFValue"; + OHTableClient hTable = ObHTableTestUtil.newOHTableClient(getTableName(entry.getKey())); + hTable.init(); + + // 创建线程池 + ThreadPoolExecutor threadPoolExecutor = OHTable.createDefaultThreadPoolExecutor(1, 25, 100); + AtomicInteger successCount = new AtomicInteger(0); + CountDownLatch countDownLatch = new CountDownLatch(40); + + // 并发执行多列族操作 + for (int i = 0; i < 40; i++) { + final int taskId = i; + threadPoolExecutor.submit(() -> { + try { + long timestamp = System.currentTimeMillis(); + Put put = new Put(toBytes(key + taskId)); + Get get = new Get(toBytes(key + taskId)); + + // 为每个列族添加数据 + for (String tableName : entry.getValue()) { + String family = getColumnFamilyName(tableName); + put.addColumn(family.getBytes(), column1.getBytes(), timestamp, toBytes(column1 + value + "_" + taskId)); + put.addColumn(family.getBytes(), column2.getBytes(), timestamp, toBytes(column2 + value + "_" + taskId)); + get.addColumn(family.getBytes(), column1.getBytes()); + get.addColumn(family.getBytes(), column2.getBytes()); + } + + hTable.put(put); + successCount.incrementAndGet(); + } catch (Exception e) { + System.err.println("MultiCF task " + taskId + " failed: " + e.getMessage()); + } finally { + countDownLatch.countDown(); + } + }); } + // 等待所有任务完成 + countDownLatch.await(45, TimeUnit.SECONDS); + threadPoolExecutor.shutdown(); + + // 验证结果 + System.out.println("MultiCF concurrent operations completed. Success count: " + successCount.get()); + Assert.assertTrue("At least some operations should succeed", successCount.get() > 0); + + // 验证部分数据 + for (int verifyIndex = 0; verifyIndex < Math.min(5, successCount.get()); verifyIndex++) { + final int finalVerifyIndex = verifyIndex; + Get get = new Get(toBytes(key + verifyIndex)); + for (String tableName : entry.getValue()) { + String family = getColumnFamilyName(tableName); + get.addColumn(family.getBytes(), column1.getBytes()); + get.addColumn(family.getBytes(), column2.getBytes()); + } + Result r = hTable.get(get); + Assert(entry.getValue(), ()->Assert.assertEquals(entry.getValue().size() * 2, r.size())); + for (String tableName : entry.getValue()) { + String family = getColumnFamilyName(tableName); + Assert(entry.getValue(), () -> Assert.assertTrue(secureCompare( + toBytes(column1 + value + "_" + finalVerifyIndex), + r.getValue(family.getBytes(), column1.getBytes())))); + Assert(entry.getValue(), () -> Assert.assertTrue(secureCompare( + toBytes(column2 + value + "_" + finalVerifyIndex), + r.getValue(family.getBytes(), column2.getBytes())))); + } + } hTable.close(); } @@ -322,6 +513,26 @@ public void testBatchPut() throws Throwable { FOR_EACH(tableNames, OHTableSecondaryPartPutTest::testBatchPutImpl); } + @Test + public void testBatchPutConcurrent() throws Throwable { + FOR_EACH(tableNames, OHTableSecondaryPartPutTest::testBatchPutConcurrentImpl); + } + + @Test + public void testMixedOperationsConcurrent() throws Throwable { + FOR_EACH(tableNames, OHTableSecondaryPartPutTest::testMixedOperationsConcurrentImpl); + } + + @Test + public void testMultiCFConcurrent() throws Throwable { + FOR_EACH(group2tableNames, OHTableSecondaryPartPutTest::testMultiCFConcurrentImpl); + } + + @Test + public void testBatchPutConsistency() throws Throwable { + FOR_EACH(tableNames, OHTableSecondaryPartPutTest::testBatchPutConsistencyImpl); + } + @Test public void testMultiCFPut() throws Throwable { FOR_EACH(group2tableNames, OHTableSecondaryPartPutTest::testMultiCFPutImpl); From 882b8b7cefe59e76f435125b408d8d19afd4ad98 Mon Sep 17 00:00:00 2001 From: JackShi148 Date: Tue, 29 Jul 2025 17:02:40 +0800 Subject: [PATCH 2/5] enable new hbase put performance multi-cf --- .../com/alipay/oceanbase/hbase/OHTable.java | 76 ++++++++++++++----- .../oceanbase/hbase/util/OHBaseFuncUtils.java | 26 +++++++ .../OHTableSecondaryPartPutTest.java | 2 +- 3 files changed, 85 insertions(+), 19 deletions(-) diff --git a/src/main/java/com/alipay/oceanbase/hbase/OHTable.java b/src/main/java/com/alipay/oceanbase/hbase/OHTable.java index b52225fb..cb9d6784 100644 --- a/src/main/java/com/alipay/oceanbase/hbase/OHTable.java +++ b/src/main/java/com/alipay/oceanbase/hbase/OHTable.java @@ -718,6 +718,19 @@ public void batch(final List actions, final Object[] results) thr } catch (Exception e) { throw new IOException(tableNameString + " table occurred unexpected error." , e); } + } else if (OHBaseFuncUtils.isAllPut(actions) && OHBaseFuncUtils.isHBasePutPefSupport(obTableClient)) { + // only support Put now + ObHbaseRequest request = buildHbaseRequest(actions); + try { + ObHbaseResult result = (ObHbaseResult) obTableClient.execute(request); + if (results != null) { + for (int i = 0; i < results.length; ++i) { + results[i] = new Result(); + } + } + } catch (Exception e) { + throw new IOException(tableNameString + " table occurred unexpected error." , e); + } } else { String realTableName = getTargetTableName(actions); BatchOperation batch = buildBatchOperation(realTableName, actions, @@ -726,7 +739,7 @@ public void batch(final List actions, final Object[] results) thr try { tmpResults = batch.execute(); } catch (Exception e) { - throw new IOException(tableNameString + " table occurred unexpected error." , e); + throw new IOException(tableNameString + " table occurred unexpected error.", e); } int index = 0; for (int i = 0; i != actions.size(); ++i) { @@ -1232,11 +1245,19 @@ private void doPut(List puts) throws IOException { // we need to periodically see if the writebuffer is full instead of waiting until the end of the List n++; if (n % putWriteBufferCheck == 0 && currentWriteBufferSize > writeBufferSize) { - flushCommitsV2(); + if (OHBaseFuncUtils.isHBasePutPefSupport(obTableClient)) { + flushCommitsV2(); + } else { + flushCommits(); + } } } if (autoFlush || currentWriteBufferSize > writeBufferSize) { - flushCommitsV2(); + if (OHBaseFuncUtils.isHBasePutPefSupport(obTableClient)) { + flushCommitsV2(); + } else { + flushCommits(); + } } } @@ -1670,10 +1691,9 @@ public void flushCommitsV2() throws IOException { return; } try { - String realTableName = getTargetTableName(writeBuffer); - ObHbaseRequest request = buildHbaseRequest(realTableName, writeBuffer); + ObHbaseRequest request = buildHbaseRequest(writeBuffer); try { - obTableClient.execute(request); + ObHbaseResult result = (ObHbaseResult) obTableClient.execute(request); } catch (Exception e) { throw new IOException(tableNameString + " table occurred unexpected error." , e); } @@ -2352,43 +2372,63 @@ private BatchOperation buildBatchOperation(String tableName, List return batch; } - private ObHbaseRequest buildHbaseRequest(String tableName, List actions) + private ObHbaseRequest buildHbaseRequest(List actions) throws FeatureNotSupportedException, IllegalArgumentException, IOException { ObHbaseRequest request = new ObHbaseRequest(); - request.setTableName(tableName); - List keys = new ArrayList<>(); - List cellNumArray = new ArrayList<>(); - List cells = new ArrayList<>(); + ObTableOperationType opType = null; + List keys = new ArrayList<>(); + List cfRowsArray = new ArrayList<>(); + Map cfRowsMap = new HashMap<>(); + int keyIndex = 0; for (Row row : actions) { if (row instanceof Put) { + opType = INSERT_OR_UPDATE; Put put = (Put) row; if (put.isEmpty()) { throw new IllegalArgumentException("No columns to put for item"); } + boolean isCellTTL = false; + long ttl = put.getTTL(); + if (ttl != Long.MAX_VALUE) { + isCellTTL = true; + } keys.add(ObObj.getInstance(put.getRow())); - int cellCount = 0; for (Map.Entry> entry : put.getFamilyCellMap().entrySet()) { + String family = Bytes.toString(entry.getKey()); + ObHbaseCfRows sameCfRows = cfRowsMap.get(family); + if (sameCfRows == null) { + sameCfRows = new ObHbaseCfRows(); + String realTableName = getTargetTableName(tableNameString, family, configuration); + sameCfRows.setRealTableName(realTableName); + cfRowsMap.put(family, sameCfRows); + cfRowsArray.add(sameCfRows); + } List keyValueList = entry.getValue(); + List cells = new ArrayList<>(); for (Cell kv : keyValueList) { - cellCount++; - ObHbaseQTV cell = new ObHbaseQTV(); + ObHbaseCell cell = new ObHbaseCell(isCellTTL); cell.setQ(ObObj.getInstance(CellUtil.cloneQualifier(kv))); - cell.setT(ObObj.getInstance(kv.getTimestamp())); + cell.setT(ObObj.getInstance(-kv.getTimestamp())); // set timestamp as negative cell.setV(ObObj.getInstance(CellUtil.cloneValue(kv))); + if (isCellTTL) { + cell.setTTL(ObObj.getInstance(ttl)); + } cells.add(cell); } + sameCfRows.add(keyIndex, cells.size(), cells); } - cellNumArray.add(cellCount); } else { throw new FeatureNotSupportedException( "not supported other type in batch yet,only support get, put and delete"); } + ++keyIndex; } + request.setTableName(tableNameString); request.setKeys(keys); - request.setCellNumArray(cellNumArray); - request.setCells(cells); + request.setOpType(opType); + request.setCfRows(cfRowsArray); return request; } diff --git a/src/main/java/com/alipay/oceanbase/hbase/util/OHBaseFuncUtils.java b/src/main/java/com/alipay/oceanbase/hbase/util/OHBaseFuncUtils.java index 9932f554..81bbe072 100644 --- a/src/main/java/com/alipay/oceanbase/hbase/util/OHBaseFuncUtils.java +++ b/src/main/java/com/alipay/oceanbase/hbase/util/OHBaseFuncUtils.java @@ -17,9 +17,15 @@ package com.alipay.oceanbase.hbase.util; +import com.alipay.oceanbase.rpc.ObGlobal; +import com.alipay.oceanbase.rpc.ObTableClient; +import com.alipay.oceanbase.rpc.exception.FeatureNotSupportedException; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Row; import java.util.Arrays; +import java.util.List; @InterfaceAudience.Private public class OHBaseFuncUtils { @@ -42,4 +48,24 @@ public static byte[][] extractFamilyFromQualifier(byte[] qualifier) throws Excep byte[] newQualifier = Arrays.copyOfRange(qualifier, familyLen + 1, qualifier.length); return new byte[][] { family, newQualifier }; } + + public static boolean isHBasePutPefSupport(ObTableClient tableClient) { + if (tableClient.isOdpMode()) { + throw new FeatureNotSupportedException("not supported yet"); + } else { + // server version support and distributed capacity is enabled + return ObGlobal.isHBasePutPerfSupport() && tableClient.getServerCapacity().isSupportDistributedExecute(); + } + } + + public static boolean isAllPut(List actions) { + boolean isAllPut = true; + for (Row action : actions) { + if (!(action instanceof Put)) { + isAllPut = false; + break; + } + } + return isAllPut; + } } diff --git a/src/test/java/com/alipay/oceanbase/hbase/secondary/OHTableSecondaryPartPutTest.java b/src/test/java/com/alipay/oceanbase/hbase/secondary/OHTableSecondaryPartPutTest.java index 60ec3aee..74e65d8b 100644 --- a/src/test/java/com/alipay/oceanbase/hbase/secondary/OHTableSecondaryPartPutTest.java +++ b/src/test/java/com/alipay/oceanbase/hbase/secondary/OHTableSecondaryPartPutTest.java @@ -254,7 +254,7 @@ public static void testMixedOperationsConcurrentImpl(String tableName) throws Ex threadPoolExecutor.shutdown(); // 验证结果 - System.out.println("Mixed operations completed. Put success: " + putSuccessCount.get() + ", Get success: " + getSuccessCount.get()); + System.out.println("Mixed operations completed. Put success: " + putSuccessCount.get()); Assert.assertTrue("At least some put operations should succeed", putSuccessCount.get() > 0); // 验证最终数据一致性 From 79fc026494799df01e84e884814e8a0fb9394e7f Mon Sep 17 00:00:00 2001 From: JackShi148 Date: Fri, 1 Aug 2025 10:06:22 +0800 Subject: [PATCH 3/5] add odp version control --- .../com/alipay/oceanbase/hbase/util/OHBaseFuncUtils.java | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/src/main/java/com/alipay/oceanbase/hbase/util/OHBaseFuncUtils.java b/src/main/java/com/alipay/oceanbase/hbase/util/OHBaseFuncUtils.java index 81bbe072..f7701215 100644 --- a/src/main/java/com/alipay/oceanbase/hbase/util/OHBaseFuncUtils.java +++ b/src/main/java/com/alipay/oceanbase/hbase/util/OHBaseFuncUtils.java @@ -19,7 +19,6 @@ import com.alipay.oceanbase.rpc.ObGlobal; import com.alipay.oceanbase.rpc.ObTableClient; -import com.alipay.oceanbase.rpc.exception.FeatureNotSupportedException; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Row; @@ -51,10 +50,14 @@ public static byte[][] extractFamilyFromQualifier(byte[] qualifier) throws Excep public static boolean isHBasePutPefSupport(ObTableClient tableClient) { if (tableClient.isOdpMode()) { - throw new FeatureNotSupportedException("not supported yet"); + // server version support and distributed capacity is enabled and odp version support + return ObGlobal.isHBasePutPerfSupport() + && tableClient.getServerCapacity().isSupportDistributedExecute() + && ObGlobal.OB_PROXY_VERSION >= ObGlobal.OB_PROXY_VERSION_4_3_6_0; } else { // server version support and distributed capacity is enabled - return ObGlobal.isHBasePutPerfSupport() && tableClient.getServerCapacity().isSupportDistributedExecute(); + return ObGlobal.isHBasePutPerfSupport() + && tableClient.getServerCapacity().isSupportDistributedExecute(); } } From af8c4bf9f4a14412944538ce68f48a3a655b593b Mon Sep 17 00:00:00 2001 From: JackShi148 Date: Tue, 5 Aug 2025 10:34:55 +0800 Subject: [PATCH 4/5] sort HBase get/scan result compatiable to original HBase --- .../com/alipay/oceanbase/hbase/OHTable.java | 76 +++++++++++-------- .../oceanbase/hbase/util/OHBaseFuncUtils.java | 36 ++++++++- 2 files changed, 78 insertions(+), 34 deletions(-) diff --git a/src/main/java/com/alipay/oceanbase/hbase/OHTable.java b/src/main/java/com/alipay/oceanbase/hbase/OHTable.java index cb9d6784..e894148e 100644 --- a/src/main/java/com/alipay/oceanbase/hbase/OHTable.java +++ b/src/main/java/com/alipay/oceanbase/hbase/OHTable.java @@ -154,7 +154,7 @@ public class OHTable implements Table { /** * the buffer of put request */ - private final ArrayList writeBuffer = new ArrayList(); + private final ArrayList writeBuffer = new ArrayList(); /** * when the put request reach the write buffer size the do put will * flush commits automatically @@ -195,7 +195,8 @@ public class OHTable implements Table { private int scannerTimeout; - private RegionLocator regionLocator; + private RegionLocator regionLocator; + /** * Creates an object to access a HBase table. * Shares oceanbase table obTableClient and other resources with other OHTable instances @@ -460,8 +461,8 @@ private void finishSetUp() { } public static OHConnectionConfiguration setUserDefinedNamespace(String tableNameString, - OHConnectionConfiguration ohConnectionConf) - throws IllegalArgumentException { + OHConnectionConfiguration ohConnectionConf) + throws IllegalArgumentException { if (tableNameString.indexOf(':') != -1) { String[] params = tableNameString.split(":"); if (params.length != 2) { @@ -501,13 +502,15 @@ public Configuration getConfiguration() { @Override public HTableDescriptor getTableDescriptor() throws IOException { - OHTableDescriptorExecutor executor = new OHTableDescriptorExecutor(tableNameString, obTableClient); + OHTableDescriptorExecutor executor = new OHTableDescriptorExecutor(tableNameString, + obTableClient); return executor.getTableDescriptor(); } @Override public TableDescriptor getDescriptor() throws IOException { - OHTableDescriptorExecutor executor = new OHTableDescriptorExecutor(tableNameString, obTableClient); + OHTableDescriptorExecutor executor = new OHTableDescriptorExecutor(tableNameString, + obTableClient); return executor.getTableDescriptor(); } @@ -923,10 +926,12 @@ private void processColumnFilters(NavigableSet columnFilters, byte[] family = entry.getKey(); if (entry.getValue() != null) { for (byte[] columnName : entry.getValue()) { - byte[] newQualifier = new byte[family.length + 1/* length of "." */ + columnName.length]; + byte[] newQualifier = new byte[family.length + 1/* length of "." */ + + columnName.length]; System.arraycopy(family, 0, newQualifier, 0, family.length); newQualifier[family.length] = 0x2E; // 0x2E in utf-8 is "." - System.arraycopy(columnName, 0, newQualifier, family.length + 1, columnName.length); + System.arraycopy(columnName, 0, newQualifier, family.length + 1, + columnName.length); columnFilters.add(newQualifier); } } else { @@ -1007,6 +1012,8 @@ public Result call() throws IOException { if (get.isCheckExistenceOnly()) { return Result.create(null, !keyValueList.isEmpty()); } + // sort keyValues + OHBaseFuncUtils.sortHBaseResult(keyValueList); return Result.create(keyValueList); } }; @@ -1695,7 +1702,7 @@ public void flushCommitsV2() throws IOException { try { ObHbaseResult result = (ObHbaseResult) obTableClient.execute(request); } catch (Exception e) { - throw new IOException(tableNameString + " table occurred unexpected error." , e); + throw new IOException(tableNameString + " table occurred unexpected error.", e); } } catch (Exception e) { logger.error(LCD.convert("01-00008"), tableNameString, null, autoFlush, @@ -1938,7 +1945,7 @@ private ObHTableFilter buildObHTableFilter(byte[] filterString, TimeRange timeRa if (columnQualifier == null) { obHTableFilter.addSelectColumnQualifier(new byte[0]); } else { - obHTableFilter.addSelectColumnQualifier(columnQualifier); + obHTableFilter.addSelectColumnQualifier(columnQualifier); } } } @@ -1996,11 +2003,11 @@ private ObTableQuery buildObTableQuery(ObHTableFilter filter, final Scan scan) { filter.setOffsetPerRowPerCf(scan.getRowOffsetPerColumnFamily()); } if (scan.isReversed()) { - obTableQuery = buildObTableQuery(filter, scan.getStopRow(), scan.includeStopRow(), scan.getStartRow(), - scan.includeStartRow(), true, ts); + obTableQuery = buildObTableQuery(filter, scan.getStopRow(), scan.includeStopRow(), + scan.getStartRow(), scan.includeStartRow(), true, ts); } else { - obTableQuery = buildObTableQuery(filter, scan.getStartRow(), scan.includeStartRow(), scan.getStopRow(), - scan.includeStopRow(), false, ts); + obTableQuery = buildObTableQuery(filter, scan.getStartRow(), scan.includeStartRow(), + scan.getStopRow(), scan.includeStopRow(), false, ts); } obTableQuery.setBatchSize(scan.getBatch()); obTableQuery.setLimit(scan.getLimit()); @@ -2080,10 +2087,11 @@ private QueryAndMutate buildDeleteQueryAndMutate(KeyValue kv, KeyValue.Type kvType = KeyValue.Type.codeToType(kv.getType().getCode()); com.alipay.oceanbase.rpc.mutation.Mutation tableMutation = buildMutation(kv, operationType, isTableGroup, family, TTL); - if(isTableGroup) { + if (isTableGroup) { // construct new_kv otherwise filter will fail to match targeted columns byte[] oldQualifier = CellUtil.cloneQualifier(kv); - byte[] newQualifier = new byte[family.length + 1/* length of "." */ + oldQualifier.length]; + byte[] newQualifier = new byte[family.length + 1/* length of "." */ + + oldQualifier.length]; System.arraycopy(family, 0, newQualifier, 0, family.length); newQualifier[family.length] = 0x2E; // 0x2E in utf-8 is "." System.arraycopy(oldQualifier, 0, newQualifier, family.length + 1, oldQualifier.length); @@ -2137,7 +2145,8 @@ private QueryAndMutate buildDeleteQueryAndMutate(KeyValue kv, if (!isTableGroup) { filter = buildObHTableFilter(null, null, Integer.MAX_VALUE); } else { - filter = buildObHTableFilter(null, null, Integer.MAX_VALUE, CellUtil.cloneQualifier(kv)); + filter = buildObHTableFilter(null, null, Integer.MAX_VALUE, + CellUtil.cloneQualifier(kv)); } } else { range.setStartKey(ObRowKey.getInstance(CellUtil.cloneRow(kv), ObObj.getMin(), @@ -2160,7 +2169,8 @@ private QueryAndMutate buildDeleteQueryAndMutate(KeyValue kv, range.setEndKey(ObRowKey.getInstance(CellUtil.cloneRow(kv), ObObj.getMax(), ObObj.getMax())); // [MAX_VALUE, MAX_VALUE), delete nothing - filter = buildObHTableFilter(null, new TimeRange(Long.MAX_VALUE), Integer.MAX_VALUE); + filter = buildObHTableFilter(null, new TimeRange(Long.MAX_VALUE), + Integer.MAX_VALUE); } else { TimeRange timeRange = new TimeRange(kv.getTimestamp(), kv.getTimestamp() + 1); range.setStartKey(ObRowKey.getInstance(CellUtil.cloneRow(kv), ObObj.getMin(), @@ -2168,13 +2178,10 @@ private QueryAndMutate buildDeleteQueryAndMutate(KeyValue kv, range.setEndKey(ObRowKey.getInstance(CellUtil.cloneRow(kv), ObObj.getMax(), ObObj.getMax())); if (!isTableGroup) { - filter = buildObHTableFilter(null, - timeRange, - Integer.MAX_VALUE); + filter = buildObHTableFilter(null, timeRange, Integer.MAX_VALUE); } else { - filter = buildObHTableFilter(null, - timeRange, - Integer.MAX_VALUE, CellUtil.cloneQualifier(kv)); + filter = buildObHTableFilter(null, timeRange, Integer.MAX_VALUE, + CellUtil.cloneQualifier(kv)); } } break; @@ -2193,7 +2200,8 @@ private com.alipay.oceanbase.rpc.mutation.Mutation buildMutation(Cell kv, Cell newCell = kv; if (isTableGroup && family != null) { byte[] oldQualifier = CellUtil.cloneQualifier(kv); - byte[] newQualifier = new byte[family.length + 1/* length of "." */ + oldQualifier.length]; + byte[] newQualifier = new byte[family.length + 1/* length of "." */ + + oldQualifier.length]; System.arraycopy(family, 0, newQualifier, 0, family.length); newQualifier[family.length] = 0x2E; // 0x2E in utf-8 is "." System.arraycopy(oldQualifier, 0, newQualifier, family.length + 1, oldQualifier.length); @@ -2553,7 +2561,8 @@ public byte[][] getStartKeys() throws IOException { if (ObGlobal.isHBaseAdminSupport()) { // 4.3.5.3 and above, OCP and ODP mode use regionLocator to getStartKeys if (regionLocator == null) { - OHRegionLocatorExecutor executor = new OHRegionLocatorExecutor(tableNameString, obTableClient); + OHRegionLocatorExecutor executor = new OHRegionLocatorExecutor(tableNameString, + obTableClient); regionLocator = executor.getRegionLocator(tableNameString); } startKeys = regionLocator.getStartKeys(); @@ -2562,10 +2571,12 @@ public byte[][] getStartKeys() throws IOException { try { startKeys = obTableClient.getHBaseTableStartKeys(tableNameString); } catch (Exception e) { - throw new IOException("Fail to get start keys of HBase Table: " + tableNameString, e); + throw new IOException("Fail to get start keys of HBase Table: " + tableNameString, + e); } } else { - throw new IOException("not supported yet in odp mode, only support in ObServer 4.3.5.3 and above"); + throw new IOException( + "not supported yet in odp mode, only support in ObServer 4.3.5.3 and above"); } return startKeys; @@ -2576,7 +2587,8 @@ public byte[][] getEndKeys() throws IOException { if (ObGlobal.isHBaseAdminSupport()) { // 4.3.5.3 and above, OCP and ODP mode use regionLocator to getEndKeys if (regionLocator == null) { - OHRegionLocatorExecutor executor = new OHRegionLocatorExecutor(tableNameString, obTableClient); + OHRegionLocatorExecutor executor = new OHRegionLocatorExecutor(tableNameString, + obTableClient); regionLocator = executor.getRegionLocator(tableNameString); } endKeys = regionLocator.getEndKeys(); @@ -2585,10 +2597,12 @@ public byte[][] getEndKeys() throws IOException { try { endKeys = obTableClient.getHBaseTableEndKeys(tableNameString); } catch (Exception e) { - throw new IOException("Fail to get start keys of HBase Table: " + tableNameString, e); + throw new IOException("Fail to get start keys of HBase Table: " + tableNameString, + e); } } else { - throw new IOException("not supported yet in odp mode, only support in ObServer 4.3.5.3 and above"); + throw new IOException( + "not supported yet in odp mode, only support in ObServer 4.3.5.3 and above"); } return endKeys; diff --git a/src/main/java/com/alipay/oceanbase/hbase/util/OHBaseFuncUtils.java b/src/main/java/com/alipay/oceanbase/hbase/util/OHBaseFuncUtils.java index f7701215..3896f383 100644 --- a/src/main/java/com/alipay/oceanbase/hbase/util/OHBaseFuncUtils.java +++ b/src/main/java/com/alipay/oceanbase/hbase/util/OHBaseFuncUtils.java @@ -20,10 +20,13 @@ import com.alipay.oceanbase.rpc.ObGlobal; import com.alipay.oceanbase.rpc.ObTableClient; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Row; +import org.apache.hadoop.hbase.util.Bytes; import java.util.Arrays; +import java.util.Comparator; import java.util.List; @InterfaceAudience.Private @@ -52,12 +55,12 @@ public static boolean isHBasePutPefSupport(ObTableClient tableClient) { if (tableClient.isOdpMode()) { // server version support and distributed capacity is enabled and odp version support return ObGlobal.isHBasePutPerfSupport() - && tableClient.getServerCapacity().isSupportDistributedExecute() - && ObGlobal.OB_PROXY_VERSION >= ObGlobal.OB_PROXY_VERSION_4_3_6_0; + && tableClient.getServerCapacity().isSupportDistributedExecute() + && ObGlobal.OB_PROXY_VERSION >= ObGlobal.OB_PROXY_VERSION_4_3_6_0; } else { // server version support and distributed capacity is enabled return ObGlobal.isHBasePutPerfSupport() - && tableClient.getServerCapacity().isSupportDistributedExecute(); + && tableClient.getServerCapacity().isSupportDistributedExecute(); } } @@ -71,4 +74,31 @@ public static boolean isAllPut(List actions) { } return isAllPut; } + + public static void sortHBaseResult(List cells) { + cells.sort(new Comparator() { + @Override + public int compare(Cell cell1, Cell cell2) { + // 1. sort family in lexicographical order + int familyComparison = Bytes.compareTo(cell1.getFamilyArray(), + cell1.getFamilyOffset(), cell1.getFamilyLength(), cell2.getFamilyArray(), + cell2.getFamilyOffset(), cell2.getFamilyLength()); + if (familyComparison != 0) { + return familyComparison; + } + + // 2: sort qualifier in lexicographical order + int qualifierComparison = Bytes.compareTo(cell1.getQualifierArray(), + cell1.getQualifierOffset(), cell1.getQualifierLength(), + cell2.getQualifierArray(), cell2.getQualifierOffset(), + cell2.getQualifierLength()); + if (qualifierComparison != 0) { + return qualifierComparison; + } + + // 3: sort timestamp in descend order + return Long.compare(cell2.getTimestamp(), cell1.getTimestamp()); + } + }); + } } From a899b6453dfdeafe0305a2f98324fd7dbf49210d Mon Sep 17 00:00:00 2001 From: JackShi148 Date: Mon, 11 Aug 2025 14:39:31 +0800 Subject: [PATCH 5/5] fix concurrent put --- .../com/alipay/oceanbase/hbase/OHTable.java | 62 ++++++++----------- .../hbase/result/ClientStreamScanner.java | 2 + .../hbase/util/OHBufferedMutatorImpl.java | 2 +- .../OHTableSecondaryPartPutTest.java | 26 ++++---- 4 files changed, 41 insertions(+), 51 deletions(-) diff --git a/src/main/java/com/alipay/oceanbase/hbase/OHTable.java b/src/main/java/com/alipay/oceanbase/hbase/OHTable.java index e894148e..759da56b 100644 --- a/src/main/java/com/alipay/oceanbase/hbase/OHTable.java +++ b/src/main/java/com/alipay/oceanbase/hbase/OHTable.java @@ -69,6 +69,7 @@ import java.io.IOException; import java.util.*; import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicLong; import static com.alipay.oceanbase.hbase.constants.OHConstants.*; import static com.alipay.oceanbase.hbase.util.Preconditions.checkArgument; @@ -84,7 +85,6 @@ import static org.apache.commons.lang.StringUtils.isBlank; import static org.apache.commons.lang.StringUtils.isNotBlank; import static com.alipay.oceanbase.hbase.filter.HBaseFilterUtils.writeBytesWithEscape; -import static org.apache.hadoop.hbase.KeyValue.Type.*; public class OHTable implements Table { @@ -154,7 +154,7 @@ public class OHTable implements Table { /** * the buffer of put request */ - private final ArrayList writeBuffer = new ArrayList(); + private final List writeBuffer = new CopyOnWriteArrayList<>(); /** * when the put request reach the write buffer size the do put will * flush commits automatically @@ -179,7 +179,7 @@ public class OHTable implements Table { /** * current buffer size */ - private long currentWriteBufferSize; + private AtomicLong currentWriteBufferSize = new AtomicLong(0); /** * the max size of put key value @@ -704,7 +704,7 @@ private void compatOldServerBatch(final List actions, final Objec @Override public void batch(final List actions, final Object[] results) throws IOException { - if (actions == null) { + if (actions == null || actions.isEmpty()) { return; } if (results != null) { @@ -926,12 +926,10 @@ private void processColumnFilters(NavigableSet columnFilters, byte[] family = entry.getKey(); if (entry.getValue() != null) { for (byte[] columnName : entry.getValue()) { - byte[] newQualifier = new byte[family.length + 1/* length of "." */ - + columnName.length]; + byte[] newQualifier = new byte[family.length + 1/* length of "." */ + columnName.length]; System.arraycopy(family, 0, newQualifier, 0, family.length); newQualifier[family.length] = 0x2E; // 0x2E in utf-8 is "." - System.arraycopy(columnName, 0, newQualifier, family.length + 1, - columnName.length); + System.arraycopy(columnName, 0, newQualifier, family.length + 1, columnName.length); columnFilters.add(newQualifier); } } else { @@ -1247,11 +1245,11 @@ private void doPut(List puts) throws IOException { validatePut(put); checkFamilyViolation(put.getFamilyCellMap().keySet(), true); writeBuffer.add(put); - currentWriteBufferSize += put.heapSize(); + currentWriteBufferSize.addAndGet(put.heapSize()); // we need to periodically see if the writebuffer is full instead of waiting until the end of the List n++; - if (n % putWriteBufferCheck == 0 && currentWriteBufferSize > writeBufferSize) { + if (n % putWriteBufferCheck == 0 && currentWriteBufferSize.get() > writeBufferSize) { if (OHBaseFuncUtils.isHBasePutPefSupport(obTableClient)) { flushCommitsV2(); } else { @@ -1259,7 +1257,7 @@ private void doPut(List puts) throws IOException { } } } - if (autoFlush || currentWriteBufferSize > writeBufferSize) { + if (autoFlush || currentWriteBufferSize.get() > writeBufferSize) { if (OHBaseFuncUtils.isHBasePutPefSupport(obTableClient)) { flushCommitsV2(); } else { @@ -1681,12 +1679,12 @@ public void flushCommits() throws IOException { } finally { if (clearBufferOnFail) { writeBuffer.clear(); - currentWriteBufferSize = 0; + currentWriteBufferSize.set(0); } else { // the write buffer was adjusted by processBatchOfPuts - currentWriteBufferSize = 0; + currentWriteBufferSize.set(0); for (Put aPut : writeBuffer) { - currentWriteBufferSize += aPut.heapSize(); + currentWriteBufferSize.addAndGet(aPut.heapSize()); } } } @@ -1714,12 +1712,12 @@ public void flushCommitsV2() throws IOException { } finally { if (clearBufferOnFail) { writeBuffer.clear(); - currentWriteBufferSize = 0; + currentWriteBufferSize.set(0); } else { // the write buffer was adjusted by processBatchOfPuts - currentWriteBufferSize = 0; + currentWriteBufferSize.set(0); for (Put aPut : writeBuffer) { - currentWriteBufferSize += aPut.heapSize(); + currentWriteBufferSize.addAndGet(aPut.heapSize()); } } } @@ -2090,8 +2088,7 @@ private QueryAndMutate buildDeleteQueryAndMutate(KeyValue kv, if (isTableGroup) { // construct new_kv otherwise filter will fail to match targeted columns byte[] oldQualifier = CellUtil.cloneQualifier(kv); - byte[] newQualifier = new byte[family.length + 1/* length of "." */ - + oldQualifier.length]; + byte[] newQualifier = new byte[family.length + 1/* length of "." */ + oldQualifier.length]; System.arraycopy(family, 0, newQualifier, 0, family.length); newQualifier[family.length] = 0x2E; // 0x2E in utf-8 is "." System.arraycopy(oldQualifier, 0, newQualifier, family.length + 1, oldQualifier.length); @@ -2145,8 +2142,7 @@ private QueryAndMutate buildDeleteQueryAndMutate(KeyValue kv, if (!isTableGroup) { filter = buildObHTableFilter(null, null, Integer.MAX_VALUE); } else { - filter = buildObHTableFilter(null, null, Integer.MAX_VALUE, - CellUtil.cloneQualifier(kv)); + filter = buildObHTableFilter(null, null, Integer.MAX_VALUE, CellUtil.cloneQualifier(kv)); } } else { range.setStartKey(ObRowKey.getInstance(CellUtil.cloneRow(kv), ObObj.getMin(), @@ -2169,8 +2165,7 @@ private QueryAndMutate buildDeleteQueryAndMutate(KeyValue kv, range.setEndKey(ObRowKey.getInstance(CellUtil.cloneRow(kv), ObObj.getMax(), ObObj.getMax())); // [MAX_VALUE, MAX_VALUE), delete nothing - filter = buildObHTableFilter(null, new TimeRange(Long.MAX_VALUE), - Integer.MAX_VALUE); + filter = buildObHTableFilter(null, new TimeRange(Long.MAX_VALUE), Integer.MAX_VALUE); } else { TimeRange timeRange = new TimeRange(kv.getTimestamp(), kv.getTimestamp() + 1); range.setStartKey(ObRowKey.getInstance(CellUtil.cloneRow(kv), ObObj.getMin(), @@ -2200,8 +2195,7 @@ private com.alipay.oceanbase.rpc.mutation.Mutation buildMutation(Cell kv, Cell newCell = kv; if (isTableGroup && family != null) { byte[] oldQualifier = CellUtil.cloneQualifier(kv); - byte[] newQualifier = new byte[family.length + 1/* length of "." */ - + oldQualifier.length]; + byte[] newQualifier = new byte[family.length + 1/* length of "." */ + oldQualifier.length]; System.arraycopy(family, 0, newQualifier, 0, family.length); newQualifier[family.length] = 0x2E; // 0x2E in utf-8 is "." System.arraycopy(oldQualifier, 0, newQualifier, family.length + 1, oldQualifier.length); @@ -2561,8 +2555,7 @@ public byte[][] getStartKeys() throws IOException { if (ObGlobal.isHBaseAdminSupport()) { // 4.3.5.3 and above, OCP and ODP mode use regionLocator to getStartKeys if (regionLocator == null) { - OHRegionLocatorExecutor executor = new OHRegionLocatorExecutor(tableNameString, - obTableClient); + OHRegionLocatorExecutor executor = new OHRegionLocatorExecutor(tableNameString, obTableClient); regionLocator = executor.getRegionLocator(tableNameString); } startKeys = regionLocator.getStartKeys(); @@ -2571,12 +2564,10 @@ public byte[][] getStartKeys() throws IOException { try { startKeys = obTableClient.getHBaseTableStartKeys(tableNameString); } catch (Exception e) { - throw new IOException("Fail to get start keys of HBase Table: " + tableNameString, - e); + throw new IOException("Fail to get start keys of HBase Table: " + tableNameString, e); } } else { - throw new IOException( - "not supported yet in odp mode, only support in ObServer 4.3.5.3 and above"); + throw new IOException("not supported yet in odp mode, only support in ObServer 4.3.5.3 and above"); } return startKeys; @@ -2587,8 +2578,7 @@ public byte[][] getEndKeys() throws IOException { if (ObGlobal.isHBaseAdminSupport()) { // 4.3.5.3 and above, OCP and ODP mode use regionLocator to getEndKeys if (regionLocator == null) { - OHRegionLocatorExecutor executor = new OHRegionLocatorExecutor(tableNameString, - obTableClient); + OHRegionLocatorExecutor executor = new OHRegionLocatorExecutor(tableNameString, obTableClient); regionLocator = executor.getRegionLocator(tableNameString); } endKeys = regionLocator.getEndKeys(); @@ -2597,12 +2587,10 @@ public byte[][] getEndKeys() throws IOException { try { endKeys = obTableClient.getHBaseTableEndKeys(tableNameString); } catch (Exception e) { - throw new IOException("Fail to get start keys of HBase Table: " + tableNameString, - e); + throw new IOException("Fail to get start keys of HBase Table: " + tableNameString, e); } } else { - throw new IOException( - "not supported yet in odp mode, only support in ObServer 4.3.5.3 and above"); + throw new IOException("not supported yet in odp mode, only support in ObServer 4.3.5.3 and above"); } return endKeys; diff --git a/src/main/java/com/alipay/oceanbase/hbase/result/ClientStreamScanner.java b/src/main/java/com/alipay/oceanbase/hbase/result/ClientStreamScanner.java index dca76d20..b5f41c9c 100644 --- a/src/main/java/com/alipay/oceanbase/hbase/result/ClientStreamScanner.java +++ b/src/main/java/com/alipay/oceanbase/hbase/result/ClientStreamScanner.java @@ -127,6 +127,8 @@ public Result next() throws IOException { break; } } + // sort keyValues + OHBaseFuncUtils.sortHBaseResult(keyValues); return Result.create(keyValues); } catch (Exception e) { logger.error(LCD.convert("01-00000"), streamResult.getTableName(), e); diff --git a/src/main/java/com/alipay/oceanbase/hbase/util/OHBufferedMutatorImpl.java b/src/main/java/com/alipay/oceanbase/hbase/util/OHBufferedMutatorImpl.java index 76ddf9f7..66a80ee7 100644 --- a/src/main/java/com/alipay/oceanbase/hbase/util/OHBufferedMutatorImpl.java +++ b/src/main/java/com/alipay/oceanbase/hbase/util/OHBufferedMutatorImpl.java @@ -301,7 +301,7 @@ public void disableWriteBufferPeriodicFlush() { } @Override - public void close() throws IOException { + public synchronized void close() throws IOException { if (closed) { return; } diff --git a/src/test/java/com/alipay/oceanbase/hbase/secondary/OHTableSecondaryPartPutTest.java b/src/test/java/com/alipay/oceanbase/hbase/secondary/OHTableSecondaryPartPutTest.java index 74e65d8b..5639f1d2 100644 --- a/src/test/java/com/alipay/oceanbase/hbase/secondary/OHTableSecondaryPartPutTest.java +++ b/src/test/java/com/alipay/oceanbase/hbase/secondary/OHTableSecondaryPartPutTest.java @@ -45,9 +45,6 @@ public class OHTableSecondaryPartPutTest { public static void before() throws Exception { openDistributedExecute(); for (TableTemplateManager.TableType type : TableTemplateManager.NORMAL_TABLES) { - if (type != TableTemplateManager.TableType.SECONDARY_PARTITIONED_RANGE_KEY_GEN) { - continue; - } createTables(type, tableNames, group2tableNames, true); } } @@ -171,12 +168,13 @@ public static void testBatchPutConcurrentImpl(String tableName) throws Exception String value = "value"; // 创建线程池 + int taskCount = 50; ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 20, 100, TimeUnit.SECONDS, new LinkedBlockingQueue()); AtomicInteger successCount = new AtomicInteger(0); - CountDownLatch countDownLatch = new CountDownLatch(50); + CountDownLatch countDownLatch = new CountDownLatch(taskCount); // 并发执行50个任务 - for (int i = 0; i < 50; i++) { + for (int i = 0; i < taskCount; i++) { final int taskId = i; threadPoolExecutor.submit(() -> { try { @@ -204,7 +202,7 @@ public static void testBatchPutConcurrentImpl(String tableName) throws Exception // 等待所有任务完成 countDownLatch.await(30, TimeUnit.SECONDS); - threadPoolExecutor.shutdown(); + threadPoolExecutor.shutdownNow(); // 验证结果 System.out.println("Concurrent batch put completed. Success count: " + successCount.get()); @@ -221,12 +219,13 @@ public static void testMixedOperationsConcurrentImpl(String tableName) throws Ex String value = "mixedValue"; // 创建线程池 - ThreadPoolExecutor threadPoolExecutor = OHTable.createDefaultThreadPoolExecutor(1, 30, 100); + int taskCount = 50; + ThreadPoolExecutor threadPoolExecutor = OHTable.createDefaultThreadPoolExecutor(1, taskCount, 100); AtomicInteger putSuccessCount = new AtomicInteger(0); - CountDownLatch countDownLatch = new CountDownLatch(100); + CountDownLatch countDownLatch = new CountDownLatch(taskCount); // 并发执行混合操作:50个put任务 - for (int i = 0; i < 50; i++) { + for (int i = 0; i < taskCount; i++) { final int taskId = i; // Put任务 threadPoolExecutor.submit(() -> { @@ -251,7 +250,7 @@ public static void testMixedOperationsConcurrentImpl(String tableName) throws Ex // 等待所有任务完成 countDownLatch.await(60, TimeUnit.SECONDS); - threadPoolExecutor.shutdown(); + threadPoolExecutor.shutdownNow(); // 验证结果 System.out.println("Mixed operations completed. Put success: " + putSuccessCount.get()); @@ -311,12 +310,13 @@ public static void testMultiCFConcurrentImpl(Map.Entry> ent hTable.init(); // 创建线程池 - ThreadPoolExecutor threadPoolExecutor = OHTable.createDefaultThreadPoolExecutor(1, 25, 100); + int taskCount = 40; + ThreadPoolExecutor threadPoolExecutor = OHTable.createDefaultThreadPoolExecutor(1, taskCount, 100); AtomicInteger successCount = new AtomicInteger(0); - CountDownLatch countDownLatch = new CountDownLatch(40); + CountDownLatch countDownLatch = new CountDownLatch(taskCount); // 并发执行多列族操作 - for (int i = 0; i < 40; i++) { + for (int i = 0; i < taskCount; i++) { final int taskId = i; threadPoolExecutor.submit(() -> { try {