From 83c823d1d1753ce1e80a679c8ac41efad711eeb6 Mon Sep 17 00:00:00 2001 From: GroundWu <1175416256@qq.com> Date: Fri, 9 Jan 2026 16:00:29 +0800 Subject: [PATCH 1/2] add auto generate ts in client --- .../com/alipay/oceanbase/hbase/OHTable.java | 41 +- .../hbase/constants/OHConstants.java | 7 +- .../OHTableFillTimestampInClientTest.java | 435 ++++++++++++++++++ 3 files changed, 473 insertions(+), 10 deletions(-) create mode 100644 src/test/java/com/alipay/oceanbase/hbase/secondary/OHTableFillTimestampInClientTest.java diff --git a/src/main/java/com/alipay/oceanbase/hbase/OHTable.java b/src/main/java/com/alipay/oceanbase/hbase/OHTable.java index 8f0c2a17..c13c9898 100644 --- a/src/main/java/com/alipay/oceanbase/hbase/OHTable.java +++ b/src/main/java/com/alipay/oceanbase/hbase/OHTable.java @@ -186,6 +186,12 @@ public class OHTable implements HTableInterface { private final OHMetrics metrics; + /** + * whether to fill current system time when timestamp is not specified during write operations. + * Default is false (disabled). + */ + private final boolean fillTimestampInClient; + /** * Creates an object to access a HBase table. * Shares oceanbase table obTableClient and other resources with other OHTable instances @@ -222,7 +228,7 @@ public OHTable(Configuration configuration, String tableName) throws IOException } else { this.metrics = null; } - + this.fillTimestampInClient = configuration.getBoolean(HBASE_HTABLE_AUTO_FILL_TIMESTAMP_IN_CLIENT, false); finishSetUp(); } @@ -279,7 +285,7 @@ public OHTable(Configuration configuration, final byte[] tableName, } else { this.metrics = null; } - + this.fillTimestampInClient = configuration.getBoolean(HBASE_HTABLE_AUTO_FILL_TIMESTAMP_IN_CLIENT, false); finishSetUp(); } @@ -309,6 +315,7 @@ public OHTable(final byte[] tableName, final ObTableClient obTableClient, this.obTableClient = obTableClient; this.configuration = new Configuration(); this.metrics = null; + this.fillTimestampInClient = configuration.getBoolean(HBASE_HTABLE_AUTO_FILL_TIMESTAMP_IN_CLIENT, false); finishSetUp(); } @@ -353,7 +360,7 @@ public OHTable(TableName tableName, Connection connection, } else { this.metrics = null; } - + this.fillTimestampInClient = configuration.getBoolean(HBASE_HTABLE_AUTO_FILL_TIMESTAMP_IN_CLIENT, false); finishSetUp(); } @@ -1667,7 +1674,7 @@ Long execute() throws IOException { ObTableBatchOperation batch = new ObTableBatchOperation(); batch.addTableOperation(getInstance(INCREMENT, new Object[] { row, qualifier, - Long.MAX_VALUE }, V_COLUMNS, new Object[] { Bytes.toBytes(amount) })); + getEffectiveTimestampForWrite(Long.MAX_VALUE) }, V_COLUMNS, new Object[] { Bytes.toBytes(amount) })); ObHTableFilter filter = buildObHTableFilter(null, null, 1, qualifiers); @@ -2182,7 +2189,7 @@ public boolean isWeakRead(Object query) { return "weak".equalsIgnoreCase(consistencyStr); } - public static ObTableBatchOperation buildObTableBatchOperation(List rowList, + public ObTableBatchOperation buildObTableBatchOperation(List rowList, List qualifiers) { ObTableBatchOperation batch = new ObTableBatchOperation(); ObTableOperationType opType; @@ -2341,7 +2348,7 @@ private com.alipay.oceanbase.rpc.mutation.Mutation buildMutation(KeyValue kv, } return com.alipay.oceanbase.rpc.mutation.Mutation.getInstance(operationType, ROW_KEY_COLUMNS, - new Object[] { kv.getRow(), kv.getQualifier(), kv.getTimestamp() }, + new Object[] { kv.getRow(), kv.getQualifier(), getEffectiveTimestampForWrite(kv.getTimestamp()) }, property_columns, property); case Delete: return com.alipay.oceanbase.rpc.mutation.Mutation.getInstance(DEL, ROW_KEY_COLUMNS, @@ -2371,6 +2378,22 @@ private com.alipay.oceanbase.rpc.mutation.Mutation buildMutation(KeyValue kv, } } + + /** + * Get effective timestamp for write operations (Put, Increment, Append). + * If fillTimestampInClient is enabled and timestamp is Long.MAX_VALUE, + * returns current system time, otherwise returns the original timestamp. + * + * @param timestamp the original timestamp + * @return effective timestamp to use + */ + private long getEffectiveTimestampForWrite(long timestamp) { + if (fillTimestampInClient && timestamp == Long.MAX_VALUE) { + return System.currentTimeMillis(); + } + return timestamp; + } + private KeyValue modifyQualifier(KeyValue original, byte[] newQualifier) { // Extract existing components byte[] row = original.getRow(); @@ -2579,7 +2602,7 @@ private ObHbaseRequest buildHbaseRequest(List actions, OHOperatio for (Cell kv : keyValueList) { ObHbaseCell cell = new ObHbaseCell(isCellTTL); cell.setQ(ObObj.getInstance(CellUtil.cloneQualifier(kv))); - cell.setT(ObObj.getInstance(-kv.getTimestamp())); // set timestamp as negative + cell.setT(ObObj.getInstance(-getEffectiveTimestampForWrite(kv.getTimestamp()))); // set timestamp as negative cell.setV(ObObj.getInstance(CellUtil.cloneValue(kv))); if (isCellTTL) { cell.setTTL(ObObj.getInstance(ttl)); @@ -2603,7 +2626,7 @@ private ObHbaseRequest buildHbaseRequest(List actions, OHOperatio return request; } - public static ObTableOperation buildObTableOperation(KeyValue kv, + public ObTableOperation buildObTableOperation(KeyValue kv, ObTableOperationType operationType, Long TTL) { KeyValue.Type kvType = KeyValue.Type.codeToType(kv.getType()); @@ -2618,7 +2641,7 @@ public static ObTableOperation buildObTableOperation(KeyValue kv, return getInstance( operationType, new Object[] { CellUtil.cloneRow(kv), CellUtil.cloneQualifier(kv), - kv.getTimestamp() }, property_columns, property); + getEffectiveTimestampForWrite(kv.getTimestamp()) }, property_columns, property); case Delete: return getInstance( DEL, diff --git a/src/main/java/com/alipay/oceanbase/hbase/constants/OHConstants.java b/src/main/java/com/alipay/oceanbase/hbase/constants/OHConstants.java index 935ef22a..7cca15ac 100644 --- a/src/main/java/com/alipay/oceanbase/hbase/constants/OHConstants.java +++ b/src/main/java/com/alipay/oceanbase/hbase/constants/OHConstants.java @@ -154,7 +154,12 @@ public final class OHConstants { * use to specify the route policy when performing a query. */ public static final String HBASE_HTABLE_CLIENT_ROUTE_POLICY = "hbase.htable.client.route.policy"; - + + /** + * use to specify whether to automatically fill current system time in client when timestamp is not specified during write operations. + * Default is false (disabled). + */ + public static final String HBASE_HTABLE_AUTO_FILL_TIMESTAMP_IN_CLIENT = "hbase.htable.auto.fill.timestamp.in.client"; /*-------------------------------------------------------------------------------------------------------------*/ /** diff --git a/src/test/java/com/alipay/oceanbase/hbase/secondary/OHTableFillTimestampInClientTest.java b/src/test/java/com/alipay/oceanbase/hbase/secondary/OHTableFillTimestampInClientTest.java new file mode 100644 index 00000000..244a309e --- /dev/null +++ b/src/test/java/com/alipay/oceanbase/hbase/secondary/OHTableFillTimestampInClientTest.java @@ -0,0 +1,435 @@ +/*- + * #%L + * OBKV HBase Client Framework + * %% + * Copyright (C) 2025 OceanBase Group + * %% + * OBKV HBase Client Framework is licensed under Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * http://license.coscl.org.cn/MulanPSL2 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PSL v2 for more details. + * #L% + */ + +package com.alipay.oceanbase.hbase.secondary; + +import com.alipay.oceanbase.hbase.OHTableClient; +import com.alipay.oceanbase.hbase.util.ObHTableTestUtil; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.client.Append; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.*; + +import java.sql.Connection; +import java.sql.SQLSyntaxErrorException; +import java.util.*; + +import static com.alipay.oceanbase.hbase.constants.OHConstants.HBASE_HTABLE_AUTO_FILL_TIMESTAMP_IN_CLIENT; +import static com.alipay.oceanbase.hbase.util.ObHTableTestUtil.*; +import static org.apache.hadoop.hbase.util.Bytes.toBytes; + +public class OHTableFillTimestampInClientTest { + private static final String TABLE_NAME = "test"; + private static final String TABLE_GROUP = "test_fill_timestamp_group"; + private static final String COLUMN_FAMILY = "cf"; + private static final String TABLE_FULL_NAME = TABLE_NAME + "$" + COLUMN_FAMILY; + private static final long ONE_DAY_MS = 24 * 60 * 60 * 1000L; + + @BeforeClass + public static void before() throws Exception { + createDynamicPartitionTable(); + } + + @AfterClass + public static void finish() throws Exception { + dropTable(); + } + + @Before + public void prepareCase() throws Exception { + truncateTable(); + } + + /** + * Create dynamic partition table for testing + */ + private static void createDynamicPartitionTable() throws Exception { + Connection conn = ObHTableTestUtil.getConnection(); + + // Create table group + String tableGroupSql = String.format("CREATE TABLEGROUP IF NOT EXISTS `%s` SHARDING = 'ADAPTIVE';", TABLE_GROUP); + try { + conn.createStatement().execute(tableGroupSql); + System.out.println("Created table group: " + TABLE_GROUP); + } catch (SQLSyntaxErrorException e) { + if (!e.getMessage().contains("already exists")) { + throw e; + } + System.out.println("Table group already exists: " + TABLE_GROUP); + } + + // Calculate partition boundary (current time + 1 day) + long currentTime = System.currentTimeMillis(); + long partitionBoundary = currentTime + ONE_DAY_MS; + + // Create table with dynamic partition policy + String createTableSql = String.format( + "CREATE TABLE IF NOT EXISTS `%s` (\n" + + " `K` varbinary(1024) NOT NULL,\n" + + " `Q` varbinary(256) NOT NULL,\n" + + " `T` bigint(20) NOT NULL,\n" + + " `V` varbinary(10240) DEFAULT NULL,\n" + + " `G` bigint(20) GENERATED ALWAYS AS (ABS(T)),\n" + + " `K_PREFIX` varbinary(1024) GENERATED ALWAYS AS (substring(`K`, 1, 18)),\n" + + " PRIMARY KEY (`K`, `Q`, `T`)\n" + + ") TABLEGROUP = `%s`\n" + + " kv_attributes ='{\"HBase\": {}}'\n" + + " enable_macro_block_bloom_filter = True\n" + + " DYNAMIC_PARTITION_POLICY(\n" + + " ENABLE = true,\n" + + " TIME_UNIT = 'month',\n" + + " PRECREATE_TIME = '1 month',\n" + + " EXPIRE_TIME = '1 month',\n" + + " BIGINT_PRECISION = 'ms')\n" + + " PARTITION BY RANGE COLUMNS(`G`) SUBPARTITION BY KEY(`K_PREFIX`) SUBPARTITIONS 2 (\n" + + " PARTITION `p0` VALUES LESS THAN (%d)\n" + + " );", + TABLE_FULL_NAME, TABLE_GROUP, partitionBoundary); + + try { + conn.createStatement().execute(createTableSql); + System.out.println("Created table: " + TABLE_FULL_NAME); + System.out.println("Partition boundary: " + partitionBoundary + " (" + new java.util.Date(partitionBoundary) + ")"); + } catch (SQLSyntaxErrorException e) { + if (!e.getMessage().contains("already exists")) { + throw e; + } + System.out.println("Table already exists: " + TABLE_FULL_NAME); + } + } + + /** + * Drop test table + */ + private static void dropTable() throws Exception { + Connection conn = ObHTableTestUtil.getConnection(); + try { + String dropTableSql = "DROP TABLE IF EXISTS `" + TABLE_FULL_NAME + "`;"; + conn.createStatement().execute(dropTableSql); + System.out.println("Dropped table: " + TABLE_FULL_NAME); + } catch (Exception e) { + System.out.println("Failed to drop table: " + e.getMessage()); + } + } + + /** + * Truncate test table + */ + private static void truncateTable() throws Exception { + Connection conn = ObHTableTestUtil.getConnection(); + try { + String truncateSql = "TRUNCATE TABLE `" + TABLE_FULL_NAME + "`;"; + conn.createStatement().execute(truncateSql); + } catch (Exception e) { + System.out.println("Failed to truncate table: " + e.getMessage()); + } + } + + /** + * Create OHTableClient with fillTimestampInClient configuration + */ + private static OHTableClient newOHTableClientWithConfig(boolean fillTimestamp) throws Exception { + Configuration conf = ObHTableTestUtil.newConfiguration(); + conf.setBoolean(HBASE_HTABLE_AUTO_FILL_TIMESTAMP_IN_CLIENT, fillTimestamp); + OHTableClient client = new OHTableClient(TABLE_NAME, conf); + client.init(); + return client; + } + + /** + * Test Put operation with fillTimestampInClient enabled + */ + @Test + public void testPutWithFillTimestampEnabled() throws Exception { + OHTableClient hTable = newOHTableClientWithConfig(true); + + String key = "putKey"; + String column = "column1"; + String value = "value1"; + + // Put without timestamp should succeed (uses current time within partition range) + Put put = new Put(toBytes(key)); + put.addColumn(COLUMN_FAMILY.getBytes(), column.getBytes(), toBytes(value)); + hTable.put(put); // Should succeed + + // Verify data was written + Get get = new Get(toBytes(key)); + get.addColumn(COLUMN_FAMILY.getBytes(), column.getBytes()); + Result r = hTable.get(get); + Assert.assertEquals(1, r.size()); + Assert.assertTrue(secureCompare(toBytes(value), r.getValue(COLUMN_FAMILY.getBytes(), column.getBytes()))); + + // Verify timestamp is within valid range (current time) + Cell cell = r.rawCells()[0]; + long timestamp = cell.getTimestamp(); + long currentTime = System.currentTimeMillis(); + Assert.assertTrue("Timestamp should be current time", + timestamp > currentTime - 5000 && timestamp <= currentTime + 1000); + Assert.assertNotEquals("Timestamp should not be Long.MAX_VALUE", Long.MAX_VALUE, timestamp); + + hTable.close(); + } + + /** + * Test Put operation with fillTimestampInClient disabled (should fail) + */ + @Test + public void testPutWithFillTimestampDisabled() throws Exception { + OHTableClient hTable = newOHTableClientWithConfig(false); + + String key = "putKey"; + String column = "column1"; + String value = "value1"; + + // Put without timestamp should fail (Long.MAX_VALUE is out of partition range) + Put put = new Put(toBytes(key)); + put.addColumn(COLUMN_FAMILY.getBytes(), column.getBytes(), toBytes(value)); + + try { + hTable.put(put); + Assert.fail("Put without timestamp should fail when fillTimestampInClient is disabled"); + } catch (Exception e) { + // Expected: should throw exception because Long.MAX_VALUE is out of partition range + System.out.println("Expected exception: " + e.getMessage()); + Assert.assertTrue("Should throw exception about partition or timestamp", + e.getMessage() != null); + } + + hTable.close(); + } + + /** + * Test Put with explicit timestamp (should work regardless of config) + */ + @Test + public void testPutWithExplicitTimestamp() throws Exception { + OHTableClient hTable = newOHTableClientWithConfig(true); + + String key = "putKey"; + String column = "column1"; + String value = "value1"; + long explicitTimestamp = System.currentTimeMillis() - 1000; // 1 second ago + + // Put with explicit timestamp should use the explicit timestamp + Put put = new Put(toBytes(key)); + put.addColumn(COLUMN_FAMILY.getBytes(), column.getBytes(), explicitTimestamp, toBytes(value)); + hTable.put(put); + + // Verify explicit timestamp is preserved + Get get = new Get(toBytes(key)); + get.addColumn(COLUMN_FAMILY.getBytes(), column.getBytes()); + Result r = hTable.get(get); + Assert.assertEquals(1, r.size()); + Cell cell = r.rawCells()[0]; + Assert.assertEquals("Explicit timestamp should be preserved", explicitTimestamp, cell.getTimestamp()); + + hTable.close(); + } + + /** + * Test batch Put with fillTimestampInClient enabled + */ + @Test + public void testBatchPutWithFillTimestampEnabled() throws Exception { + OHTableClient hTable = newOHTableClientWithConfig(true); + + String keyPrefix = "batchKey"; + String column = "column1"; + String value = "value"; + + // Batch Put without timestamp should succeed + List puts = new ArrayList<>(); + for (int i = 0; i < 5; i++) { + Put put = new Put(toBytes(keyPrefix + i)); + put.addColumn(COLUMN_FAMILY.getBytes(), column.getBytes(), toBytes(value + i)); + puts.add(put); + } + hTable.put(puts); // Should succeed + + // Verify all puts were written + for (int i = 0; i < 5; i++) { + Get get = new Get(toBytes(keyPrefix + i)); + get.addColumn(COLUMN_FAMILY.getBytes(), column.getBytes()); + Result r = hTable.get(get); + Assert.assertEquals(1, r.size()); + Assert.assertTrue(secureCompare(toBytes(value + i), + r.getValue(COLUMN_FAMILY.getBytes(), column.getBytes()))); + } + + hTable.close(); + } + + /** + * Test batch Put with fillTimestampInClient disabled (should fail) + */ + @Test + public void testBatchPutWithFillTimestampDisabled() throws Exception { + OHTableClient hTable = newOHTableClientWithConfig(false); + + String keyPrefix = "batchKey"; + String column = "column1"; + String value = "value"; + + // Batch Put without timestamp should fail + List puts = new ArrayList<>(); + for (int i = 0; i < 5; i++) { + Put put = new Put(toBytes(keyPrefix + i)); + put.addColumn(COLUMN_FAMILY.getBytes(), column.getBytes(), toBytes(value + i)); + puts.add(put); + } + + try { + hTable.put(puts); + Assert.fail("Batch Put without timestamp should fail when fillTimestampInClient is disabled"); + } catch (Exception e) { + // Expected: should throw exception + System.out.println("Expected exception: " + e.getMessage()); + Assert.assertTrue("Should throw exception about partition or timestamp", + e.getMessage() != null); + } + + hTable.close(); + } + + /** + * Test Increment operation with fillTimestampInClient enabled + */ + @Test + public void testIncrementWithFillTimestampEnabled() throws Exception { + OHTableClient hTable = newOHTableClientWithConfig(true); + + String key = "incrementKey"; + String column = "column1"; + + // First put a value with explicit timestamp + long putTimestamp = System.currentTimeMillis() - 1000; + Put put = new Put(toBytes(key)); + put.addColumn(COLUMN_FAMILY.getBytes(), column.getBytes(), putTimestamp, Bytes.toBytes(10L)); + hTable.put(put); + + // Increment should succeed (creates new row with current time) + long result = hTable.incrementColumnValue( + toBytes(key), + COLUMN_FAMILY.getBytes(), + column.getBytes(), + 5L); + + Assert.assertEquals(15L, result); + + // Verify increment created a new cell + Get get = new Get(toBytes(key)); + get.addColumn(COLUMN_FAMILY.getBytes(), column.getBytes()); + Result r = hTable.get(get); + Assert.assertTrue("Should have at least one cell", r.size() >= 1); + + hTable.close(); + } + + /** + * Test Increment operation with fillTimestampInClient disabled (should fail for new row) + */ + @Test + public void testIncrementWithFillTimestampDisabled() throws Exception { + OHTableClient hTable = newOHTableClientWithConfig(false); + + String key = "incrementKey"; + String column = "column1"; + + // First put a value with explicit timestamp + long putTimestamp = System.currentTimeMillis() - 1000; + Put put = new Put(toBytes(key)); + put.addColumn(COLUMN_FAMILY.getBytes(), column.getBytes(), putTimestamp, Bytes.toBytes(10L)); + hTable.put(put); + + // Increment on existing row might succeed, but creating new row should fail + // This depends on implementation - if increment creates new row without timestamp, it should fail + try { + long result = hTable.incrementColumnValue( + toBytes(key + "_new"), + COLUMN_FAMILY.getBytes(), + column.getBytes(), + 5L); + // If it succeeds, the server might have handled it differently + System.out.println("Increment result: " + result); + } catch (Exception e) { + // Expected if increment tries to create new row without timestamp + System.out.println("Expected exception: " + e.getMessage()); + } + + hTable.close(); + } + + /** + * Test Append operation with fillTimestampInClient enabled + */ + @Test + public void testAppendWithFillTimestampEnabled() throws Exception { + OHTableClient hTable = newOHTableClientWithConfig(true); + + String key = "appendKey"; + String column = "column1"; + + // First put a value with explicit timestamp + long putTimestamp = System.currentTimeMillis() - 1000; + Put put = new Put(toBytes(key)); + put.addColumn(COLUMN_FAMILY.getBytes(), column.getBytes(), putTimestamp, toBytes("value")); + hTable.put(put); + + // Append should succeed (creates new row with current time) + Append append = new Append(toBytes(key)); + append.add(COLUMN_FAMILY.getBytes(), column.getBytes(), toBytes("_appended")); + Result result = hTable.append(append); + + Assert.assertNotNull("Append should return result", result); + Assert.assertTrue("Should have cells", result.size() > 0); + + hTable.close(); + } + + /** + * Test Append operation with fillTimestampInClient disabled (should fail for new row) + */ + @Test + public void testAppendWithFillTimestampDisabled() throws Exception { + OHTableClient hTable = newOHTableClientWithConfig(false); + + String key = "appendKey"; + String column = "column1"; + + // First put a value with explicit timestamp + long putTimestamp = System.currentTimeMillis() - 1000; + Put put = new Put(toBytes(key)); + put.addColumn(COLUMN_FAMILY.getBytes(), column.getBytes(), putTimestamp, toBytes("value")); + hTable.put(put); + + // Append on existing row might succeed, but creating new row should fail + try { + Append append = new Append(toBytes(key + "_new")); + append.add(COLUMN_FAMILY.getBytes(), column.getBytes(), toBytes("value")); + Result result = hTable.append(append); + System.out.println("Append result size: " + (result != null ? result.size() : 0)); + } catch (Exception e) { + // Expected if append tries to create new row without timestamp + Assert.assertTrue("Should throw exception about partition or timestamp", + e.getMessage() != null); + } + } +} \ No newline at end of file From af451fefaaa5a32a3b4c14bcc25a4d9e8c653a97 Mon Sep 17 00:00:00 2001 From: GroundWu <1175416256@qq.com> Date: Tue, 13 Jan 2026 17:43:39 +0800 Subject: [PATCH 2/2] add config to control if use put opt --- src/main/java/com/alipay/oceanbase/hbase/OHTable.java | 9 ++++++++- .../alipay/oceanbase/hbase/constants/OHConstants.java | 9 +++++++++ .../com/alipay/oceanbase/hbase/util/OHBaseFuncUtils.java | 7 ++++++- 3 files changed, 23 insertions(+), 2 deletions(-) diff --git a/src/main/java/com/alipay/oceanbase/hbase/OHTable.java b/src/main/java/com/alipay/oceanbase/hbase/OHTable.java index c13c9898..0ce22e19 100644 --- a/src/main/java/com/alipay/oceanbase/hbase/OHTable.java +++ b/src/main/java/com/alipay/oceanbase/hbase/OHTable.java @@ -162,6 +162,11 @@ public class OHTable implements HTableInterface { */ private int maxKeyValueSize; + /** + * whether to enable put optimization path + */ + private boolean enablePutOptimization; + // i.e., doPut checks the writebuffer every X Puts. /** @@ -414,6 +419,8 @@ private void finishSetUp() { MAX_KEYVALUE_SIZE_DEFAULT); this.writeBufferSize = this.configuration.getLong(WRITE_BUFFER_SIZE_KEY, WRITE_BUFFER_SIZE_DEFAULT); + this.enablePutOptimization = this.configuration.getBoolean(HBASE_HTABLE_USE_PUT_OPTIMIZATION, + HBASE_HTABLE_USE_PUT_OPTIMIZATION_DEFAULT); } public static OHConnectionConfiguration setUserDefinedNamespace(String tableNameString, @@ -750,7 +757,7 @@ public void innerBatchImpl(final List actions, final Object[] res } catch (Exception e) { throw new IOException(tableNameString + " table occurred unexpected error." , e); } - } else if (OHBaseFuncUtils.isAllPut(opType, actions) && OHBaseFuncUtils.isHBasePutPefSupport(obTableClient)) { + } else if (OHBaseFuncUtils.isAllPut(opType, actions) && OHBaseFuncUtils.isHBasePutPefSupport(obTableClient, enablePutOptimization)) { // only support Put now ObHbaseRequest request = buildHbaseRequest(actions, opType); try { diff --git a/src/main/java/com/alipay/oceanbase/hbase/constants/OHConstants.java b/src/main/java/com/alipay/oceanbase/hbase/constants/OHConstants.java index 7cca15ac..72fb5b8b 100644 --- a/src/main/java/com/alipay/oceanbase/hbase/constants/OHConstants.java +++ b/src/main/java/com/alipay/oceanbase/hbase/constants/OHConstants.java @@ -160,6 +160,13 @@ public final class OHConstants { * Default is false (disabled). */ public static final String HBASE_HTABLE_AUTO_FILL_TIMESTAMP_IN_CLIENT = "hbase.htable.auto.fill.timestamp.in.client"; + + /** + * use to specify whether to enable put optimization path. + * Default is true (enabled). + */ + public static final String HBASE_HTABLE_USE_PUT_OPTIMIZATION = "hbase.htable.use.put.optimization"; + /*-------------------------------------------------------------------------------------------------------------*/ /** @@ -187,4 +194,6 @@ public final class OHConstants { public static final int DEFAULT_SOCKET_TIMEOUT = 20000; // 20 seconds + public static final boolean HBASE_HTABLE_USE_PUT_OPTIMIZATION_DEFAULT = true; + } diff --git a/src/main/java/com/alipay/oceanbase/hbase/util/OHBaseFuncUtils.java b/src/main/java/com/alipay/oceanbase/hbase/util/OHBaseFuncUtils.java index 9205ff97..734b226f 100644 --- a/src/main/java/com/alipay/oceanbase/hbase/util/OHBaseFuncUtils.java +++ b/src/main/java/com/alipay/oceanbase/hbase/util/OHBaseFuncUtils.java @@ -48,7 +48,12 @@ public static byte[][] extractFamilyFromQualifier(byte[] qualifier) throws Excep return new byte[][] { family, newQualifier }; } - public static boolean isHBasePutPefSupport(ObTableClient tableClient) { + public static boolean isHBasePutPefSupport(ObTableClient tableClient, boolean enablePutOptimization) { + // If client-side optimization is disabled, return false directly + if (!enablePutOptimization) { + return false; + } + if (tableClient.isOdpMode()) { // server version support and distributed capacity is enabled and odp version support return ObGlobal.isHBasePutPerfSupport()