diff --git a/src/main/java/com/alipay/oceanbase/hbase/OHTable.java b/src/main/java/com/alipay/oceanbase/hbase/OHTable.java index be645afc..11e5728d 100644 --- a/src/main/java/com/alipay/oceanbase/hbase/OHTable.java +++ b/src/main/java/com/alipay/oceanbase/hbase/OHTable.java @@ -2127,11 +2127,13 @@ private ObTableQuery buildObTableQuery(final Get get, Collection columnQ /** * Check if the Get or Scan operation is configured for weak read. + * If the query does not have HBASE_HTABLE_READ_CONSISTENCY attribute set, + * it will fall back to the global configuration. * * @param query the Get or Scan object to check * @return true if weak read is enabled, false otherwise */ - public static boolean isWeakRead(Object query) { + public boolean isWeakRead(Object query) { if (query == null) { return false; } @@ -2143,11 +2145,16 @@ public static boolean isWeakRead(Object query) { } else { return false; } + String consistencyStr; if (consistency == null) { - return false; + // fall back to global configuration + consistencyStr = configuration.get(HBASE_HTABLE_READ_CONSISTENCY); + if (consistencyStr == null) { + return false; + } + } else { + consistencyStr = Bytes.toString(consistency); } - String consistencyStr = Bytes.toString(consistency); - System.out.println("consistencyStr: " + consistencyStr); return "weak".equalsIgnoreCase(consistencyStr); } @@ -2360,10 +2367,14 @@ private BatchOperation buildBatchOperation(String tableName, List BatchOperation batch = obTableClient.batchOperation(tableName); int posInList = -1; int singleOpResultNum; + int getOperationNum = 0; + // allGetIsWeakRead is initialized to true, and set to false if any Get is not weak read + boolean allGetIsWeakRead = true; for (Row row : actions) { singleOpResultNum = 0; posInList++; if (row instanceof Get) { + getOperationNum++; if (!ObGlobal.isHBaseBatchGetSupport()) { throw new FeatureNotSupportedException("server does not support batch get"); } @@ -2398,8 +2409,9 @@ private BatchOperation buildBatchOperation(String tableName, List ObTableClientQueryImpl query = new ObTableClientQueryImpl(tableName, obTableQuery, obTableClient); try { query.setRowKey(row(colVal("K", Bytes.toString(get.getRow())), colVal("Q", null), colVal("T", Integer.MAX_VALUE))); - if (isWeakRead(get)) { - batch.setReadConsistency(ObReadConsistency.WEAK); + // if any Get is not weak read, set allGetIsWeakRead to false + if (!isWeakRead(get)) { + allGetIsWeakRead = false; } } catch (Exception e) { throw new IOException(e); @@ -2495,6 +2507,10 @@ private BatchOperation buildBatchOperation(String tableName, List } resultMapSingleOp.add(singleOpResultNum); } + // only set weak read consistency when all operations are Get and all Get operations are weak read + if (getOperationNum == actions.size() && allGetIsWeakRead) { + batch.setReadConsistency(ObReadConsistency.WEAK); + } batch.setEntityType(ObTableEntityType.HKV); batch.setServerCanRetry(OHBaseFuncUtils.serverCanRetry(obTableClient)); batch.setNeedTabletId(OHBaseFuncUtils.needTabletId(obTableClient)); diff --git a/src/main/java/com/alipay/oceanbase/hbase/util/ObTableClientManager.java b/src/main/java/com/alipay/oceanbase/hbase/util/ObTableClientManager.java index bfed823a..c0e2333b 100644 --- a/src/main/java/com/alipay/oceanbase/hbase/util/ObTableClientManager.java +++ b/src/main/java/com/alipay/oceanbase/hbase/util/ObTableClientManager.java @@ -127,9 +127,6 @@ public static ObTableClient getOrCreateObTableClient(ObTableClientKey obTableCli if (connectionConfig.getRoutePolicy() != null) { obTableClient.setRoutePolicy(ObRoutePolicy.getByName(connectionConfig.getRoutePolicy())); } - if (connectionConfig.getGlobalWeakRead() != null) { - obTableClient.setReadConsistency(ObReadConsistency.getByName(connectionConfig.getGlobalWeakRead())); - } obTableClient.init(); OB_TABLE_CLIENT_INSTANCE.put(obTableClientKey, obTableClient); } diff --git a/src/test/java/com/alipay/oceanbase/hbase/ObTableWeakReadTest.java b/src/test/java/com/alipay/oceanbase/hbase/ObTableWeakReadTest.java index 758bc6e8..ede4d618 100644 --- a/src/test/java/com/alipay/oceanbase/hbase/ObTableWeakReadTest.java +++ b/src/test/java/com/alipay/oceanbase/hbase/ObTableWeakReadTest.java @@ -160,18 +160,18 @@ public String toString() { */ public class ObTableWeakReadTest { // 测试配置常量 - private static String FULL_USER_NAME = ""; - private static String PARAM_URL = ""; - private static String PASSWORD = ""; - private static String PROXY_SYS_USER_NAME = "root"; - private static String PROXY_SYS_USER_PASSWORD = ""; + private static String FULL_USER_NAME = ""; + private static String PARAM_URL = ""; + private static String PASSWORD = ""; + private static String PROXY_SYS_USER_NAME = ""; + private static String PROXY_SYS_USER_PASSWORD = ""; private static boolean USE_ODP = false; private static String ODP_IP = "ip-addr"; private static int ODP_PORT = 0; private static int ODP_SQL_PORT = 0; private static String ODP_DATABASE = "database-name"; - private static String JDBC_IP = ""; - private static String JDBC_PORT = ""; + private static String JDBC_IP = ""; + private static String JDBC_PORT = ""; private static String JDBC_DATABASE = "test"; private static String JDBC_URL = "jdbc:mysql://" + JDBC_IP @@ -2098,16 +2098,18 @@ public void testIdcBatchGet1() throws Exception { setZoneIdc(ZONE3, IDC3); // 3. 获取数据 List gets = new ArrayList<>(); - Get get = new Get(rowkey.getBytes()); - get.setAttribute(HBASE_HTABLE_READ_CONSISTENCY, "weak".getBytes()); - get.addColumn(FAMILY_NAME.getBytes(), "c2".getBytes()); - gets.add(get); - gets.add(get); + Get get1 = new Get(rowkey.getBytes()); + get1.setAttribute(HBASE_HTABLE_READ_CONSISTENCY, "weak".getBytes()); + get1.addColumn(FAMILY_NAME.getBytes(), "c2".getBytes()); + Get get2 = new Get((rowkey + "1").getBytes()); + get2.setAttribute(HBASE_HTABLE_READ_CONSISTENCY, "weak".getBytes()); + get2.addColumn(FAMILY_NAME.getBytes(), "c2".getBytes()); + gets.add(get1); + gets.add(get2); Result[] res = table.get(gets); Assert.assertNotNull(res); Assert.assertEquals(2, res.length); Assert.assertEquals("c2_val", Bytes.toString(res[0].getValue(FAMILY_NAME.getBytes(), "c2".getBytes()))); - Assert.assertEquals("c2_val", Bytes.toString(res[1].getValue(FAMILY_NAME.getBytes(), "c2".getBytes()))); // 4. 查询 sql audit,确定读请求发到哪个节点和分区上 SqlAuditResult sqlAuditResult = getServerBySqlAudit(rowkey, "scan"); @@ -2796,4 +2798,480 @@ public void testRoutePolicyBatchGet5() throws Exception { debugPrint("readReplica: %s", readReplica.toString()); Assert.assertTrue(readReplica.isFollower()); } + + /* + * 测试场景:全局配置为weak,语句级别设置为strong(语句级别优先级大于全局级别) + * 测试预期:语句级别的strong优先生效,读leader + */ + @Test + public void testGlobalConfigOverride1() throws Exception { + Configuration config = HBaseConfiguration.create(); + config.set(HBASE_HTABLE_READ_CONSISTENCY, "weak"); // 设置全局弱一致性读 + initObkvConfig(config, IDC2, FOLLOW_FIRST_ROUTE_POLICY); + org.apache.hadoop.hbase.client.Connection hbaseConnection = ConnectionFactory.createConnection(config); + Table table = hbaseConnection.getTable(TableName.valueOf(TABLE_NAME)); + // 1. 准备数据 + String rowkey = getRandomRowkString(); + insertData(table, rowkey); + Thread.sleep(1000); + // 2. 设置 idc + setZoneIdc(ZONE1, IDC1); + setZoneIdc(ZONE2, IDC2); + setZoneIdc(ZONE3, IDC3); + // 3. 获取数据,语句级别设置为strong,优先级大于全局weak + Get get = new Get(rowkey.getBytes()); + get.setAttribute(HBASE_HTABLE_READ_CONSISTENCY, "strong".getBytes()); + get.addColumn(FAMILY_NAME.getBytes(), "c2".getBytes()); + Result result = table.get(get); + Assert.assertNotNull(result); + Assert.assertNotEquals(true, result.isEmpty()); + byte[] value = result.getValue(FAMILY_NAME.getBytes(), "c2".getBytes()); + Assert.assertEquals("c2_val", Bytes.toString(value)); + // 4. 查询 sql audit + SqlAuditResult sqlAuditResult = getServerBySqlAudit(rowkey, "scan"); + debugPrint("sqlAuditResult: %s", sqlAuditResult.toString()); + // 5. 查询分区的位置信息 + PartitionLocation partitionLocation = getPartitionLocation(sqlAuditResult.tabletId); + debugPrint("partitionLocation: %s", partitionLocation.toString()); + // 6. 校验:语句级别strong优先生效,读leader + ReplicaLocation readReplica = partitionLocation.getReplicaBySvrAddr(sqlAuditResult.svrIp, + sqlAuditResult.svrPort); + debugPrint("readReplica: %s", readReplica.toString()); + Assert.assertTrue(readReplica.isLeader()); + } + + /* + * 测试场景:全局配置为strong,语句级别设置为weak(语句级别优先级大于全局级别) + * 测试预期:语句级别的weak优先生效,读follower + */ + @Test + public void testGlobalConfigOverride2() throws Exception { + Configuration config = HBaseConfiguration.create(); + config.set(HBASE_HTABLE_READ_CONSISTENCY, "strong"); // 设置全局强一致性读 + initObkvConfig(config, IDC2, FOLLOW_FIRST_ROUTE_POLICY); + org.apache.hadoop.hbase.client.Connection hbaseConnection = ConnectionFactory.createConnection(config); + Table table = hbaseConnection.getTable(TableName.valueOf(TABLE_NAME)); + // 1. 准备数据 + String rowkey = getRandomRowkString(); + insertData(table, rowkey); + Thread.sleep(1000); + // 2. 设置 idc + setZoneIdc(ZONE1, IDC1); + setZoneIdc(ZONE2, IDC2); + setZoneIdc(ZONE3, IDC3); + // 3. 获取数据,语句级别设置为weak,优先级大于全局strong + Get get = new Get(rowkey.getBytes()); + get.setAttribute(HBASE_HTABLE_READ_CONSISTENCY, "weak".getBytes()); + get.addColumn(FAMILY_NAME.getBytes(), "c2".getBytes()); + Result result = table.get(get); + Assert.assertNotNull(result); + Assert.assertNotEquals(true, result.isEmpty()); + byte[] value = result.getValue(FAMILY_NAME.getBytes(), "c2".getBytes()); + Assert.assertEquals("c2_val", Bytes.toString(value)); + // 4. 查询 sql audit + SqlAuditResult sqlAuditResult = getServerBySqlAudit(rowkey, "scan"); + debugPrint("sqlAuditResult: %s", sqlAuditResult.toString()); + // 5. 查询分区的位置信息 + PartitionLocation partitionLocation = getPartitionLocation(sqlAuditResult.tabletId); + debugPrint("partitionLocation: %s", partitionLocation.toString()); + // 6. 校验:语句级别weak优先生效,读follower + ReplicaLocation readReplica = partitionLocation.getReplicaBySvrAddr(sqlAuditResult.svrIp, + sqlAuditResult.svrPort); + debugPrint("readReplica: %s", readReplica.toString()); + Assert.assertTrue(readReplica.isFollower()); + Assert.assertEquals(IDC2, readReplica.getIdc()); + } + + /* + * 测试场景:全局配置为weak,Scan操作不设置属性 + * 测试预期:使用全局配置weak,读follower + */ + @Test + public void testGlobalConfigScan() throws Exception { + Configuration config = HBaseConfiguration.create(); + config.set(HBASE_HTABLE_READ_CONSISTENCY, "weak"); // 设置全局弱一致性读 + initObkvConfig(config, IDC2, FOLLOW_FIRST_ROUTE_POLICY); + org.apache.hadoop.hbase.client.Connection hbaseConnection = ConnectionFactory.createConnection(config); + Table table = hbaseConnection.getTable(TableName.valueOf(TABLE_NAME)); + // 1. 准备数据 + String rowkey = getRandomRowkString(); + insertData(table, rowkey); + Thread.sleep(1000); + // 2. 设置 idc + setZoneIdc(ZONE1, IDC1); + setZoneIdc(ZONE2, IDC2); + setZoneIdc(ZONE3, IDC3); + // 3. Scan不设置READ_CONSISTENCY属性,使用全局配置 + Scan scan = new Scan(); + scan.withStartRow(rowkey.getBytes()); + // 不设置 READ_CONSISTENCY 属性 + scan.addColumn(FAMILY_NAME.getBytes(), "c2".getBytes()); + ResultScanner resultScanner = table.getScanner(scan); + Result result; + while ((result = resultScanner.next()) != null) { + Assert.assertNotNull(result); + Assert.assertNotEquals(true, result.isEmpty()); + byte[] value = result.getValue(FAMILY_NAME.getBytes(), "c2".getBytes()); + Assert.assertEquals("c2_val", Bytes.toString(value)); + } + // 4. 查询 sql audit + SqlAuditResult sqlAuditResult = getServerBySqlAudit(rowkey, "scan"); + debugPrint("sqlAuditResult: %s", sqlAuditResult.toString()); + // 5. 查询分区的位置信息 + PartitionLocation partitionLocation = getPartitionLocation(sqlAuditResult.tabletId); + debugPrint("partitionLocation: %s", partitionLocation.toString()); + // 6. 校验:应该使用全局weak配置,读follower + ReplicaLocation readReplica = partitionLocation.getReplicaBySvrAddr(sqlAuditResult.svrIp, + sqlAuditResult.svrPort); + debugPrint("readReplica: %s", readReplica.toString()); + Assert.assertTrue(readReplica.isFollower()); + Assert.assertEquals(IDC2, readReplica.getIdc()); + } + + /* + * 测试场景:batch get中部分语句设置weak,部分语句设置strong + * 测试预期:只要有一个语句不是weak,整个batch就不设置弱读一致性,读leader + * 注意:语句级别的配置项优先级大于全局级别 + */ + @Test + public void testBatchGetMixedConsistency1() throws Exception { + Configuration config = HBaseConfiguration.create(); + initObkvConfig(config, IDC2, FOLLOW_FIRST_ROUTE_POLICY); + org.apache.hadoop.hbase.client.Connection hbaseConnection = ConnectionFactory.createConnection(config); + Table table = hbaseConnection.getTable(TableName.valueOf(TABLE_NAME)); + // 1. 准备数据 + String rowkey1 = getRandomRowkString(); + String rowkey2 = rowkey1 + "_2"; + insertData(table, rowkey1); + insertData(table, rowkey2); + Thread.sleep(1000); + // 2. 设置 idc + setZoneIdc(ZONE1, IDC1); + setZoneIdc(ZONE2, IDC2); + setZoneIdc(ZONE3, IDC3); + // 3. batch get:get1设置weak,get2设置strong + List gets = new ArrayList<>(); + Get get1 = new Get(rowkey1.getBytes()); + get1.setAttribute(HBASE_HTABLE_READ_CONSISTENCY, "weak".getBytes()); + get1.addColumn(FAMILY_NAME.getBytes(), "c2".getBytes()); + Get get2 = new Get(rowkey2.getBytes()); + get2.setAttribute(HBASE_HTABLE_READ_CONSISTENCY, "strong".getBytes()); + get2.addColumn(FAMILY_NAME.getBytes(), "c2".getBytes()); + gets.add(get1); + gets.add(get2); + Result[] res = table.get(gets); + Assert.assertNotNull(res); + Assert.assertEquals(2, res.length); + Assert.assertEquals("c2_val", Bytes.toString(res[0].getValue(FAMILY_NAME.getBytes(), "c2".getBytes()))); + Assert.assertEquals("c2_val", Bytes.toString(res[1].getValue(FAMILY_NAME.getBytes(), "c2".getBytes()))); + // 4. 查询 sql audit + SqlAuditResult sqlAuditResult = getServerBySqlAudit(rowkey1, "scan"); + debugPrint("sqlAuditResult: %s", sqlAuditResult.toString()); + // 5. 查询分区的位置信息 + PartitionLocation partitionLocation = getPartitionLocation(sqlAuditResult.tabletId); + debugPrint("partitionLocation: %s", partitionLocation.toString()); + // 6. 校验:由于存在非weak读的语句,整个batch不设置弱读一致性,读leader + ReplicaLocation readReplica = partitionLocation.getReplicaBySvrAddr(sqlAuditResult.svrIp, + sqlAuditResult.svrPort); + debugPrint("readReplica: %s", readReplica.toString()); + Assert.assertTrue(readReplica.isLeader()); + } + + /* + * 测试场景:batch get中所有语句都设置weak + * 测试预期:所有语句都是weak,整个batch使用weak一致性,读follower + */ + @Test + public void testBatchGetMixedConsistency2() throws Exception { + Configuration config = HBaseConfiguration.create(); + initObkvConfig(config, IDC2, FOLLOW_FIRST_ROUTE_POLICY); + org.apache.hadoop.hbase.client.Connection hbaseConnection = ConnectionFactory.createConnection(config); + Table table = hbaseConnection.getTable(TableName.valueOf(TABLE_NAME)); + // 1. 准备数据 + String rowkey1 = getRandomRowkString(); + String rowkey2 = rowkey1 + "_2"; + insertData(table, rowkey1); + insertData(table, rowkey2); + Thread.sleep(1000); + // 2. 设置 idc + setZoneIdc(ZONE1, IDC1); + setZoneIdc(ZONE2, IDC2); + setZoneIdc(ZONE3, IDC3); + // 3. batch get:所有get都设置weak + List gets = new ArrayList<>(); + Get get1 = new Get(rowkey1.getBytes()); + get1.setAttribute(HBASE_HTABLE_READ_CONSISTENCY, "weak".getBytes()); + get1.addColumn(FAMILY_NAME.getBytes(), "c2".getBytes()); + Get get2 = new Get(rowkey2.getBytes()); + get2.setAttribute(HBASE_HTABLE_READ_CONSISTENCY, "weak".getBytes()); + get2.addColumn(FAMILY_NAME.getBytes(), "c2".getBytes()); + gets.add(get1); + gets.add(get2); + Result[] res = table.get(gets); + Assert.assertNotNull(res); + Assert.assertEquals(2, res.length); + Assert.assertEquals("c2_val", Bytes.toString(res[0].getValue(FAMILY_NAME.getBytes(), "c2".getBytes()))); + Assert.assertEquals("c2_val", Bytes.toString(res[1].getValue(FAMILY_NAME.getBytes(), "c2".getBytes()))); + // 4. 查询 sql audit + SqlAuditResult sqlAuditResult = getServerBySqlAudit(rowkey1, "scan"); + debugPrint("sqlAuditResult: %s", sqlAuditResult.toString()); + // 5. 查询分区的位置信息 + PartitionLocation partitionLocation = getPartitionLocation(sqlAuditResult.tabletId); + debugPrint("partitionLocation: %s", partitionLocation.toString()); + // 6. 校验:所有都是weak,应该读follower + ReplicaLocation readReplica = partitionLocation.getReplicaBySvrAddr(sqlAuditResult.svrIp, + sqlAuditResult.svrPort); + debugPrint("readReplica: %s", readReplica.toString()); + Assert.assertTrue(readReplica.isFollower()); + Assert.assertEquals(IDC2, readReplica.getIdc()); + } + + /* + * 测试场景:batch get中部分语句设置weak,部分语句不设置属性(fallback到全局weak配置) + * 测试预期:语句级别优先,不设置属性的语句使用全局weak配置,所有语句都是weak,整个batch使用weak一致性 + */ + @Test + public void testBatchGetMixedConsistency3() throws Exception { + Configuration config = HBaseConfiguration.create(); + config.set(HBASE_HTABLE_READ_CONSISTENCY, "weak"); // 设置全局弱一致性读 + initObkvConfig(config, IDC2, FOLLOW_FIRST_ROUTE_POLICY); + org.apache.hadoop.hbase.client.Connection hbaseConnection = ConnectionFactory.createConnection(config); + Table table = hbaseConnection.getTable(TableName.valueOf(TABLE_NAME)); + // 1. 准备数据 + String rowkey1 = getRandomRowkString(); + String rowkey2 = rowkey1 + "_2"; + insertData(table, rowkey1); + insertData(table, rowkey2); + Thread.sleep(1000); + // 2. 设置 idc + setZoneIdc(ZONE1, IDC1); + setZoneIdc(ZONE2, IDC2); + setZoneIdc(ZONE3, IDC3); + // 3. batch get:get1设置weak,get2不设置属性(使用全局配置) + List gets = new ArrayList<>(); + Get get1 = new Get(rowkey1.getBytes()); + get1.setAttribute(HBASE_HTABLE_READ_CONSISTENCY, "weak".getBytes()); + get1.addColumn(FAMILY_NAME.getBytes(), "c2".getBytes()); + Get get2 = new Get(rowkey2.getBytes()); + // get2不设置READ_CONSISTENCY属性,使用全局weak配置 + get2.addColumn(FAMILY_NAME.getBytes(), "c2".getBytes()); + gets.add(get1); + gets.add(get2); + Result[] res = table.get(gets); + Assert.assertNotNull(res); + Assert.assertEquals(2, res.length); + Assert.assertEquals("c2_val", Bytes.toString(res[0].getValue(FAMILY_NAME.getBytes(), "c2".getBytes()))); + Assert.assertEquals("c2_val", Bytes.toString(res[1].getValue(FAMILY_NAME.getBytes(), "c2".getBytes()))); + // 4. 查询 sql audit + SqlAuditResult sqlAuditResult = getServerBySqlAudit(rowkey1, "scan"); + debugPrint("sqlAuditResult: %s", sqlAuditResult.toString()); + // 5. 查询分区的位置信息 + PartitionLocation partitionLocation = getPartitionLocation(sqlAuditResult.tabletId); + debugPrint("partitionLocation: %s", partitionLocation.toString()); + // 6. 校验:都是weak,应该读follower + ReplicaLocation readReplica = partitionLocation.getReplicaBySvrAddr(sqlAuditResult.svrIp, + sqlAuditResult.svrPort); + debugPrint("readReplica: %s", readReplica.toString()); + Assert.assertTrue(readReplica.isFollower()); + Assert.assertEquals(IDC2, readReplica.getIdc()); + } + + /* + * 测试场景:batch get中部分语句不设置属性(fallback到全局weak配置),部分语句设置strong + * 测试预期:语句级别优先,存在非weak的语句,整个batch不设置弱读一致性,读leader + */ + @Test + public void testBatchGetMixedConsistency4() throws Exception { + Configuration config = HBaseConfiguration.create(); + config.set(HBASE_HTABLE_READ_CONSISTENCY, "weak"); // 设置全局弱一致性读 + initObkvConfig(config, IDC2, FOLLOW_FIRST_ROUTE_POLICY); + org.apache.hadoop.hbase.client.Connection hbaseConnection = ConnectionFactory.createConnection(config); + Table table = hbaseConnection.getTable(TableName.valueOf(TABLE_NAME)); + // 1. 准备数据 + String rowkey1 = getRandomRowkString(); + String rowkey2 = rowkey1 + "_2"; + insertData(table, rowkey1); + insertData(table, rowkey2); + Thread.sleep(1000); + // 2. 设置 idc + setZoneIdc(ZONE1, IDC1); + setZoneIdc(ZONE2, IDC2); + setZoneIdc(ZONE3, IDC3); + // 3. batch get:get1不设置属性(使用全局weak配置),get2设置strong + List gets = new ArrayList<>(); + Get get1 = new Get(rowkey1.getBytes()); + // get1不设置READ_CONSISTENCY属性,使用全局weak配置 + get1.addColumn(FAMILY_NAME.getBytes(), "c2".getBytes()); + Get get2 = new Get(rowkey2.getBytes()); + get2.setAttribute(HBASE_HTABLE_READ_CONSISTENCY, "strong".getBytes()); + get2.addColumn(FAMILY_NAME.getBytes(), "c2".getBytes()); + gets.add(get1); + gets.add(get2); + Result[] res = table.get(gets); + Assert.assertNotNull(res); + Assert.assertEquals(2, res.length); + Assert.assertEquals("c2_val", Bytes.toString(res[0].getValue(FAMILY_NAME.getBytes(), "c2".getBytes()))); + Assert.assertEquals("c2_val", Bytes.toString(res[1].getValue(FAMILY_NAME.getBytes(), "c2".getBytes()))); + // 4. 查询 sql audit + SqlAuditResult sqlAuditResult = getServerBySqlAudit(rowkey1, "scan"); + debugPrint("sqlAuditResult: %s", sqlAuditResult.toString()); + // 5. 查询分区的位置信息 + PartitionLocation partitionLocation = getPartitionLocation(sqlAuditResult.tabletId); + debugPrint("partitionLocation: %s", partitionLocation.toString()); + // 6. 校验:由于存在非weak的语句,整个batch不设置弱读一致性,读leader + ReplicaLocation readReplica = partitionLocation.getReplicaBySvrAddr(sqlAuditResult.svrIp, + sqlAuditResult.svrPort); + debugPrint("readReplica: %s", readReplica.toString()); + Assert.assertTrue(readReplica.isLeader()); + } + + /* + * 测试场景:全局配置为strong,batch get中所有语句都不设置属性 + * 测试预期:所有语句都fallback到全局strong配置,整个batch不设置弱读一致性,读leader + */ + @Test + public void testBatchGetGlobalStrong() throws Exception { + Configuration config = HBaseConfiguration.create(); + config.set(HBASE_HTABLE_READ_CONSISTENCY, "strong"); // 设置全局强一致性读 + initObkvConfig(config, IDC2, FOLLOW_FIRST_ROUTE_POLICY); + org.apache.hadoop.hbase.client.Connection hbaseConnection = ConnectionFactory.createConnection(config); + Table table = hbaseConnection.getTable(TableName.valueOf(TABLE_NAME)); + // 1. 准备数据 + String rowkey1 = getRandomRowkString(); + String rowkey2 = rowkey1 + "_2"; + insertData(table, rowkey1); + insertData(table, rowkey2); + Thread.sleep(1000); + // 2. 设置 idc + setZoneIdc(ZONE1, IDC1); + setZoneIdc(ZONE2, IDC2); + setZoneIdc(ZONE3, IDC3); + // 3. batch get:所有get都不设置属性,使用全局strong配置 + List gets = new ArrayList<>(); + Get get1 = new Get(rowkey1.getBytes()); + get1.addColumn(FAMILY_NAME.getBytes(), "c2".getBytes()); + Get get2 = new Get(rowkey2.getBytes()); + get2.addColumn(FAMILY_NAME.getBytes(), "c2".getBytes()); + gets.add(get1); + gets.add(get2); + Result[] res = table.get(gets); + Assert.assertNotNull(res); + Assert.assertEquals(2, res.length); + Assert.assertEquals("c2_val", Bytes.toString(res[0].getValue(FAMILY_NAME.getBytes(), "c2".getBytes()))); + Assert.assertEquals("c2_val", Bytes.toString(res[1].getValue(FAMILY_NAME.getBytes(), "c2".getBytes()))); + // 4. 查询 sql audit + SqlAuditResult sqlAuditResult = getServerBySqlAudit(rowkey1, "scan"); + debugPrint("sqlAuditResult: %s", sqlAuditResult.toString()); + // 5. 查询分区的位置信息 + PartitionLocation partitionLocation = getPartitionLocation(sqlAuditResult.tabletId); + debugPrint("partitionLocation: %s", partitionLocation.toString()); + // 6. 校验:使用全局strong配置,应该读leader + ReplicaLocation readReplica = partitionLocation.getReplicaBySvrAddr(sqlAuditResult.svrIp, + sqlAuditResult.svrPort); + debugPrint("readReplica: %s", readReplica.toString()); + Assert.assertTrue(readReplica.isLeader()); + } + + /* + * 测试场景:全局配置为weak,batch get中3个语句:1个设置weak,1个设置strong,1个不设置(fallback到全局weak) + * 测试预期:语句级别优先,存在非weak的语句(strong),整个batch不设置弱读一致性,读leader + */ + @Test + public void testBatchGetMixedConsistency5() throws Exception { + Configuration config = HBaseConfiguration.create(); + config.set(HBASE_HTABLE_READ_CONSISTENCY, "weak"); // 设置全局弱一致性读 + initObkvConfig(config, IDC2, FOLLOW_FIRST_ROUTE_POLICY); + org.apache.hadoop.hbase.client.Connection hbaseConnection = ConnectionFactory.createConnection(config); + Table table = hbaseConnection.getTable(TableName.valueOf(TABLE_NAME)); + // 1. 准备数据 + String rowkey1 = getRandomRowkString(); + String rowkey2 = rowkey1 + "_2"; + String rowkey3 = rowkey1 + "_3"; + insertData(table, rowkey1); + insertData(table, rowkey2); + insertData(table, rowkey3); + Thread.sleep(1000); + // 2. 设置 idc + setZoneIdc(ZONE1, IDC1); + setZoneIdc(ZONE2, IDC2); + setZoneIdc(ZONE3, IDC3); + // 3. batch get:get1设置weak,get2设置strong,get3不设置(使用全局weak) + List gets = new ArrayList<>(); + Get get1 = new Get(rowkey1.getBytes()); + get1.setAttribute(HBASE_HTABLE_READ_CONSISTENCY, "weak".getBytes()); + get1.addColumn(FAMILY_NAME.getBytes(), "c2".getBytes()); + Get get2 = new Get(rowkey2.getBytes()); + get2.setAttribute(HBASE_HTABLE_READ_CONSISTENCY, "strong".getBytes()); + get2.addColumn(FAMILY_NAME.getBytes(), "c2".getBytes()); + Get get3 = new Get(rowkey3.getBytes()); + // get3不设置READ_CONSISTENCY属性,使用全局weak配置 + get3.addColumn(FAMILY_NAME.getBytes(), "c2".getBytes()); + gets.add(get1); + gets.add(get2); + gets.add(get3); + Result[] res = table.get(gets); + Assert.assertNotNull(res); + Assert.assertEquals(3, res.length); + Assert.assertEquals("c2_val", Bytes.toString(res[0].getValue(FAMILY_NAME.getBytes(), "c2".getBytes()))); + Assert.assertEquals("c2_val", Bytes.toString(res[1].getValue(FAMILY_NAME.getBytes(), "c2".getBytes()))); + Assert.assertEquals("c2_val", Bytes.toString(res[2].getValue(FAMILY_NAME.getBytes(), "c2".getBytes()))); + // 4. 查询 sql audit + SqlAuditResult sqlAuditResult = getServerBySqlAudit(rowkey1, "scan"); + debugPrint("sqlAuditResult: %s", sqlAuditResult.toString()); + // 5. 查询分区的位置信息 + PartitionLocation partitionLocation = getPartitionLocation(sqlAuditResult.tabletId); + debugPrint("partitionLocation: %s", partitionLocation.toString()); + // 6. 校验:由于存在非weak的语句,整个batch不设置弱读一致性,读leader + ReplicaLocation readReplica = partitionLocation.getReplicaBySvrAddr(sqlAuditResult.svrIp, + sqlAuditResult.svrPort); + debugPrint("readReplica: %s", readReplica.toString()); + Assert.assertTrue(readReplica.isLeader()); + } + + /* + * 测试场景:不设置全局配置,batch get中所有语句都不设置属性 + * 测试预期:语句级别和全局配置都未设置,使用默认strong一致性,整个batch不设置弱读一致性,读leader + */ + @Test + public void testBatchGetNoConfig() throws Exception { + Configuration config = HBaseConfiguration.create(); + // 不设置全局READ_CONSISTENCY配置 + initObkvConfig(config, IDC2, FOLLOW_FIRST_ROUTE_POLICY); + org.apache.hadoop.hbase.client.Connection hbaseConnection = ConnectionFactory.createConnection(config); + Table table = hbaseConnection.getTable(TableName.valueOf(TABLE_NAME)); + // 1. 准备数据 + String rowkey1 = getRandomRowkString(); + String rowkey2 = rowkey1 + "_2"; + insertData(table, rowkey1); + insertData(table, rowkey2); + Thread.sleep(1000); + // 2. 设置 idc + setZoneIdc(ZONE1, IDC1); + setZoneIdc(ZONE2, IDC2); + setZoneIdc(ZONE3, IDC3); + // 3. batch get:所有get都不设置属性 + List gets = new ArrayList<>(); + Get get1 = new Get(rowkey1.getBytes()); + get1.addColumn(FAMILY_NAME.getBytes(), "c2".getBytes()); + Get get2 = new Get(rowkey2.getBytes()); + get2.addColumn(FAMILY_NAME.getBytes(), "c2".getBytes()); + gets.add(get1); + gets.add(get2); + Result[] res = table.get(gets); + Assert.assertNotNull(res); + Assert.assertEquals(2, res.length); + Assert.assertEquals("c2_val", Bytes.toString(res[0].getValue(FAMILY_NAME.getBytes(), "c2".getBytes()))); + Assert.assertEquals("c2_val", Bytes.toString(res[1].getValue(FAMILY_NAME.getBytes(), "c2".getBytes()))); + // 4. 查询 sql audit + SqlAuditResult sqlAuditResult = getServerBySqlAudit(rowkey1, "scan"); + debugPrint("sqlAuditResult: %s", sqlAuditResult.toString()); + // 5. 查询分区的位置信息 + PartitionLocation partitionLocation = getPartitionLocation(sqlAuditResult.tabletId); + debugPrint("partitionLocation: %s", partitionLocation.toString()); + // 6. 校验:默认应该是strong,读leader + ReplicaLocation readReplica = partitionLocation.getReplicaBySvrAddr(sqlAuditResult.svrIp, + sqlAuditResult.svrPort); + debugPrint("readReplica: %s", readReplica.toString()); + Assert.assertTrue(readReplica.isLeader()); + } }