diff --git a/src/main/java/com/alipay/oceanbase/hbase/OHTable.java b/src/main/java/com/alipay/oceanbase/hbase/OHTable.java index 7efb7669..77122af9 100644 --- a/src/main/java/com/alipay/oceanbase/hbase/OHTable.java +++ b/src/main/java/com/alipay/oceanbase/hbase/OHTable.java @@ -54,6 +54,8 @@ import org.apache.hadoop.hbase.client.coprocessor.Batch; import org.apache.hadoop.hbase.filter.CompareFilter; import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.filter.FilterList; +import org.apache.hadoop.hbase.filter.PageFilter; import org.apache.hadoop.hbase.io.TimeRange; import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; import org.apache.hadoop.hbase.util.Bytes; @@ -503,24 +505,10 @@ public Result call() throws IOException { ObTableClientQueryStreamResult clientQueryStreamResult; ObTableQueryRequest request; ObTableQuery obTableQuery; - ObHTableFilter filter; try { if (get.getFamilyMap().keySet() == null || get.getFamilyMap().keySet().size() == 0) { - filter = buildObHTableFilter(get.getFilter(), get.getTimeRange(), - get.getMaxVersions(), null); - if (get.isClosestRowBefore()) { - Scan scan = new Scan(); - scan.setStartRow(get.getRow()); - scan.setCaching(1); - scan.setReversed(true); - obTableQuery = buildObTableQuery(filter, scan); - obTableQuery.setObKVParams(buildOBKVParams(scan)); - } else { - obTableQuery = buildObTableQuery(filter, get.getRow(), true, - get.getRow(), true); - obTableQuery.setObKVParams(buildOBKVParams(get)); - } + obTableQuery = buildObTableQuery(get, null); request = buildObTableQueryRequest(obTableQuery, getTargetTableName(tableNameString)); @@ -531,19 +519,7 @@ public Result call() throws IOException { for (Map.Entry> entry : get.getFamilyMap() .entrySet()) { family = entry.getKey(); - filter = buildObHTableFilter(get.getFilter(), get.getTimeRange(), - get.getMaxVersions(), entry.getValue()); - if (get.isClosestRowBefore()) { - Scan scan = new Scan(get.getRow()); - scan.setCaching(1); - scan.setReversed(true); - obTableQuery = buildObTableQuery(filter, scan); - obTableQuery.setObKVParams(buildOBKVParams(scan)); - } else { - obTableQuery = buildObTableQuery(filter, get.getRow(), true, - get.getRow(), true); - obTableQuery.setObKVParams(buildOBKVParams(get)); - } + obTableQuery = buildObTableQuery(get, entry.getValue()); request = buildObTableQueryRequest(obTableQuery, getTargetTableName(tableNameString, Bytes.toString(family))); clientQueryStreamResult = (ObTableClientQueryStreamResult) obTableClient @@ -870,7 +846,7 @@ private boolean checkAndMutation(byte[] row, byte[] family, byte[] qualifier, Co keyValueList.addAll(entry.getValue()); } } - ObTableQuery obTableQuery = buildObTableQuery(filter, row, true, row, true); + ObTableQuery obTableQuery = buildObTableQuery(filter, row, true, row, true, false); ObTableBatchOperation batch = buildObTableBatchOperation(keyValueList, false, null); @@ -908,7 +884,7 @@ public Result append(Append append) throws IOException { true, qualifiers); // the later hbase has supported timeRange ObHTableFilter filter = buildObHTableFilter(null, null, 1, qualifiers); - ObTableQuery obTableQuery = buildObTableQuery(filter, r, true, r, true); + ObTableQuery obTableQuery = buildObTableQuery(filter, r, true, r, true, false); ObTableQueryAndMutate queryAndMutate = new ObTableQueryAndMutate(); queryAndMutate.setTableQuery(obTableQuery); queryAndMutate.setMutations(batchOperation); @@ -963,11 +939,11 @@ public Result increment(Increment increment) throws IOException { batch.addTableOperation(getInstance(INCREMENT, new Object[] { rowKey, qualifier, Long.MAX_VALUE }, V_COLUMNS, new Object[] { cell.getValue() })); }); - + ObHTableFilter filter = buildObHTableFilter(null, increment.getTimeRange(), 1, qualifiers); - ObTableQuery obTableQuery = buildObTableQuery(filter, rowKey, true, rowKey, true); + ObTableQuery obTableQuery = buildObTableQuery(filter, rowKey, true, rowKey, true, false); ObTableQueryAndMutate queryAndMutate = new ObTableQueryAndMutate(); queryAndMutate.setMutations(batch); queryAndMutate.setTableQuery(obTableQuery); @@ -1017,7 +993,7 @@ public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, lo ObHTableFilter filter = buildObHTableFilter(null, null, 1, qualifiers); - ObTableQuery obTableQuery = buildObTableQuery(filter, row, true, row, true); + ObTableQuery obTableQuery = buildObTableQuery(filter, row, true, row, true, false); ObTableQueryAndMutate queryAndMutate = new ObTableQueryAndMutate(); queryAndMutate.setMutations(batch); queryAndMutate.setTableQuery(obTableQuery); @@ -1419,7 +1395,8 @@ private ObHTableFilter buildObHTableFilter(String filterString, TimeRange timeRa } private ObTableQuery buildObTableQuery(ObHTableFilter filter, byte[] start, - boolean includeStart, byte[] stop, boolean includeStop) { + boolean includeStart, byte[] stop, boolean includeStop, + boolean isReversed) { ObNewRange obNewRange = new ObNewRange(); if (Arrays.equals(start, HConstants.EMPTY_BYTE_ARRAY)) { @@ -1440,6 +1417,9 @@ private ObTableQuery buildObTableQuery(ObHTableFilter filter, byte[] start, obNewRange.setEndKey(ObRowKey.getInstance(stop, ObObj.getMin(), ObObj.getMin())); } ObTableQuery obTableQuery = new ObTableQuery(); + if (isReversed) { + obTableQuery.setScanOrder(ObScanOrder.Reverse); + } obTableQuery.setIndexName("PRIMARY"); obTableQuery.sethTableFilter(filter); for (String column : ALL_COLUMNS) { @@ -1459,11 +1439,10 @@ private ObTableQuery buildObTableQuery(ObHTableFilter filter, final Scan scan) { } if (scan.isReversed()) { obTableQuery = buildObTableQuery(filter, scan.getStopRow(), false, scan.getStartRow(), - true); - obTableQuery.setScanOrder(ObScanOrder.Reverse); + true, true); } else { obTableQuery = buildObTableQuery(filter, scan.getStartRow(), true, scan.getStopRow(), - false); + false, false); } if (scan.getBatch() > 0) { obTableQuery.setBatchSize(scan.getBatch()); @@ -1475,6 +1454,29 @@ private ObTableQuery buildObTableQuery(ObHTableFilter filter, final Scan scan) { return obTableQuery; } + private ObTableQuery buildObTableQuery(final Get get, Collection columnQualifiers) { + ObTableQuery obTableQuery; + if (get.isClosestRowBefore()) { + PageFilter pageFilter = new PageFilter(1); + FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL); + filterList.addFilter(pageFilter); + if (null != get.getFilter()) { + filterList.addFilter(get.getFilter()); + } + get.setFilter(filterList); + } + ObHTableFilter filter = buildObHTableFilter(get.getFilter(), get.getTimeRange(), + get.getMaxVersions(), columnQualifiers); + if (get.isClosestRowBefore()) { + obTableQuery = buildObTableQuery(filter, HConstants.EMPTY_BYTE_ARRAY, true, + get.getRow(), true, true); + } else { + obTableQuery = buildObTableQuery(filter, get.getRow(), true, get.getRow(), true, false); + } + obTableQuery.setObKVParams(buildOBKVParams(get)); + return obTableQuery; + } + private ObTableBatchOperation buildObTableBatchOperation(List keyValueList, boolean putToAppend, List qualifiers) { diff --git a/src/test/java/com/alipay/oceanbase/hbase/HTableTestBase.java b/src/test/java/com/alipay/oceanbase/hbase/HTableTestBase.java index 66fe49e8..9074f233 100644 --- a/src/test/java/com/alipay/oceanbase/hbase/HTableTestBase.java +++ b/src/test/java/com/alipay/oceanbase/hbase/HTableTestBase.java @@ -495,6 +495,7 @@ public void testFilter() throws Exception { Get get; Result r; + ColumnPrefixFilter filter; hTable.delete(deleteKey1Family); hTable.delete(deleteKey2Family); @@ -508,31 +509,31 @@ public void testFilter() throws Exception { tryPut(hTable, putKey2Column2Value1); tryPut(hTable, putKey2Column2Value2); - ColumnPrefixFilter filter = new ColumnPrefixFilter(Bytes.toBytes("a")); + filter = new ColumnPrefixFilter(Bytes.toBytes("e")); get = new Get(toBytes(key1)); get.setMaxVersions(10); get.addFamily(toBytes(family)); get.setFilter(filter); r = hTable.get(get); - Assert.assertEquals(3, r.raw().length); + Assert.assertEquals(0, r.raw().length); - filter = new ColumnPrefixFilter(Bytes.toBytes("d")); + filter = new ColumnPrefixFilter(Bytes.toBytes("a")); get = new Get(toBytes(key1)); get.setMaxVersions(10); get.addFamily(toBytes(family)); get.setFilter(filter); r = hTable.get(get); - Assert.assertEquals(4, r.raw().length); + Assert.assertEquals(3, r.raw().length); - filter = new ColumnPrefixFilter(Bytes.toBytes("b")); + filter = new ColumnPrefixFilter(Bytes.toBytes("d")); get = new Get(toBytes(key1)); get.setMaxVersions(10); get.addFamily(toBytes(family)); get.setFilter(filter); r = hTable.get(get); - Assert.assertEquals(0, r.raw().length); + Assert.assertEquals(4, r.raw().length); - filter = new ColumnPrefixFilter(Bytes.toBytes("e")); + filter = new ColumnPrefixFilter(Bytes.toBytes("b")); get = new Get(toBytes(key1)); get.setMaxVersions(10); get.addFamily(toBytes(family)); @@ -1732,6 +1733,7 @@ public void testScan() throws Exception { Get get; Scan scan; Result r; + ResultScanner scanner; int res_count = 0; tryPut(hTable, putKey1Column1Value1); @@ -1769,6 +1771,21 @@ public void testScan() throws Exception { //| scanKey3x | column2 | -1709714409977 | value1 | //+-----------+---------+----------------+--------+ + scan = new Scan(); + scan.addFamily(family.getBytes()); + scan.setStartRow("scanKey1x".getBytes()); + scan.setStopRow("scanKey3x".getBytes()); + scan.setMaxVersions(10); + scan.setMaxResultsPerColumnFamily(2); + scan.setRowOffsetPerColumnFamily(1); + scanner = hTable.getScanner(scan); + res_count = 0; + for (Result result : scanner) { + res_count += result.size(); + } + Assert.assertEquals(3, res_count); + scanner.close(); + // check insert ok get = new Get(toBytes(key1)); get.addFamily(toBytes(family)); @@ -1790,7 +1807,7 @@ public void testScan() throws Exception { scan.setStartRow("scanKey1x".getBytes()); scan.setStopRow("scanKey2x".getBytes()); scan.setMaxVersions(10); - ResultScanner scanner = hTable.getScanner(scan); + scanner = hTable.getScanner(scan); res_count = 0; for (Result result : scanner) { for (KeyValue keyValue : result.raw()) { @@ -1852,23 +1869,6 @@ public void testScan() throws Exception { scanner.close(); - scan = new Scan(); - scan.addFamily(family.getBytes()); - scan.setStartRow("scanKey1x".getBytes()); - scan.setStopRow("scanKey3x".getBytes()); - scan.setMaxVersions(10); - scan.setMaxResultsPerColumnFamily(2); - scan.setRowOffsetPerColumnFamily(1); - scanner = hTable.getScanner(scan); - res_count = 0; - for (Result result : scanner) { - for (KeyValue keyValue : result.raw()) { - res_count += 1; - } - } - Assert.assertEquals(3, res_count); - scanner.close(); - // scan with prefixFilter scan = new Scan(); scan.addFamily(family.getBytes());