From fc2db4b5b299a329e5e0e47dc8d7c94c66d4621c Mon Sep 17 00:00:00 2001 From: stuBirdFly <1065492934@qq.com> Date: Thu, 12 Dec 2024 21:55:15 +0800 Subject: [PATCH 1/2] hbase support cell TTL patch hbase2.x cell ttl to hbase1.x --- .../com/alipay/oceanbase/hbase/OHTable.java | 250 ++++++++++-------- .../hbase/constants/OHConstants.java | 12 +- .../oceanbase/hbase/HTableTestBase.java | 245 ++++++++++++++++- .../alipay/oceanbase/hbase/OHTableTest.java | 2 +- src/test/java/unit_test_db.sql | 9 + 5 files changed, 389 insertions(+), 129 deletions(-) diff --git a/src/main/java/com/alipay/oceanbase/hbase/OHTable.java b/src/main/java/com/alipay/oceanbase/hbase/OHTable.java index f0ae53e6..ab54527f 100644 --- a/src/main/java/com/alipay/oceanbase/hbase/OHTable.java +++ b/src/main/java/com/alipay/oceanbase/hbase/OHTable.java @@ -46,11 +46,7 @@ import com.alipay.oceanbase.rpc.table.ObKVParams; import com.alipay.oceanbase.rpc.table.ObTableClientQueryImpl; -import com.google.protobuf.Descriptors; -import com.google.protobuf.Message; -import com.google.protobuf.Service; -import com.google.protobuf.ServiceException; -import jdk.nashorn.internal.objects.Global; +import com.google.protobuf.*; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.*; @@ -397,8 +393,7 @@ private void finishSetUp() { } private OHConnectionConfiguration setUserDefinedNamespace(String tableNameString, - OHConnectionConfiguration ohConnectionConf) - throws IOException { + OHConnectionConfiguration ohConnectionConf) { if (tableNameString.indexOf(':') != -1) { String[] params = tableNameString.split(":"); if (params.length != 2) { @@ -731,14 +726,13 @@ private List generateGetResult(ObTableSingleOpResult getResult) throws IOE return cells; } - private String getTargetTableName(List actions) { byte[] family = null; for (Row action : actions) { if (action instanceof RowMutations || action instanceof RegionCoprocessorServiceExec) { throw new FeatureNotSupportedException("not supported yet'"); } else { - Set familySet = null; + Set familySet; if (action instanceof Get) { Get get = (Get) action; familySet = get.familySet(); @@ -837,7 +831,7 @@ && compareByteArray(kv.getRow(), (byte[]) row.get(0).getValue()) != 0) { familyAndQualifier[0], // family familyAndQualifier[1], // qualifiermat (Long) row.get(2).getValue(), // T - (byte[]) row.get(3).getValue()// V + (byte[]) row.get(3).getValue() // V ); keyValueList.add(kv); } @@ -875,7 +869,7 @@ private void processColumnFilters(NavigableSet columnFilters, @Override public Result get(final Get get) throws IOException { - if (get.getFamilyMap().keySet() == null || get.getFamilyMap().keySet().isEmpty()) { + if (get.getFamilyMap().keySet().isEmpty()) { // check nothing, use table group; } else { checkFamilyViolation(get.getFamilyMap().keySet(), false); @@ -888,8 +882,7 @@ public Result call() throws IOException { byte[] family = new byte[] {}; ObTableQuery obTableQuery; try { - if (get.getFamilyMap().keySet() == null - || get.getFamilyMap().keySet().isEmpty() + if (get.getFamilyMap().keySet().isEmpty() || get.getFamilyMap().size() > 1) { // In a Get operation where the family map is greater than 1 or equal to 0, // we handle this by appending the column family to the qualifier on the client side. @@ -999,8 +992,7 @@ public ResultScanner call() throws IOException { ObTableQuery obTableQuery; ObHTableFilter filter; try { - if (scan.getFamilyMap().keySet() == null - || scan.getFamilyMap().keySet().isEmpty() + if (scan.getFamilyMap().keySet().isEmpty() || scan.getFamilyMap().size() > 1) { // In a Scan operation where the family map is greater than 1 or equal to 0, // we handle this by appending the column family to the qualifier on the client side. @@ -1081,8 +1073,7 @@ public List call() throws IOException { ObTableQuery obTableQuery; ObHTableFilter filter; try { - if (scan.getFamilyMap().keySet() == null - || scan.getFamilyMap().keySet().isEmpty() + if (scan.getFamilyMap().keySet().isEmpty() || scan.getFamilyMap().size() > 1) { // In a Scan operation where the family map is greater than 1 or equal to 0, // we handle this by appending the column family to the qualifier on the client side. @@ -1336,39 +1327,35 @@ public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier, } } - private boolean checkAndMutation(byte[] row, byte[] family, byte[] qualifier, CompareFilter.CompareOp compareOp, byte[] value, + private boolean checkAndMutation(byte[] row, byte[] family, byte[] qualifier, + CompareFilter.CompareOp compareOp, byte[] value, RowMutations rowMutations) throws Exception { - checkArgument(row != null, "row is null"); - checkArgument(isNotBlank(Bytes.toString(family)), "family is blank"); - checkArgument(Bytes.equals(row, rowMutations.getRow()), - "mutation row is not equal check row"); - - checkArgument(!rowMutations.getMutations().isEmpty(), "mutation is empty"); - - byte[] filterString = buildCheckAndMutateFilterString(family, qualifier, compareOp, value); - - ObHTableFilter filter = buildObHTableFilter(filterString, null, 1, qualifier); - List mutations = rowMutations.getMutations(); - List keyValueList = new LinkedList<>(); - // only one family operation is allowed - for (Mutation mutation : mutations) { - checkFamilyViolationForOneFamily(mutation.getFamilyMap().keySet()); - checkArgument(Arrays.equals(family, mutation.getFamilyMap().firstEntry().getKey()), - "mutation family is not equal check family"); - // Support for multiple families in the future - for (Map.Entry> entry : mutation.getFamilyMap().entrySet()) { - keyValueList.addAll(entry.getValue()); - } + checkArgument(row != null, "row is null"); + checkArgument(isNotBlank(Bytes.toString(family)), "family is blank"); + checkArgument(Bytes.equals(row, rowMutations.getRow()), + "mutation row is not equal check row"); + checkArgument(!rowMutations.getMutations().isEmpty(), "mutation is empty"); + List mutations = rowMutations.getMutations(); + // only one family operation is allowed + for (Mutation mutation : mutations) { + if (!(mutation instanceof Put || mutation instanceof Delete)) { + throw new DoNotRetryIOException("RowMutations supports only put and delete, not " + + mutation.getClass().getName()); } - ObTableQuery obTableQuery = buildObTableQuery(filter, row, true, row, true, false); - - ObTableBatchOperation batch = buildObTableBatchOperation(keyValueList, false, null); + checkFamilyViolationForOneFamily(mutation.getFamilyCellMap().keySet()); + checkArgument(Arrays.equals(family, mutation.getFamilyCellMap().firstEntry().getKey()), + "mutation family is not equal check family"); + } + byte[] filterString = buildCheckAndMutateFilterString(family, qualifier, compareOp, value); + ObHTableFilter filter = buildObHTableFilter(filterString, null, 1, qualifier); + ObTableQuery obTableQuery = buildObTableQuery(filter, row, true, row, true, false); + ObTableBatchOperation batch = buildObTableBatchOperation(mutations, null); - ObTableQueryAndMutateRequest request = buildObTableQueryAndMutateRequest(obTableQuery, - batch, getTargetTableName(tableNameString, Bytes.toString(family), configuration)); - ObTableQueryAndMutateResult result = (ObTableQueryAndMutateResult) obTableClient - .execute(request); - return result.getAffectedRows() > 0; + ObTableQueryAndMutateRequest request = buildObTableQueryAndMutateRequest(obTableQuery, + batch, getTargetTableName(tableNameString, Bytes.toString(family), configuration)); + ObTableQueryAndMutateResult result = (ObTableQueryAndMutateResult) obTableClient + .execute(request); + return result.getAffectedRows() > 0; } @Override @@ -1385,17 +1372,17 @@ public void mutateRow(RowMutations rm) { */ @Override public Result append(Append append) throws IOException { - - checkFamilyViolationForOneFamily(append.getFamilyMap().keySet()); - checkArgument(!append.isEmpty(), "append is empty."); + checkArgument(!append.isEmpty(), "Invalid arguments to %s, zero columns specified", + append.toString()); + checkFamilyViolationForOneFamily(append.getFamilyCellMap().keySet()); try { byte[] r = append.getRow(); Map.Entry> entry = append.getFamilyMap().entrySet().iterator() .next(); byte[] f = entry.getKey(); List qualifiers = new ArrayList(); - ObTableBatchOperation batchOperation = buildObTableBatchOperation(entry.getValue(), - true, qualifiers); + ObTableBatchOperation batchOperation = buildObTableBatchOperation( + Collections.singletonList(append), qualifiers); // the later hbase has supported timeRange ObHTableFilter filter = buildObHTableFilter(null, null, 1, qualifiers); ObTableQuery obTableQuery = buildObTableQuery(filter, r, true, r, true, false); @@ -1435,33 +1422,22 @@ public Result append(Append append) throws IOException { */ @Override public Result increment(Increment increment) throws IOException { - - checkFamilyViolationForOneFamily(increment.getFamilyMap().keySet()); + checkArgument(!increment.isEmpty(), "Invalid arguments to %s, zero columns specified", increment.toString()); + checkFamilyViolationForOneFamily(increment.getFamilyCellMap().keySet()); try { - List qualifiers = new ArrayList(); - byte[] rowKey = increment.getRow(); Map.Entry> entry = increment.getFamilyCellMap().entrySet() .iterator().next(); byte[] f = entry.getKey(); - - ObTableBatchOperation batch = new ObTableBatchOperation(); - for (Cell cell : entry.getValue()) { - byte[] qualifier = cell.getQualifier(); - qualifiers.add(qualifier); - batch.addTableOperation(getInstance(INCREMENT, new Object[] { rowKey, qualifier, - Long.MAX_VALUE }, V_COLUMNS, new Object[] { cell.getValue() })); - } + List qualifiers = new ArrayList<>(); + ObTableBatchOperation batch = buildObTableBatchOperation(Collections.singletonList(increment), qualifiers); ObHTableFilter filter = buildObHTableFilter(null, increment.getTimeRange(), 1, qualifiers); ObTableQuery obTableQuery = buildObTableQuery(filter, rowKey, true, rowKey, true, false); - ObTableQueryAndMutate queryAndMutate = new ObTableQueryAndMutate(); - queryAndMutate.setMutations(batch); - queryAndMutate.setTableQuery(obTableQuery); ObTableQueryAndMutateRequest request = buildObTableQueryAndMutateRequest(obTableQuery, batch, getTargetTableName(tableNameString, Bytes.toString(f), configuration)); @@ -1938,9 +1914,6 @@ private ObTableQuery buildObTableQuery(ObHTableFilter filter, byte[] start, } obTableQuery.setIndexName("PRIMARY"); obTableQuery.sethTableFilter(filter); - for (String column : ALL_COLUMNS) { - obTableQuery.addSelectColumn(column); - } obTableQuery.addKeyRange(obNewRange); return obTableQuery; } @@ -1994,32 +1967,67 @@ private ObTableQuery buildObTableQuery(final Get get, Collection columnQ return obTableQuery; } - public static ObTableBatchOperation buildObTableBatchOperation(List keyValueList, - boolean putToAppend, + public static ObTableBatchOperation buildObTableBatchOperation(List rowList, List qualifiers) { ObTableBatchOperation batch = new ObTableBatchOperation(); - for (KeyValue kv : keyValueList) { - if (qualifiers != null) { - qualifiers.add(kv.getQualifier()); + OHOpType opType; + Map indexMap = new HashMap<>(); + for (Mutation row : rowList) { + if (row instanceof Put) { + opType = OHOpType.Put; + } else if (row instanceof Delete) { + opType = OHOpType.Delete; + } else if (row instanceof Increment) { + opType = OHOpType.Increment; + } else if (row instanceof Append) { + opType = OHOpType.Append; + } else { + throw new FeatureNotSupportedException("not supported other type"); } - batch.addTableOperation(buildObTableOperation(kv, putToAppend)); + Set>> familyCellMap = row.getFamilyMap().entrySet(); + + for (Map.Entry> familyWithCells : familyCellMap) { + if (opType == OHOpType.Increment || opType == OHOpType.Append) { + indexMap.clear(); + for (int i = 0; i < familyWithCells.getValue().size(); i++) { + Cell cell = familyWithCells.getValue().get(i); + String qualifier = Bytes.toString(CellUtil.cloneQualifier(cell)); + indexMap.put(qualifier, i); + } + for (Map.Entry entry : indexMap.entrySet()) { + qualifiers.add(entry.getKey().getBytes()); + batch.addTableOperation(buildObTableOperation(familyWithCells.getValue().get(entry.getValue()), opType, row.getTTL())); + } + } else { + for (KeyValue cell : familyWithCells.getValue()) { + batch.addTableOperation(buildObTableOperation(cell, opType, row.getTTL())); + } + } + } + } - batch.setSameType(true); batch.setSamePropertiesNames(true); return batch; } private com.alipay.oceanbase.rpc.mutation.Mutation buildMutation(KeyValue kv, ObTableOperationType operationType, - boolean isTableGroup) { + boolean isTableGroup, Long TTL) { KeyValue.Type kvType = KeyValue.Type.codeToType(kv.getType()); switch (operationType) { case INSERT_OR_UPDATE: case APPEND: + case INCREMENT: + String[] property_columns = V_COLUMNS; + Object[] property = new Object[] { CellUtil.cloneValue(kv) }; + if (TTL != Long.MAX_VALUE) { + property_columns = PROPERTY_COLUMNS; + property = new Object[] { CellUtil.cloneValue(kv), TTL }; + } return com.alipay.oceanbase.rpc.mutation.Mutation.getInstance(operationType, ROW_KEY_COLUMNS, - new Object[] { kv.getRow(), kv.getQualifier(), kv.getTimestamp() }, V_COLUMNS, - new Object[] { kv.getValue() }); + new Object[] { kv.getRow(), kv.getQualifier(), kv.getTimestamp() }, + property_columns, property); case DEL: switch (kvType) { case Delete: @@ -2134,9 +2142,9 @@ private BatchOperation buildBatchOperation(String tableName, List (Bytes.toString(family) + "." + Bytes.toString(kv.getQualifier())) .getBytes()); batch - .addOperation(buildMutation(new_kv, INSERT_OR_UPDATE, isTableGroup)); + .addOperation(buildMutation(new_kv, INSERT_OR_UPDATE, true, put.getTTL())); } else { - batch.addOperation(buildMutation(kv, INSERT_OR_UPDATE, isTableGroup)); + batch.addOperation(buildMutation(kv, INSERT_OR_UPDATE, false, put.getTTL())); } } } @@ -2146,7 +2154,7 @@ private BatchOperation buildBatchOperation(String tableName, List singleOpResultNum++; KeyValue kv = new KeyValue(delete.getRow(), delete.getTimeStamp(), KeyValue.Type.Maximum); - batch.addOperation(buildMutation(kv, DEL, isTableGroup)); + batch.addOperation(buildMutation(kv, DEL, isTableGroup, Long.MAX_VALUE)); } else { for (Map.Entry> entry : delete.getFamilyMap().entrySet()) { byte[] family = entry.getKey(); @@ -2157,9 +2165,9 @@ private BatchOperation buildBatchOperation(String tableName, List KeyValue new_kv = modifyQualifier(kv, (Bytes.toString(family) + "." + Bytes.toString(kv .getQualifier())).getBytes()); - batch.addOperation(buildMutation(new_kv, DEL, true)); + batch.addOperation(buildMutation(new_kv, DEL, true, Long.MAX_VALUE)); } else { - batch.addOperation(buildMutation(kv, DEL, false)); + batch.addOperation(buildMutation(kv, DEL, false, Long.MAX_VALUE)); } } } @@ -2174,30 +2182,50 @@ private BatchOperation buildBatchOperation(String tableName, List return batch; } - public static ObTableOperation buildObTableOperation(KeyValue kv, boolean putToAppend) { - KeyValue.Type kvType = KeyValue.Type.codeToType(kv.getType()); - switch (kvType) { + public static ObTableOperation buildObTableOperation(KeyValue kv, OHOpType operationType, + Long TTL) { + String[] property_columns = V_COLUMNS; + Object[] property = new Object[] { CellUtil.cloneValue(kv) }; + if (TTL != Long.MAX_VALUE) { + property_columns = PROPERTY_COLUMNS; + property = new Object[] { CellUtil.cloneValue(kv), TTL }; + } + switch (operationType) { case Put: - ObTableOperationType operationType; - if (putToAppend) { - operationType = APPEND; + case Increment: + case Append: + ObTableOperationType type; + if (operationType == OHOpType.Put) { + type = INSERT_OR_UPDATE; + } else if (operationType == OHOpType.Increment) { + type = INCREMENT; } else { - operationType = INSERT_OR_UPDATE; + type = APPEND; } - return getInstance(operationType, - new Object[] { kv.getRow(), kv.getQualifier(), kv.getTimestamp() }, V_COLUMNS, - new Object[] { kv.getValue() }); + return getInstance( + type, + new Object[] { CellUtil.cloneRow(kv), CellUtil.cloneQualifier(kv), + kv.getTimestamp() }, property_columns, property); case Delete: - return getInstance(DEL, - new Object[] { kv.getRow(), kv.getQualifier(), kv.getTimestamp() }, null, null); - case DeleteColumn: - return getInstance(DEL, - new Object[] { kv.getRow(), kv.getQualifier(), -kv.getTimestamp() }, null, null); - case DeleteFamily: - return getInstance(DEL, new Object[] { kv.getRow(), null, -kv.getTimestamp() }, - null, null); + KeyValue.Type delType = KeyValue.Type.codeToType(kv.getTypeByte()); + if (delType == KeyValue.Type.Delete) { + return getInstance( + DEL, + new Object[] { CellUtil.cloneRow(kv), CellUtil.cloneQualifier(kv), + kv.getTimestamp() }, null, null); + } else if (delType == KeyValue.Type.DeleteColumn) { + return getInstance( + DEL, + new Object[] { CellUtil.cloneRow(kv), CellUtil.cloneQualifier(kv), + -kv.getTimestamp() }, null, null); + } else if (delType == KeyValue.Type.DeleteFamily) { + return getInstance(DEL, new Object[] { kv.getRow(), null, -kv.getTimestamp() }, + null, null); + } else { + throw new IllegalArgumentException("illegal delete type " + operationType); + } default: - throw new IllegalArgumentException("illegal mutation type " + kvType); + throw new IllegalArgumentException("illegal mutation type " + operationType); } } @@ -2223,16 +2251,6 @@ private ObTableQueryAsyncRequest buildObTableQueryAsyncRequest(ObTableQuery obTa return asyncRequest; } - public static ObTableBatchOperationRequest buildObTableBatchOperationRequest(ObTableBatchOperation obTableBatchOperation, - String targetTableName) { - ObTableBatchOperationRequest request = new ObTableBatchOperationRequest(); - request.setTableName(targetTableName); - request.setReturningAffectedRows(true); - request.setEntityType(ObTableEntityType.HKV); - request.setBatchOperation(obTableBatchOperation); - return request; - } - private ObTableQueryAndMutateRequest buildObTableQueryAndMutateRequest(ObTableQuery obTableQuery, ObTableBatchOperation obTableBatchOperation, String targetTableName) { @@ -2317,4 +2335,8 @@ public byte[][] getEndKeys() throws IOException { public Pair getStartEndKeys() throws IOException { return new Pair<>(getStartKeys(), getEndKeys()); } + + public enum OHOpType { + Put, Append, Delete, Increment + } } \ No newline at end of file diff --git a/src/main/java/com/alipay/oceanbase/hbase/constants/OHConstants.java b/src/main/java/com/alipay/oceanbase/hbase/constants/OHConstants.java index e10ef0b1..e3a22b2a 100644 --- a/src/main/java/com/alipay/oceanbase/hbase/constants/OHConstants.java +++ b/src/main/java/com/alipay/oceanbase/hbase/constants/OHConstants.java @@ -72,16 +72,6 @@ public final class OHConstants { */ public static final String HBASE_OCEANBASE_DATABASE = "hbase.oceanbase.database"; - /** - * ocenbase hbase model is consist of following columns - * K hbase row key - * Q hbase qualifier - * T hbase timeStamp - * V hbase value - */ - public static final String[] ALL_COLUMNS = new String[] { "K", - "Q", "T", "V" }; - /** * ocenbase hbase model rowkey column is consist of following column * K, Q, T hbase value @@ -94,6 +84,8 @@ public final class OHConstants { * V hbase value */ public static final String[] V_COLUMNS = new String[] { "V" }; + public static final String[] PROPERTY_COLUMNS = new String[] { "V", + "TTL" }; public static final String HBASE_HTABLE_POOL_SEPERATOR = "$"; diff --git a/src/test/java/com/alipay/oceanbase/hbase/HTableTestBase.java b/src/test/java/com/alipay/oceanbase/hbase/HTableTestBase.java index 45f44797..f732ea9b 100644 --- a/src/test/java/com/alipay/oceanbase/hbase/HTableTestBase.java +++ b/src/test/java/com/alipay/oceanbase/hbase/HTableTestBase.java @@ -5108,6 +5108,243 @@ public void testAppend() throws IOException { } } + @Test + public void testCellTTL() throws Exception { + String key1 = "key1"; + String column1 = "cf1"; + String column2 = "cf2"; + String column3 = "cf3"; + String family = "cellTTLFamily"; + String value1 = "v1"; + String value2 = "v2"; + String app = "app"; + + Result r; + Put put1 = new Put(key1.getBytes()); + put1.addColumn(family.getBytes(), column1.getBytes(), toBytes(11L)); + put1.setTTL(5000); + Put put2 = new Put(key1.getBytes()); + put2.addColumn(family.getBytes(), column1.getBytes(), toBytes(22L)); + put2.addColumn(family.getBytes(), column2.getBytes(), toBytes(33L)); + put2.setTTL(10000); + Put put3 = new Put(key1.getBytes()); + put3.addColumn(family.getBytes(), column1.getBytes(), toBytes(11L)); + put3.setTTL(-3000); + Put put4 = new Put(key1.getBytes()); + put4.addColumn(family.getBytes(), column1.getBytes(), toBytes(11L)); + put4.setTTL(0); + Put errorPut = new Put(key1.getBytes()); + errorPut.addColumn("family1".getBytes(), column1.getBytes(), toBytes(11L)); + errorPut.setTTL(10); + + Get get = new Get(key1.getBytes()); + get.addFamily(family.getBytes()); + get.setMaxVersions(10); + try { + tryPut(hTable, errorPut); + } catch (Exception e) { + assertTrue(e.getCause().toString().contains("Unknown column 'TTL'")); + } + // test put and get + tryPut(hTable, put1); + tryPut(hTable, put2); + tryPut(hTable, put3); + tryPut(hTable, put4); + r = hTable.get(get); + assertEquals(3, r.size()); + Thread.sleep(5000); + r = hTable.get(get); + assertEquals(2, r.size()); + Thread.sleep(5000); + r = hTable.get(get); + assertEquals(0, r.size()); + + // test increment + tryPut(hTable, put1); + tryPut(hTable, put2); + Thread.sleep(1000); + Increment increment = new Increment(key1.getBytes()); + increment.addColumn(family.getBytes(), column1.getBytes(), 1L); + increment.addColumn(family.getBytes(), column2.getBytes(), 2L); + increment.addColumn(family.getBytes(), column3.getBytes(), 5L); + increment.setTTL(-5000); + hTable.increment(increment); + increment.setTTL(5000); + hTable.increment(increment); + get.setMaxVersions(1); + r = hTable.get(get); + + assertEquals(3, r.size()); + assertEquals(23L, Bytes.toLong(CellUtil.cloneValue(r.getColumnCells(family.getBytes(), + column1.getBytes()).get(0)))); + assertEquals(35L, Bytes.toLong(CellUtil.cloneValue(r.getColumnCells(family.getBytes(), + column2.getBytes()).get(0)))); + assertEquals(5L, Bytes.toLong(CellUtil.cloneValue(r.getColumnCells(family.getBytes(), + column3.getBytes()).get(0)))); + + Thread.sleep(10000); + r = hTable.get(get); + assertEquals(0, r.size()); + + increment = new Increment(key1.getBytes()); + increment.addColumn(family.getBytes(), column1.getBytes(), 1L); + increment.addColumn(family.getBytes(), column2.getBytes(), 2L); + increment.setTTL(5000); + hTable.increment(increment); + r = hTable.get(get); + + assertEquals(2, r.size()); + assertEquals(1L, Bytes.toLong(CellUtil.cloneValue(r.getColumnCells(family.getBytes(), + column1.getBytes()).get(0)))); + assertEquals(2L, Bytes.toLong(CellUtil.cloneValue(r.getColumnCells(family.getBytes(), + column2.getBytes()).get(0)))); + + Thread.sleep(5000); + r = hTable.get(get); + assertEquals(0, r.size()); + + tryPut(hTable, put1); + tryPut(hTable, put2); + increment.addColumn(family.getBytes(), column1.getBytes(), 4L); + hTable.increment(increment); + + r = hTable.get(get); + assertEquals(2, r.size()); + assertEquals(26L, Bytes.toLong(CellUtil.cloneValue(r.getColumnCells(family.getBytes(), + column1.getBytes()).get(0)))); + assertEquals(35L, Bytes.toLong(CellUtil.cloneValue(r.getColumnCells(family.getBytes(), + column2.getBytes()).get(0)))); + + // test append + Thread.sleep(10000); + r = hTable.get(get); + assertEquals(0, r.size()); + + put3 = new Put(key1.getBytes()); + put3.addColumn(family.getBytes(), column1.getBytes(), toBytes(value1)); + put3.addColumn(family.getBytes(), column2.getBytes(), toBytes(value2)); + put3.setTTL(10000); + tryPut(hTable, put3); + + Append append = new Append(key1.getBytes()); + KeyValue kv = new KeyValue(key1.getBytes(), family.getBytes(), column1.getBytes(), + app.getBytes()); + append.add(kv); + append.setTTL(-3000); + hTable.append(append); + append.setTTL(3000); + hTable.append(append); + + r = hTable.get(get); + assertEquals(2, r.size()); + assertEquals( + value1 + app, + Bytes.toString(CellUtil.cloneValue(r.getColumnCells(family.getBytes(), + column1.getBytes()).get(0)))); + + Thread.sleep(3000); + r = hTable.get(get); + assertEquals(2, r.size()); + assertEquals( + value1, + Bytes.toString(CellUtil.cloneValue(r.getColumnCells(family.getBytes(), + column1.getBytes()).get(0)))); + + Thread.sleep(7000); + r = hTable.get(get); + assertEquals(0, r.size()); + + append.add(family.getBytes(), column1.getBytes(), app.getBytes()); + hTable.append(append); + r = hTable.get(get); + assertEquals(1, r.size()); + assertEquals( + app, + Bytes.toString(CellUtil.cloneValue(r.getColumnCells(family.getBytes(), + column1.getBytes()).get(0)))); + + Thread.sleep(3000); + append.add(family.getBytes(), column2.getBytes(), app.getBytes()); + hTable.append(append); + r = hTable.get(get); + assertEquals(2, r.size()); + assertEquals( + app, + Bytes.toString(CellUtil.cloneValue(r.getColumnCells(family.getBytes(), + column1.getBytes()).get(0)))); + assertEquals( + app, + Bytes.toString(CellUtil.cloneValue(r.getColumnCells(family.getBytes(), + column2.getBytes()).get(0)))); + + // test checkAndMutate + Thread.sleep(3000); + r = hTable.get(get); + assertEquals(0, r.size()); + + tryPut(hTable, put1); + RowMutations rowMutations = new RowMutations(key1.getBytes()); + rowMutations.add(put2); + Delete delete = new Delete(key1.getBytes()); + delete.addColumn(family.getBytes(), column1.getBytes()); + rowMutations.add(delete); + boolean succ = hTable.checkAndMutate(key1.getBytes(), family.getBytes(), + column1.getBytes(), CompareFilter.CompareOp.EQUAL, toBytes(11L), rowMutations); + assertTrue(succ); + r = hTable.get(get); + assertEquals(r.size(), 2); + assertEquals(11L, Bytes.toLong(CellUtil.cloneValue(r.getColumnCells(family.getBytes(), + column1.getBytes()).get(0)))); + assertEquals(33L, Bytes.toLong(CellUtil.cloneValue(r.getColumnCells(family.getBytes(), + column2.getBytes()).get(0)))); + + Thread.sleep(10000); + r = hTable.get(get); + assertEquals(r.size(), 0); + + tryPut(hTable, put1); + rowMutations = new RowMutations(key1.getBytes()); + put4 = new Put(key1.getBytes()); + put4.addColumn(family.getBytes(), column1.getBytes(), toBytes(22L)); + put4.addColumn(family.getBytes(), column2.getBytes(), toBytes(33L)); + put4.setTTL(10000); + rowMutations.add(put4); + succ = hTable.checkAndMutate(key1.getBytes(), family.getBytes(), column1.getBytes(), + CompareFilter.CompareOp.EQUAL, toBytes(1L), rowMutations); + assertFalse(succ); + succ = hTable.checkAndMutate(key1.getBytes(), family.getBytes(), column1.getBytes(), + CompareFilter.CompareOp.EQUAL, toBytes(11L), rowMutations); + assertTrue(succ); + + r = hTable.get(get); + assertEquals(r.size(), 2); + assertEquals(22L, Bytes.toLong(CellUtil.cloneValue(r.getColumnCells(family.getBytes(), + column1.getBytes()).get(0)))); + assertEquals(33L, Bytes.toLong(CellUtil.cloneValue(r.getColumnCells(family.getBytes(), + column2.getBytes()).get(0)))); + + Thread.sleep(5000); + r = hTable.get(get); + assertEquals(2, r.size()); + assertEquals(22L, Bytes.toLong(CellUtil.cloneValue(r.getColumnCells(family.getBytes(), + column1.getBytes()).get(0)))); + assertEquals(33L, Bytes.toLong(CellUtil.cloneValue(r.getColumnCells(family.getBytes(), + column2.getBytes()).get(0)))); + + Thread.sleep(5000); + r = hTable.get(get); + assertEquals(r.size(), 0); + put1 = new Put(key1.getBytes()); + put1.addColumn(family.getBytes(), column1.getBytes(), toBytes(11L)); + tryPut(hTable, put1); + + increment = new Increment(key1.getBytes()); + increment.addColumn(family.getBytes(), column1.getBytes(), 1L); + hTable.increment(increment); + r = hTable.get(get); + assertEquals(r.size(), 1); + } + @Test public void testIncrement() throws IOException { String column = "incrementColumn"; @@ -5372,8 +5609,8 @@ public void testFamilyBlank() throws Exception { try { hTable.append(append); fail(); - } catch (FeatureNotSupportedException e) { - Assert.assertTrue(e.getMessage().contains("family is empty")); + } catch (IllegalArgumentException e) { + Assert.assertTrue(e.getMessage().contains("zero columns specified")); } Increment increment = new Increment(key.getBytes()); @@ -5381,8 +5618,8 @@ public void testFamilyBlank() throws Exception { try { hTable.increment(increment); fail(); - } catch (FeatureNotSupportedException e) { - Assert.assertTrue(e.getMessage().contains("family is empty")); + } catch (IllegalArgumentException e) { + Assert.assertTrue(e.getMessage().contains("zero columns specified")); } } diff --git a/src/test/java/com/alipay/oceanbase/hbase/OHTableTest.java b/src/test/java/com/alipay/oceanbase/hbase/OHTableTest.java index f36969ca..b4df2595 100644 --- a/src/test/java/com/alipay/oceanbase/hbase/OHTableTest.java +++ b/src/test/java/com/alipay/oceanbase/hbase/OHTableTest.java @@ -47,7 +47,7 @@ public static void setup() throws Exception { multiCfHTable = new OHTable(c, "test_multi_cf"); List tableGroups = new LinkedList<>(); tableGroups.add("test"); -// tableGroups.add("test_multi_cf"); + tableGroups.add("test_multi_cf"); ObHTableTestUtil.prepareClean(tableGroups); } diff --git a/src/test/java/unit_test_db.sql b/src/test/java/unit_test_db.sql index 8b7269ae..7ddd9c33 100644 --- a/src/test/java/unit_test_db.sql +++ b/src/test/java/unit_test_db.sql @@ -22,6 +22,15 @@ CREATE TABLE `test$family1` ( PRIMARY KEY (`K`, `Q`, `T`) ); +CREATE TABLE `test$cellTTLFamily` ( + `K` varbinary(1024) NOT NULL, + `Q` varbinary(256) NOT NULL, + `T` bigint(20) NOT NULL, + `V` varbinary(1024) DEFAULT NULL, + `TTL` bigint(20) DEFAULT NULL, + PRIMARY KEY (`K`, `Q`, `T`) +); + CREATE TABLE `test_t$partitionFamily1` ( `K` varbinary(1024) NOT NULL, `Q` varbinary(256) NOT NULL, From 421dfa840781ae645302e24d6b11197f901a124b Mon Sep 17 00:00:00 2001 From: stuBirdFly <1065492934@qq.com> Date: Sat, 8 Feb 2025 11:57:21 +0800 Subject: [PATCH 2/2] Remove the outermost exception wrapping of the batch --- src/main/java/com/alipay/oceanbase/hbase/OHTable.java | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/src/main/java/com/alipay/oceanbase/hbase/OHTable.java b/src/main/java/com/alipay/oceanbase/hbase/OHTable.java index ab54527f..a465aec2 100644 --- a/src/main/java/com/alipay/oceanbase/hbase/OHTable.java +++ b/src/main/java/com/alipay/oceanbase/hbase/OHTable.java @@ -647,7 +647,7 @@ public void batch(final List actions, final Object[] results) thr try { compatOldServerBatch(actions, results, batchError); } catch (Exception e) { - throw new IOException(e); + throw new IOException(tableNameString + " table occurred unexpected error." , e); } } else { String realTableName = getTargetTableName(actions); @@ -657,7 +657,7 @@ public void batch(final List actions, final Object[] results) thr try { tmpResults = batch.execute(); } catch (Exception e) { - throw new IOException(e); + throw new IOException(tableNameString + " table occurred unexpected error." , e); } int index = 0; for (int i = 0; i != actions.size(); ++i) { @@ -1266,7 +1266,6 @@ private void innerDelete(Delete delete) throws IOException { batch(Collections.singletonList(delete)); } catch (Exception e) { logger.error(LCD.convert("01-00004"), tableNameString, e); - throw new IOException("delete table " + tableNameString + " error.", e); } } @@ -1553,9 +1552,6 @@ public void flushCommits() throws IOException { } catch (Exception e) { logger.error(LCD.convert("01-00008"), tableNameString, null, autoFlush, writeBuffer.size(), e); - throw new IOException("put table " + tableNameString + " error codes " + null - + "auto flush " + autoFlush + " current buffer size " - + writeBuffer.size(), e); } finally { // mutate list so that it is empty for complete success, or contains // only failed records results are returned in the same order as the