Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
219 changes: 88 additions & 131 deletions src/main/java/com/alipay/oceanbase/hbase/OHTable.java

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion src/main/java/com/alipay/oceanbase/hbase/OHTablePool.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.google.protobuf.Service;
import com.google.protobuf.ServiceException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
Expand Down Expand Up @@ -67,7 +68,7 @@ public class OHTablePool implements Closeable {
* Default Constructor. Default HBaseConfiguration and no limit on pool size.
*/
public OHTablePool() {
this(new Configuration(), Integer.MAX_VALUE);
this(HBaseConfiguration.create(), Integer.MAX_VALUE);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,10 +143,6 @@ public final class OHConstants {

public static final String MAX_KEYVALUE_SIZE_KEY = "hbase.client.keyvalue.maxsize";

public static final String HBASE_HTABLE_PUT_WRITE_BUFFER_CHECK = "hbase.htable.put.write.buffer.check";

public static final int DEFAULT_HBASE_HTABLE_PUT_WRITE_BUFFER_CHECK = 10;

public static final long WRITE_BUFFER_SIZE_DEFAULT = 2097152L;

public static final int MAX_KEYVALUE_SIZE_DEFAULT = -1;
Expand Down
17 changes: 17 additions & 0 deletions src/main/java/com/alipay/oceanbase/hbase/util/BatchError.java
Original file line number Diff line number Diff line change
@@ -1,3 +1,20 @@
/*-
* #%L
* com.oceanbase:obkv-hbase-client
* %%
* Copyright (C) 2022 - 2025 OceanBase Group
* %%
* OBKV HBase Client Framework is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
* http://license.coscl.org.cn/MulanPSL2
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* #L%
*/

package com.alipay.oceanbase.hbase.util;

import org.apache.hadoop.hbase.ServerName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@
package com.alipay.oceanbase.hbase.util;

import com.alipay.oceanbase.hbase.OHTable;
import com.google.common.annotations.VisibleForTesting;
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableBatchOperation;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.slf4j.Logger;
Expand All @@ -31,7 +32,9 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import static com.alipay.oceanbase.rpc.util.TableClientLoggerFactory.LCD;

import static com.alipay.oceanbase.hbase.util.TableHBaseLoggerFactory.LCD;
import static com.alipay.oceanbase.rpc.ObGlobal.*;

@InterfaceAudience.Private
public class OHBufferedMutatorImpl implements BufferedMutator {
Expand All @@ -40,13 +43,11 @@ public class OHBufferedMutatorImpl implements BufferedMutator {

private final ExceptionListener listener;

private final OHTable ohTable;
private final TableName tableName;
private volatile Configuration conf;

@VisibleForTesting
private OHTable ohTable;
final ConcurrentLinkedQueue<Mutation> asyncWriteBuffer = new ConcurrentLinkedQueue<Mutation>();
@VisibleForTesting
AtomicLong currentAsyncBufferSize = new AtomicLong(0);

private long writeBufferSize;
Expand All @@ -55,9 +56,11 @@ public class OHBufferedMutatorImpl implements BufferedMutator {
private final ExecutorService pool;
private int rpcTimeout;
private int operationTimeout;
private static final long OB_VERSION_4_2_5_1 = calcVersion(4, (short) 2,
(byte) 5, (byte) 1);

public OHBufferedMutatorImpl(OHConnectionImpl ohConnection, BufferedMutatorParams params)
throws IOException {
public OHBufferedMutatorImpl(OHConnectionImpl ohConnection, BufferedMutatorParams params,
OHTable ohTable) throws IOException {
if (ohConnection == null || ohConnection.isClosed()) {
throw new IllegalArgumentException("Connection is null or closed.");
}
Expand All @@ -77,7 +80,11 @@ public OHBufferedMutatorImpl(OHConnectionImpl ohConnection, BufferedMutatorParam
.getMaxKeyValueSize() : connectionConfig.getMaxKeyValueSize();

// create an OHTable object to do batch work
this.ohTable = new OHTable(tableName, ohConnection, connectionConfig, pool);
if (ohTable != null) {
this.ohTable = ohTable;
} else {
this.ohTable = new OHTable(tableName, ohConnection, connectionConfig, pool);
}
}

@Override
Expand Down Expand Up @@ -119,14 +126,12 @@ public void mutate(List<? extends Mutation> mutations) throws IOException {
validateOperation(m);
toAddSize += m.heapSize();
}

currentAsyncBufferSize.addAndGet(toAddSize);
asyncWriteBuffer.addAll(mutations);

if (currentAsyncBufferSize.get() > writeBufferSize) {
execute(false);
batchExecute(false);
}

}

/**
Expand All @@ -142,10 +147,18 @@ private void validateOperation(Mutation mt) throws IllegalArgumentException {
}
if (mt instanceof Put) {
// family empty check is in validatePut
HTable.validatePut((Put) mt, maxKeyValueSize);
OHTable.checkFamilyViolation(mt.getFamilyMap().keySet(), true);
OHTable.validatePut((Put) mt, maxKeyValueSize);
if (isMultiFamilySupport()) {
OHTable.checkFamilyViolation(mt.getFamilyMap().keySet(), true);
} else {
OHTable.checkFamilyViolationForOneFamily(mt.getFamilyMap().keySet());
}
} else {
OHTable.checkFamilyViolation(mt.getFamilyMap().keySet(), false);
if (isMultiFamilySupport()) {
OHTable.checkFamilyViolation(mt.getFamilyMap().keySet(), false);
} else {
OHTable.checkFamilyViolationForOneFamily(mt.getFamilyMap().keySet());
}
}
}

Expand All @@ -156,7 +169,7 @@ private void validateOperation(Mutation mt) throws IllegalArgumentException {
* @param flushAll - if true, sends all the writes and wait for all of them to finish before
* returning.
*/
private void execute(boolean flushAll) throws IOException {
private void batchExecute(boolean flushAll) throws IOException {
LinkedList<Mutation> execBuffer = new LinkedList<>();
long dequeuedSize = 0L;
try {
Expand All @@ -172,19 +185,16 @@ private void execute(boolean flushAll) throws IOException {
if (execBuffer.isEmpty()) {
return;
}
ohTable.batch(execBuffer);
Object[] results = new Object[execBuffer.size()];
ohTable.batch(execBuffer, results);
// if commit all successfully, clean execBuffer
execBuffer.clear();
} catch (Exception ex) {
LOGGER.error(LCD.convert("01-00026"), ex);
// do not recollect error operations, notify outside
LOGGER.error("error happens: table name = ", tableName.getNameAsString(), ex);
if (ex.getCause() instanceof RetriesExhaustedWithDetailsException) {
LOGGER.error(tableName + ": One or more of the operations have failed after retries.");
LOGGER.error(tableName.getNameAsString() + ": One or more of the operations have failed after retries.", ex);
RetriesExhaustedWithDetailsException retryException = (RetriesExhaustedWithDetailsException) ex.getCause();
// recollect mutations
execBuffer.clear();
for (int i = 0; i < retryException.getNumExceptions(); ++i) {
execBuffer.add((Mutation) retryException.getRow(i));
}
if (listener != null) {
listener.onException(retryException, this);
} else {
Expand All @@ -194,12 +204,6 @@ private void execute(boolean flushAll) throws IOException {
LOGGER.error("Errors unrelated to operations occur during mutation operation", ex);
throw ex;
}
} finally {
for (Mutation mutation : execBuffer) {
long size = mutation.heapSize();
currentAsyncBufferSize.addAndGet(size);
asyncWriteBuffer.add(mutation);
}
}
}

Expand All @@ -209,7 +213,7 @@ public void close() throws IOException {
return;
}
try {
execute(true);
batchExecute(true);
} finally {
// the pool in ObTableClient will be shut down too
this.pool.shutdown();
Expand All @@ -234,13 +238,21 @@ public void setWriteBufferSize(long writeBufferSize) throws IOException {
}
}

/**
* Only 4_2_5 BP1 - 4_3_0 and after 4_3_4 support multi-cf
* */
boolean isMultiFamilySupport() {
return (OB_VERSION >= OB_VERSION_4_2_5_1 && OB_VERSION < OB_VERSION_4_3_0_0)
|| (OB_VERSION >= OB_VERSION_4_3_4_0);
}

/**
* Force to commit all operations
* do not care whether the pool is shut down or this BufferedMutator is closed
*/
@Override
public void flush() throws IOException {
execute(true);
batchExecute(true);
}

@Override
Expand All @@ -258,6 +270,10 @@ public void setOperationTimeout(int operationTimeout) {
this.ohTable.setOperationTimeout(operationTimeout);
}

public long getCurrentBufferSize() {
return currentAsyncBufferSize.get();
}

@Deprecated
public List<Row> getWriteBuffer() {
return Arrays.asList(asyncWriteBuffer.toArray(new Row[0]));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,11 @@ public BufferedMutator getBufferedMutator(TableName tableName) throws IOExceptio

@Override
public BufferedMutator getBufferedMutator(BufferedMutatorParams params) throws IOException {
return getBufferedMutator(params, null);
}

public BufferedMutator getBufferedMutator(BufferedMutatorParams params, OHTable ohTable)
throws IOException {
if (params.getTableName() == null) {
throw new IllegalArgumentException("TableName cannot be null.");
}
Expand All @@ -138,7 +143,7 @@ public BufferedMutator getBufferedMutator(BufferedMutatorParams params) throws I
if (params.getMaxKeyValueSize() == BUFFERED_PARAM_UNSET) {
params.maxKeyValueSize(connectionConfig.getMaxKeyValueSize());
}
return new OHBufferedMutatorImpl(this, params);
return new OHBufferedMutatorImpl(this, params, ohTable);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.alipay.oceanbase.hbase.OHTablePool;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTableFactory;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.util.Bytes;
Expand Down Expand Up @@ -170,7 +171,7 @@ private Configuration adjustConfiguration(Configuration configuration, String ta
}

private Configuration copyConfiguration(Configuration origin) {
Configuration copy = new Configuration();
Configuration copy = HBaseConfiguration.create();

for (Map.Entry<String, String> entry : origin) {
copy.set(entry.getKey(), entry.getValue());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@
import com.alipay.oceanbase.rpc.constant.Constants;
import com.google.common.base.Objects;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.ConnectionConfiguration;

import java.io.IOException;
import java.util.Map;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5189,7 +5189,7 @@ public void testCellTTL() throws Exception {
try {
tryPut(hTable, errorPut);
} catch (Exception e) {
assertTrue(e.getCause().toString().contains("Unknown column 'TTL'"));
assertTrue(e.getCause().getCause().toString().contains("Unknown column 'TTL'"));
}
// test put and get
tryPut(hTable, put1);
Expand Down
10 changes: 8 additions & 2 deletions src/test/java/com/alipay/oceanbase/hbase/NativeHBaseTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;

Expand Down Expand Up @@ -39,8 +40,13 @@ public void cleanData() throws IOException {

@AfterClass
public static void finish() throws IOException {
hTable.close();
multiCfHTable.close();
try {
hTable.close();
multiCfHTable.close();
} catch (Exception e) {
Assert.assertSame(e.getClass(), IOException.class);
Assert.assertTrue(e.getMessage().contains("put table"));
}
}

}
14 changes: 10 additions & 4 deletions src/test/java/com/alipay/oceanbase/hbase/OHTableClientTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.alipay.oceanbase.hbase.util.ObHTableTestUtil;
import org.junit.*;

import java.io.IOException;
import java.util.LinkedList;
import java.util.List;

Expand All @@ -37,7 +38,7 @@ public static void before() throws Exception {
((OHTableClient) multiCfHTable).init();
List<String> tableGroups = new LinkedList<>();
tableGroups.add("test");
tableGroups.add("test_multi_cf");
// tableGroups.add("test_multi_cf");
ObHTableTestUtil.prepareClean(tableGroups);
}

Expand Down Expand Up @@ -77,8 +78,13 @@ public void testNew() throws Exception {

@AfterClass
public static void finish() throws Exception {
hTable.close();
multiCfHTable.close();
ObHTableTestUtil.closeConn();
try {
hTable.close();
multiCfHTable.close();
ObHTableTestUtil.closeConn();
} catch (Exception e) {
Assert.assertSame(e.getClass(), IOException.class);
Assert.assertTrue(e.getMessage().contains("put table"));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,14 @@ public void test_refresh_table_entry() throws Exception {

@AfterClass
public static void after() throws Exception {
hTable.close();
multiCfHTable.close();
ObHTableTestUtil.closeConn();
try {
hTable.close();
multiCfHTable.close();
ObHTableTestUtil.closeConn();
} catch (Exception e) {
Assert.assertSame(e.getClass(), IOException.class);
Assert.assertTrue(e.getMessage().contains("put table"));
}
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,8 +167,13 @@ public void testNew() throws IOException {

@AfterClass
public static void finish() throws IOException, SQLException {
hTable.close();
multiCfHTable.close();
ObHTableTestUtil.closeConn();
try {
hTable.close();
multiCfHTable.close();
ObHTableTestUtil.closeConn();
} catch (Exception e) {
Assert.assertSame(e.getClass(), IOException.class);
Assert.assertTrue(e.getMessage().contains("put table"));
}
}
}
Loading