Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@
import com.alipay.oceanbase.rpc.constant.Constants;
import com.google.common.base.Objects;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.ConnectionConfiguration;


import java.io.IOException;
import java.util.Map;
Expand All @@ -38,7 +37,7 @@
public class ObTableClientManager {

public static final ConcurrentHashMap<ObTableClientKey, ReentrantLock> OB_TABLE_CLIENT_LOCK = new ConcurrentHashMap<ObTableClientKey, ReentrantLock>();
public static final Map<ObTableClientKey, ObTableClient> OB_TABLE_CLIENT_INSTANCE = new ConcurrentHashMap<ObTableClientKey, ObTableClient>();
public static final Map<ObTableClientKey, ObTableClient> OB_TABLE_CLIENT_INSTANCE = new ConcurrentHashMap<>();

public static ObTableClient getOrCreateObTableClient(OHConnectionConfiguration connectionConfig)
throws IllegalArgumentException,
Expand All @@ -58,22 +57,27 @@ public static ObTableClient getOrCreateObTableClient(OHConnectionConfiguration c
obTableClientKey.setDatabase(connectionConfig.getDatabase());
} else {
checkArgument(isNotBlank(connectionConfig.getParamUrl()), HBASE_OCEANBASE_PARAM_URL
+ " is blank");
obTableClientKey = new ObTableClientKey();
String paramUrl = connectionConfig.getParamUrl();
if (!paramUrl.contains("database")) {
paramUrl += "&database=default";
}
obTableClientKey.setParamUrl(paramUrl);
obTableClientKey.setSysUserName(connectionConfig.getSysUsername());
if (connectionConfig.getSysPassword() == null) {
obTableClientKey.setSysPassword(Constants.EMPTY_STRING);
} else {
obTableClientKey.setSysPassword(connectionConfig.getSysPassword());
}
+ " is blank");
obTableClientKey = generateObTableClientKey(connectionConfig);
}
return getOrCreateObTableClient(obTableClientKey, connectionConfig.getRpcConnectTimeout());
}

public static ObTableClientKey generateObTableClientKey(OHConnectionConfiguration connectionConfig) {
ObTableClientKey obTableClientKey = new ObTableClientKey();
String paramUrl = connectionConfig.getParamUrl();
if (!paramUrl.contains("database")) {
paramUrl += "&database=default";
}
obTableClientKey.setParamUrl(paramUrl);
obTableClientKey.setSysUserName(connectionConfig.getSysUsername());
if (connectionConfig.getSysPassword() == null) {
obTableClientKey.setSysPassword(Constants.EMPTY_STRING);
} else {
obTableClientKey.setSysPassword(connectionConfig.getSysPassword());
}
checkArgument(isNotBlank(connectionConfig.getFullUsername()),
HBASE_OCEANBASE_FULL_USER_NAME + " is blank");
HBASE_OCEANBASE_FULL_USER_NAME + " is blank");
obTableClientKey.setFullUserName(connectionConfig.getFullUsername());

if (connectionConfig.getPassword() == null) {
Expand All @@ -85,8 +89,8 @@ public static ObTableClient getOrCreateObTableClient(OHConnectionConfiguration c
for (Map.Entry<Object, Object> property : connectionConfig.getProperties().entrySet()) {
obTableClientKey.getProperties().put(property.getKey(), property.getValue());
}

return getOrCreateObTableClient(obTableClientKey, connectionConfig.getRpcConnectTimeout());
return obTableClientKey;
}

public static ObTableClient getOrCreateObTableClient(ObTableClientKey obTableClientKey,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.alipay.oceanbase.hbase.OHTableClient;
import com.alipay.oceanbase.hbase.util.ObHTableTestUtil;
import com.alipay.oceanbase.hbase.util.TableTemplateManager;
import com.google.common.base.Strings;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.*;
Expand All @@ -34,6 +35,7 @@

import static com.alipay.oceanbase.hbase.util.ObHTableSecondaryPartUtil.*;
import static com.alipay.oceanbase.hbase.util.ObHTableTestUtil.FOR_EACH;
import static com.alipay.oceanbase.hbase.util.ObHTableTestUtil.getConnection;
import static com.alipay.oceanbase.hbase.util.TableTemplateManager.NORMAL_TABLES;
import static org.junit.Assert.*;

Expand All @@ -53,7 +55,7 @@ public static void before() throws Exception {
@AfterClass
public static void finish() throws Exception {
closeDistributedExecute();
dropTables(tableNames, group2tableNames);
// dropTables(tableNames, group2tableNames);
}

@Before
Expand All @@ -68,133 +70,139 @@ private static void assertNullResult(Result result) throws Exception {
private static void testAppend(String tableName) throws Exception {
OHTableClient hTable = ObHTableTestUtil.newOHTableClient(getTableName(tableName));
hTable.init();
try {
byte[] FAMILY = getColumnFamilyName(tableName).getBytes();
byte[] ROW = "appendKey".getBytes();
byte[] v1 = Bytes.toBytes("42");
byte[] v2 = Bytes.toBytes("23");
byte[][] QUALIFIERS = new byte[][]{Bytes.toBytes("b"), Bytes.toBytes("a"),
Bytes.toBytes("c")};
Append a = new Append(ROW);
a.add(FAMILY, QUALIFIERS[0], v1);
a.add(FAMILY, QUALIFIERS[1], v2);
a.setReturnResults(false);
assertNullResult(hTable.append(a));

byte[] FAMILY = getColumnFamilyName(tableName).getBytes();
byte[] ROW = "appendKey".getBytes();
byte[] v1 = Bytes.toBytes("42");
byte[] v2 = Bytes.toBytes("23");
byte[][] QUALIFIERS = new byte[][] { Bytes.toBytes("b"), Bytes.toBytes("a"),
Bytes.toBytes("c") };
Append a = new Append(ROW);
a.add(FAMILY, QUALIFIERS[0], v1);
a.add(FAMILY, QUALIFIERS[1], v2);
a.setReturnResults(false);
assertNullResult(hTable.append(a));

a = new Append(ROW);
a.add(FAMILY, QUALIFIERS[0], v2);
a.add(FAMILY, QUALIFIERS[1], v1);
a.add(FAMILY, QUALIFIERS[2], v2);
Result r = hTable.append(a);
assertEquals(0, Bytes.compareTo(Bytes.add(v1, v2), r.getValue(FAMILY, QUALIFIERS[0])));
assertEquals(0, Bytes.compareTo(Bytes.add(v2, v1), r.getValue(FAMILY, QUALIFIERS[1])));
// QUALIFIERS[2] previously not exist, verify both value and timestamp are correct
assertEquals(0, Bytes.compareTo(v2, r.getValue(FAMILY, QUALIFIERS[2])));
assertEquals(r.getColumnLatest(FAMILY, QUALIFIERS[0]).getTimestamp(),
r.getColumnLatest(FAMILY, QUALIFIERS[2]).getTimestamp());

Get get = new Get(ROW);
get.setMaxVersions(10);
get.addFamily(FAMILY);
Result result = hTable.get(get);
assertEquals(2, result.getColumnCells(FAMILY, QUALIFIERS[0]).size());
assertEquals(2, result.getColumnCells(FAMILY, QUALIFIERS[1]).size());
assertEquals(1, result.getColumnCells(FAMILY, QUALIFIERS[2]).size());
assertEquals(
0,
Bytes.compareTo(Bytes.add(v1, v2), result.getColumnCells(FAMILY, QUALIFIERS[0]).get(0)
.getValue()));
assertEquals(0,
Bytes.compareTo(v2, result.getColumnCells(FAMILY, QUALIFIERS[2]).get(0).getValue()));
a = new Append(ROW);
a.add(FAMILY, QUALIFIERS[0], v2);
a.add(FAMILY, QUALIFIERS[1], v1);
a.add(FAMILY, QUALIFIERS[2], v2);
Result r = hTable.append(a);
assertEquals(0, Bytes.compareTo(Bytes.add(v1, v2), r.getValue(FAMILY, QUALIFIERS[0])));
assertEquals(0, Bytes.compareTo(Bytes.add(v2, v1), r.getValue(FAMILY, QUALIFIERS[1])));
// QUALIFIERS[2] previously not exist, verify both value and timestamp are correct
assertEquals(0, Bytes.compareTo(v2, r.getValue(FAMILY, QUALIFIERS[2])));
assertEquals(r.getColumnLatest(FAMILY, QUALIFIERS[0]).getTimestamp(),
r.getColumnLatest(FAMILY, QUALIFIERS[2]).getTimestamp());

hTable.close();
Get get = new Get(ROW);
get.setMaxVersions(10);
get.addFamily(FAMILY);
Result result = hTable.get(get);
assertEquals(2, result.getColumnCells(FAMILY, QUALIFIERS[0]).size());
assertEquals(2, result.getColumnCells(FAMILY, QUALIFIERS[1]).size());
assertEquals(1, result.getColumnCells(FAMILY, QUALIFIERS[2]).size());
assertEquals(
0,
Bytes.compareTo(Bytes.add(v1, v2), result.getColumnCells(FAMILY, QUALIFIERS[0]).get(0)
.getValue()));
assertEquals(0,
Bytes.compareTo(v2, result.getColumnCells(FAMILY, QUALIFIERS[2]).get(0).getValue()));
} finally {
hTable.close();
}
}

private static void testAppendBorder(String tableName) throws Exception {
OHTableClient hTable = ObHTableTestUtil.newOHTableClient(getTableName(tableName));
hTable.init();

byte[] FAMILY = getColumnFamilyName(tableName).getBytes();
byte[] ROW = "appendKey".getBytes();
byte[] v1 = Bytes.toBytes("ab");
byte[][] QUALIFIERS = new byte[][] { Bytes.toBytes("b"), Bytes.toBytes("a"),
Bytes.toBytes("c") };
Put put = new Put(ROW);
put.addColumn(FAMILY, QUALIFIERS[1], v1);
hTable.put(put);
Append a = new Append(ROW);
a.add(FAMILY, QUALIFIERS[1], v1);
a.add(FAMILY, QUALIFIERS[2], "".getBytes());
hTable.append(a);
Get get = new Get(ROW);
get.setMaxVersions(10);
get.addFamily(FAMILY);
Result result = hTable.get(get);
assertEquals(3, result.size());

a = new Append(ROW);
a.add(FAMILY, QUALIFIERS[2], v1);
a.add(FAMILY, QUALIFIERS[2], "".getBytes());
hTable.append(a);
get = new Get(ROW);
get.setMaxVersions(10);
get.addFamily(FAMILY);
result = hTable.get(get);
assertEquals(4, result.size());

byte[] randomBytes = new byte[1025];
Random random = new Random();
random.nextBytes(randomBytes);
a = new Append(ROW);
a.add(FAMILY, QUALIFIERS[2], randomBytes);
try {
byte[] FAMILY = getColumnFamilyName(tableName).getBytes();
byte[] ROW = "appendKey".getBytes();
byte[] v1 = Bytes.toBytes("ab");
byte[][] QUALIFIERS = new byte[][]{Bytes.toBytes("b"), Bytes.toBytes("a"),
Bytes.toBytes("c")};
Put put = new Put(ROW);
put.addColumn(FAMILY, QUALIFIERS[1], v1);
hTable.put(put);
Append a = new Append(ROW);
a.add(FAMILY, QUALIFIERS[1], v1);
a.add(FAMILY, QUALIFIERS[2], "".getBytes());
hTable.append(a);
fail("unexpect error, too long data should fail");
} catch (IOException e) {
assertTrue(e.getCause().getMessage().contains("Data too long for column 'V'"));
}
Get get = new Get(ROW);
get.setMaxVersions(10);
get.addFamily(FAMILY);
Result result = hTable.get(get);
assertEquals(3, result.size());

hTable.close();
a = new Append(ROW);
a.add(FAMILY, QUALIFIERS[2], v1);
a.add(FAMILY, QUALIFIERS[2], "".getBytes());
hTable.append(a);
get = new Get(ROW);
get.setMaxVersions(10);
get.addFamily(FAMILY);
result = hTable.get(get);
assertEquals(4, result.size());

byte[] randomBytes = new byte[1025];
Random random = new Random();
random.nextBytes(randomBytes);
a = new Append(ROW);
a.add(FAMILY, QUALIFIERS[2], randomBytes);
try {
hTable.append(a);
fail("unexpect error, too long data should fail");
} catch (IOException e) {
assertTrue(e.getCause().getMessage().contains("Data too long for column 'V'"));
}
} finally {
hTable.close();
}

}

private static void testAppendCon(String tableName) throws Exception {
OHTableClient hTable = ObHTableTestUtil.newOHTableClient(getTableName(tableName));
hTable.init();
byte[] FAMILY = getColumnFamilyName(tableName).getBytes();
String column = "appColumn";
byte[] ROW = "appendKey".getBytes();
byte[] v = "a".getBytes();
byte[] expect = "a".getBytes();
ThreadPoolExecutor threadPoolExecutor = OHTable.createDefaultThreadPoolExecutor(1, 100,100);
AtomicInteger atomicInteger = new AtomicInteger(0);
CountDownLatch countDownLatch = new CountDownLatch(100);
for (int i = 0; i < 100; i++) {
Append append = new Append(ROW);
append.add(FAMILY, column.getBytes(), v);
threadPoolExecutor.submit(() -> {
try {
hTable.append(append);
atomicInteger.incrementAndGet();
} catch (Exception e) {
if (!e.getCause().getMessage().contains("OB_TRY_LOCK_ROW_CONFLICT") && !e.getCause().getMessage().contains("OB_TIMEOUT")) {
throw new RuntimeException(e);
try {
hTable.init();
byte[] FAMILY = getColumnFamilyName(tableName).getBytes();
String column = "appColumn";
byte[] ROW = "appendKey".getBytes();
byte[] v = "a".getBytes();
ThreadPoolExecutor threadPoolExecutor = OHTable.createDefaultThreadPoolExecutor(1, 100, 100);
AtomicInteger atomicInteger = new AtomicInteger(0);
CountDownLatch countDownLatch = new CountDownLatch(100);

for (int i = 0; i < 100; i++) {
Append append = new Append(ROW);
append.add(FAMILY, column.getBytes(), v);
threadPoolExecutor.submit(() -> {
try {
hTable.append(append);
} catch (Exception e) {
if (!e.getCause().getMessage().contains("OB_TRY_LOCK_ROW_CONFLICT")
&& !e.getCause().getMessage().contains("OB_TIMEOUT")) {
throw new RuntimeException(e);
}
} finally {
atomicInteger.incrementAndGet();
countDownLatch.countDown();
}
} finally {
countDownLatch.countDown();
}
});
}
countDownLatch.await(100000, TimeUnit.MILLISECONDS);
for (int i = 0; i < atomicInteger.get() - 1; i++) {
expect = Bytes.add(expect, v);
});
}
threadPoolExecutor.shutdown();
countDownLatch.await(100000, TimeUnit.MILLISECONDS);
final byte[] expect = Strings.repeat("a", atomicInteger.get()).getBytes();
System.out.println("atomicInteger: " + atomicInteger.get());
Get get = new Get(ROW);
get.setMaxVersions(1);
get.addColumn(FAMILY, column.getBytes());
Result result = hTable.get(get);
ObHTableTestUtil.Assert(tableName, ()-> assertTrue(0 <= Bytes.compareTo(expect, result.getColumnCells(FAMILY, column.getBytes()).get(0).getValue())));
} finally {
hTable.close();
}
Get get = new Get(ROW);
get.setMaxVersions(1);
get.addColumn(FAMILY, column.getBytes());
Result result = hTable.get(get);
assertEquals(0, Bytes.compareTo(expect, result.getColumnCells(FAMILY, column.getBytes()).get(0).getValue()));
hTable.close();
}

private static void testAppendMultiCF(Map.Entry<String, List<String>> entry) throws Exception {
Expand Down Expand Up @@ -260,8 +268,8 @@ public void testAppendMultiCF() throws Throwable {

@Test
public void testAppendSeires() throws Throwable {
createTables(TableTemplateManager.TableType.SECONDARY_PARTITIONED_TIME_RANGE_KEY, series_tables, group2tableNames, true);
createTables(TableTemplateManager.TableType.SECONDARY_PARTITIONED_TIME_RANGE_KEY, series_tables, null, true);
FOR_EACH(series_tables, OHTableSecondaryPartAppendTest::testAppendSeires);
dropTables(series_tables, group2tableNames);
dropTables(series_tables, null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public class OHTableSecondaryPartBatchGetTest {
@BeforeClass
public static void before() throws Exception {
openDistributedExecute();
for (TableTemplateManager.TableType type : TableTemplateManager.NORMAL_AND_SERIES_TABLES) {
for (TableTemplateManager.TableType type : TableTemplateManager.NORMAL_TABLES) {
createTables(type, tableNames, group2tableNames, true);
}
}
Expand Down Expand Up @@ -94,8 +94,9 @@ public static void testBatchGetImpl(String tableName) throws Exception {
System.out.println("Column: " + Q + ", Value: " + V);
}
}
hTable.close();
}

@Test
public void testBatchGet() throws Throwable {
FOR_EACH(tableNames, OHTableSecondaryPartBatchGetTest::testBatchGetImpl);
Expand Down
Loading