Skip to content

Commit 9117112

Browse files
authored
修复rds高可用主从切换和oss相关问题 (#3480)
* 修复rds oos 拉取的binlog 的消费逻辑以及本地消费到直连消费的逻辑衔接 * 去除无用代码
1 parent 63407dc commit 9117112

File tree

5 files changed

+64
-23
lines changed

5 files changed

+64
-23
lines changed

parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/LocalBinLogConnection.java

Lines changed: 49 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,9 @@
22

33
import java.io.File;
44
import java.io.IOException;
5+
import java.util.HashSet;
56
import java.util.List;
7+
import java.util.Set;
68

79
import org.apache.commons.lang.NotImplementedException;
810
import org.apache.commons.lang.StringUtils;
@@ -25,7 +27,7 @@
2527

2628
/**
2729
* local bin log connection (not real connection)
28-
*
30+
*
2931
* @author yuanzu Date: 12-9-27 Time: 下午6:14
3032
*/
3133
public class LocalBinLogConnection implements ErosaConnection {
@@ -37,11 +39,28 @@ public class LocalBinLogConnection implements ErosaConnection {
3739
private int bufferSize = 16 * 1024;
3840
private boolean running = false;
3941
private long serverId;
42+
43+
/** rdsOosMode binlog 的 serverId 是两个 */
44+
private boolean isRdsOssMode = false;
45+
46+
/** rdsOosMode 主从信息 */
47+
private final Set<Long> rdsOssMasterSlaveInfo = new HashSet<>(4);
48+
49+
private boolean firstUpdateRdsOssMasterSlave = true;
50+
4051
private FileParserListener parserListener;
4152

4253
public LocalBinLogConnection(){
4354
}
4455

56+
public boolean isRdsOssMode() {
57+
return isRdsOssMode;
58+
}
59+
60+
public void setRdsOssMode(boolean rdsOssMode) {
61+
isRdsOssMode = rdsOssMode;
62+
}
63+
4564
public LocalBinLogConnection(String directory, boolean needWait){
4665
this.needWait = needWait;
4766
this.directory = directory;
@@ -94,9 +113,7 @@ public void dump(String binlogfilename, Long binlogPosition, SinkFunction func)
94113
if (event == null) {
95114
continue;
96115
}
97-
if (serverId != 0 && event.getServerId() != serverId) {
98-
throw new ServerIdNotMatchException("unexpected serverId " + serverId + " in binlog file !");
99-
}
116+
checkServerId(event);
100117

101118
if (!func.sink(event)) {
102119
needContinue = false;
@@ -131,6 +148,30 @@ public void dump(String binlogfilename, Long binlogPosition, SinkFunction func)
131148
}
132149
}
133150

151+
/**
152+
* 1. 非 rdsOos 模式下需要要校验 serverId 是否一致 防止解析其他实例的 binlog
153+
* 2. rdsOos 高可用模式下解析 binlog 会有两个 serverId,分别对应着主从节点 binlog解析出来的 serverId
154+
* 主从的关系可能会变, 但是 serverId一直都会是这两个 serverId
155+
*
156+
* @param event
157+
*/
158+
private void checkServerId(LogEvent event) {
159+
if (serverId != 0 && event.getServerId() != serverId) {
160+
if (isRdsOssMode()) {
161+
// 第一次添加主从信息
162+
if (firstUpdateRdsOssMasterSlave) {
163+
firstUpdateRdsOssMasterSlave = false;
164+
rdsOssMasterSlaveInfo.add(event.getServerId());
165+
} else if (!rdsOssMasterSlaveInfo.contains(event.getServerId())) {
166+
// 主从节点信息之外的节点信息
167+
throw new ServerIdNotMatchException("unexpected rds serverId " + serverId + " in binlog file !");
168+
}
169+
} else {
170+
throw new ServerIdNotMatchException("unexpected serverId " + serverId + " in binlog file !");
171+
}
172+
}
173+
}
174+
134175
public void dump(long timestampMills, SinkFunction func) throws IOException {
135176
List<File> currentBinlogs = binlogs.currentBinlogs();
136177
File current = currentBinlogs.get(currentBinlogs.size() - 1);
@@ -158,9 +199,7 @@ public void dump(long timestampMills, SinkFunction func) throws IOException {
158199
while (fetcher.fetch()) {
159200
LogEvent event = decoder.decode(fetcher, context);
160201
if (event != null) {
161-
if (serverId != 0 && event.getServerId() != serverId) {
162-
throw new ServerIdNotMatchException("unexpected serverId " + serverId + " in binlog file !");
163-
}
202+
checkServerId(event);
164203

165204
if (event.getWhen() > timestampSeconds) {
166205
break;
@@ -234,9 +273,7 @@ public void dump(String binlogfilename, Long binlogPosition, MultiStageCoprocess
234273
if (event == null) {
235274
continue;
236275
}
237-
if (serverId != 0 && event.getServerId() != serverId) {
238-
throw new ServerIdNotMatchException("unexpected serverId " + serverId + " in binlog file !");
239-
}
276+
checkServerId(event);
240277

241278
if (!coprocessor.publish(event)) {
242279
needContinue = false;
@@ -304,9 +341,7 @@ public void dump(long timestampMills, MultiStageCoprocessor coprocessor) throws
304341
while (fetcher.fetch()) {
305342
LogEvent event = decoder.decode(fetcher, context);
306343
if (event != null) {
307-
if (serverId != 0 && event.getServerId() != serverId) {
308-
throw new ServerIdNotMatchException("unexpected serverId " + serverId + " in binlog file !");
309-
}
344+
checkServerId(event);
310345

311346
if (event.getWhen() > timestampSeconds) {
312347
break;
@@ -404,6 +439,7 @@ public long getServerId() {
404439

405440
public void setServerId(long serverId) {
406441
this.serverId = serverId;
442+
rdsOssMasterSlaveInfo.add(serverId);
407443
}
408444

409445
public void setParserListener(FileParserListener parserListener) {

parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlEventParser.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -464,6 +464,10 @@ protected EntryPosition findStartPositionInternal(ErosaConnection connection) {
464464
}
465465

466466
if (specificLogFilePosition == null) {
467+
if (isRdsOssMode()) {
468+
// 如果binlog位点不存在,并且属于timestamp不为空,可以返回null走到oss binlog处理
469+
return null;
470+
}
467471
// position不存在,从文件头开始
468472
entryPosition.setPosition(BINLOG_START_OFFEST);
469473
return entryPosition;

parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/local/BinLogFileQueue.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,15 @@
1818

1919
/**
2020
* 维护binlog文件列表
21-
*
21+
*
2222
* @author jianghang 2012-7-7 下午03:48:05
2323
* @version 1.0.0
2424
*/
2525
public class BinLogFileQueue {
2626

2727
private String baseName = "mysql-bin.";
2828
private List<File> binlogs = new ArrayList<>();
29+
private Pattern binLogPattern = Pattern.compile(baseName + "\\d+$");
2930
private File directory;
3031
private ReentrantLock lock = new ReentrantLock();
3132
private Condition nextCondition = lock.newCondition();
@@ -77,7 +78,7 @@ public void run() {
7778

7879
/**
7980
* 根据前一个文件,获取符合条件的下一个binlog文件
80-
*
81+
*
8182
* @param pre
8283
* @return
8384
*/
@@ -141,7 +142,7 @@ public File getBefore(File file) {
141142

142143
/**
143144
* 根据前一个文件,获取符合条件的下一个binlog文件
144-
*
145+
*
145146
* @param pre
146147
* @return
147148
* @throws InterruptedException
@@ -219,9 +220,8 @@ private List<File> listBinlogFiles() {
219220
files.addAll(FileUtils.listFiles(directory, new IOFileFilter() {
220221

221222
public boolean accept(File file) {
222-
Pattern pattern = Pattern.compile("\\d+$");
223-
Matcher matcher = pattern.matcher(file.getName());
224-
return file.getName().startsWith(baseName) && matcher.find();
223+
Matcher matcher = binLogPattern.matcher(file.getName());
224+
return matcher.find();
225225
}
226226

227227
public boolean accept(File dir, String name) {

parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/rds/BinlogDownloadQueue.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ private void filter(String hostInstanceId) {
123123
public boolean isLastFile(String fileName) {
124124
String needCompareName = lastDownload;
125125
if (StringUtils.isNotEmpty(needCompareName) && StringUtils.endsWith(needCompareName, "tar")) {
126-
needCompareName = needCompareName.substring(0, needCompareName.indexOf("."));
126+
needCompareName = needCompareName.substring(0, needCompareName.lastIndexOf("."));
127127
}
128128
return (needCompareName == null || fileName.equalsIgnoreCase(needCompareName)) && binlogList.isEmpty();
129129
}
@@ -223,7 +223,7 @@ private static void saveFile(File parentFile, String fileName, HttpResponse resp
223223
TarArchiveEntry tarArchiveEntry = null;
224224
while ((tarArchiveEntry = tais.getNextTarEntry()) != null) {
225225
String name = tarArchiveEntry.getName();
226-
File tarFile = new File(parentFile, name + ".tmp");
226+
File tarFile = new File(parentFile, name);
227227
logger.info("start to download file " + tarFile.getName());
228228
if (tarFile.exists()) {
229229
tarFile.delete();
@@ -244,7 +244,7 @@ private static void saveFile(File parentFile, String fileName, HttpResponse resp
244244
}
245245
tais.close();
246246
} else {
247-
File file = new File(parentFile, fileName + ".tmp");
247+
File file = new File(parentFile, fileName);
248248
if (file.exists()) {
249249
file.delete();
250250
}

parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/rds/RdsLocalBinlogEventParser.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121

2222
/**
2323
* 基于rds binlog备份文件的复制
24-
*
24+
*
2525
* @author agapple 2017年10月15日 下午1:27:36
2626
* @since 1.0.25
2727
*/
@@ -117,6 +117,7 @@ protected ErosaConnection buildErosaConnection() {
117117
localBinLogConnection.setNeedWait(true);
118118
localBinLogConnection.setServerId(serverId);
119119
localBinLogConnection.setParserListener(this);
120+
localBinLogConnection.setRdsOssMode(true);
120121
}
121122
return connection;
122123
}

0 commit comments

Comments
 (0)