diff --git a/chunjun-connectors/chunjun-connector-emqx/src/main/java/com/dtstack/chunjun/connector/emqx/conf/EmqxConf.java b/chunjun-connectors/chunjun-connector-emqx/src/main/java/com/dtstack/chunjun/connector/emqx/conf/EmqxConf.java index f06150e31d..9d9e90bafa 100644 --- a/chunjun-connectors/chunjun-connector-emqx/src/main/java/com/dtstack/chunjun/connector/emqx/conf/EmqxConf.java +++ b/chunjun-connectors/chunjun-connector-emqx/src/main/java/com/dtstack/chunjun/connector/emqx/conf/EmqxConf.java @@ -15,7 +15,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package com.dtstack.chunjun.connector.emqx.conf; import com.dtstack.chunjun.conf.ChunJunCommonConf; @@ -43,6 +42,9 @@ public class EmqxConf extends ChunJunCommonConf { private int qos = 2; /** emq codec */ private String codec = "plain"; + /** emqx reconnect times */ + private int connectRetryTimes = 10; + /** * Field mapping configuration. The data passed from the reader plug-in to the writer plug-in * only contains its value attribute. After configuring this parameter, it can be restored to a @@ -114,6 +116,14 @@ public void setTableFields(List tableFields) { this.tableFields = tableFields; } + public void setConnectRetryTimes(int connectRetryTimes) { + this.connectRetryTimes = connectRetryTimes; + } + + public int getConnectRetryTimes() { + return connectRetryTimes; + } + @Override public String toString() { return "EmqxConf{" @@ -136,6 +146,9 @@ public String toString() { + ", codec='" + codec + '\'' + + ",connectRetryTimes='" + + connectRetryTimes + + '\'' + ", tableFields=" + tableFields + '}'; diff --git a/chunjun-connectors/chunjun-connector-emqx/src/main/java/com/dtstack/chunjun/connector/emqx/options/EmqxOptions.java b/chunjun-connectors/chunjun-connector-emqx/src/main/java/com/dtstack/chunjun/connector/emqx/options/EmqxOptions.java index caac47f56d..f98ceb3278 100644 --- a/chunjun-connectors/chunjun-connector-emqx/src/main/java/com/dtstack/chunjun/connector/emqx/options/EmqxOptions.java +++ b/chunjun-connectors/chunjun-connector-emqx/src/main/java/com/dtstack/chunjun/connector/emqx/options/EmqxOptions.java @@ -79,4 +79,11 @@ public class EmqxOptions { .stringType() .defaultValue("writer") .withDescription("dclient.id.pre"); + + /** Number of reconnections * */ + public static final ConfigOption connectRetryTimes = + ConfigOptions.key("connectRetryTimes") + .intType() + .defaultValue(10) + .withDescription(" connectRetryTimes "); } diff --git a/chunjun-connectors/chunjun-connector-emqx/src/main/java/com/dtstack/chunjun/connector/emqx/sink/EmqxOutputFormat.java b/chunjun-connectors/chunjun-connector-emqx/src/main/java/com/dtstack/chunjun/connector/emqx/sink/EmqxOutputFormat.java index a23fca9664..793bff2457 100644 --- a/chunjun-connectors/chunjun-connector-emqx/src/main/java/com/dtstack/chunjun/connector/emqx/sink/EmqxOutputFormat.java +++ b/chunjun-connectors/chunjun-connector-emqx/src/main/java/com/dtstack/chunjun/connector/emqx/sink/EmqxOutputFormat.java @@ -60,6 +60,14 @@ protected void writeSingleRecordInternal(RowData rowData) throws WriteRecordExce message.setQos(emqxConf.getQos()); client.publish(emqxConf.getTopic(), message); } catch (MqttException e) { + // When the mqtt client connection exception reports an error, re-establish the + // connection + if (this.client == null || !this.client.isConnected()) { + // If the mqtt connection is closed, the connection is reestablished + MqttConnectUtil.getMqttClient( + emqxConf, + CLIENT_ID_WRITER.defaultValue() + LocalTime.now().toSecondOfDay() + jobId); + } throw new RuntimeException(e); } catch (Exception e) { throw new WriteRecordException("", e, 0, rowData); diff --git a/chunjun-connectors/chunjun-connector-emqx/src/main/java/com/dtstack/chunjun/connector/emqx/table/EmqxDynamicTableFactory.java b/chunjun-connectors/chunjun-connector-emqx/src/main/java/com/dtstack/chunjun/connector/emqx/table/EmqxDynamicTableFactory.java index 858c16cb95..95aa568b99 100644 --- a/chunjun-connectors/chunjun-connector-emqx/src/main/java/com/dtstack/chunjun/connector/emqx/table/EmqxDynamicTableFactory.java +++ b/chunjun-connectors/chunjun-connector-emqx/src/main/java/com/dtstack/chunjun/connector/emqx/table/EmqxDynamicTableFactory.java @@ -53,6 +53,7 @@ import static com.dtstack.chunjun.connector.emqx.options.EmqxOptions.QOS; import static com.dtstack.chunjun.connector.emqx.options.EmqxOptions.TOPIC; import static com.dtstack.chunjun.connector.emqx.options.EmqxOptions.USERNAME; +import static com.dtstack.chunjun.connector.emqx.options.EmqxOptions.connectRetryTimes; /** * @author chuixue @@ -127,6 +128,7 @@ public Set> optionalOptions() { optionalOptions.add(USERNAME); optionalOptions.add(PASSWORD); optionalOptions.add(FORMAT); + optionalOptions.add(connectRetryTimes); return optionalOptions; } diff --git a/chunjun-connectors/chunjun-connector-emqx/src/main/java/com/dtstack/chunjun/connector/emqx/util/MqttConnectUtil.java b/chunjun-connectors/chunjun-connector-emqx/src/main/java/com/dtstack/chunjun/connector/emqx/util/MqttConnectUtil.java index 0af2dbd627..d191c4254c 100644 --- a/chunjun-connectors/chunjun-connector-emqx/src/main/java/com/dtstack/chunjun/connector/emqx/util/MqttConnectUtil.java +++ b/chunjun-connectors/chunjun-connector-emqx/src/main/java/com/dtstack/chunjun/connector/emqx/util/MqttConnectUtil.java @@ -49,7 +49,7 @@ public class MqttConnectUtil { */ public static MqttClient getMqttClient(EmqxConf emqxConf, String clientId) { MqttClient client = null; - for (int i = 0; i <= 2; i++) { + for (int i = 0; i <= emqxConf.getConnectRetryTimes(); i++) { try { client = new MqttClient(emqxConf.getBroker(), clientId); MqttConnectOptions options = new MqttConnectOptions(); @@ -73,7 +73,7 @@ public static MqttClient getMqttClient(EmqxConf emqxConf, String clientId) { } catch (InterruptedException interruptedException) { throw new RuntimeException(interruptedException); } - if (i == 2) { + if (i == emqxConf.getConnectRetryTimes()) { throw new RuntimeException(e); } }