Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.dtstack.chunjun.connector.emqx.conf;

import com.dtstack.chunjun.conf.ChunJunCommonConf;
Expand Down Expand Up @@ -43,6 +42,9 @@ public class EmqxConf extends ChunJunCommonConf {
private int qos = 2;
/** emq codec */
private String codec = "plain";
/** emqx reconnect times */
private int connectRetryTimes = 10;

/**
* Field mapping configuration. The data passed from the reader plug-in to the writer plug-in
* only contains its value attribute. After configuring this parameter, it can be restored to a
Expand Down Expand Up @@ -114,6 +116,14 @@ public void setTableFields(List<String> tableFields) {
this.tableFields = tableFields;
}

public void setConnectRetryTimes(int connectRetryTimes) {
this.connectRetryTimes = connectRetryTimes;
}

public int getConnectRetryTimes() {
return connectRetryTimes;
}

@Override
public String toString() {
return "EmqxConf{"
Expand All @@ -136,6 +146,9 @@ public String toString() {
+ ", codec='"
+ codec
+ '\''
+ ",connectRetryTimes='"
+ connectRetryTimes
+ '\''
+ ", tableFields="
+ tableFields
+ '}';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,4 +79,11 @@ public class EmqxOptions {
.stringType()
.defaultValue("writer")
.withDescription("dclient.id.pre");

/** Number of reconnections * */
public static final ConfigOption<Integer> connectRetryTimes =
ConfigOptions.key("connectRetryTimes")
.intType()
.defaultValue(10)
.withDescription(" connectRetryTimes ");
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,14 @@ protected void writeSingleRecordInternal(RowData rowData) throws WriteRecordExce
message.setQos(emqxConf.getQos());
client.publish(emqxConf.getTopic(), message);
} catch (MqttException e) {
// When the mqtt client connection exception reports an error, re-establish the
// connection
if (this.client == null || !this.client.isConnected()) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

如果写入每条数据都进行client.isConnected()判断 的话,十分消耗性能

可以对client.publish()进行捕获, 产生错误的时候,再重新建立连接

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

隔了两个星期正式回复一下,我之前提交了两次代码,按照您的指导。 但是我亲自测试,最后的版本运行不超过6h就会失败,报错是未连接! 然后我不断的改地方,发现,如果在异常产生错误的时候,做判断,然后建立连接,是行不通的!!! 然后在每次写入数据的时候判断,测试几个小时,发现还是不行!!!! 然后我把{重新创建连接, 直接调用 MqttConnectUtil.getMqttClient(...) } 改成第一版本的this.openInternal(1, 1); 运行1day都没问题,现在还在运行不报错。第一个版本,之前测试了一个星期都没有问题。 这个是我两个星期测试的结果反馈

// If the mqtt connection is closed, the connection is reestablished
MqttConnectUtil.getMqttClient(
emqxConf,
CLIENT_ID_WRITER.defaultValue() + LocalTime.now().toSecondOfDay() + jobId);
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

只重新建立了连接,没有把那条数据也publish, 上面这条数据也需要publish一下

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里行不通的,不超过几个小时就会连接失败,更不要提数据的完整性了!!!

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

因为业务需求,我一直在测试emqx这个插件的稳定性,根据你的建议,反复的测试,验证,做了很多实验,发现行不通!!! 实验一、最后的版本(在抛出异常的时候判断连接状态,然后建立连接用 MqttConnectUtil.getMqttClient(,把连接次数搞成可调节的参数) 运行不超过3h 实验二、(把判断连接状态放在每次写入数据那里,和实验一相比,其他的不动) 运行不超过5h 实验三、在实验二的基础上,把连接次数改成固定的60 也失败; 实验四 、 在实验三的基础上,把MqttConnectUtil.getMqttClient( 改成this.openInternal(1, 1); 也就是完全的第一个版本。 运行没有问题,超过1d 之前测试都7d了

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这些是我实践的结果,但是我不理解,为什么按照你说的行不通,我也觉得很合理!!!!!如果想明白为什么,可以告诉我,我再测试看看,

throw new RuntimeException(e);
} catch (Exception e) {
throw new WriteRecordException("", e, 0, rowData);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import static com.dtstack.chunjun.connector.emqx.options.EmqxOptions.QOS;
import static com.dtstack.chunjun.connector.emqx.options.EmqxOptions.TOPIC;
import static com.dtstack.chunjun.connector.emqx.options.EmqxOptions.USERNAME;
import static com.dtstack.chunjun.connector.emqx.options.EmqxOptions.connectRetryTimes;

/**
* @author chuixue
Expand Down Expand Up @@ -127,6 +128,7 @@ public Set<ConfigOption<?>> optionalOptions() {
optionalOptions.add(USERNAME);
optionalOptions.add(PASSWORD);
optionalOptions.add(FORMAT);
optionalOptions.add(connectRetryTimes);
return optionalOptions;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public class MqttConnectUtil {
*/
public static MqttClient getMqttClient(EmqxConf emqxConf, String clientId) {
MqttClient client = null;
for (int i = 0; i <= 2; i++) {
for (int i = 0; i <= emqxConf.getConnectRetryTimes(); i++) {
try {
client = new MqttClient(emqxConf.getBroker(), clientId);
MqttConnectOptions options = new MqttConnectOptions();
Expand All @@ -73,7 +73,7 @@ public static MqttClient getMqttClient(EmqxConf emqxConf, String clientId) {
} catch (InterruptedException interruptedException) {
throw new RuntimeException(interruptedException);
}
if (i == 2) {
if (i == emqxConf.getConnectRetryTimes()) {
throw new RuntimeException(e);
}
}
Expand Down