From b3a83c7aa0601bef2a10befc5fcc779295e63262 Mon Sep 17 00:00:00 2001 From: stuBirdFly <1065492934@qq.com> Date: Thu, 29 Aug 2024 16:40:32 +0800 Subject: [PATCH] add scan lease test --- .../com/alipay/oceanbase/hbase/OHTable.java | 83 ++++++------- .../hbase/result/ClientStreamScanner.java | 2 + .../hbase/util/OHConnectionImpl.java | 2 +- src/main/resources/hbase-site.xml | 2 +- .../oceanbase/hbase/HTableTestBase.java | 117 +++++++++++++++++- 5 files changed, 154 insertions(+), 52 deletions(-) diff --git a/src/main/java/com/alipay/oceanbase/hbase/OHTable.java b/src/main/java/com/alipay/oceanbase/hbase/OHTable.java index 6d94fd06..1d76cf24 100644 --- a/src/main/java/com/alipay/oceanbase/hbase/OHTable.java +++ b/src/main/java/com/alipay/oceanbase/hbase/OHTable.java @@ -479,7 +479,7 @@ public Result call() throws IOException { filter = buildObHTableFilter(get.getFilter(), get.getTimeRange(), get.getMaxVersions(), null); obTableQuery = buildObTableQuery(filter, get.getRow(), true, get.getRow(), - true, -1); + true); request = buildObTableQueryRequest(obTableQuery, getTargetTableName(tableNameString)); @@ -489,14 +489,12 @@ public Result call() throws IOException { } else { for (Map.Entry> entry : get.getFamilyMap() .entrySet()) { - family = entry.getKey(); - filter = buildObHTableFilter(get.getFilter(), get.getTimeRange(), get.getMaxVersions(), entry.getValue()); obTableQuery = buildObTableQuery(filter, get.getRow(), true, - get.getRow(), true, -1); + get.getRow(), true); request = buildObTableQueryRequest(obTableQuery, getTargetTableName(tableNameString, Bytes.toString(family))); @@ -540,8 +538,7 @@ public Result getRowOrBefore(byte[] row, byte[] family) { @Override public ResultScanner getScanner(final Scan scan) throws IOException { - - if (scan.getFamilyMap().keySet() == null || scan.getFamilyMap().keySet().size() == 0) { + if (scan.getFamilyMap().keySet().isEmpty()) { // check nothing, use table group; } else { checkFamilyViolation(scan.getFamilyMap().keySet()); @@ -558,20 +555,11 @@ public ResultScanner call() throws IOException { ObTableQuery obTableQuery; ObHTableFilter filter; try { - if (scan.getFamilyMap().keySet() == null - || scan.getFamilyMap().keySet().size() == 0) { + if (scan.getFamilyMap().keySet().isEmpty()) { filter = buildObHTableFilter(scan.getFilter(), scan.getTimeRange(), scan.getMaxVersions(), null); - if (scan.isReversed()) { - obTableQuery = buildObTableQuery(filter, scan.getStopRow(), false, - scan.getStartRow(), true, scan.getBatch()); - } else { - obTableQuery = buildObTableQuery(filter, scan.getStartRow(), true, - scan.getStopRow(), false, scan.getBatch()); - } - if (scan.isReversed()) { // reverse scan 时设置为逆序 - obTableQuery.setScanOrder(ObScanOrder.Reverse); - } + obTableQuery = buildObTableQuery(filter, scan); + request = buildObTableQueryAsyncRequest(obTableQuery, getTargetTableName(tableNameString)); clientQueryAsyncStreamResult = (ObTableClientQueryAsyncStreamResult) obTableClient @@ -584,19 +572,7 @@ public ResultScanner call() throws IOException { family = entry.getKey(); filter = buildObHTableFilter(scan.getFilter(), scan.getTimeRange(), scan.getMaxVersions(), entry.getValue()); - if (scan.isReversed()) { - obTableQuery = buildObTableQuery(filter, scan.getStopRow(), false, - scan.getStartRow(), true, scan.getBatch()); - } else { - obTableQuery = buildObTableQuery(filter, scan.getStartRow(), true, - scan.getStopRow(), false, scan.getBatch()); - } - if (scan.isReversed()) { // reverse scan 时设置为逆序 - obTableQuery.setScanOrder(ObScanOrder.Reverse); - } - - // no support set maxResultSize. - // obTableQuery.setMaxResultSize(scan.getMaxResultSize()); + obTableQuery = buildObTableQuery(filter, scan); request = buildObTableQueryAsyncRequest(obTableQuery, getTargetTableName(tableNameString, Bytes.toString(family))); @@ -824,7 +800,7 @@ private boolean checkAndMutation(byte[] row, byte[] family, byte[] qualifier, Co keyValueList.addAll(entry.getValue()); } } - ObTableQuery obTableQuery = buildObTableQuery(filter, row, true, row, true, -1); + ObTableQuery obTableQuery = buildObTableQuery(filter, row, true, row, true); ObTableBatchOperation batch = buildObTableBatchOperation(keyValueList, false, null); @@ -862,7 +838,7 @@ public Result append(Append append) throws IOException { true, qualifiers); // the later hbase has supported timeRange ObHTableFilter filter = buildObHTableFilter(null, null, 1, qualifiers); - ObTableQuery obTableQuery = buildObTableQuery(filter, r, true, r, true, -1); + ObTableQuery obTableQuery = buildObTableQuery(filter, r, true, r, true); ObTableQueryAndMutate queryAndMutate = new ObTableQueryAndMutate(); queryAndMutate.setTableQuery(obTableQuery); queryAndMutate.setMutations(batchOperation); @@ -921,7 +897,7 @@ public Result increment(Increment increment) throws IOException { ObHTableFilter filter = buildObHTableFilter(null, increment.getTimeRange(), 1, qualifiers); - ObTableQuery obTableQuery = buildObTableQuery(filter, rowKey, true, rowKey, true, -1); + ObTableQuery obTableQuery = buildObTableQuery(filter, rowKey, true, rowKey, true); ObTableQueryAndMutate queryAndMutate = new ObTableQueryAndMutate(); queryAndMutate.setMutations(batch); queryAndMutate.setTableQuery(obTableQuery); @@ -971,7 +947,7 @@ public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, lo ObHTableFilter filter = buildObHTableFilter(null, null, 1, qualifiers); - ObTableQuery obTableQuery = buildObTableQuery(filter, row, true, row, true, -1); + ObTableQuery obTableQuery = buildObTableQuery(filter, row, true, row, true); ObTableQueryAndMutate queryAndMutate = new ObTableQueryAndMutate(); queryAndMutate.setMutations(batch); queryAndMutate.setTableQuery(obTableQuery); @@ -1373,8 +1349,7 @@ private ObHTableFilter buildObHTableFilter(String filterString, TimeRange timeRa } private ObTableQuery buildObTableQuery(ObHTableFilter filter, byte[] start, - boolean includeStart, byte[] stop, boolean includeStop, - int batchSize) { + boolean includeStart, byte[] stop, boolean includeStop) { ObNewRange obNewRange = new ObNewRange(); if (Arrays.equals(start, HConstants.EMPTY_BYTE_ARRAY)) { @@ -1394,23 +1369,37 @@ private ObTableQuery buildObTableQuery(ObHTableFilter filter, byte[] start, } else { obNewRange.setEndKey(ObRowKey.getInstance(stop, ObObj.getMin(), ObObj.getMin())); } - - return buildObTableQuery(filter, obNewRange, batchSize); - } - - private ObTableQuery buildObTableQuery(ObHTableFilter filter, ObNewRange obNewRange, - int batchSize) { ObTableQuery obTableQuery = new ObTableQuery(); obTableQuery.setIndexName("PRIMARY"); obTableQuery.sethTableFilter(filter); for (String column : ALL_COLUMNS) { obTableQuery.addSelectColumn(column); } - if (obNewRange != null) { - obTableQuery.addKeyRange(obNewRange); + obTableQuery.addKeyRange(obNewRange); + + return obTableQuery; + } + + private ObTableQuery buildObTableQuery(ObHTableFilter filter, final Scan scan) { + ObTableQuery obTableQuery; + if (scan.getMaxResultsPerColumnFamily() > 0) { + filter.setLimitPerRowPerCf(scan.getMaxResultsPerColumnFamily()); + } + if (scan.getRowOffsetPerColumnFamily() > 0) { + filter.setOffsetPerRowPerCf(scan.getRowOffsetPerColumnFamily()); + } + if (scan.isReversed()) { + obTableQuery = buildObTableQuery(filter, scan.getStopRow(), false, scan.getStartRow(), + true); + } else { + obTableQuery = buildObTableQuery(filter, scan.getStartRow(), true, scan.getStopRow(), + false); + } + if (scan.isReversed()) { // reverse scan 时设置为逆序 + obTableQuery.setScanOrder(ObScanOrder.Reverse); } - if (batchSize > 0) { - obTableQuery.setBatchSize(batchSize); + if (scan.getBatch() > 0) { + obTableQuery.setBatchSize(scan.getBatch()); } return obTableQuery; } diff --git a/src/main/java/com/alipay/oceanbase/hbase/result/ClientStreamScanner.java b/src/main/java/com/alipay/oceanbase/hbase/result/ClientStreamScanner.java index ed58582d..35096378 100644 --- a/src/main/java/com/alipay/oceanbase/hbase/result/ClientStreamScanner.java +++ b/src/main/java/com/alipay/oceanbase/hbase/result/ClientStreamScanner.java @@ -24,6 +24,7 @@ import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.query.AbstractQueryStreamResult; import com.alipay.oceanbase.rpc.stream.ObTableClientQueryAsyncStreamResult; import com.alipay.oceanbase.rpc.stream.ObTableClientQueryStreamResult; +import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.AbstractClientScanner; import org.apache.hadoop.hbase.client.Result; @@ -36,6 +37,7 @@ import static com.alipay.oceanbase.hbase.util.TableHBaseLoggerFactory.LCD; +@InterfaceAudience.Private public class ClientStreamScanner extends AbstractClientScanner { private static final Logger logger = TableHBaseLoggerFactory diff --git a/src/main/java/com/alipay/oceanbase/hbase/util/OHConnectionImpl.java b/src/main/java/com/alipay/oceanbase/hbase/util/OHConnectionImpl.java index b8fb5411..12d2abb2 100644 --- a/src/main/java/com/alipay/oceanbase/hbase/util/OHConnectionImpl.java +++ b/src/main/java/com/alipay/oceanbase/hbase/util/OHConnectionImpl.java @@ -40,7 +40,7 @@ public class OHConnectionImpl implements Connection { private static final Marker FATAL = MarkerFactory.getMarker("FATAL"); - static final int BUFFERED_PARAM_UNSET = -1; + private static final int BUFFERED_PARAM_UNSET = -1; private volatile boolean closed; private volatile boolean aborted; diff --git a/src/main/resources/hbase-site.xml b/src/main/resources/hbase-site.xml index db435ff7..eb81dfea 100644 --- a/src/main/resources/hbase-site.xml +++ b/src/main/resources/hbase-site.xml @@ -22,4 +22,4 @@ hbase.client.connection.impl com.alipay.oceanbase.hbase.util.OHConnectionImpl - + \ No newline at end of file diff --git a/src/test/java/com/alipay/oceanbase/hbase/HTableTestBase.java b/src/test/java/com/alipay/oceanbase/hbase/HTableTestBase.java index d4ea53e4..8ab41c3b 100644 --- a/src/test/java/com/alipay/oceanbase/hbase/HTableTestBase.java +++ b/src/test/java/com/alipay/oceanbase/hbase/HTableTestBase.java @@ -18,7 +18,6 @@ package com.alipay.oceanbase.hbase; import com.alipay.oceanbase.hbase.exception.FeatureNotSupportedException; - import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HConstants; @@ -41,8 +40,7 @@ import static org.apache.hadoop.hbase.filter.FilterList.Operator.MUST_PASS_ONE; import static org.apache.hadoop.hbase.util.Bytes.toBytes; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.fail; +import static org.junit.Assert.*; public abstract class HTableTestBase { @@ -399,6 +397,8 @@ public void testMultiPartitionPut() throws IOException { } } + + @Test public void testMultiPartitionDel() throws IOException { String[] keys = new String[] { "putKey1", "putKey2", "putKey3", "putKey4", "putKey5", @@ -1026,6 +1026,87 @@ public void testGetFilter() throws Exception { Assert.assertEquals(1, r.raw().length); } + @Test + public void testScanSessionClean() throws Exception { + String key1 = "bKey"; + String key2 = "cKey"; + String key3 = "dKey"; + String key4 = "eKey"; + String key5 = "fKey"; + String column1 = "column1"; + String column2 = "column2"; + String value1 = "value1"; + String family = "family1"; + + // delete previous data + Delete deleteKey1Family = new Delete(toBytes(key1)); + deleteKey1Family.deleteFamily(toBytes(family)); + Delete deleteKey2Family = new Delete(toBytes(key2)); + deleteKey2Family.deleteFamily(toBytes(family)); + Delete deleteKey3Family = new Delete(toBytes(key3)); + deleteKey3Family.deleteFamily(toBytes(family)); + Delete deleteKey4Family = new Delete(toBytes(key4)); + deleteKey4Family.deleteFamily(toBytes(family)); + Delete deleteKey5Family = new Delete(toBytes(key5)); + deleteKey5Family.deleteFamily(toBytes(family)); + + hTable.delete(deleteKey1Family); + hTable.delete(deleteKey2Family); + hTable.delete(deleteKey3Family); + hTable.delete(deleteKey4Family); + hTable.delete(deleteKey5Family); + + Put putKey1Column1Value1 = new Put(toBytes(key1)); + putKey1Column1Value1.add(toBytes(family), toBytes(column1), toBytes(value1)); + + Put putKey2Column2Value1 = new Put(toBytes(key2)); + putKey2Column2Value1.add(toBytes(family), toBytes(column2), toBytes(value1)); + + Put putKey3Column2Value1 = new Put(toBytes(key3)); + putKey3Column2Value1.add(toBytes(family), toBytes(column2), toBytes(value1)); + + Put putKey4Column2Value1 = new Put(toBytes(key4)); + putKey4Column2Value1.add(toBytes(family), toBytes(column2), toBytes(value1)); + + Put putKey5Column2Value1 = new Put(toBytes(key5)); + putKey5Column2Value1.add(toBytes(family), toBytes(column2), toBytes(value1)); + + tryPut(hTable, putKey1Column1Value1); + tryPut(hTable, putKey1Column1Value1); + tryPut(hTable, putKey2Column2Value1); + tryPut(hTable, putKey3Column2Value1); + tryPut(hTable, putKey4Column2Value1); + tryPut(hTable, putKey5Column2Value1); + + Scan scan; + scan = new Scan(); + scan.addFamily(family.getBytes()); + scan.setMaxVersions(10); + scan.setBatch(1); + + ResultScanner scanner = hTable.getScanner(scan); + scanner.next(); + + // The server defaults to a lease of 60 seconds. Therefore, at 20 seconds, + // the transaction is checked to ensure it has not rolled back, and the lease is updated. + // At 55 seconds, the query should still be able to retrieve the data and update the lease. + // If it exceeds 60 seconds (at 61 seconds), the session is deleted. + Thread.sleep(20 * 1000); + scanner.next(); + + Thread.sleep(55 * 1000); + scanner.next(); + + Thread.sleep(61 * 1000); + try { + scanner.next(); + } catch (IOException e) { + assertTrue(e.getCause().getMessage().contains("OB_HASH_NOT_EXIST")); + } finally { + scanner.close(); + } + } + @Test public void testScan() throws Exception { String key1 = "scanKey1x"; @@ -1176,6 +1257,36 @@ public void testScan() throws Exception { Assert.assertEquals(res_count, 7); scanner.close(); + scan = new Scan(); + scan.addFamily(family.getBytes()); + scan.setStartRow("scanKey1x".getBytes()); + scan.setStopRow("scanKey3x".getBytes()); + scan.setMaxVersions(10); + scan.setMaxResultsPerColumnFamily(1); + scanner = hTable.getScanner(scan); + for (Result result : scanner) { + assertEquals(result.rawCells().length,1); + } + scanner.close(); + + scan = new Scan(); + scan.addFamily(family.getBytes()); + scan.setStartRow("scanKey1x".getBytes()); + scan.setStopRow("scanKey3x".getBytes()); + scan.setMaxVersions(10); + scan.setMaxResultsPerColumnFamily(2); + scan.setRowOffsetPerColumnFamily(1); + scanner = hTable.getScanner(scan); + res_count = 0; + for (Result result : scanner) { + for (KeyValue keyValue : result.raw()) { + Arrays.equals(key2.getBytes(), keyValue.getRow()); + res_count += 1; + } + } + Assert.assertEquals(res_count, 3); + scanner.close(); + // scan with prefixFilter scan = new Scan(); scan.addFamily(family.getBytes());