Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 36 additions & 47 deletions src/main/java/com/alipay/oceanbase/hbase/OHTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -479,7 +479,7 @@ public Result call() throws IOException {
filter = buildObHTableFilter(get.getFilter(), get.getTimeRange(),
get.getMaxVersions(), null);
obTableQuery = buildObTableQuery(filter, get.getRow(), true, get.getRow(),
true, -1);
true);
request = buildObTableQueryRequest(obTableQuery,
getTargetTableName(tableNameString));

Expand All @@ -489,14 +489,12 @@ public Result call() throws IOException {
} else {
for (Map.Entry<byte[], NavigableSet<byte[]>> entry : get.getFamilyMap()
.entrySet()) {

family = entry.getKey();

filter = buildObHTableFilter(get.getFilter(), get.getTimeRange(),
get.getMaxVersions(), entry.getValue());

obTableQuery = buildObTableQuery(filter, get.getRow(), true,
get.getRow(), true, -1);
get.getRow(), true);

request = buildObTableQueryRequest(obTableQuery,
getTargetTableName(tableNameString, Bytes.toString(family)));
Expand Down Expand Up @@ -540,8 +538,7 @@ public Result getRowOrBefore(byte[] row, byte[] family) {

@Override
public ResultScanner getScanner(final Scan scan) throws IOException {

if (scan.getFamilyMap().keySet() == null || scan.getFamilyMap().keySet().size() == 0) {
if (scan.getFamilyMap().keySet().isEmpty()) {
// check nothing, use table group;
} else {
checkFamilyViolation(scan.getFamilyMap().keySet());
Expand All @@ -558,20 +555,11 @@ public ResultScanner call() throws IOException {
ObTableQuery obTableQuery;
ObHTableFilter filter;
try {
if (scan.getFamilyMap().keySet() == null
|| scan.getFamilyMap().keySet().size() == 0) {
if (scan.getFamilyMap().keySet().isEmpty()) {
filter = buildObHTableFilter(scan.getFilter(), scan.getTimeRange(),
scan.getMaxVersions(), null);
if (scan.isReversed()) {
obTableQuery = buildObTableQuery(filter, scan.getStopRow(), false,
scan.getStartRow(), true, scan.getBatch());
} else {
obTableQuery = buildObTableQuery(filter, scan.getStartRow(), true,
scan.getStopRow(), false, scan.getBatch());
}
if (scan.isReversed()) { // reverse scan 时设置为逆序
obTableQuery.setScanOrder(ObScanOrder.Reverse);
}
obTableQuery = buildObTableQuery(filter, scan);

request = buildObTableQueryAsyncRequest(obTableQuery,
getTargetTableName(tableNameString));
clientQueryAsyncStreamResult = (ObTableClientQueryAsyncStreamResult) obTableClient
Expand All @@ -584,19 +572,7 @@ public ResultScanner call() throws IOException {
family = entry.getKey();
filter = buildObHTableFilter(scan.getFilter(), scan.getTimeRange(),
scan.getMaxVersions(), entry.getValue());
if (scan.isReversed()) {
obTableQuery = buildObTableQuery(filter, scan.getStopRow(), false,
scan.getStartRow(), true, scan.getBatch());
} else {
obTableQuery = buildObTableQuery(filter, scan.getStartRow(), true,
scan.getStopRow(), false, scan.getBatch());
}
if (scan.isReversed()) { // reverse scan 时设置为逆序
obTableQuery.setScanOrder(ObScanOrder.Reverse);
}

// no support set maxResultSize.
// obTableQuery.setMaxResultSize(scan.getMaxResultSize());
obTableQuery = buildObTableQuery(filter, scan);

request = buildObTableQueryAsyncRequest(obTableQuery,
getTargetTableName(tableNameString, Bytes.toString(family)));
Expand Down Expand Up @@ -824,7 +800,7 @@ private boolean checkAndMutation(byte[] row, byte[] family, byte[] qualifier, Co
keyValueList.addAll(entry.getValue());
}
}
ObTableQuery obTableQuery = buildObTableQuery(filter, row, true, row, true, -1);
ObTableQuery obTableQuery = buildObTableQuery(filter, row, true, row, true);

ObTableBatchOperation batch = buildObTableBatchOperation(keyValueList, false, null);

Expand Down Expand Up @@ -862,7 +838,7 @@ public Result append(Append append) throws IOException {
true, qualifiers);
// the later hbase has supported timeRange
ObHTableFilter filter = buildObHTableFilter(null, null, 1, qualifiers);
ObTableQuery obTableQuery = buildObTableQuery(filter, r, true, r, true, -1);
ObTableQuery obTableQuery = buildObTableQuery(filter, r, true, r, true);
ObTableQueryAndMutate queryAndMutate = new ObTableQueryAndMutate();
queryAndMutate.setTableQuery(obTableQuery);
queryAndMutate.setMutations(batchOperation);
Expand Down Expand Up @@ -921,7 +897,7 @@ public Result increment(Increment increment) throws IOException {
ObHTableFilter filter = buildObHTableFilter(null, increment.getTimeRange(), 1,
qualifiers);

ObTableQuery obTableQuery = buildObTableQuery(filter, rowKey, true, rowKey, true, -1);
ObTableQuery obTableQuery = buildObTableQuery(filter, rowKey, true, rowKey, true);
ObTableQueryAndMutate queryAndMutate = new ObTableQueryAndMutate();
queryAndMutate.setMutations(batch);
queryAndMutate.setTableQuery(obTableQuery);
Expand Down Expand Up @@ -971,7 +947,7 @@ public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, lo

ObHTableFilter filter = buildObHTableFilter(null, null, 1, qualifiers);

ObTableQuery obTableQuery = buildObTableQuery(filter, row, true, row, true, -1);
ObTableQuery obTableQuery = buildObTableQuery(filter, row, true, row, true);
ObTableQueryAndMutate queryAndMutate = new ObTableQueryAndMutate();
queryAndMutate.setMutations(batch);
queryAndMutate.setTableQuery(obTableQuery);
Expand Down Expand Up @@ -1373,8 +1349,7 @@ private ObHTableFilter buildObHTableFilter(String filterString, TimeRange timeRa
}

private ObTableQuery buildObTableQuery(ObHTableFilter filter, byte[] start,
boolean includeStart, byte[] stop, boolean includeStop,
int batchSize) {
boolean includeStart, byte[] stop, boolean includeStop) {
ObNewRange obNewRange = new ObNewRange();

if (Arrays.equals(start, HConstants.EMPTY_BYTE_ARRAY)) {
Expand All @@ -1394,23 +1369,37 @@ private ObTableQuery buildObTableQuery(ObHTableFilter filter, byte[] start,
} else {
obNewRange.setEndKey(ObRowKey.getInstance(stop, ObObj.getMin(), ObObj.getMin()));
}

return buildObTableQuery(filter, obNewRange, batchSize);
}

private ObTableQuery buildObTableQuery(ObHTableFilter filter, ObNewRange obNewRange,
int batchSize) {
ObTableQuery obTableQuery = new ObTableQuery();
obTableQuery.setIndexName("PRIMARY");
obTableQuery.sethTableFilter(filter);
for (String column : ALL_COLUMNS) {
obTableQuery.addSelectColumn(column);
}
if (obNewRange != null) {
obTableQuery.addKeyRange(obNewRange);
obTableQuery.addKeyRange(obNewRange);

return obTableQuery;
}

private ObTableQuery buildObTableQuery(ObHTableFilter filter, final Scan scan) {
ObTableQuery obTableQuery;
if (scan.getMaxResultsPerColumnFamily() > 0) {
filter.setLimitPerRowPerCf(scan.getMaxResultsPerColumnFamily());
}
if (scan.getRowOffsetPerColumnFamily() > 0) {
filter.setOffsetPerRowPerCf(scan.getRowOffsetPerColumnFamily());
}
if (scan.isReversed()) {
obTableQuery = buildObTableQuery(filter, scan.getStopRow(), false, scan.getStartRow(),
true);
} else {
obTableQuery = buildObTableQuery(filter, scan.getStartRow(), true, scan.getStopRow(),
false);
}
if (scan.isReversed()) { // reverse scan 时设置为逆序
obTableQuery.setScanOrder(ObScanOrder.Reverse);
}
if (batchSize > 0) {
obTableQuery.setBatchSize(batchSize);
if (scan.getBatch() > 0) {
obTableQuery.setBatchSize(scan.getBatch());
}
return obTableQuery;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.query.AbstractQueryStreamResult;
import com.alipay.oceanbase.rpc.stream.ObTableClientQueryAsyncStreamResult;
import com.alipay.oceanbase.rpc.stream.ObTableClientQueryStreamResult;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.AbstractClientScanner;
import org.apache.hadoop.hbase.client.Result;
Expand All @@ -36,6 +37,7 @@

import static com.alipay.oceanbase.hbase.util.TableHBaseLoggerFactory.LCD;

@InterfaceAudience.Private
public class ClientStreamScanner extends AbstractClientScanner {

private static final Logger logger = TableHBaseLoggerFactory
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public class OHConnectionImpl implements Connection {

private static final Marker FATAL = MarkerFactory.getMarker("FATAL");

static final int BUFFERED_PARAM_UNSET = -1;
private static final int BUFFERED_PARAM_UNSET = -1;

private volatile boolean closed;
private volatile boolean aborted;
Expand Down
2 changes: 1 addition & 1 deletion src/main/resources/hbase-site.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,4 @@
<name>hbase.client.connection.impl</name>
<value>com.alipay.oceanbase.hbase.util.OHConnectionImpl</value>
</property>
</configuration>
</configuration>
117 changes: 114 additions & 3 deletions src/test/java/com/alipay/oceanbase/hbase/HTableTestBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package com.alipay.oceanbase.hbase;

import com.alipay.oceanbase.hbase.exception.FeatureNotSupportedException;

import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HConstants;
Expand All @@ -41,8 +40,7 @@

import static org.apache.hadoop.hbase.filter.FilterList.Operator.MUST_PASS_ONE;
import static org.apache.hadoop.hbase.util.Bytes.toBytes;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import static org.junit.Assert.*;

public abstract class HTableTestBase {

Expand Down Expand Up @@ -399,6 +397,8 @@ public void testMultiPartitionPut() throws IOException {
}
}



@Test
public void testMultiPartitionDel() throws IOException {
String[] keys = new String[] { "putKey1", "putKey2", "putKey3", "putKey4", "putKey5",
Expand Down Expand Up @@ -1026,6 +1026,87 @@ public void testGetFilter() throws Exception {
Assert.assertEquals(1, r.raw().length);
}

@Test
public void testScanSessionClean() throws Exception {
String key1 = "bKey";
String key2 = "cKey";
String key3 = "dKey";
String key4 = "eKey";
String key5 = "fKey";
String column1 = "column1";
String column2 = "column2";
String value1 = "value1";
String family = "family1";

// delete previous data
Delete deleteKey1Family = new Delete(toBytes(key1));
deleteKey1Family.deleteFamily(toBytes(family));
Delete deleteKey2Family = new Delete(toBytes(key2));
deleteKey2Family.deleteFamily(toBytes(family));
Delete deleteKey3Family = new Delete(toBytes(key3));
deleteKey3Family.deleteFamily(toBytes(family));
Delete deleteKey4Family = new Delete(toBytes(key4));
deleteKey4Family.deleteFamily(toBytes(family));
Delete deleteKey5Family = new Delete(toBytes(key5));
deleteKey5Family.deleteFamily(toBytes(family));

hTable.delete(deleteKey1Family);
hTable.delete(deleteKey2Family);
hTable.delete(deleteKey3Family);
hTable.delete(deleteKey4Family);
hTable.delete(deleteKey5Family);

Put putKey1Column1Value1 = new Put(toBytes(key1));
putKey1Column1Value1.add(toBytes(family), toBytes(column1), toBytes(value1));

Put putKey2Column2Value1 = new Put(toBytes(key2));
putKey2Column2Value1.add(toBytes(family), toBytes(column2), toBytes(value1));

Put putKey3Column2Value1 = new Put(toBytes(key3));
putKey3Column2Value1.add(toBytes(family), toBytes(column2), toBytes(value1));

Put putKey4Column2Value1 = new Put(toBytes(key4));
putKey4Column2Value1.add(toBytes(family), toBytes(column2), toBytes(value1));

Put putKey5Column2Value1 = new Put(toBytes(key5));
putKey5Column2Value1.add(toBytes(family), toBytes(column2), toBytes(value1));

tryPut(hTable, putKey1Column1Value1);
tryPut(hTable, putKey1Column1Value1);
tryPut(hTable, putKey2Column2Value1);
tryPut(hTable, putKey3Column2Value1);
tryPut(hTable, putKey4Column2Value1);
tryPut(hTable, putKey5Column2Value1);

Scan scan;
scan = new Scan();
scan.addFamily(family.getBytes());
scan.setMaxVersions(10);
scan.setBatch(1);

ResultScanner scanner = hTable.getScanner(scan);
scanner.next();

// The server defaults to a lease of 60 seconds. Therefore, at 20 seconds,
// the transaction is checked to ensure it has not rolled back, and the lease is updated.
// At 55 seconds, the query should still be able to retrieve the data and update the lease.
// If it exceeds 60 seconds (at 61 seconds), the session is deleted.
Thread.sleep(20 * 1000);
scanner.next();

Thread.sleep(55 * 1000);
scanner.next();

Thread.sleep(61 * 1000);
try {
scanner.next();
} catch (IOException e) {
assertTrue(e.getCause().getMessage().contains("OB_HASH_NOT_EXIST"));
} finally {
scanner.close();
}
}

@Test
public void testScan() throws Exception {
String key1 = "scanKey1x";
Expand Down Expand Up @@ -1176,6 +1257,36 @@ public void testScan() throws Exception {
Assert.assertEquals(res_count, 7);
scanner.close();

scan = new Scan();
scan.addFamily(family.getBytes());
scan.setStartRow("scanKey1x".getBytes());
scan.setStopRow("scanKey3x".getBytes());
scan.setMaxVersions(10);
scan.setMaxResultsPerColumnFamily(1);
scanner = hTable.getScanner(scan);
for (Result result : scanner) {
assertEquals(result.rawCells().length,1);
}
scanner.close();

scan = new Scan();
scan.addFamily(family.getBytes());
scan.setStartRow("scanKey1x".getBytes());
scan.setStopRow("scanKey3x".getBytes());
scan.setMaxVersions(10);
scan.setMaxResultsPerColumnFamily(2);
scan.setRowOffsetPerColumnFamily(1);
scanner = hTable.getScanner(scan);
res_count = 0;
for (Result result : scanner) {
for (KeyValue keyValue : result.raw()) {
Arrays.equals(key2.getBytes(), keyValue.getRow());
res_count += 1;
}
}
Assert.assertEquals(res_count, 3);
scanner.close();

// scan with prefixFilter
scan = new Scan();
scan.addFamily(family.getBytes());
Expand Down