From 88da8e3ec683a2bcaea8ffb38191c19772f52aff Mon Sep 17 00:00:00 2001 From: stuBirdFly <1065492934@qq.com> Date: Mon, 10 Feb 2025 10:55:59 +0800 Subject: [PATCH] Fix the error caused by putting a KeyValue with a delete type --- .../com/alipay/oceanbase/hbase/OHTable.java | 131 +++++++----------- .../oceanbase/hbase/HTableTestBase.java | 46 ++++++ .../hbase/util/ObHTableTestUtil.java | 44 +++--- 3 files changed, 121 insertions(+), 100 deletions(-) diff --git a/src/main/java/com/alipay/oceanbase/hbase/OHTable.java b/src/main/java/com/alipay/oceanbase/hbase/OHTable.java index a465aec2..3851012c 100644 --- a/src/main/java/com/alipay/oceanbase/hbase/OHTable.java +++ b/src/main/java/com/alipay/oceanbase/hbase/OHTable.java @@ -1966,24 +1966,23 @@ private ObTableQuery buildObTableQuery(final Get get, Collection columnQ public static ObTableBatchOperation buildObTableBatchOperation(List rowList, List qualifiers) { ObTableBatchOperation batch = new ObTableBatchOperation(); - OHOpType opType; + ObTableOperationType opType; Map indexMap = new HashMap<>(); for (Mutation row : rowList) { if (row instanceof Put) { - opType = OHOpType.Put; + opType = INSERT_OR_UPDATE; } else if (row instanceof Delete) { - opType = OHOpType.Delete; + opType = DEL; } else if (row instanceof Increment) { - opType = OHOpType.Increment; + opType = INCREMENT; } else if (row instanceof Append) { - opType = OHOpType.Append; + opType = APPEND; } else { throw new FeatureNotSupportedException("not supported other type"); } Set>> familyCellMap = row.getFamilyMap().entrySet(); - for (Map.Entry> familyWithCells : familyCellMap) { - if (opType == OHOpType.Increment || opType == OHOpType.Append) { + if (opType == INCREMENT || opType == APPEND) { indexMap.clear(); for (int i = 0; i < familyWithCells.getValue().size(); i++) { Cell cell = familyWithCells.getValue().get(i); @@ -1992,15 +1991,17 @@ public static ObTableBatchOperation buildObTableBatchOperation(List ro } for (Map.Entry entry : indexMap.entrySet()) { qualifiers.add(entry.getKey().getBytes()); - batch.addTableOperation(buildObTableOperation(familyWithCells.getValue().get(entry.getValue()), opType, row.getTTL())); + batch.addTableOperation(buildObTableOperation( + familyWithCells.getValue().get(entry.getValue()), opType, + row.getTTL())); } } else { for (KeyValue cell : familyWithCells.getValue()) { - batch.addTableOperation(buildObTableOperation(cell, opType, row.getTTL())); + batch.addTableOperation( + buildObTableOperation(cell, opType, row.getTTL())); } } } - } batch.setSamePropertiesNames(true); return batch; @@ -2010,10 +2011,8 @@ private com.alipay.oceanbase.rpc.mutation.Mutation buildMutation(KeyValue kv, ObTableOperationType operationType, boolean isTableGroup, Long TTL) { KeyValue.Type kvType = KeyValue.Type.codeToType(kv.getType()); - switch (operationType) { - case INSERT_OR_UPDATE: - case APPEND: - case INCREMENT: + switch (kvType) { + case Put: String[] property_columns = V_COLUMNS; Object[] property = new Object[] { CellUtil.cloneValue(kv) }; if (TTL != Long.MAX_VALUE) { @@ -2024,36 +2023,29 @@ private com.alipay.oceanbase.rpc.mutation.Mutation buildMutation(KeyValue kv, ROW_KEY_COLUMNS, new Object[] { kv.getRow(), kv.getQualifier(), kv.getTimestamp() }, property_columns, property); - case DEL: - switch (kvType) { - case Delete: - return com.alipay.oceanbase.rpc.mutation.Mutation.getInstance(DEL, - ROW_KEY_COLUMNS, - new Object[] { kv.getRow(), kv.getQualifier(), kv.getTimestamp() }, - null, null); - case Maximum: - return com.alipay.oceanbase.rpc.mutation.Mutation.getInstance(DEL, - ROW_KEY_COLUMNS, - new Object[] { kv.getRow(), null, -kv.getTimestamp() }, null, null); - case DeleteColumn: - return com.alipay.oceanbase.rpc.mutation.Mutation.getInstance(DEL, - ROW_KEY_COLUMNS, - new Object[] { kv.getRow(), kv.getQualifier(), -kv.getTimestamp() }, - null, null); - case DeleteFamily: - return com.alipay.oceanbase.rpc.mutation.Mutation.getInstance(DEL, - ROW_KEY_COLUMNS, - new Object[] { kv.getRow(), isTableGroup ? kv.getQualifier() : null, - -kv.getTimestamp() }, null, null); - case DeleteFamilyVersion: - return com.alipay.oceanbase.rpc.mutation.Mutation.getInstance( - DEL, - ROW_KEY_COLUMNS, - new Object[] { kv.getRow(), isTableGroup ? kv.getQualifier() : null, - kv.getTimestamp() }, null, null); - default: - throw new IllegalArgumentException("illegal mutation type " + kvType); - } + case Delete: + return com.alipay.oceanbase.rpc.mutation.Mutation.getInstance(DEL, ROW_KEY_COLUMNS, + new Object[] { kv.getRow(), kv.getQualifier(), kv.getTimestamp() }, null, null); + case Maximum: + return com.alipay.oceanbase.rpc.mutation.Mutation.getInstance(DEL, ROW_KEY_COLUMNS, + new Object[] { kv.getRow(), null, -kv.getTimestamp() }, null, null); + case DeleteColumn: + return com.alipay.oceanbase.rpc.mutation.Mutation + .getInstance(DEL, ROW_KEY_COLUMNS, + new Object[] { kv.getRow(), kv.getQualifier(), -kv.getTimestamp() }, null, + null); + case DeleteFamily: + return com.alipay.oceanbase.rpc.mutation.Mutation.getInstance( + DEL, + ROW_KEY_COLUMNS, + new Object[] { kv.getRow(), isTableGroup ? kv.getQualifier() : null, + -kv.getTimestamp() }, null, null); + case DeleteFamilyVersion: + return com.alipay.oceanbase.rpc.mutation.Mutation.getInstance( + DEL, + ROW_KEY_COLUMNS, + new Object[] { kv.getRow(), isTableGroup ? kv.getQualifier() : null, + kv.getTimestamp() }, null, null); default: throw new IllegalArgumentException("illegal mutation type " + operationType); } @@ -2178,48 +2170,35 @@ private BatchOperation buildBatchOperation(String tableName, List return batch; } - public static ObTableOperation buildObTableOperation(KeyValue kv, OHOpType operationType, + public static ObTableOperation buildObTableOperation(KeyValue kv, + ObTableOperationType operationType, Long TTL) { + KeyValue.Type kvType = KeyValue.Type.codeToType(kv.getType()); String[] property_columns = V_COLUMNS; Object[] property = new Object[] { CellUtil.cloneValue(kv) }; if (TTL != Long.MAX_VALUE) { property_columns = PROPERTY_COLUMNS; property = new Object[] { CellUtil.cloneValue(kv), TTL }; } - switch (operationType) { + switch (kvType) { case Put: - case Increment: - case Append: - ObTableOperationType type; - if (operationType == OHOpType.Put) { - type = INSERT_OR_UPDATE; - } else if (operationType == OHOpType.Increment) { - type = INCREMENT; - } else { - type = APPEND; - } return getInstance( - type, + operationType, new Object[] { CellUtil.cloneRow(kv), CellUtil.cloneQualifier(kv), kv.getTimestamp() }, property_columns, property); case Delete: - KeyValue.Type delType = KeyValue.Type.codeToType(kv.getTypeByte()); - if (delType == KeyValue.Type.Delete) { - return getInstance( - DEL, - new Object[] { CellUtil.cloneRow(kv), CellUtil.cloneQualifier(kv), - kv.getTimestamp() }, null, null); - } else if (delType == KeyValue.Type.DeleteColumn) { - return getInstance( - DEL, - new Object[] { CellUtil.cloneRow(kv), CellUtil.cloneQualifier(kv), - -kv.getTimestamp() }, null, null); - } else if (delType == KeyValue.Type.DeleteFamily) { - return getInstance(DEL, new Object[] { kv.getRow(), null, -kv.getTimestamp() }, - null, null); - } else { - throw new IllegalArgumentException("illegal delete type " + operationType); - } + return getInstance( + DEL, + new Object[] { CellUtil.cloneRow(kv), CellUtil.cloneQualifier(kv), + kv.getTimestamp() }, null, null); + case DeleteColumn: + return getInstance( + DEL, + new Object[] { CellUtil.cloneRow(kv), CellUtil.cloneQualifier(kv), + -kv.getTimestamp() }, null, null); + case DeleteFamily: + return getInstance(DEL, new Object[] { kv.getRow(), null, -kv.getTimestamp() }, + null, null); default: throw new IllegalArgumentException("illegal mutation type " + operationType); } @@ -2331,8 +2310,4 @@ public byte[][] getEndKeys() throws IOException { public Pair getStartEndKeys() throws IOException { return new Pair<>(getStartKeys(), getEndKeys()); } - - public enum OHOpType { - Put, Append, Delete, Increment - } } \ No newline at end of file diff --git a/src/test/java/com/alipay/oceanbase/hbase/HTableTestBase.java b/src/test/java/com/alipay/oceanbase/hbase/HTableTestBase.java index f732ea9b..ec197fce 100644 --- a/src/test/java/com/alipay/oceanbase/hbase/HTableTestBase.java +++ b/src/test/java/com/alipay/oceanbase/hbase/HTableTestBase.java @@ -5108,6 +5108,52 @@ public void testAppend() throws IOException { } } + @Test + public void testHbasePutDeleteCell() throws Exception { + final byte[] rowKey = Bytes.toBytes("12345"); + final byte[] family = Bytes.toBytes("family1"); + + Put put = new Put(rowKey); + put.add(family, Bytes.toBytes("A"), Bytes.toBytes("a")); + put.add(family, Bytes.toBytes("B"), Bytes.toBytes("b")); + put.add(family, Bytes.toBytes("C"), Bytes.toBytes("c")); + put.add(family, Bytes.toBytes("D"), Bytes.toBytes("d")); + hTable.put(put); + // get row back and assert the values + Get get = new Get(rowKey); + get.addFamily(family); + Result result = hTable.get(get); + assertTrue("Column A value should be a", + Bytes.toString(result.getValue(family, Bytes.toBytes("A"))).equals("a")); + assertTrue("Column B value should be b", + Bytes.toString(result.getValue(family, Bytes.toBytes("B"))).equals("b")); + assertTrue("Column C value should be c", + Bytes.toString(result.getValue(family, Bytes.toBytes("C"))).equals("c")); + assertTrue("Column D value should be d", + Bytes.toString(result.getValue(family, Bytes.toBytes("D"))).equals("d")); + // put the same row again with C column deleted + put = new Put(rowKey); + put.add(family, Bytes.toBytes("A"), Bytes.toBytes("a1")); + put.add(family, Bytes.toBytes("B"), Bytes.toBytes("b1")); + KeyValue marker = new KeyValue(rowKey, family, Bytes.toBytes("C"), + HConstants.LATEST_TIMESTAMP, KeyValue.Type.DeleteColumn); + put.add(family, Bytes.toBytes("D"), Bytes.toBytes("d1")); + put.add(marker); + hTable.put(put); + // get row back and assert the values + get = new Get(rowKey); + get.addFamily(family); + result = hTable.get(get); + assertTrue("Column A value should be a1", + Bytes.toString(result.getValue(family, Bytes.toBytes("A"))).equals("a1")); + assertTrue("Column B value should be b1", + Bytes.toString(result.getValue(family, Bytes.toBytes("B"))).equals("b1")); + System.out.println(result.getValue(family, Bytes.toBytes("C"))); + assertTrue("Column C should not exist", result.getValue(family, Bytes.toBytes("C")) == null); + assertTrue("Column D value should be d1", + Bytes.toString(result.getValue(family, Bytes.toBytes("D"))).equals("d1")); + } + @Test public void testCellTTL() throws Exception { String key1 = "key1"; diff --git a/src/test/java/com/alipay/oceanbase/hbase/util/ObHTableTestUtil.java b/src/test/java/com/alipay/oceanbase/hbase/util/ObHTableTestUtil.java index 41e68bb2..44583f74 100644 --- a/src/test/java/com/alipay/oceanbase/hbase/util/ObHTableTestUtil.java +++ b/src/test/java/com/alipay/oceanbase/hbase/util/ObHTableTestUtil.java @@ -70,7 +70,7 @@ public class ObHTableTestUtil { public static void prepareClean(List tableGroupList) throws Exception { for (String tableGroup : tableGroupList) { - tableNameList.addAll(getOTableNameList(tableGroup)); + tableNameList.addAll(getOHTableNameList(tableGroup)); } } @@ -120,30 +120,30 @@ public static OHTableClient newOHTableClient(String tableName) { return new OHTableClient(tableName, newConfiguration()); } - static public List getOTableNameList(String tableGroup) throws IOException { - // 读取建表语句 - List res = new LinkedList<>(); - String sql = new String(Files.readAllBytes(Paths.get(NativeHBaseUtil.SQL_PATH))); - String[] sqlList = sql.split(";"); - Map tableMap = new LinkedHashMap<>(); - for (String singleSql : sqlList) { - String realTableName; - if (singleSql.contains("CREATE TABLE ")) { - singleSql.trim(); - String[] splits = singleSql.split(" "); - String tableGroupName = splits[2].substring(1, splits[2].length() - 1); - if (tableGroupName.contains(":")) { - String[] tmpStr = tableGroupName.split(":", 2); - tableGroupName = tmpStr[1]; - } - realTableName = tableGroupName.split("\\$", 2)[0]; - if (realTableName.equals(tableGroup)) { - res.add(tableGroupName); - } + static public List getOHTableNameList(String tableGroup) throws IOException { + // 读取建表语句 + List res = new LinkedList<>(); + String sql = new String(Files.readAllBytes(Paths.get(NativeHBaseUtil.SQL_PATH))); + String[] sqlList = sql.split(";"); + Map tableMap = new LinkedHashMap<>(); + for (String singleSql : sqlList) { + String realTableName; + if (singleSql.contains("CREATE TABLE ")) { + singleSql.trim(); + String[] splits = singleSql.split(" "); + String tableGroupName = splits[2].substring(1, splits[2].length() - 1); + if (tableGroupName.contains(":")) { + String[] tmpStr = tableGroupName.split(":", 2); + tableGroupName = tmpStr[1]; + } + realTableName = tableGroupName.split("\\$", 2)[0]; + if (realTableName.equals(tableGroup)) { + res.add(tableGroupName); } } - return res; } + return res; + } static public Connection getConnection() { try {