Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
176 changes: 146 additions & 30 deletions src/main/java/com/alipay/oceanbase/hbase/OHTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
import java.io.IOException;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;

import static com.alipay.oceanbase.hbase.constants.OHConstants.*;
import static com.alipay.oceanbase.hbase.util.Preconditions.checkArgument;
Expand All @@ -84,7 +85,6 @@
import static org.apache.commons.lang.StringUtils.isBlank;
import static org.apache.commons.lang.StringUtils.isNotBlank;
import static com.alipay.oceanbase.hbase.filter.HBaseFilterUtils.writeBytesWithEscape;
import static org.apache.hadoop.hbase.KeyValue.Type.*;

public class OHTable implements Table {

Expand Down Expand Up @@ -154,7 +154,7 @@ public class OHTable implements Table {
/**
* the buffer of put request
*/
private final ArrayList<Put> writeBuffer = new ArrayList<Put>();
private final List<Put> writeBuffer = new CopyOnWriteArrayList<>();
/**
* when the put request reach the write buffer size the do put will
* flush commits automatically
Expand All @@ -179,7 +179,7 @@ public class OHTable implements Table {
/**
* current buffer size
*/
private long currentWriteBufferSize;
private AtomicLong currentWriteBufferSize = new AtomicLong(0);

/**
* the max size of put key value
Expand All @@ -195,7 +195,8 @@ public class OHTable implements Table {

private int scannerTimeout;

private RegionLocator regionLocator;
private RegionLocator regionLocator;

/**
* Creates an object to access a HBase table.
* Shares oceanbase table obTableClient and other resources with other OHTable instances
Expand Down Expand Up @@ -460,8 +461,8 @@ private void finishSetUp() {
}

public static OHConnectionConfiguration setUserDefinedNamespace(String tableNameString,
OHConnectionConfiguration ohConnectionConf)
throws IllegalArgumentException {
OHConnectionConfiguration ohConnectionConf)
throws IllegalArgumentException {
if (tableNameString.indexOf(':') != -1) {
String[] params = tableNameString.split(":");
if (params.length != 2) {
Expand Down Expand Up @@ -501,13 +502,15 @@ public Configuration getConfiguration() {

@Override
public HTableDescriptor getTableDescriptor() throws IOException {
OHTableDescriptorExecutor executor = new OHTableDescriptorExecutor(tableNameString, obTableClient);
OHTableDescriptorExecutor executor = new OHTableDescriptorExecutor(tableNameString,
obTableClient);
return executor.getTableDescriptor();
}

@Override
public TableDescriptor getDescriptor() throws IOException {
OHTableDescriptorExecutor executor = new OHTableDescriptorExecutor(tableNameString, obTableClient);
OHTableDescriptorExecutor executor = new OHTableDescriptorExecutor(tableNameString,
obTableClient);
return executor.getTableDescriptor();
}

Expand Down Expand Up @@ -701,7 +704,7 @@ private void compatOldServerBatch(final List<? extends Row> actions, final Objec

@Override
public void batch(final List<? extends Row> actions, final Object[] results) throws IOException {
if (actions == null) {
if (actions == null || actions.isEmpty()) {
return;
}
if (results != null) {
Expand All @@ -718,6 +721,19 @@ public void batch(final List<? extends Row> actions, final Object[] results) thr
} catch (Exception e) {
throw new IOException(tableNameString + " table occurred unexpected error." , e);
}
} else if (OHBaseFuncUtils.isAllPut(actions) && OHBaseFuncUtils.isHBasePutPefSupport(obTableClient)) {
// only support Put now
ObHbaseRequest request = buildHbaseRequest(actions);
try {
ObHbaseResult result = (ObHbaseResult) obTableClient.execute(request);
if (results != null) {
for (int i = 0; i < results.length; ++i) {
results[i] = new Result();
}
}
} catch (Exception e) {
throw new IOException(tableNameString + " table occurred unexpected error." , e);
}
} else {
String realTableName = getTargetTableName(actions);
BatchOperation batch = buildBatchOperation(realTableName, actions,
Expand All @@ -726,7 +742,7 @@ public void batch(final List<? extends Row> actions, final Object[] results) thr
try {
tmpResults = batch.execute();
} catch (Exception e) {
throw new IOException(tableNameString + " table occurred unexpected error." , e);
throw new IOException(tableNameString + " table occurred unexpected error.", e);
}
int index = 0;
for (int i = 0; i != actions.size(); ++i) {
Expand Down Expand Up @@ -994,6 +1010,8 @@ public Result call() throws IOException {
if (get.isCheckExistenceOnly()) {
return Result.create(null, !keyValueList.isEmpty());
}
// sort keyValues
OHBaseFuncUtils.sortHBaseResult(keyValueList);
return Result.create(keyValueList);
}
};
Expand Down Expand Up @@ -1227,16 +1245,24 @@ private void doPut(List<Put> puts) throws IOException {
validatePut(put);
checkFamilyViolation(put.getFamilyCellMap().keySet(), true);
writeBuffer.add(put);
currentWriteBufferSize += put.heapSize();
currentWriteBufferSize.addAndGet(put.heapSize());

// we need to periodically see if the writebuffer is full instead of waiting until the end of the List
n++;
if (n % putWriteBufferCheck == 0 && currentWriteBufferSize > writeBufferSize) {
flushCommits();
if (n % putWriteBufferCheck == 0 && currentWriteBufferSize.get() > writeBufferSize) {
if (OHBaseFuncUtils.isHBasePutPefSupport(obTableClient)) {
flushCommitsV2();
} else {
flushCommits();
}
}
}
if (autoFlush || currentWriteBufferSize > writeBufferSize) {
flushCommits();
if (autoFlush || currentWriteBufferSize.get() > writeBufferSize) {
if (OHBaseFuncUtils.isHBasePutPefSupport(obTableClient)) {
flushCommitsV2();
} else {
flushCommits();
}
}
}

Expand Down Expand Up @@ -1653,12 +1679,45 @@ public void flushCommits() throws IOException {
} finally {
if (clearBufferOnFail) {
writeBuffer.clear();
currentWriteBufferSize = 0;
currentWriteBufferSize.set(0);
} else {
// the write buffer was adjusted by processBatchOfPuts
currentWriteBufferSize.set(0);
for (Put aPut : writeBuffer) {
currentWriteBufferSize.addAndGet(aPut.heapSize());
}
}
}
}

public void flushCommitsV2() throws IOException {
try {
if (writeBuffer.isEmpty()) {
return;
}
try {
ObHbaseRequest request = buildHbaseRequest(writeBuffer);
try {
ObHbaseResult result = (ObHbaseResult) obTableClient.execute(request);
} catch (Exception e) {
throw new IOException(tableNameString + " table occurred unexpected error.", e);
}
} catch (Exception e) {
logger.error(LCD.convert("01-00008"), tableNameString, null, autoFlush,
writeBuffer.size(), e);
if (e instanceof IOException) {
throw (IOException) e;
}
}
} finally {
if (clearBufferOnFail) {
writeBuffer.clear();
currentWriteBufferSize.set(0);
} else {
// the write buffer was adjusted by processBatchOfPuts
currentWriteBufferSize = 0;
currentWriteBufferSize.set(0);
for (Put aPut : writeBuffer) {
currentWriteBufferSize += aPut.heapSize();
currentWriteBufferSize.addAndGet(aPut.heapSize());
}
}
}
Expand Down Expand Up @@ -1884,7 +1943,7 @@ private ObHTableFilter buildObHTableFilter(byte[] filterString, TimeRange timeRa
if (columnQualifier == null) {
obHTableFilter.addSelectColumnQualifier(new byte[0]);
} else {
obHTableFilter.addSelectColumnQualifier(columnQualifier);
obHTableFilter.addSelectColumnQualifier(columnQualifier);
}
}
}
Expand Down Expand Up @@ -1942,11 +2001,11 @@ private ObTableQuery buildObTableQuery(ObHTableFilter filter, final Scan scan) {
filter.setOffsetPerRowPerCf(scan.getRowOffsetPerColumnFamily());
}
if (scan.isReversed()) {
obTableQuery = buildObTableQuery(filter, scan.getStopRow(), scan.includeStopRow(), scan.getStartRow(),
scan.includeStartRow(), true, ts);
obTableQuery = buildObTableQuery(filter, scan.getStopRow(), scan.includeStopRow(),
scan.getStartRow(), scan.includeStartRow(), true, ts);
} else {
obTableQuery = buildObTableQuery(filter, scan.getStartRow(), scan.includeStartRow(), scan.getStopRow(),
scan.includeStopRow(), false, ts);
obTableQuery = buildObTableQuery(filter, scan.getStartRow(), scan.includeStartRow(),
scan.getStopRow(), scan.includeStopRow(), false, ts);
}
obTableQuery.setBatchSize(scan.getBatch());
obTableQuery.setLimit(scan.getLimit());
Expand Down Expand Up @@ -2026,7 +2085,7 @@ private QueryAndMutate buildDeleteQueryAndMutate(KeyValue kv,
KeyValue.Type kvType = KeyValue.Type.codeToType(kv.getType().getCode());
com.alipay.oceanbase.rpc.mutation.Mutation tableMutation = buildMutation(kv, operationType,
isTableGroup, family, TTL);
if(isTableGroup) {
if (isTableGroup) {
// construct new_kv otherwise filter will fail to match targeted columns
byte[] oldQualifier = CellUtil.cloneQualifier(kv);
byte[] newQualifier = new byte[family.length + 1/* length of "." */ + oldQualifier.length];
Expand Down Expand Up @@ -2114,13 +2173,10 @@ private QueryAndMutate buildDeleteQueryAndMutate(KeyValue kv,
range.setEndKey(ObRowKey.getInstance(CellUtil.cloneRow(kv), ObObj.getMax(),
ObObj.getMax()));
if (!isTableGroup) {
filter = buildObHTableFilter(null,
timeRange,
Integer.MAX_VALUE);
filter = buildObHTableFilter(null, timeRange, Integer.MAX_VALUE);
} else {
filter = buildObHTableFilter(null,
timeRange,
Integer.MAX_VALUE, CellUtil.cloneQualifier(kv));
filter = buildObHTableFilter(null, timeRange, Integer.MAX_VALUE,
CellUtil.cloneQualifier(kv));
}
}
break;
Expand Down Expand Up @@ -2318,6 +2374,66 @@ private BatchOperation buildBatchOperation(String tableName, List<? extends Row>
return batch;
}

private ObHbaseRequest buildHbaseRequest(List<? extends Row> actions)
throws FeatureNotSupportedException,
IllegalArgumentException,
IOException {
ObHbaseRequest request = new ObHbaseRequest();
ObTableOperationType opType = null;
List<ObObj> keys = new ArrayList<>();
List<ObHbaseCfRows> cfRowsArray = new ArrayList<>();
Map<String, ObHbaseCfRows> cfRowsMap = new HashMap<>();
int keyIndex = 0;
for (Row row : actions) {
if (row instanceof Put) {
opType = INSERT_OR_UPDATE;
Put put = (Put) row;
if (put.isEmpty()) {
throw new IllegalArgumentException("No columns to put for item");
}
boolean isCellTTL = false;
long ttl = put.getTTL();
if (ttl != Long.MAX_VALUE) {
isCellTTL = true;
}
keys.add(ObObj.getInstance(put.getRow()));
for (Map.Entry<byte[], List<Cell>> entry : put.getFamilyCellMap().entrySet()) {
String family = Bytes.toString(entry.getKey());
ObHbaseCfRows sameCfRows = cfRowsMap.get(family);
if (sameCfRows == null) {
sameCfRows = new ObHbaseCfRows();
String realTableName = getTargetTableName(tableNameString, family, configuration);
sameCfRows.setRealTableName(realTableName);
cfRowsMap.put(family, sameCfRows);
cfRowsArray.add(sameCfRows);
}
List<Cell> keyValueList = entry.getValue();
List<ObHbaseCell> cells = new ArrayList<>();
for (Cell kv : keyValueList) {
ObHbaseCell cell = new ObHbaseCell(isCellTTL);
cell.setQ(ObObj.getInstance(CellUtil.cloneQualifier(kv)));
cell.setT(ObObj.getInstance(-kv.getTimestamp())); // set timestamp as negative
cell.setV(ObObj.getInstance(CellUtil.cloneValue(kv)));
if (isCellTTL) {
cell.setTTL(ObObj.getInstance(ttl));
}
cells.add(cell);
}
sameCfRows.add(keyIndex, cells.size(), cells);
}
} else {
throw new FeatureNotSupportedException(
"not supported other type in batch yet,only support get, put and delete");
}
++keyIndex;
}
request.setTableName(tableNameString);
request.setKeys(keys);
request.setOpType(opType);
request.setCfRows(cfRowsArray);
return request;
}

public static ObTableOperation buildObTableOperation(Cell kv,
ObTableOperationType operationType,
Long TTL) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,8 @@ public Result next() throws IOException {
break;
}
}
// sort keyValues
OHBaseFuncUtils.sortHBaseResult(keyValues);
return Result.create(keyValues);
} catch (Exception e) {
logger.error(LCD.convert("01-00000"), streamResult.getTableName(), e);
Expand Down
59 changes: 59 additions & 0 deletions src/main/java/com/alipay/oceanbase/hbase/util/OHBaseFuncUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,17 @@

package com.alipay.oceanbase.hbase.util;

import com.alipay.oceanbase.rpc.ObGlobal;
import com.alipay.oceanbase.rpc.ObTableClient;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Row;
import org.apache.hadoop.hbase.util.Bytes;

import java.util.Arrays;
import java.util.Comparator;
import java.util.List;

@InterfaceAudience.Private
public class OHBaseFuncUtils {
Expand All @@ -42,4 +50,55 @@ public static byte[][] extractFamilyFromQualifier(byte[] qualifier) throws Excep
byte[] newQualifier = Arrays.copyOfRange(qualifier, familyLen + 1, qualifier.length);
return new byte[][] { family, newQualifier };
}

public static boolean isHBasePutPefSupport(ObTableClient tableClient) {
if (tableClient.isOdpMode()) {
// server version support and distributed capacity is enabled and odp version support
return ObGlobal.isHBasePutPerfSupport()
&& tableClient.getServerCapacity().isSupportDistributedExecute()
&& ObGlobal.OB_PROXY_VERSION >= ObGlobal.OB_PROXY_VERSION_4_3_6_0;
} else {
// server version support and distributed capacity is enabled
return ObGlobal.isHBasePutPerfSupport()
&& tableClient.getServerCapacity().isSupportDistributedExecute();
}
}

public static boolean isAllPut(List<? extends Row> actions) {
boolean isAllPut = true;
for (Row action : actions) {
if (!(action instanceof Put)) {
isAllPut = false;
break;
}
}
return isAllPut;
}

public static void sortHBaseResult(List<Cell> cells) {
cells.sort(new Comparator<Cell>() {
@Override
public int compare(Cell cell1, Cell cell2) {
// 1. sort family in lexicographical order
int familyComparison = Bytes.compareTo(cell1.getFamilyArray(),
cell1.getFamilyOffset(), cell1.getFamilyLength(), cell2.getFamilyArray(),
cell2.getFamilyOffset(), cell2.getFamilyLength());
if (familyComparison != 0) {
return familyComparison;
}

// 2: sort qualifier in lexicographical order
int qualifierComparison = Bytes.compareTo(cell1.getQualifierArray(),
cell1.getQualifierOffset(), cell1.getQualifierLength(),
cell2.getQualifierArray(), cell2.getQualifierOffset(),
cell2.getQualifierLength());
if (qualifierComparison != 0) {
return qualifierComparison;
}

// 3: sort timestamp in descend order
return Long.compare(cell2.getTimestamp(), cell1.getTimestamp());
}
});
}
}
Loading