Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 40 additions & 10 deletions src/main/java/com/alipay/oceanbase/hbase/OHTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,11 @@ public class OHTable implements HTableInterface {
*/
private int maxKeyValueSize;

/**
* whether to enable put optimization path
*/
private boolean enablePutOptimization;

// i.e., doPut checks the writebuffer every X Puts.

/**
Expand All @@ -186,6 +191,12 @@ public class OHTable implements HTableInterface {

private final OHMetrics metrics;

/**
* whether to fill current system time when timestamp is not specified during write operations.
* Default is false (disabled).
*/
private final boolean fillTimestampInClient;

/**
* Creates an object to access a HBase table.
* Shares oceanbase table obTableClient and other resources with other OHTable instances
Expand Down Expand Up @@ -222,7 +233,7 @@ public OHTable(Configuration configuration, String tableName) throws IOException
} else {
this.metrics = null;
}

this.fillTimestampInClient = configuration.getBoolean(HBASE_HTABLE_AUTO_FILL_TIMESTAMP_IN_CLIENT, false);
finishSetUp();
}

Expand Down Expand Up @@ -279,7 +290,7 @@ public OHTable(Configuration configuration, final byte[] tableName,
} else {
this.metrics = null;
}

this.fillTimestampInClient = configuration.getBoolean(HBASE_HTABLE_AUTO_FILL_TIMESTAMP_IN_CLIENT, false);
finishSetUp();
}

Expand Down Expand Up @@ -309,6 +320,7 @@ public OHTable(final byte[] tableName, final ObTableClient obTableClient,
this.obTableClient = obTableClient;
this.configuration = new Configuration();
this.metrics = null;
this.fillTimestampInClient = configuration.getBoolean(HBASE_HTABLE_AUTO_FILL_TIMESTAMP_IN_CLIENT, false);
finishSetUp();
}

Expand Down Expand Up @@ -353,7 +365,7 @@ public OHTable(TableName tableName, Connection connection,
} else {
this.metrics = null;
}

this.fillTimestampInClient = configuration.getBoolean(HBASE_HTABLE_AUTO_FILL_TIMESTAMP_IN_CLIENT, false);
finishSetUp();
}

Expand Down Expand Up @@ -407,6 +419,8 @@ private void finishSetUp() {
MAX_KEYVALUE_SIZE_DEFAULT);
this.writeBufferSize = this.configuration.getLong(WRITE_BUFFER_SIZE_KEY,
WRITE_BUFFER_SIZE_DEFAULT);
this.enablePutOptimization = this.configuration.getBoolean(HBASE_HTABLE_USE_PUT_OPTIMIZATION,
HBASE_HTABLE_USE_PUT_OPTIMIZATION_DEFAULT);
}

public static OHConnectionConfiguration setUserDefinedNamespace(String tableNameString,
Expand Down Expand Up @@ -743,7 +757,7 @@ public void innerBatchImpl(final List<? extends Row> actions, final Object[] res
} catch (Exception e) {
throw new IOException(tableNameString + " table occurred unexpected error." , e);
}
} else if (OHBaseFuncUtils.isAllPut(opType, actions) && OHBaseFuncUtils.isHBasePutPefSupport(obTableClient)) {
} else if (OHBaseFuncUtils.isAllPut(opType, actions) && OHBaseFuncUtils.isHBasePutPefSupport(obTableClient, enablePutOptimization)) {
// only support Put now
ObHbaseRequest request = buildHbaseRequest(actions, opType);
try {
Expand Down Expand Up @@ -1667,7 +1681,7 @@ Long execute() throws IOException {

ObTableBatchOperation batch = new ObTableBatchOperation();
batch.addTableOperation(getInstance(INCREMENT, new Object[] { row, qualifier,
Long.MAX_VALUE }, V_COLUMNS, new Object[] { Bytes.toBytes(amount) }));
getEffectiveTimestampForWrite(Long.MAX_VALUE) }, V_COLUMNS, new Object[] { Bytes.toBytes(amount) }));

ObHTableFilter filter = buildObHTableFilter(null, null, 1, qualifiers);

Expand Down Expand Up @@ -2182,7 +2196,7 @@ public boolean isWeakRead(Object query) {
return "weak".equalsIgnoreCase(consistencyStr);
}

public static ObTableBatchOperation buildObTableBatchOperation(List<Mutation> rowList,
public ObTableBatchOperation buildObTableBatchOperation(List<Mutation> rowList,
List<byte[]> qualifiers) {
ObTableBatchOperation batch = new ObTableBatchOperation();
ObTableOperationType opType;
Expand Down Expand Up @@ -2341,7 +2355,7 @@ private com.alipay.oceanbase.rpc.mutation.Mutation buildMutation(KeyValue kv,
}
return com.alipay.oceanbase.rpc.mutation.Mutation.getInstance(operationType,
ROW_KEY_COLUMNS,
new Object[] { kv.getRow(), kv.getQualifier(), kv.getTimestamp() },
new Object[] { kv.getRow(), kv.getQualifier(), getEffectiveTimestampForWrite(kv.getTimestamp()) },
property_columns, property);
case Delete:
return com.alipay.oceanbase.rpc.mutation.Mutation.getInstance(DEL, ROW_KEY_COLUMNS,
Expand Down Expand Up @@ -2371,6 +2385,22 @@ private com.alipay.oceanbase.rpc.mutation.Mutation buildMutation(KeyValue kv,
}
}


/**
* Get effective timestamp for write operations (Put, Increment, Append).
* If fillTimestampInClient is enabled and timestamp is Long.MAX_VALUE,
* returns current system time, otherwise returns the original timestamp.
*
* @param timestamp the original timestamp
* @return effective timestamp to use
*/
private long getEffectiveTimestampForWrite(long timestamp) {
if (fillTimestampInClient && timestamp == Long.MAX_VALUE) {
return System.currentTimeMillis();
}
return timestamp;
}

private KeyValue modifyQualifier(KeyValue original, byte[] newQualifier) {
// Extract existing components
byte[] row = original.getRow();
Expand Down Expand Up @@ -2579,7 +2609,7 @@ private ObHbaseRequest buildHbaseRequest(List<? extends Row> actions, OHOperatio
for (Cell kv : keyValueList) {
ObHbaseCell cell = new ObHbaseCell(isCellTTL);
cell.setQ(ObObj.getInstance(CellUtil.cloneQualifier(kv)));
cell.setT(ObObj.getInstance(-kv.getTimestamp())); // set timestamp as negative
cell.setT(ObObj.getInstance(-getEffectiveTimestampForWrite(kv.getTimestamp()))); // set timestamp as negative
cell.setV(ObObj.getInstance(CellUtil.cloneValue(kv)));
if (isCellTTL) {
cell.setTTL(ObObj.getInstance(ttl));
Expand All @@ -2603,7 +2633,7 @@ private ObHbaseRequest buildHbaseRequest(List<? extends Row> actions, OHOperatio
return request;
}

public static ObTableOperation buildObTableOperation(KeyValue kv,
public ObTableOperation buildObTableOperation(KeyValue kv,
ObTableOperationType operationType,
Long TTL) {
KeyValue.Type kvType = KeyValue.Type.codeToType(kv.getType());
Expand All @@ -2618,7 +2648,7 @@ public static ObTableOperation buildObTableOperation(KeyValue kv,
return getInstance(
operationType,
new Object[] { CellUtil.cloneRow(kv), CellUtil.cloneQualifier(kv),
kv.getTimestamp() }, property_columns, property);
getEffectiveTimestampForWrite(kv.getTimestamp()) }, property_columns, property);
case Delete:
return getInstance(
DEL,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,18 @@ public final class OHConstants {
* use to specify the route policy when performing a query.
*/
public static final String HBASE_HTABLE_CLIENT_ROUTE_POLICY = "hbase.htable.client.route.policy";

/**
* use to specify whether to automatically fill current system time in client when timestamp is not specified during write operations.
* Default is false (disabled).
*/
public static final String HBASE_HTABLE_AUTO_FILL_TIMESTAMP_IN_CLIENT = "hbase.htable.auto.fill.timestamp.in.client";

/**
* use to specify whether to enable put optimization path.
* Default is true (enabled).
*/
public static final String HBASE_HTABLE_USE_PUT_OPTIMIZATION = "hbase.htable.use.put.optimization";

/*-------------------------------------------------------------------------------------------------------------*/

Expand Down Expand Up @@ -182,4 +194,6 @@ public final class OHConstants {

public static final int DEFAULT_SOCKET_TIMEOUT = 20000; // 20 seconds

public static final boolean HBASE_HTABLE_USE_PUT_OPTIMIZATION_DEFAULT = true;

}
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,12 @@ public static byte[][] extractFamilyFromQualifier(byte[] qualifier) throws Excep
return new byte[][] { family, newQualifier };
}

public static boolean isHBasePutPefSupport(ObTableClient tableClient) {
public static boolean isHBasePutPefSupport(ObTableClient tableClient, boolean enablePutOptimization) {
// If client-side optimization is disabled, return false directly
if (!enablePutOptimization) {
return false;
}

if (tableClient.isOdpMode()) {
// server version support and distributed capacity is enabled and odp version support
return ObGlobal.isHBasePutPerfSupport()
Expand Down
Loading