diff --git a/chunjun-connectors/chunjun-connector-emqx/src/main/java/com/dtstack/chunjun/connector/emqx/sink/EmqxOutputFormat.java b/chunjun-connectors/chunjun-connector-emqx/src/main/java/com/dtstack/chunjun/connector/emqx/sink/EmqxOutputFormat.java index 793bff2457..0b2d7376a4 100644 --- a/chunjun-connectors/chunjun-connector-emqx/src/main/java/com/dtstack/chunjun/connector/emqx/sink/EmqxOutputFormat.java +++ b/chunjun-connectors/chunjun-connector-emqx/src/main/java/com/dtstack/chunjun/connector/emqx/sink/EmqxOutputFormat.java @@ -25,6 +25,8 @@ import org.apache.flink.table.data.RowData; +import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; +import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; @@ -51,6 +53,38 @@ protected void openInternal(int taskNumber, int numTasks) { MqttConnectUtil.getMqttClient( emqxConf, CLIENT_ID_WRITER.defaultValue() + LocalTime.now().toSecondOfDay() + jobId); + + client.setCallback( + new MqttCallback() { + + @Override + public void connectionLost(Throwable cause) { + LOG.warn("connection lost and reconnect , e = {}", cause.getMessage()); + if (client != null && client.isConnected()) { + try { + client.disconnect(); + } catch (MqttException e) { + LOG.error(e.getMessage()); + } + } + + try { + client = MqttConnectUtil.getMqttClient(emqxConf, jobId); + } catch (Exception e) { + LOG.error( + e.getMessage() + + "\n" + + " can not reconnect success, please restart job!!!"); + } + } + + @Override + public void messageArrived(String s, MqttMessage mqttMessage) + throws Exception {} + + @Override + public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {} + }); } @Override @@ -59,16 +93,6 @@ protected void writeSingleRecordInternal(RowData rowData) throws WriteRecordExce MqttMessage message = (MqttMessage) rowConverter.toExternal(rowData, new MqttMessage()); message.setQos(emqxConf.getQos()); client.publish(emqxConf.getTopic(), message); - } catch (MqttException e) { - // When the mqtt client connection exception reports an error, re-establish the - // connection - if (this.client == null || !this.client.isConnected()) { - // If the mqtt connection is closed, the connection is reestablished - MqttConnectUtil.getMqttClient( - emqxConf, - CLIENT_ID_WRITER.defaultValue() + LocalTime.now().toSecondOfDay() + jobId); - } - throw new RuntimeException(e); } catch (Exception e) { throw new WriteRecordException("", e, 0, rowData); }