Skip to content

Commit be0a68a

Browse files
committed
use DbType in DruidDataSource
1 parent 4612d1d commit be0a68a

File tree

7 files changed

+34
-47
lines changed

7 files changed

+34
-47
lines changed

client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/RdbAdapter.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -85,8 +85,6 @@ public void init(OuterAdapterConfig configuration, Properties envProperties) {
8585
if ((mappingConfig.getOuterAdapterKey() == null && configuration.getKey() == null)
8686
|| (mappingConfig.getOuterAdapterKey() != null && mappingConfig.getOuterAdapterKey()
8787
.equalsIgnoreCase(configuration.getKey()))) {
88-
// 在dbMapping中保存db类型
89-
mappingConfig.getDbMapping().setDbType(dbType);
9088
rdbMapping.put(key, mappingConfig);
9189
}
9290
});
@@ -235,7 +233,7 @@ public EtlResult etl(String task, List<String> params) {
235233
public Map<String, Object> count(String task) {
236234
MappingConfig config = rdbMapping.get(task);
237235
MappingConfig.DbMapping dbMapping = config.getDbMapping();
238-
String sql = "SELECT COUNT(1) AS cnt FROM " + SyncUtil.getDbTableName(dbMapping);
236+
String sql = "SELECT COUNT(1) AS cnt FROM " + SyncUtil.getDbTableName(dbMapping, dataSource.getDbType());
239237
Connection conn = null;
240238
Map<String, Object> res = new LinkedHashMap<>();
241239
try {
@@ -261,7 +259,7 @@ public Map<String, Object> count(String task) {
261259
}
262260
}
263261
}
264-
res.put("targetTable", SyncUtil.getDbTableName(dbMapping));
262+
res.put("targetTable", SyncUtil.getDbTableName(dbMapping, dataSource.getDbType()));
265263

266264
return res;
267265
}

client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/config/MappingConfig.java

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55

66
import org.apache.commons.lang.StringUtils;
77

8-
import com.alibaba.druid.DbType;
98
import com.alibaba.otter.canal.client.adapter.support.AdapterConfig;
109

1110
/**
@@ -110,8 +109,6 @@ public static class DbMapping implements AdapterMapping {
110109
private int readBatch = 5000;
111110
private int commitBatch = 5000; // etl等批量提交大小
112111

113-
private String dbType = DbType.mysql.name();
114-
115112
private Map<String, String> allMapColumns;
116113

117114
public boolean getMirrorDb() {
@@ -224,13 +221,5 @@ public Map<String, String> getAllMapColumns() {
224221
public void setAllMapColumns(Map<String, String> allMapColumns) {
225222
this.allMapColumns = allMapColumns;
226223
}
227-
228-
public String getDbType() {
229-
return dbType;
230-
}
231-
232-
public void setDbType(String dbType) {
233-
this.dbType = dbType;
234-
}
235224
}
236225
}

client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/service/RdbEtlService.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313

1414
import javax.sql.DataSource;
1515

16+
import com.alibaba.druid.pool.DruidDataSource;
1617
import com.alibaba.otter.canal.client.adapter.rdb.config.MappingConfig;
1718
import com.alibaba.otter.canal.client.adapter.rdb.config.MappingConfig.DbMapping;
1819
import com.alibaba.otter.canal.client.adapter.rdb.support.SyncUtil;
@@ -56,8 +57,11 @@ protected boolean executeSqlImport(DataSource srcDS, String sql, List<Object> va
5657
DbMapping dbMapping = (DbMapping) mapping;
5758
Map<String, String> columnsMap = new LinkedHashMap<>();
5859
Map<String, Integer> columnType = new LinkedHashMap<>();
60+
DruidDataSource dataSource = (DruidDataSource) srcDS;
5961

60-
Util.sqlRS(targetDS, "SELECT * FROM " + SyncUtil.getDbTableName(dbMapping) + " LIMIT 1 ", rs -> {
62+
Util.sqlRS(targetDS,
63+
"SELECT * FROM " + SyncUtil.getDbTableName(dbMapping, dataSource.getDbType()) + " LIMIT 1 ",
64+
rs -> {
6165
try {
6266

6367
ResultSetMetaData rsd = rs.getMetaData();
@@ -83,7 +87,9 @@ protected boolean executeSqlImport(DataSource srcDS, String sql, List<Object> va
8387
boolean completed = false;
8488

8589
StringBuilder insertSql = new StringBuilder();
86-
insertSql.append("INSERT INTO ").append(SyncUtil.getDbTableName(dbMapping)).append(" (");
90+
insertSql.append("INSERT INTO ")
91+
.append(SyncUtil.getDbTableName(dbMapping, dataSource.getDbType()))
92+
.append(" (");
8793
columnsMap
8894
.forEach((targetColumnName, srcColumnName) -> insertSql.append(targetColumnName).append(","));
8995

@@ -107,7 +113,7 @@ protected boolean executeSqlImport(DataSource srcDS, String sql, List<Object> va
107113
// 删除数据
108114
Map<String, Object> pkVal = new LinkedHashMap<>();
109115
StringBuilder deleteSql = new StringBuilder(
110-
"DELETE FROM " + SyncUtil.getDbTableName(dbMapping) + " WHERE ");
116+
"DELETE FROM " + SyncUtil.getDbTableName(dbMapping, dataSource.getDbType()) + " WHERE ");
111117
appendCondition(dbMapping, deleteSql, pkVal, rs);
112118
try (PreparedStatement pstmt2 = connTarget.prepareStatement(deleteSql.toString())) {
113119
int k = 1;

client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/service/RdbMirrorDbSyncService.java

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,11 @@
77
import java.util.List;
88
import java.util.Map;
99

10-
import javax.sql.DataSource;
11-
1210
import org.apache.commons.lang.StringUtils;
1311
import org.slf4j.Logger;
1412
import org.slf4j.LoggerFactory;
1513

14+
import com.alibaba.druid.pool.DruidDataSource;
1615
import com.alibaba.fastjson.JSON;
1716
import com.alibaba.fastjson.serializer.SerializerFeature;
1817
import com.alibaba.otter.canal.client.adapter.rdb.config.MappingConfig;
@@ -32,10 +31,10 @@ public class RdbMirrorDbSyncService {
3231
private static final Logger logger = LoggerFactory.getLogger(RdbMirrorDbSyncService.class);
3332

3433
private Map<String, MirrorDbConfig> mirrorDbConfigCache; // 镜像库配置
35-
private DataSource dataSource;
34+
private DruidDataSource dataSource;
3635
private RdbSyncService rdbSyncService; // rdbSyncService代理
3736

38-
public RdbMirrorDbSyncService(Map<String, MirrorDbConfig> mirrorDbConfigCache, DataSource dataSource,
37+
public RdbMirrorDbSyncService(Map<String, MirrorDbConfig> mirrorDbConfigCache, DruidDataSource dataSource,
3938
Integer threads, Map<String, Map<String, Integer>> columnsTypeCache,
4039
boolean skipDupException){
4140
this.mirrorDbConfigCache = mirrorDbConfigCache;
@@ -156,9 +155,7 @@ private void executeDdl(MirrorDbConfig mirrorDbConfig, Dml ddl) {
156155
try (Connection conn = dataSource.getConnection(); Statement statement = conn.createStatement()) {
157156
// 替换反引号
158157
String sql = ddl.getSql();
159-
String backtick = SyncUtil.getBacktickByDbType(mirrorDbConfig.getMappingConfig()
160-
.getDbMapping()
161-
.getDbType());
158+
String backtick = SyncUtil.getBacktickByDbType(dataSource.getDbType());
162159
if (!"`".equals(backtick)) {
163160
sql = sql.replaceAll("`", backtick);
164161
}

client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/service/RdbSyncService.java

Lines changed: 13 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,11 @@
1515
import java.util.concurrent.Future;
1616
import java.util.function.Function;
1717

18-
import javax.sql.DataSource;
19-
2018
import org.apache.commons.lang.StringUtils;
2119
import org.slf4j.Logger;
2220
import org.slf4j.LoggerFactory;
2321

22+
import com.alibaba.druid.pool.DruidDataSource;
2423
import com.alibaba.fastjson.JSON;
2524
import com.alibaba.fastjson.serializer.SerializerFeature;
2625
import com.alibaba.otter.canal.client.adapter.rdb.config.MappingConfig;
@@ -41,6 +40,7 @@ public class RdbSyncService {
4140

4241
private static final Logger logger = LoggerFactory.getLogger(RdbSyncService.class);
4342

43+
private DruidDataSource dataSource;
4444
// 源库表字段类型缓存: instance.schema.table -> <columnName, jdbcType>
4545
private Map<String, Map<String, Integer>> columnsTypeCache;
4646

@@ -59,13 +59,14 @@ public Map<String, Map<String, Integer>> getColumnsTypeCache() {
5959
return columnsTypeCache;
6060
}
6161

62-
public RdbSyncService(DataSource dataSource, Integer threads, boolean skipDupException){
62+
public RdbSyncService(DruidDataSource dataSource, Integer threads, boolean skipDupException){
6363
this(dataSource, threads, new ConcurrentHashMap<>(), skipDupException);
6464
}
6565

6666
@SuppressWarnings("unchecked")
67-
public RdbSyncService(DataSource dataSource, Integer threads, Map<String, Map<String, Integer>> columnsTypeCache,
67+
public RdbSyncService(DruidDataSource dataSource, Integer threads, Map<String, Map<String, Integer>> columnsTypeCache,
6868
boolean skipDupException){
69+
this.dataSource = dataSource;
6970
this.columnsTypeCache = columnsTypeCache;
7071
this.skipDupException = skipDupException;
7172
try {
@@ -241,13 +242,11 @@ private void insert(BatchExecutor batchExecutor, MappingConfig config, SingleDml
241242
}
242243

243244
DbMapping dbMapping = config.getDbMapping();
244-
String dbTypeName = dbMapping.getDbType();
245-
String backtick = SyncUtil.getBacktickByDbType(dbTypeName);
246-
245+
String backtick = SyncUtil.getBacktickByDbType(dataSource.getDbType());
247246
Map<String, String> columnsMap = SyncUtil.getColumnsMap(dbMapping, data);
248247

249248
StringBuilder insertSql = new StringBuilder();
250-
insertSql.append("INSERT INTO ").append(SyncUtil.getDbTableName(dbMapping)).append(" (");
249+
insertSql.append("INSERT INTO ").append(SyncUtil.getDbTableName(dbMapping, dataSource.getDbType())).append(" (");
251250

252251
columnsMap.forEach((targetColumnName, srcColumnName) -> insertSql.append(backtick)
253252
.append(targetColumnName)
@@ -315,15 +314,13 @@ private void update(BatchExecutor batchExecutor, MappingConfig config, SingleDml
315314
}
316315

317316
DbMapping dbMapping = config.getDbMapping();
318-
String dbTypeName = dbMapping.getDbType();
319-
String backtick = SyncUtil.getBacktickByDbType(dbTypeName);
320-
317+
String backtick = SyncUtil.getBacktickByDbType(dataSource.getDbType());
321318
Map<String, String> columnsMap = SyncUtil.getColumnsMap(dbMapping, data);
322319

323320
Map<String, Integer> ctype = getTargetColumnType(batchExecutor.getConn(), config);
324321

325322
StringBuilder updateSql = new StringBuilder();
326-
updateSql.append("UPDATE ").append(SyncUtil.getDbTableName(dbMapping)).append(" SET ");
323+
updateSql.append("UPDATE ").append(SyncUtil.getDbTableName(dbMapping, dataSource.getDbType())).append(" SET ");
327324
List<Map<String, ?>> values = new ArrayList<>();
328325
boolean hasMatched = false;
329326
for (String srcColumnName : old.keySet()) {
@@ -373,11 +370,10 @@ private void delete(BatchExecutor batchExecutor, MappingConfig config, SingleDml
373370
}
374371

375372
DbMapping dbMapping = config.getDbMapping();
376-
377373
Map<String, Integer> ctype = getTargetColumnType(batchExecutor.getConn(), config);
378374

379375
StringBuilder sql = new StringBuilder();
380-
sql.append("DELETE FROM ").append(SyncUtil.getDbTableName(dbMapping)).append(" WHERE ");
376+
sql.append("DELETE FROM ").append(SyncUtil.getDbTableName(dbMapping, dataSource.getDbType())).append(" WHERE ");
381377

382378
List<Map<String, ?>> values = new ArrayList<>();
383379
// 拼接主键
@@ -396,7 +392,7 @@ private void delete(BatchExecutor batchExecutor, MappingConfig config, SingleDml
396392
private void truncate(BatchExecutor batchExecutor, MappingConfig config) throws SQLException {
397393
DbMapping dbMapping = config.getDbMapping();
398394
StringBuilder sql = new StringBuilder();
399-
sql.append("TRUNCATE TABLE ").append(SyncUtil.getDbTableName(dbMapping));
395+
sql.append("TRUNCATE TABLE ").append(SyncUtil.getDbTableName(dbMapping, dataSource.getDbType()));
400396
batchExecutor.execute(sql.toString(), new ArrayList<>());
401397
if (logger.isTraceEnabled()) {
402398
logger.trace("Truncate target table, sql: {}", sql);
@@ -420,7 +416,7 @@ private Map<String, Integer> getTargetColumnType(Connection conn, MappingConfig
420416
if (columnType == null) {
421417
columnType = new LinkedHashMap<>();
422418
final Map<String, Integer> columnTypeTmp = columnType;
423-
String sql = "SELECT * FROM " + SyncUtil.getDbTableName(dbMapping) + " WHERE 1=2";
419+
String sql = "SELECT * FROM " + SyncUtil.getDbTableName(dbMapping, dataSource.getDbType()) + " WHERE 1=2";
424420
Util.sqlRS(conn, sql, rs -> {
425421
try {
426422
ResultSetMetaData rsd = rs.getMetaData();
@@ -449,8 +445,7 @@ private void appendCondition(MappingConfig.DbMapping dbMapping, StringBuilder sq
449445

450446
private void appendCondition(MappingConfig.DbMapping dbMapping, StringBuilder sql, Map<String, Integer> ctype,
451447
List<Map<String, ?>> values, Map<String, Object> d, Map<String, Object> o) {
452-
String dbTypeName = dbMapping.getDbType();
453-
String backtick = SyncUtil.getBacktickByDbType(dbTypeName);
448+
String backtick = SyncUtil.getBacktickByDbType(dataSource.getDbType());
454449

455450
// 拼接主键
456451
for (Map.Entry<String, String> entry : dbMapping.getTargetPk().entrySet()) {

client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/support/BatchExecutor.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414
import org.slf4j.Logger;
1515
import org.slf4j.LoggerFactory;
1616

17+
import com.alibaba.druid.pool.DruidDataSource;
18+
1719
/**
1820
* sql批量执行器
1921
*
@@ -24,11 +26,11 @@ public class BatchExecutor implements Closeable {
2426

2527
private static final Logger logger = LoggerFactory.getLogger(BatchExecutor.class);
2628

27-
private DataSource dataSource;
29+
private DruidDataSource dataSource;
2830
private Connection conn;
2931
private AtomicInteger idx = new AtomicInteger(0);
3032

31-
public BatchExecutor(DataSource dataSource){
33+
public BatchExecutor(DruidDataSource dataSource){
3234
this.dataSource = dataSource;
3335
}
3436

client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/support/SyncUtil.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -256,9 +256,9 @@ public static void setPStmt(int type, PreparedStatement pstmt, Object value, int
256256
}
257257
}
258258

259-
public static String getDbTableName(MappingConfig.DbMapping dbMapping) {
259+
public static String getDbTableName(MappingConfig.DbMapping dbMapping, String dbType) {
260260
String result = "";
261-
String backtick = getBacktickByDbType(dbMapping.getDbType());
261+
String backtick = getBacktickByDbType(dbType);
262262
if (StringUtils.isNotEmpty(dbMapping.getTargetDb())) {
263263
result += (backtick + dbMapping.getTargetDb() + backtick + ".");
264264
}

0 commit comments

Comments
 (0)