Skip to content

New Feature Plan: Automatic Reconnect and Disconnected Buffering #9

@jpwsutton

Description

@jpwsutton

Automatic Reconnect and Disconnected Publishing Plan

Currently, the Paho Java client is lacking two major areas of functionality: Automatic Reconnect and Disconnected (or Offline) Publishing.
The goal is to implement these features in time for the next release Neon.

This issue aims to outline the plan for adding this new functionality into the client and has been modeled off Mike Tran's plan and work for the same functionality in the Javascript Client.

Recap: Possible Client States

There are 5 main potential states that the client can be in. The User will usually desire the client to either be in the connected or disconnected states.

  • never connected: This is the initial state of the client where:
    • The client has not yet sent a connect request.
    • The client has never received a CONNACK after it's initial connect request.
  • connecting: A connect request is in progress.
  • connected: The client is connected and ready to send and receive messages.
  • disconnecting: A disconnect request is in progress.
  • disconnected: The client goes from connected to disconnected state when:
    • A disconnect request has been completed
    • As a result of a networking error, or the server is no longer reachable.

What does it do?

Automatic Reconnect

Will automatically attempt to reconnect to the broker (or one of the servers in the host list) while the client is in disconnected state.

  • The client will not perform automatic reconnection if it is not in disconnected state.
  • When the connection is list, the connectionLost() callback is called before the client starts the reconnect process. Since the state of the client is disconnected, the application is allowed to call the connect function with new connection options if they wish.
  • When disconnect is called while connected, the client goes to the disconnected state and automatic reconnect is disabled.
  • If the client application calls connect after it had reconnected, an invalid state error will be thrown.
  • The client library does not subscribe for the application after it successfully reconnects. A callback will be provided to notify the application when it has been reconnected allowing it to make any subscriptions to topics itself.

Disconnected Publishing

Will allow the client to save messages in a buffer whilst the client is in the disconnected state.

  • Once the client is reconnected (To the same or different broker), the buffered messages are scheduled to be sent.
  • To maintain order of messages, the client library must send buffered messages before sending new messages.
  • The client library does not save any messages while the client is in the never connected state. So it cannot send any messages before it connects for the first time.
  • When disconnect is called while connected, the client goes to disconnected state and Disconnected Publishing remains active if enabled.

API Changes

Automatic Reconnect

  • The following optional attributes will be added to the MqttConnectOptions class:
    • setReconnect(boolean reconnect) : If true, the client will attempt to reconnect when the connection is lost. Default: False
  • A new interface called MqttCallbackExtended will be created which will extend MqttCallback modifying / adding a few methods. This will be set in the existing setCallback method in the client:
    • Addition of a new callback connectionComplete(boolean reconnect, String serverUri). This would not replace the existing IMqttToken that you get if you call connectWithResult (So that we don't break any functionality). However it would serve the same purpose, it would be called every time that the client connects or reconnects to the broker. This will allow the application to re-make any subscriptions that were lost or to send any held messages if it is not using Disconnected Publishing.
      The boolean reconnect attribute is set to true if the connection was the result of an automatic reconnection, else it is false. The String serverUri attribute contains the URI of the server that the connection was re-made to, this is useful in scenarios where multiple server URIs are provided to the MqttConnectOptions.
  • The method reconnect() will be added to the MqttAsyncClient class, this will make the client attempt to reconnect straight away if in the middle of a retry interval. I'm thinking about reseting the retry interval if this is ever called with the assumption that the user knows what they're doing.
  • If the cleanSession flag is false, then any subscriptions would not have to be re-made
  • Once the client has been disconnected, the client will attempt to connect with an increasing time interval. Starting at 1 second, doubling on each failed attempt up to 2 minutes. This prevents both waiting an unnecessary amount of time between reconnect attempts, and also from wasting bandwidth from attempting to connect too frequently.

Disconnected Publising

  • To maintain the order of the messages, the client must ensure that buffered messages are scheduled to be sent before new messages.
  • A new optional class called DisconnectedBufferOptions will be created with the following attributes & relevant getters and setters:
    • disconnectedPublishing: If true, the client will store messages whilst disconnected. Default: False
    • disconnectedBufferSize: The maximum number of messages that will be stored in memory while the client is disconnected. Default: 5000
    • persistDisconnectedBuffer: If true, the client will persist the messages to disk, if false or not present, the messages will only be saved in memory. Default: False
    • deleteOldestBufferedMessages:If true, the client will delete the 0th message in the buffer once it is full and a new message is published. Default: False
  • The following optional methods will be added to the MqttAsyncClient class:
    • void setDisconnectedBufferOptions: Sets the DisconnectedBufferOptions before a connect.
    • int getBufferedMessagesCount : Returns the number of messages in the buffer.
    • MqttMessage getBufferedMessage(int index): Returns the MqttMessage at the index location.
    • void deleteBufferedMessage(int index) : Deletes the buffered message at the index location.

The following change will be made to the MqttAsyncClient class:

  • publish : Currently throws an MqttException. If the buffer is full, this will be thrown containing a message explaining that the Message buffer is full.

Sample application

Example 1

This application wants to use both Automatic Reconnect and Disconnected Publishing. The Application does not want to persist buffered messages.

public static void main( String[] args )
    {

        String topic        = "/greenhouse/temperature";
        String broker       = "tcp://iot.eclipse.org:1883";
        String clientId     = "TemperatureMonitor_42";
        MemoryPersistence persistence = new MemoryPersistence();

        try {
            MqttClient sampleClient = new MqttClient(broker, clientId, persistence);
            MqttConnectOptions connOpts = new MqttConnectOptions();
            connOpts.setCleanSession(true);

            sampleClient.setCallback(new MqttCallbackExtended() {

                public void messageArrived(String topic, MqttMessage message) throws Exception {
                    // Not used
                }

                public void deliveryComplete(IMqttDeliveryToken token) {
                    // Not used
                }

                public void connectionLost(Throwable cause) {
                    System.out.println("Connection Lost: " + cause.getMessage());
                }

                public void connectionComplete(boolean reconnect) {
                    // Make or re-make subscriptions here
                    if(reconnect){
                        System.out.println("Automatically Reconnected to Broker!");
                    } else {
                        System.out.println("Connected To Broker for the first time!");
                    }
                }
            });


            sampleClient.setReconnect(true);  // Enable Automatic Reconnect

            DisconnectedBufferOptions bufferOpts = new DisconnectedBufferOptions();
            bufferOpts.setDisconnectedPublishing(true); // Enable Disconnected Publishing
            bufferOpts.setDisconnectedBufferSize(100);  // Only Store 1000 messages in the buffer
            bufferOpts.setPersistDisconnectedBuffer(false);  // Do not persist the buffer
            bufferOpts.deleteOldestBufferedMessages(true); // Delete oldest messages once the buffer is full

            sampleClient.setDisconnectedBufferOptions(bufferOpts);

            System.out.println("Connecting to broker: "+broker);
            sampleClient.connect(connOpts);
            System.out.println("Connected");


            /*
             * Sample code to continually publish messages
             */
            Timer timer = new Timer();
            timer.scheduleAtFixedRate(new TimerTask() {

                @Override
                public void run() {
                    // Publish the current Temperature
                    String temp = String.format("%.2f", getTemperature());
                    System.out.format("Publising temperature %s to topic %s. ", temp, topic);
                    MqttMessage message = new MqttMessage(temp.getBytes());
                    message.setQos(0);
                    try {
                        sampleClient.publish(topic, message);
                    } catch (MqttException e) {
                        e.printStackTrace();
                    }
                }
            }, 5000, 5000);  // Every 5 seconds



        } catch(MqttException me) {
            System.out.println("reason "+me.getReasonCode());
            System.out.println("msg "+me.getMessage());
            System.out.println("loc "+me.getLocalizedMessage());
            System.out.println("cause "+me.getCause());
            System.out.println("excep "+me);
            me.printStackTrace();
        }
    }

Before I start work on this, I'd be very interested in hearing back from the community. Because of the very nature of the features that need to be implemented, it means adding a lot to the API which would mean a small amount of work for developers upgrading their application to use Neon (For example the addition of a callback to MqttCallback). If anyone can spot anything that might cause issues down the line, or thinks that there might be a better way of accomplishing this functionality please do comment!

Metadata

Metadata

Assignees

Type

No type

Projects

No projects

Milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions