Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
d22a241
add metrics and callback
JackShi148 Oct 24, 2025
448751f
add hbaseOpType into request
JackShi148 Oct 24, 2025
7a21a28
fix metrics bug and add test interface
JackShi148 Nov 10, 2025
672530e
adapt batch put to origin hbase 2.x and add test case
JackShi148 Nov 13, 2025
9a42a50
add new jmx test and metrics exporter test
JackShi148 Nov 14, 2025
0cf3a86
hotkey get opt case
maochongxin Nov 11, 2025
1fce32d
support weak read
WeiXinChan Nov 17, 2025
0d01752
review advice
JackShi148 Nov 18, 2025
bce2601
Merge pull request #296 from oceanbase/hbase_metrics_2.0
WeiXinChan Nov 18, 2025
1ddf863
Merge remote-tracking branch 'origin/44x_feat' into obkv_442_features…
WeiXinChan Nov 18, 2025
c6d3510
Merge pull request #299 from WeiXinChan/obkv_442_features_2.0
WeiXinChan Nov 18, 2025
f3b16ff
Merge pull request #297 from oceanbase/hotkey-opt-case-2x
WeiXinChan Nov 26, 2025
ace9d91
adapt weak read new interface
WeiXinChan Nov 27, 2025
4bc5d66
Merge pull request #303 from WeiXinChan/obkv_442_feat_bugfix
WeiXinChan Nov 27, 2025
cd110dc
fix weak read in batch get
WeiXinChan Dec 4, 2025
00dfd90
Merge pull request #304 from WeiXinChan/bug_1204_442
WeiXinChan Dec 4, 2025
6130bd7
change domain to keep the same as 1x
JackShi148 Dec 9, 2025
8c3ebff
fix weak read bug in batch get
WeiXinChan Dec 10, 2025
6e568d2
Merge pull request #306 from WeiXinChan/bug_1209
WeiXinChan Dec 10, 2025
610c03e
fix jmx test bug
JackShi148 Dec 9, 2025
f54d3a1
Merge pull request #307 from oceanbase/adapt_1x_jmx_domain
JackShi148 Dec 10, 2025
5987415
Add client configuration to control hot key optimization enablement
maochongxin Dec 11, 2025
3e750c1
add batch get case
maochongxin Dec 12, 2025
7ea2622
Merge pull request #309 from oceanbase/hot_key_get
maochongxin Dec 12, 2025
463ec28
Merge branch 'hbase_2.0' into obkv_442_features_2.0
JackShi148 Dec 15, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,025 changes: 580 additions & 445 deletions src/main/java/com/alipay/oceanbase/hbase/OHTable.java

Large diffs are not rendered by default.

7 changes: 7 additions & 0 deletions src/main/java/com/alipay/oceanbase/hbase/OHTableClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package com.alipay.oceanbase.hbase;

import com.alipay.oceanbase.hbase.core.Lifecycle;
import com.alipay.oceanbase.hbase.metrics.OHMetrics;
import com.google.protobuf.Descriptors;
import com.google.protobuf.Message;
import com.google.protobuf.Service;
Expand All @@ -30,6 +31,7 @@
import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;

import java.io.IOException;
import java.util.List;
Expand Down Expand Up @@ -411,4 +413,9 @@ public Pair<byte[][], byte[][]> getStartEndKeys() throws IOException {
checkStatus();
return this.ohTable.getStartEndKeys();
}

@VisibleForTesting
public OHMetrics getMetrics() {
return ohTable.getMetrics();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,32 @@ public final class OHConstants {
*/
public static final String HBASE_HTABLE_QUERY_HOT_ONLY = "hbase.htable.query.hot_only";

/**
* use to specify the read consistency when performing a query.
*/
public static final String HBASE_HTABLE_READ_CONSISTENCY = "hbase.htable.read.consistency";


/**
* use to specify whether to enable the hotkey get optimize when performing a query.
*/
public static final String HBASE_HTABLE_HOTKEY_GET_OPTIMIZE_ENABLE = "hbase.htable.hotkey.get.optimize.enable";

/**
* 开启后不指定时间戳的读写都将以此时间戳进行写入和查询, 默认为兼容原行为
*/
public static final String HBASE_HTABLE_HOTKEY_GET_OPTIMIZE_ENABLE_GLOBAL = "hbase.htable.hotkey.get.optimize.enable.global";

/**
* use to specify the idc when performing a query.
*/
public static final String HBASE_HTABLE_CLIENT_IDC = "hbase.htable.client.idc";

/**
* use to specify the route policy when performing a query.
*/
public static final String HBASE_HTABLE_CLIENT_ROUTE_POLICY = "hbase.htable.client.route.policy";

/*-------------------------------------------------------------------------------------------------------------*/

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import com.alipay.oceanbase.hbase.util.OHBaseExceptionUtil;
import com.alipay.oceanbase.rpc.ObTableClient;
import com.alipay.oceanbase.rpc.exception.ObTableException;
import com.alipay.oceanbase.rpc.meta.ObTableMetaRequest;
import com.alipay.oceanbase.rpc.meta.ObTableMetaResponse;
import com.alipay.oceanbase.rpc.table.ObTable;
Expand Down
150 changes: 150 additions & 0 deletions src/main/java/com/alipay/oceanbase/hbase/metrics/MetricsExporter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
package com.alipay.oceanbase.hbase.metrics;

import com.codahale.metrics.Snapshot;
import com.codahale.metrics.Timer;

public class MetricsExporter {
private final long totalOpCount;
private final double averageOps;
private final double oneMinuteAverageOps;
private final double fiveMinuteAverageOps;
private final double fifteenMinuteAverageOps;
private final double averageLatency; // ms
private final long maxLatency; // ms
private final long minLatency; // ms
private final double medianLatency; // ms
private final double P75thPercentile; // ms
private final double P95thPercentile; // ms
private final double P98thPercentile; // ms
private final double P99thPercentile; // ms
private final double P999thPercentile; // ms
private long failCount;
private long totalRuntime; // ms
private double failRate;
private double averageSingleOpCount;

public MetricsExporter(Timer latencyHistogram) {
this.totalOpCount = latencyHistogram.getCount();
this.averageOps = latencyHistogram.getMeanRate();
this.oneMinuteAverageOps = latencyHistogram.getOneMinuteRate();
this.fiveMinuteAverageOps = latencyHistogram.getFiveMinuteRate();
this.fifteenMinuteAverageOps = latencyHistogram.getFifteenMinuteRate();
Snapshot snapshot = latencyHistogram.getSnapshot();
// Time unit of duration stored in Timer is nanosecond, convert it to millisecond
double nanosecondsToMilliseconds = 1_000_000.0;
this.averageLatency = snapshot.getMean() / nanosecondsToMilliseconds;
this.maxLatency = (long) (snapshot.getMax() / nanosecondsToMilliseconds);
this.minLatency = (long) (snapshot.getMin() / nanosecondsToMilliseconds);
this.medianLatency = snapshot.getMedian() / nanosecondsToMilliseconds;
this.P75thPercentile = snapshot.get75thPercentile() / nanosecondsToMilliseconds;
this.P95thPercentile = snapshot.get95thPercentile() / nanosecondsToMilliseconds;
this.P98thPercentile = snapshot.get98thPercentile() / nanosecondsToMilliseconds;
this.P99thPercentile = snapshot.get99thPercentile() / nanosecondsToMilliseconds;
this.P999thPercentile = snapshot.get999thPercentile() / nanosecondsToMilliseconds;
this.failRate = 0;
this.failCount = 0L;
this.totalRuntime = 0L;
this.averageSingleOpCount = 0;
}

public void setFailRate(double failRate) {
this.failRate = failRate;
}

public void setFailCount(long failCount) {
this.failCount = failCount;
}

public void setTotalRuntime(long totalRuntime) {
this.totalRuntime = totalRuntime;
}

public void setAverageSingleOpCount(double averageSingleOpCount) {
this.averageSingleOpCount = averageSingleOpCount;
}

public long getCount() {
return totalOpCount;
}

public double getAverageOps() {
return averageOps;
}

public double getOneMinuteAverageOps() {
return oneMinuteAverageOps;
}

public double getFiveMinuteAverageOps() {
return fiveMinuteAverageOps;
}

public double getFifteenMinuteAverageOps() {
return fifteenMinuteAverageOps;
}

public double getFailRate() {
return failRate;
}

public long getFailCount() {
return failCount;
}

public long getTotalRuntime() {
return totalRuntime;
}

public double getAverageSingleOpCount() {
return averageSingleOpCount;
}

public double getAverageLatency() {
return averageLatency;
}

public long getMaxLatency() {
return maxLatency;
}

public long getMinLatency() {
return minLatency;
}

public double getMedian() {
return medianLatency;
}

public double get75thPercentile() {
return P75thPercentile;
}

public double get95thPercentile() {
return P95thPercentile;
}

public double get98thPercentile() {
return P98thPercentile;
}

public double get99thPercentile() {
return P99thPercentile;
}

public double get999thPercentile() {
return P999thPercentile;
}

public static MetricsExporter getInstanceOf(double averageSingleOpCount,
double failRate,
long failCount,
long totalRuntime,
Timer latencyHistogram) {
MetricsExporter exporter = new MetricsExporter(latencyHistogram);
exporter.setAverageSingleOpCount(averageSingleOpCount);
exporter.setFailRate(failRate);
exporter.setFailCount(failCount);
exporter.setTotalRuntime(totalRuntime);
return exporter;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package com.alipay.oceanbase.hbase.metrics;

public class MetricsImporter {
private boolean isFailedOp;
private long duration;
private long batchSize;

public MetricsImporter() {
this.isFailedOp = false;
this.duration = 0;
this.batchSize = 0;
}

public void setIsFailedOp(boolean isFailedOp) {
this.isFailedOp = isFailedOp;
}

public void setDuration(long duration) {
this.duration = duration;
}

public void setBatchSize(long batchSize) {
this.batchSize = batchSize;
}

public boolean isFailedOp() {
return isFailedOp;
}

public long getDuration() {
return duration;
}

public long getBatchSize() {
return batchSize;
}
}
89 changes: 89 additions & 0 deletions src/main/java/com/alipay/oceanbase/hbase/metrics/OHMetrics.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package com.alipay.oceanbase.hbase.metrics;

import com.alipay.oceanbase.hbase.util.TableHBaseLoggerFactory;
import com.alipay.oceanbase.rpc.location.model.partition.ObPair;
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.OHOperationType;
import com.codahale.metrics.JmxReporter;
import com.codahale.metrics.MetricRegistry;
import org.slf4j.Logger;

import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class OHMetrics {
private static final Logger logger = TableHBaseLoggerFactory.getLogger(OHMetrics.class);
private final String metricsName;
private final MetricRegistry registry;
private final JmxReporter reporter;
private static OHMetricsTracker[] trackers;
private final ConcurrentLinkedQueue<ObPair<OHOperationType, MetricsImporter>> metricsQueue = new ConcurrentLinkedQueue<>();
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);

public OHMetrics(String metricsName) {
this.metricsName = metricsName;
this.registry = new MetricRegistry();
trackers = new OHMetricsTracker[OHOperationType.values().length - 1];
// OHOperationType(0) is INVALID, skip it
for (int i = 1; i <= trackers.length; ++i) {
OHOperationType opType = OHOperationType.valueOf(i);
trackers[i - 1] = new OHMetricsTracker(this.registry,
metricsName,
opType);
}
this.reporter = JmxReporter.forRegistry(this.registry)
.inDomain("com.alipay.oceanbase.hbase.metrics")
.build();
this.reporter.start();
scheduler.scheduleWithFixedDelay(this::updateMetrics, 0, 10, TimeUnit.SECONDS);
}
// get the size of current queue,only update these metrics to corresponding trackers, ignore those concurrently added metrics
private void updateMetrics() {
try {
long size = metricsQueue.size();
for (long i = 0; i < size; ++i) {
ObPair<OHOperationType, MetricsImporter> pair = null;
if ((pair = metricsQueue.poll()) != null) {
OHMetricsTracker tracker = getTracker(pair.getLeft());
MetricsImporter importer = pair.getRight();
tracker.update(importer);
} else {
break;
}
}
} catch (Exception e) {
logger.warn("update metrics meets exception", e);
}
}

private OHMetricsTracker getTracker(OHOperationType opType) {
return trackers[opType.getValue() - 1];
}

public String getMetricsName() { return this.metricsName; }

// add metrics into queue asynchronously
public void update(ObPair<OHOperationType, MetricsImporter> importerPair) {
metricsQueue.add(importerPair);
}
// OPS,RT,P99,failures
public MetricsExporter acquireMetrics(OHOperationType opType) {
OHMetricsTracker tracker = getTracker(opType);
return tracker.acquireMetrics();
}

public void stop() {
reporter.stop();
try {
scheduler.shutdown();
// wait at most 500 ms to close the scheduler
if (!scheduler.awaitTermination(500, TimeUnit.MILLISECONDS)) {
scheduler.shutdownNow();
}
} catch (InterruptedException e) {
logger.warn("scheduler await for terminate interrupted: {}.", e.getMessage());
scheduler.shutdownNow();
}
}
}
Loading