From 7a9a35c02238bd8ae32fd29465128a0f78194af5 Mon Sep 17 00:00:00 2001 From: "2038373094@qq.com" <2038373094@qq.com> Date: Mon, 15 Aug 2022 13:25:04 +0800 Subject: [PATCH 1/5] =?UTF-8?q?emqx=E6=96=AD=E5=BC=80=E4=B8=8D=E8=83=BD?= =?UTF-8?q?=E9=87=8D=E8=BF=9Ebug=E4=BF=AE=E5=A4=8D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../chunjun/connector/emqx/sink/EmqxOutputFormat.java | 6 ++++++ .../chunjun/connector/emqx/util/MqttConnectUtil.java | 4 ++-- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/chunjun-connectors/chunjun-connector-emqx/src/main/java/com/dtstack/chunjun/connector/emqx/sink/EmqxOutputFormat.java b/chunjun-connectors/chunjun-connector-emqx/src/main/java/com/dtstack/chunjun/connector/emqx/sink/EmqxOutputFormat.java index a23fca9664..8384206d6b 100644 --- a/chunjun-connectors/chunjun-connector-emqx/src/main/java/com/dtstack/chunjun/connector/emqx/sink/EmqxOutputFormat.java +++ b/chunjun-connectors/chunjun-connector-emqx/src/main/java/com/dtstack/chunjun/connector/emqx/sink/EmqxOutputFormat.java @@ -56,11 +56,17 @@ protected void openInternal(int taskNumber, int numTasks) { @Override protected void writeSingleRecordInternal(RowData rowData) throws WriteRecordException { try { + // 如果断开就重新连接 + if (this.client == null || !this.client.isConnected()) { + // 如果关闭的话,就重新连接 + this.openInternal(1, 1); + } MqttMessage message = (MqttMessage) rowConverter.toExternal(rowData, new MqttMessage()); message.setQos(emqxConf.getQos()); client.publish(emqxConf.getTopic(), message); } catch (MqttException e) { throw new RuntimeException(e); + } catch (Exception e) { throw new WriteRecordException("", e, 0, rowData); } diff --git a/chunjun-connectors/chunjun-connector-emqx/src/main/java/com/dtstack/chunjun/connector/emqx/util/MqttConnectUtil.java b/chunjun-connectors/chunjun-connector-emqx/src/main/java/com/dtstack/chunjun/connector/emqx/util/MqttConnectUtil.java index 0af2dbd627..f0c2d0cc3a 100644 --- a/chunjun-connectors/chunjun-connector-emqx/src/main/java/com/dtstack/chunjun/connector/emqx/util/MqttConnectUtil.java +++ b/chunjun-connectors/chunjun-connector-emqx/src/main/java/com/dtstack/chunjun/connector/emqx/util/MqttConnectUtil.java @@ -49,7 +49,7 @@ public class MqttConnectUtil { */ public static MqttClient getMqttClient(EmqxConf emqxConf, String clientId) { MqttClient client = null; - for (int i = 0; i <= 2; i++) { + for (int i = 0; i <= 60; i++) { try { client = new MqttClient(emqxConf.getBroker(), clientId); MqttConnectOptions options = new MqttConnectOptions(); @@ -73,7 +73,7 @@ public static MqttClient getMqttClient(EmqxConf emqxConf, String clientId) { } catch (InterruptedException interruptedException) { throw new RuntimeException(interruptedException); } - if (i == 2) { + if (i == 60) { throw new RuntimeException(e); } } From bbd16a1e320949aaba143311ab1a292bd47415cb Mon Sep 17 00:00:00 2001 From: "2038373094@qq.com" <2038373094@qq.com> Date: Wed, 24 Aug 2022 10:49:06 +0800 Subject: [PATCH 2/5] =?UTF-8?q?emqx=E4=BA=8C=E6=AC=A1=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../chunjun/connector/emqx/conf/EmqxConf.java | 17 +++++++++++++++-- .../connector/emqx/options/EmqxOptions.java | 3 +++ .../connector/emqx/sink/EmqxOutputFormat.java | 4 +++- .../emqx/table/EmqxDynamicTableFactory.java | 2 ++ .../connector/emqx/util/MqttConnectUtil.java | 2 +- mintty.exe.stackdump | 19 +++++++++++++++++++ 6 files changed, 43 insertions(+), 4 deletions(-) create mode 100644 mintty.exe.stackdump diff --git a/chunjun-connectors/chunjun-connector-emqx/src/main/java/com/dtstack/chunjun/connector/emqx/conf/EmqxConf.java b/chunjun-connectors/chunjun-connector-emqx/src/main/java/com/dtstack/chunjun/connector/emqx/conf/EmqxConf.java index f06150e31d..84a49d5b0c 100644 --- a/chunjun-connectors/chunjun-connector-emqx/src/main/java/com/dtstack/chunjun/connector/emqx/conf/EmqxConf.java +++ b/chunjun-connectors/chunjun-connector-emqx/src/main/java/com/dtstack/chunjun/connector/emqx/conf/EmqxConf.java @@ -15,7 +15,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package com.dtstack.chunjun.connector.emqx.conf; import com.dtstack.chunjun.conf.ChunJunCommonConf; @@ -40,9 +39,12 @@ public class EmqxConf extends ChunJunCommonConf { /** emq clean session */ private boolean isCleanSession = true; /** emq EXACTLY_ONCE */ - private int qos = 2; + private int qos = 1; /** emq codec */ private String codec = "plain"; + /** emqx reconnect times */ + private int times = 10; + /** * Field mapping configuration. The data passed from the reader plug-in to the writer plug-in * only contains its value attribute. After configuring this parameter, it can be restored to a @@ -114,6 +116,14 @@ public void setTableFields(List tableFields) { this.tableFields = tableFields; } + public void setTimes(int times) { + this.times = times; + } + + public int getTimes() { + return times; + } + @Override public String toString() { return "EmqxConf{" @@ -136,6 +146,9 @@ public String toString() { + ", codec='" + codec + '\'' + + ",times='" + + times + + '\'' + ", tableFields=" + tableFields + '}'; diff --git a/chunjun-connectors/chunjun-connector-emqx/src/main/java/com/dtstack/chunjun/connector/emqx/options/EmqxOptions.java b/chunjun-connectors/chunjun-connector-emqx/src/main/java/com/dtstack/chunjun/connector/emqx/options/EmqxOptions.java index caac47f56d..c31533b72b 100644 --- a/chunjun-connectors/chunjun-connector-emqx/src/main/java/com/dtstack/chunjun/connector/emqx/options/EmqxOptions.java +++ b/chunjun-connectors/chunjun-connector-emqx/src/main/java/com/dtstack/chunjun/connector/emqx/options/EmqxOptions.java @@ -79,4 +79,7 @@ public class EmqxOptions { .stringType() .defaultValue("writer") .withDescription("dclient.id.pre"); + /** 重连接的次数 * */ + public static final ConfigOption TIMES = + ConfigOptions.key("times").intType().defaultValue(10).withDescription(" times "); } diff --git a/chunjun-connectors/chunjun-connector-emqx/src/main/java/com/dtstack/chunjun/connector/emqx/sink/EmqxOutputFormat.java b/chunjun-connectors/chunjun-connector-emqx/src/main/java/com/dtstack/chunjun/connector/emqx/sink/EmqxOutputFormat.java index 8384206d6b..9353dee2b7 100644 --- a/chunjun-connectors/chunjun-connector-emqx/src/main/java/com/dtstack/chunjun/connector/emqx/sink/EmqxOutputFormat.java +++ b/chunjun-connectors/chunjun-connector-emqx/src/main/java/com/dtstack/chunjun/connector/emqx/sink/EmqxOutputFormat.java @@ -59,7 +59,9 @@ protected void writeSingleRecordInternal(RowData rowData) throws WriteRecordExce // 如果断开就重新连接 if (this.client == null || !this.client.isConnected()) { // 如果关闭的话,就重新连接 - this.openInternal(1, 1); + MqttConnectUtil.getMqttClient( + emqxConf, + CLIENT_ID_WRITER.defaultValue() + LocalTime.now().toSecondOfDay() + jobId); } MqttMessage message = (MqttMessage) rowConverter.toExternal(rowData, new MqttMessage()); message.setQos(emqxConf.getQos()); diff --git a/chunjun-connectors/chunjun-connector-emqx/src/main/java/com/dtstack/chunjun/connector/emqx/table/EmqxDynamicTableFactory.java b/chunjun-connectors/chunjun-connector-emqx/src/main/java/com/dtstack/chunjun/connector/emqx/table/EmqxDynamicTableFactory.java index 858c16cb95..66c28b99e1 100644 --- a/chunjun-connectors/chunjun-connector-emqx/src/main/java/com/dtstack/chunjun/connector/emqx/table/EmqxDynamicTableFactory.java +++ b/chunjun-connectors/chunjun-connector-emqx/src/main/java/com/dtstack/chunjun/connector/emqx/table/EmqxDynamicTableFactory.java @@ -51,6 +51,7 @@ import static com.dtstack.chunjun.connector.emqx.options.EmqxOptions.ISCLEANSESSION; import static com.dtstack.chunjun.connector.emqx.options.EmqxOptions.PASSWORD; import static com.dtstack.chunjun.connector.emqx.options.EmqxOptions.QOS; +import static com.dtstack.chunjun.connector.emqx.options.EmqxOptions.TIMES; import static com.dtstack.chunjun.connector.emqx.options.EmqxOptions.TOPIC; import static com.dtstack.chunjun.connector.emqx.options.EmqxOptions.USERNAME; @@ -116,6 +117,7 @@ public Set> requiredOptions() { Set> requiredOptions = new HashSet<>(); requiredOptions.add(BROKER); requiredOptions.add(TOPIC); + requiredOptions.add(TIMES); return requiredOptions; } diff --git a/chunjun-connectors/chunjun-connector-emqx/src/main/java/com/dtstack/chunjun/connector/emqx/util/MqttConnectUtil.java b/chunjun-connectors/chunjun-connector-emqx/src/main/java/com/dtstack/chunjun/connector/emqx/util/MqttConnectUtil.java index f0c2d0cc3a..14bcb40e60 100644 --- a/chunjun-connectors/chunjun-connector-emqx/src/main/java/com/dtstack/chunjun/connector/emqx/util/MqttConnectUtil.java +++ b/chunjun-connectors/chunjun-connector-emqx/src/main/java/com/dtstack/chunjun/connector/emqx/util/MqttConnectUtil.java @@ -49,7 +49,7 @@ public class MqttConnectUtil { */ public static MqttClient getMqttClient(EmqxConf emqxConf, String clientId) { MqttClient client = null; - for (int i = 0; i <= 60; i++) { + for (int i = 0; i <= emqxConf.getTimes(); i++) { try { client = new MqttClient(emqxConf.getBroker(), clientId); MqttConnectOptions options = new MqttConnectOptions(); diff --git a/mintty.exe.stackdump b/mintty.exe.stackdump new file mode 100644 index 0000000000..488fc09807 --- /dev/null +++ b/mintty.exe.stackdump @@ -0,0 +1,19 @@ +Stack trace: +Frame Function Args +000FFFF1EBC 00180062B0E (001802AB0FB, 00180270E51, 0000000FDE9, 000FFFF0CE0) +000FFFF1EBC 0018004846A (00000000000, 000FFFF1D58, 0000011400C, 00000000000) +000FFFF1EBC 001800484A2 (00000000032, 000FFFF1F50, 0000000FDE9, 00000000010) +000FFFF1EBC 001801647A4 (00000000000, 00000000000, 00000000000, 00000000000) +000FFFF1EBC 001800BC88A (008002582F0, 000FFFF1F50, 000FFFF1ED0, 00800000010) +000FFFF1EBC 001800C58C1 (00000000001, 00000000000, 0000000043C, 7FF941B36F90) +000FFFFC3B0 001800C9038 (001004028AC, 000003C0E92, 7FF941703BB0, 000010A0A65) +000FFFFC3B0 001801473E7 (000FFFFC170, 00000000060, 000000001CD, 474E494E5241575B) +000FFFFC3B0 0018019598B (000FFFFC170, 00000000060, 000000001CD, 474E494E5241575B) +000FFFFC3B0 001004055AA (00000000000, 00000000070, 7FF9416EA5C3, FFFFFFFFFFFFFFFF) +001004A19F6 00100421476 (00000000003, 001004535FD, 000FFFFC5D0, 000FFFFC5D0) +001004A19F6 001004046EA (000FFFFC550, 001004EDC84, 7FF9416E9980, 00800000001) +000FFFFC550 0010046321D (001801BCF1A, 00000000000, 0000000000E, 000FFFFCD30) +000FFFFCD30 00180049B91 (00000000000, 00000000000, 00000000000, 00000000000) +000FFFFFFF0 00180047716 (00000000000, 00000000000, 00000000000, 00000000000) +000FFFFFFF0 001800477C4 (00000000000, 00000000000, 00000000000, 00000000000) +End of stack trace From 49b779b4e5e34b5cb013745b4d2ec6f206114cc3 Mon Sep 17 00:00:00 2001 From: "2038373094@qq.com" <2038373094@qq.com> Date: Thu, 25 Aug 2022 16:40:31 +0800 Subject: [PATCH 3/5] =?UTF-8?q?1=E3=80=81emqx=E7=9A=84=E9=87=8D=E8=BF=9E?= =?UTF-8?q?=E6=AC=A1=E6=95=B0=E5=8F=82=E6=95=B0=E9=87=8D=E5=91=BD=E5=90=8D?= =?UTF-8?q?=202=E3=80=81=E8=AE=BE=E7=BD=AE=E9=87=8D=E8=BF=9E=E6=AC=A1?= =?UTF-8?q?=E6=95=B0=E4=B8=BA=E9=9D=9E=E5=BF=85=E8=A6=81=E9=85=8D=E7=BD=AE?= =?UTF-8?q?=E9=A1=B9=203=E3=80=81=E6=8A=8A60=E6=94=B9=E4=B8=BA=E5=8F=98?= =?UTF-8?q?=E9=87=8F=204=E3=80=81=E5=88=A0=E9=99=A4=E4=BA=86=E9=A2=9D?= =?UTF-8?q?=E5=A4=96=E7=9A=84=E6=96=87=E4=BB=B6=205=E3=80=81=E6=8A=8A?= =?UTF-8?q?=E6=AF=8F=E6=AC=A1=E5=86=99=E5=85=A5=E6=95=B0=E6=8D=AE=E9=83=BD?= =?UTF-8?q?=E5=88=A4=E6=96=AD=E7=8A=B6=E6=80=81=E8=BF=99=E7=A7=8D=E8=80=97?= =?UTF-8?q?=E8=B5=84=E6=BA=90=E7=9A=84=E5=86=99=E6=B3=95=E6=94=B9=E4=B8=BA?= =?UTF-8?q?=E6=8D=95=E8=8E=B7=E9=94=99=E8=AF=AF=E7=9A=84=E6=97=B6=E5=80=99?= =?UTF-8?q?=E5=86=8D=E9=87=8D=E6=96=B0=E5=BB=BA=E7=AB=8B=E8=BF=9E=E6=8E=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../chunjun/connector/emqx/conf/EmqxConf.java | 16 ++++++++-------- .../connector/emqx/options/EmqxOptions.java | 4 ++-- .../connector/emqx/sink/EmqxOutputFormat.java | 11 +++++------ .../emqx/table/EmqxDynamicTableFactory.java | 4 ++-- .../connector/emqx/util/MqttConnectUtil.java | 4 ++-- mintty.exe.stackdump | 19 ------------------- 6 files changed, 19 insertions(+), 39 deletions(-) delete mode 100644 mintty.exe.stackdump diff --git a/chunjun-connectors/chunjun-connector-emqx/src/main/java/com/dtstack/chunjun/connector/emqx/conf/EmqxConf.java b/chunjun-connectors/chunjun-connector-emqx/src/main/java/com/dtstack/chunjun/connector/emqx/conf/EmqxConf.java index 84a49d5b0c..9d9e90bafa 100644 --- a/chunjun-connectors/chunjun-connector-emqx/src/main/java/com/dtstack/chunjun/connector/emqx/conf/EmqxConf.java +++ b/chunjun-connectors/chunjun-connector-emqx/src/main/java/com/dtstack/chunjun/connector/emqx/conf/EmqxConf.java @@ -39,11 +39,11 @@ public class EmqxConf extends ChunJunCommonConf { /** emq clean session */ private boolean isCleanSession = true; /** emq EXACTLY_ONCE */ - private int qos = 1; + private int qos = 2; /** emq codec */ private String codec = "plain"; /** emqx reconnect times */ - private int times = 10; + private int connectRetryTimes = 10; /** * Field mapping configuration. The data passed from the reader plug-in to the writer plug-in @@ -116,12 +116,12 @@ public void setTableFields(List tableFields) { this.tableFields = tableFields; } - public void setTimes(int times) { - this.times = times; + public void setConnectRetryTimes(int connectRetryTimes) { + this.connectRetryTimes = connectRetryTimes; } - public int getTimes() { - return times; + public int getConnectRetryTimes() { + return connectRetryTimes; } @Override @@ -146,8 +146,8 @@ public String toString() { + ", codec='" + codec + '\'' - + ",times='" - + times + + ",connectRetryTimes='" + + connectRetryTimes + '\'' + ", tableFields=" + tableFields diff --git a/chunjun-connectors/chunjun-connector-emqx/src/main/java/com/dtstack/chunjun/connector/emqx/options/EmqxOptions.java b/chunjun-connectors/chunjun-connector-emqx/src/main/java/com/dtstack/chunjun/connector/emqx/options/EmqxOptions.java index c31533b72b..4f3571a7ec 100644 --- a/chunjun-connectors/chunjun-connector-emqx/src/main/java/com/dtstack/chunjun/connector/emqx/options/EmqxOptions.java +++ b/chunjun-connectors/chunjun-connector-emqx/src/main/java/com/dtstack/chunjun/connector/emqx/options/EmqxOptions.java @@ -80,6 +80,6 @@ public class EmqxOptions { .defaultValue("writer") .withDescription("dclient.id.pre"); /** 重连接的次数 * */ - public static final ConfigOption TIMES = - ConfigOptions.key("times").intType().defaultValue(10).withDescription(" times "); + public static final ConfigOption connectRetryTimes = + ConfigOptions.key("connectRetryTimes").intType().defaultValue(10).withDescription(" connectRetryTimes "); } diff --git a/chunjun-connectors/chunjun-connector-emqx/src/main/java/com/dtstack/chunjun/connector/emqx/sink/EmqxOutputFormat.java b/chunjun-connectors/chunjun-connector-emqx/src/main/java/com/dtstack/chunjun/connector/emqx/sink/EmqxOutputFormat.java index 9353dee2b7..a3c796269f 100644 --- a/chunjun-connectors/chunjun-connector-emqx/src/main/java/com/dtstack/chunjun/connector/emqx/sink/EmqxOutputFormat.java +++ b/chunjun-connectors/chunjun-connector-emqx/src/main/java/com/dtstack/chunjun/connector/emqx/sink/EmqxOutputFormat.java @@ -56,19 +56,18 @@ protected void openInternal(int taskNumber, int numTasks) { @Override protected void writeSingleRecordInternal(RowData rowData) throws WriteRecordException { try { - // 如果断开就重新连接 + MqttMessage message = (MqttMessage) rowConverter.toExternal(rowData, new MqttMessage()); + message.setQos(emqxConf.getQos()); + client.publish(emqxConf.getTopic(), message); + } catch (MqttException e) { + //当mqtt客户端连接异常报错的时候,再重新建立连接 if (this.client == null || !this.client.isConnected()) { // 如果关闭的话,就重新连接 MqttConnectUtil.getMqttClient( emqxConf, CLIENT_ID_WRITER.defaultValue() + LocalTime.now().toSecondOfDay() + jobId); } - MqttMessage message = (MqttMessage) rowConverter.toExternal(rowData, new MqttMessage()); - message.setQos(emqxConf.getQos()); - client.publish(emqxConf.getTopic(), message); - } catch (MqttException e) { throw new RuntimeException(e); - } catch (Exception e) { throw new WriteRecordException("", e, 0, rowData); } diff --git a/chunjun-connectors/chunjun-connector-emqx/src/main/java/com/dtstack/chunjun/connector/emqx/table/EmqxDynamicTableFactory.java b/chunjun-connectors/chunjun-connector-emqx/src/main/java/com/dtstack/chunjun/connector/emqx/table/EmqxDynamicTableFactory.java index 66c28b99e1..a982c96a2d 100644 --- a/chunjun-connectors/chunjun-connector-emqx/src/main/java/com/dtstack/chunjun/connector/emqx/table/EmqxDynamicTableFactory.java +++ b/chunjun-connectors/chunjun-connector-emqx/src/main/java/com/dtstack/chunjun/connector/emqx/table/EmqxDynamicTableFactory.java @@ -51,7 +51,7 @@ import static com.dtstack.chunjun.connector.emqx.options.EmqxOptions.ISCLEANSESSION; import static com.dtstack.chunjun.connector.emqx.options.EmqxOptions.PASSWORD; import static com.dtstack.chunjun.connector.emqx.options.EmqxOptions.QOS; -import static com.dtstack.chunjun.connector.emqx.options.EmqxOptions.TIMES; +import static com.dtstack.chunjun.connector.emqx.options.EmqxOptions.connectRetryTimes; import static com.dtstack.chunjun.connector.emqx.options.EmqxOptions.TOPIC; import static com.dtstack.chunjun.connector.emqx.options.EmqxOptions.USERNAME; @@ -117,7 +117,6 @@ public Set> requiredOptions() { Set> requiredOptions = new HashSet<>(); requiredOptions.add(BROKER); requiredOptions.add(TOPIC); - requiredOptions.add(TIMES); return requiredOptions; } @@ -129,6 +128,7 @@ public Set> optionalOptions() { optionalOptions.add(USERNAME); optionalOptions.add(PASSWORD); optionalOptions.add(FORMAT); + optionalOptions.add(connectRetryTimes); return optionalOptions; } diff --git a/chunjun-connectors/chunjun-connector-emqx/src/main/java/com/dtstack/chunjun/connector/emqx/util/MqttConnectUtil.java b/chunjun-connectors/chunjun-connector-emqx/src/main/java/com/dtstack/chunjun/connector/emqx/util/MqttConnectUtil.java index 14bcb40e60..d191c4254c 100644 --- a/chunjun-connectors/chunjun-connector-emqx/src/main/java/com/dtstack/chunjun/connector/emqx/util/MqttConnectUtil.java +++ b/chunjun-connectors/chunjun-connector-emqx/src/main/java/com/dtstack/chunjun/connector/emqx/util/MqttConnectUtil.java @@ -49,7 +49,7 @@ public class MqttConnectUtil { */ public static MqttClient getMqttClient(EmqxConf emqxConf, String clientId) { MqttClient client = null; - for (int i = 0; i <= emqxConf.getTimes(); i++) { + for (int i = 0; i <= emqxConf.getConnectRetryTimes(); i++) { try { client = new MqttClient(emqxConf.getBroker(), clientId); MqttConnectOptions options = new MqttConnectOptions(); @@ -73,7 +73,7 @@ public static MqttClient getMqttClient(EmqxConf emqxConf, String clientId) { } catch (InterruptedException interruptedException) { throw new RuntimeException(interruptedException); } - if (i == 60) { + if (i == emqxConf.getConnectRetryTimes()) { throw new RuntimeException(e); } } diff --git a/mintty.exe.stackdump b/mintty.exe.stackdump deleted file mode 100644 index 488fc09807..0000000000 --- a/mintty.exe.stackdump +++ /dev/null @@ -1,19 +0,0 @@ -Stack trace: -Frame Function Args -000FFFF1EBC 00180062B0E (001802AB0FB, 00180270E51, 0000000FDE9, 000FFFF0CE0) -000FFFF1EBC 0018004846A (00000000000, 000FFFF1D58, 0000011400C, 00000000000) -000FFFF1EBC 001800484A2 (00000000032, 000FFFF1F50, 0000000FDE9, 00000000010) -000FFFF1EBC 001801647A4 (00000000000, 00000000000, 00000000000, 00000000000) -000FFFF1EBC 001800BC88A (008002582F0, 000FFFF1F50, 000FFFF1ED0, 00800000010) -000FFFF1EBC 001800C58C1 (00000000001, 00000000000, 0000000043C, 7FF941B36F90) -000FFFFC3B0 001800C9038 (001004028AC, 000003C0E92, 7FF941703BB0, 000010A0A65) -000FFFFC3B0 001801473E7 (000FFFFC170, 00000000060, 000000001CD, 474E494E5241575B) -000FFFFC3B0 0018019598B (000FFFFC170, 00000000060, 000000001CD, 474E494E5241575B) -000FFFFC3B0 001004055AA (00000000000, 00000000070, 7FF9416EA5C3, FFFFFFFFFFFFFFFF) -001004A19F6 00100421476 (00000000003, 001004535FD, 000FFFFC5D0, 000FFFFC5D0) -001004A19F6 001004046EA (000FFFFC550, 001004EDC84, 7FF9416E9980, 00800000001) -000FFFFC550 0010046321D (001801BCF1A, 00000000000, 0000000000E, 000FFFFCD30) -000FFFFCD30 00180049B91 (00000000000, 00000000000, 00000000000, 00000000000) -000FFFFFFF0 00180047716 (00000000000, 00000000000, 00000000000, 00000000000) -000FFFFFFF0 001800477C4 (00000000000, 00000000000, 00000000000, 00000000000) -End of stack trace From 7a3e06f2fbfba136ef032d32362384d93c07e3fb Mon Sep 17 00:00:00 2001 From: "2038373094@qq.com" <2038373094@qq.com> Date: Thu, 25 Aug 2022 17:03:28 +0800 Subject: [PATCH 4/5] =?UTF-8?q?emqx=E5=9C=A8=E6=8D=95=E8=8E=B7=E5=BC=82?= =?UTF-8?q?=E5=B8=B8=E7=9A=84=E6=97=B6=E5=80=99=E9=87=8D=E8=BF=9E=E6=8E=A5?= =?UTF-8?q?=EF=BC=8C=E6=94=B9=E9=9D=9E=E5=BF=85=E8=A6=81=E9=85=8D=E7=BD=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../dtstack/chunjun/connector/emqx/options/EmqxOptions.java | 5 ++++- .../chunjun/connector/emqx/sink/EmqxOutputFormat.java | 2 +- .../connector/emqx/table/EmqxDynamicTableFactory.java | 2 +- 3 files changed, 6 insertions(+), 3 deletions(-) diff --git a/chunjun-connectors/chunjun-connector-emqx/src/main/java/com/dtstack/chunjun/connector/emqx/options/EmqxOptions.java b/chunjun-connectors/chunjun-connector-emqx/src/main/java/com/dtstack/chunjun/connector/emqx/options/EmqxOptions.java index 4f3571a7ec..828fb17a51 100644 --- a/chunjun-connectors/chunjun-connector-emqx/src/main/java/com/dtstack/chunjun/connector/emqx/options/EmqxOptions.java +++ b/chunjun-connectors/chunjun-connector-emqx/src/main/java/com/dtstack/chunjun/connector/emqx/options/EmqxOptions.java @@ -81,5 +81,8 @@ public class EmqxOptions { .withDescription("dclient.id.pre"); /** 重连接的次数 * */ public static final ConfigOption connectRetryTimes = - ConfigOptions.key("connectRetryTimes").intType().defaultValue(10).withDescription(" connectRetryTimes "); + ConfigOptions.key("connectRetryTimes") + .intType() + .defaultValue(10) + .withDescription(" connectRetryTimes "); } diff --git a/chunjun-connectors/chunjun-connector-emqx/src/main/java/com/dtstack/chunjun/connector/emqx/sink/EmqxOutputFormat.java b/chunjun-connectors/chunjun-connector-emqx/src/main/java/com/dtstack/chunjun/connector/emqx/sink/EmqxOutputFormat.java index a3c796269f..fcf85256f2 100644 --- a/chunjun-connectors/chunjun-connector-emqx/src/main/java/com/dtstack/chunjun/connector/emqx/sink/EmqxOutputFormat.java +++ b/chunjun-connectors/chunjun-connector-emqx/src/main/java/com/dtstack/chunjun/connector/emqx/sink/EmqxOutputFormat.java @@ -60,7 +60,7 @@ protected void writeSingleRecordInternal(RowData rowData) throws WriteRecordExce message.setQos(emqxConf.getQos()); client.publish(emqxConf.getTopic(), message); } catch (MqttException e) { - //当mqtt客户端连接异常报错的时候,再重新建立连接 + // 当mqtt客户端连接异常报错的时候,再重新建立连接 if (this.client == null || !this.client.isConnected()) { // 如果关闭的话,就重新连接 MqttConnectUtil.getMqttClient( diff --git a/chunjun-connectors/chunjun-connector-emqx/src/main/java/com/dtstack/chunjun/connector/emqx/table/EmqxDynamicTableFactory.java b/chunjun-connectors/chunjun-connector-emqx/src/main/java/com/dtstack/chunjun/connector/emqx/table/EmqxDynamicTableFactory.java index a982c96a2d..95aa568b99 100644 --- a/chunjun-connectors/chunjun-connector-emqx/src/main/java/com/dtstack/chunjun/connector/emqx/table/EmqxDynamicTableFactory.java +++ b/chunjun-connectors/chunjun-connector-emqx/src/main/java/com/dtstack/chunjun/connector/emqx/table/EmqxDynamicTableFactory.java @@ -51,9 +51,9 @@ import static com.dtstack.chunjun.connector.emqx.options.EmqxOptions.ISCLEANSESSION; import static com.dtstack.chunjun.connector.emqx.options.EmqxOptions.PASSWORD; import static com.dtstack.chunjun.connector.emqx.options.EmqxOptions.QOS; -import static com.dtstack.chunjun.connector.emqx.options.EmqxOptions.connectRetryTimes; import static com.dtstack.chunjun.connector.emqx.options.EmqxOptions.TOPIC; import static com.dtstack.chunjun.connector.emqx.options.EmqxOptions.USERNAME; +import static com.dtstack.chunjun.connector.emqx.options.EmqxOptions.connectRetryTimes; /** * @author chuixue From 399932c262792f38dd3d9e76a701f6aafa8e541a Mon Sep 17 00:00:00 2001 From: "2038373094@qq.com" <2038373094@qq.com> Date: Thu, 25 Aug 2022 17:39:55 +0800 Subject: [PATCH 5/5] =?UTF-8?q?emqx=E6=A8=A1=E5=9D=97---=E4=B8=AD=E6=96=87?= =?UTF-8?q?=E6=B3=A8=E9=87=8A=E5=8F=98=E6=88=90=E8=8B=B1=E6=96=87=E6=B3=A8?= =?UTF-8?q?=E9=87=8A?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../dtstack/chunjun/connector/emqx/options/EmqxOptions.java | 3 ++- .../chunjun/connector/emqx/sink/EmqxOutputFormat.java | 5 +++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/chunjun-connectors/chunjun-connector-emqx/src/main/java/com/dtstack/chunjun/connector/emqx/options/EmqxOptions.java b/chunjun-connectors/chunjun-connector-emqx/src/main/java/com/dtstack/chunjun/connector/emqx/options/EmqxOptions.java index 828fb17a51..f98ceb3278 100644 --- a/chunjun-connectors/chunjun-connector-emqx/src/main/java/com/dtstack/chunjun/connector/emqx/options/EmqxOptions.java +++ b/chunjun-connectors/chunjun-connector-emqx/src/main/java/com/dtstack/chunjun/connector/emqx/options/EmqxOptions.java @@ -79,7 +79,8 @@ public class EmqxOptions { .stringType() .defaultValue("writer") .withDescription("dclient.id.pre"); - /** 重连接的次数 * */ + + /** Number of reconnections * */ public static final ConfigOption connectRetryTimes = ConfigOptions.key("connectRetryTimes") .intType() diff --git a/chunjun-connectors/chunjun-connector-emqx/src/main/java/com/dtstack/chunjun/connector/emqx/sink/EmqxOutputFormat.java b/chunjun-connectors/chunjun-connector-emqx/src/main/java/com/dtstack/chunjun/connector/emqx/sink/EmqxOutputFormat.java index fcf85256f2..793bff2457 100644 --- a/chunjun-connectors/chunjun-connector-emqx/src/main/java/com/dtstack/chunjun/connector/emqx/sink/EmqxOutputFormat.java +++ b/chunjun-connectors/chunjun-connector-emqx/src/main/java/com/dtstack/chunjun/connector/emqx/sink/EmqxOutputFormat.java @@ -60,9 +60,10 @@ protected void writeSingleRecordInternal(RowData rowData) throws WriteRecordExce message.setQos(emqxConf.getQos()); client.publish(emqxConf.getTopic(), message); } catch (MqttException e) { - // 当mqtt客户端连接异常报错的时候,再重新建立连接 + // When the mqtt client connection exception reports an error, re-establish the + // connection if (this.client == null || !this.client.isConnected()) { - // 如果关闭的话,就重新连接 + // If the mqtt connection is closed, the connection is reestablished MqttConnectUtil.getMqttClient( emqxConf, CLIENT_ID_WRITER.defaultValue() + LocalTime.now().toSecondOfDay() + jobId);