Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 39 additions & 37 deletions src/main/java/com/alipay/oceanbase/hbase/OHTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@
import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.PageFilter;
import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
import org.apache.hadoop.hbase.util.Bytes;
Expand Down Expand Up @@ -503,24 +505,10 @@ public Result call() throws IOException {
ObTableClientQueryStreamResult clientQueryStreamResult;
ObTableQueryRequest request;
ObTableQuery obTableQuery;
ObHTableFilter filter;
try {
if (get.getFamilyMap().keySet() == null
|| get.getFamilyMap().keySet().size() == 0) {
filter = buildObHTableFilter(get.getFilter(), get.getTimeRange(),
get.getMaxVersions(), null);
if (get.isClosestRowBefore()) {
Scan scan = new Scan();
scan.setStartRow(get.getRow());
scan.setCaching(1);
scan.setReversed(true);
obTableQuery = buildObTableQuery(filter, scan);
obTableQuery.setObKVParams(buildOBKVParams(scan));
} else {
obTableQuery = buildObTableQuery(filter, get.getRow(), true,
get.getRow(), true);
obTableQuery.setObKVParams(buildOBKVParams(get));
}
obTableQuery = buildObTableQuery(get, null);
request = buildObTableQueryRequest(obTableQuery,
getTargetTableName(tableNameString));

Expand All @@ -531,19 +519,7 @@ public Result call() throws IOException {
for (Map.Entry<byte[], NavigableSet<byte[]>> entry : get.getFamilyMap()
.entrySet()) {
family = entry.getKey();
filter = buildObHTableFilter(get.getFilter(), get.getTimeRange(),
get.getMaxVersions(), entry.getValue());
if (get.isClosestRowBefore()) {
Scan scan = new Scan(get.getRow());
scan.setCaching(1);
scan.setReversed(true);
obTableQuery = buildObTableQuery(filter, scan);
obTableQuery.setObKVParams(buildOBKVParams(scan));
} else {
obTableQuery = buildObTableQuery(filter, get.getRow(), true,
get.getRow(), true);
obTableQuery.setObKVParams(buildOBKVParams(get));
}
obTableQuery = buildObTableQuery(get, entry.getValue());
request = buildObTableQueryRequest(obTableQuery,
getTargetTableName(tableNameString, Bytes.toString(family)));
clientQueryStreamResult = (ObTableClientQueryStreamResult) obTableClient
Expand Down Expand Up @@ -870,7 +846,7 @@ private boolean checkAndMutation(byte[] row, byte[] family, byte[] qualifier, Co
keyValueList.addAll(entry.getValue());
}
}
ObTableQuery obTableQuery = buildObTableQuery(filter, row, true, row, true);
ObTableQuery obTableQuery = buildObTableQuery(filter, row, true, row, true, false);

ObTableBatchOperation batch = buildObTableBatchOperation(keyValueList, false, null);

Expand Down Expand Up @@ -908,7 +884,7 @@ public Result append(Append append) throws IOException {
true, qualifiers);
// the later hbase has supported timeRange
ObHTableFilter filter = buildObHTableFilter(null, null, 1, qualifiers);
ObTableQuery obTableQuery = buildObTableQuery(filter, r, true, r, true);
ObTableQuery obTableQuery = buildObTableQuery(filter, r, true, r, true, false);
ObTableQueryAndMutate queryAndMutate = new ObTableQueryAndMutate();
queryAndMutate.setTableQuery(obTableQuery);
queryAndMutate.setMutations(batchOperation);
Expand Down Expand Up @@ -963,11 +939,11 @@ public Result increment(Increment increment) throws IOException {
batch.addTableOperation(getInstance(INCREMENT, new Object[] { rowKey, qualifier,
Long.MAX_VALUE }, V_COLUMNS, new Object[] { cell.getValue() }));
});

ObHTableFilter filter = buildObHTableFilter(null, increment.getTimeRange(), 1,
qualifiers);

ObTableQuery obTableQuery = buildObTableQuery(filter, rowKey, true, rowKey, true);
ObTableQuery obTableQuery = buildObTableQuery(filter, rowKey, true, rowKey, true, false);
ObTableQueryAndMutate queryAndMutate = new ObTableQueryAndMutate();
queryAndMutate.setMutations(batch);
queryAndMutate.setTableQuery(obTableQuery);
Expand Down Expand Up @@ -1017,7 +993,7 @@ public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, lo

ObHTableFilter filter = buildObHTableFilter(null, null, 1, qualifiers);

ObTableQuery obTableQuery = buildObTableQuery(filter, row, true, row, true);
ObTableQuery obTableQuery = buildObTableQuery(filter, row, true, row, true, false);
ObTableQueryAndMutate queryAndMutate = new ObTableQueryAndMutate();
queryAndMutate.setMutations(batch);
queryAndMutate.setTableQuery(obTableQuery);
Expand Down Expand Up @@ -1419,7 +1395,8 @@ private ObHTableFilter buildObHTableFilter(String filterString, TimeRange timeRa
}

private ObTableQuery buildObTableQuery(ObHTableFilter filter, byte[] start,
boolean includeStart, byte[] stop, boolean includeStop) {
boolean includeStart, byte[] stop, boolean includeStop,
boolean isReversed) {
ObNewRange obNewRange = new ObNewRange();

if (Arrays.equals(start, HConstants.EMPTY_BYTE_ARRAY)) {
Expand All @@ -1440,6 +1417,9 @@ private ObTableQuery buildObTableQuery(ObHTableFilter filter, byte[] start,
obNewRange.setEndKey(ObRowKey.getInstance(stop, ObObj.getMin(), ObObj.getMin()));
}
ObTableQuery obTableQuery = new ObTableQuery();
if (isReversed) {
obTableQuery.setScanOrder(ObScanOrder.Reverse);
}
obTableQuery.setIndexName("PRIMARY");
obTableQuery.sethTableFilter(filter);
for (String column : ALL_COLUMNS) {
Expand All @@ -1459,11 +1439,10 @@ private ObTableQuery buildObTableQuery(ObHTableFilter filter, final Scan scan) {
}
if (scan.isReversed()) {
obTableQuery = buildObTableQuery(filter, scan.getStopRow(), false, scan.getStartRow(),
true);
obTableQuery.setScanOrder(ObScanOrder.Reverse);
true, true);
} else {
obTableQuery = buildObTableQuery(filter, scan.getStartRow(), true, scan.getStopRow(),
false);
false, false);
}
if (scan.getBatch() > 0) {
obTableQuery.setBatchSize(scan.getBatch());
Expand All @@ -1475,6 +1454,29 @@ private ObTableQuery buildObTableQuery(ObHTableFilter filter, final Scan scan) {
return obTableQuery;
}

private ObTableQuery buildObTableQuery(final Get get, Collection<byte[]> columnQualifiers) {
ObTableQuery obTableQuery;
if (get.isClosestRowBefore()) {
PageFilter pageFilter = new PageFilter(1);
FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL);
filterList.addFilter(pageFilter);
if (null != get.getFilter()) {
filterList.addFilter(get.getFilter());
}
get.setFilter(filterList);
}
ObHTableFilter filter = buildObHTableFilter(get.getFilter(), get.getTimeRange(),
get.getMaxVersions(), columnQualifiers);
if (get.isClosestRowBefore()) {
obTableQuery = buildObTableQuery(filter, HConstants.EMPTY_BYTE_ARRAY, true,
get.getRow(), true, true);
} else {
obTableQuery = buildObTableQuery(filter, get.getRow(), true, get.getRow(), true, false);
}
obTableQuery.setObKVParams(buildOBKVParams(get));
return obTableQuery;
}

private ObTableBatchOperation buildObTableBatchOperation(List<KeyValue> keyValueList,
boolean putToAppend,
List<byte[]> qualifiers) {
Expand Down
50 changes: 25 additions & 25 deletions src/test/java/com/alipay/oceanbase/hbase/HTableTestBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -495,6 +495,7 @@ public void testFilter() throws Exception {

Get get;
Result r;
ColumnPrefixFilter filter;

hTable.delete(deleteKey1Family);
hTable.delete(deleteKey2Family);
Expand All @@ -508,31 +509,31 @@ public void testFilter() throws Exception {
tryPut(hTable, putKey2Column2Value1);
tryPut(hTable, putKey2Column2Value2);

ColumnPrefixFilter filter = new ColumnPrefixFilter(Bytes.toBytes("a"));
filter = new ColumnPrefixFilter(Bytes.toBytes("e"));
get = new Get(toBytes(key1));
get.setMaxVersions(10);
get.addFamily(toBytes(family));
get.setFilter(filter);
r = hTable.get(get);
Assert.assertEquals(3, r.raw().length);
Assert.assertEquals(0, r.raw().length);

filter = new ColumnPrefixFilter(Bytes.toBytes("d"));
filter = new ColumnPrefixFilter(Bytes.toBytes("a"));
get = new Get(toBytes(key1));
get.setMaxVersions(10);
get.addFamily(toBytes(family));
get.setFilter(filter);
r = hTable.get(get);
Assert.assertEquals(4, r.raw().length);
Assert.assertEquals(3, r.raw().length);

filter = new ColumnPrefixFilter(Bytes.toBytes("b"));
filter = new ColumnPrefixFilter(Bytes.toBytes("d"));
get = new Get(toBytes(key1));
get.setMaxVersions(10);
get.addFamily(toBytes(family));
get.setFilter(filter);
r = hTable.get(get);
Assert.assertEquals(0, r.raw().length);
Assert.assertEquals(4, r.raw().length);

filter = new ColumnPrefixFilter(Bytes.toBytes("e"));
filter = new ColumnPrefixFilter(Bytes.toBytes("b"));
get = new Get(toBytes(key1));
get.setMaxVersions(10);
get.addFamily(toBytes(family));
Expand Down Expand Up @@ -1732,6 +1733,7 @@ public void testScan() throws Exception {
Get get;
Scan scan;
Result r;
ResultScanner scanner;
int res_count = 0;

tryPut(hTable, putKey1Column1Value1);
Expand Down Expand Up @@ -1769,6 +1771,21 @@ public void testScan() throws Exception {
//| scanKey3x | column2 | -1709714409977 | value1 |
//+-----------+---------+----------------+--------+

scan = new Scan();
scan.addFamily(family.getBytes());
scan.setStartRow("scanKey1x".getBytes());
scan.setStopRow("scanKey3x".getBytes());
scan.setMaxVersions(10);
scan.setMaxResultsPerColumnFamily(2);
scan.setRowOffsetPerColumnFamily(1);
scanner = hTable.getScanner(scan);
res_count = 0;
for (Result result : scanner) {
res_count += result.size();
}
Assert.assertEquals(3, res_count);
scanner.close();

// check insert ok
get = new Get(toBytes(key1));
get.addFamily(toBytes(family));
Expand All @@ -1790,7 +1807,7 @@ public void testScan() throws Exception {
scan.setStartRow("scanKey1x".getBytes());
scan.setStopRow("scanKey2x".getBytes());
scan.setMaxVersions(10);
ResultScanner scanner = hTable.getScanner(scan);
scanner = hTable.getScanner(scan);
res_count = 0;
for (Result result : scanner) {
for (KeyValue keyValue : result.raw()) {
Expand Down Expand Up @@ -1852,23 +1869,6 @@ public void testScan() throws Exception {

scanner.close();

scan = new Scan();
scan.addFamily(family.getBytes());
scan.setStartRow("scanKey1x".getBytes());
scan.setStopRow("scanKey3x".getBytes());
scan.setMaxVersions(10);
scan.setMaxResultsPerColumnFamily(2);
scan.setRowOffsetPerColumnFamily(1);
scanner = hTable.getScanner(scan);
res_count = 0;
for (Result result : scanner) {
for (KeyValue keyValue : result.raw()) {
res_count += 1;
}
}
Assert.assertEquals(3, res_count);
scanner.close();

// scan with prefixFilter
scan = new Scan();
scan.addFamily(family.getBytes());
Expand Down