MQTT: Publish/Subscribe en tiempo real (+ Ejemplo Java).

La comunicación basada en eventos es uno de los paradigmas mas usados en la actualidad, ayudando a desacoplar los sistemas y poniendo a disposición información en tiempo real. En las sistemas basados en microservicios se emplean los términos event-driving y event sourcing; el primero refiere a que la arquitectura es guiada por eventos del dominio que se producen bajo ciertas condiciones, mientra en la segunda la emisión de eventos es «más agresiva» emitiendo eventos por las acciones que ocurren.

Estos eventos se propagan habitualmente empleando brokers o intermediarios que tienen la capacidad de entregar los eventos que reciben a quienes se suscriben. Para este propósito existen protocolos específicos, que en dependencia del entorno pueden emplearse, los mas comunes son AMQP (Advanced Message Queuing Protocol), WebSockets y MQTT (MQ Telemetry Transport).

En este artículo hablaremos de MQTT, siendo de los tres protocolos uno de los mas empleados aplicaciones IoT (Internet de las Cosas) por su ligereza y calidad de servicio (QoS). Este protocolo por su velocidad también puede ser empleado por comunicar microservicios.

Características principales de MQTT

  • Es un protocolo de comunicación M2M (machine-to-machine) de tipo message queue.
  • Emplea TCP/IP como base para la comunicación.
  • Es un protocolo con estado, cuando se inicia una conexión esta se mantiene abierta y se reutiliza a diferencia de http 1.
  • Presenta capacidades de retención de mensajes no enviados.
  • Emplea por defecto el puerto 1883/8883(TLS).
  • Dispone de tres niveles de calidad de servicio (QoS: 0, 1, 2)
  • Emplea el paradigma Publish / Subscribe (1 a muchos).
  • La comunicación se basa en tópicos en el broker, un tópico puede tener varios suscriptores y el mensaje les llega a todos.
  • MQTT es 93 veces mas rápido que Http. (Cita)

Forma de comunicación

La comunicación entre publicadores y suscriptores se realiza a través del broker de comunicación. A continuación el esquema:

Implementaciones

El protocolo posee varias implementaciones en diversos brokers siendo los más populares:

  • HiveMQ
  • Mosquitto
  • Mosca
  • Moquette
  • VerneQ

Además de los brokers existen varias implementaciones de bibliotecas para usar en las aplicaciones que intercambian mensajes a través de los brokers. En este artículo usaremos Mosquitto como broker MQTT con la biblioteca de Java Eclipse Paho.

Instalando el broker MQTT Mosquitto

Mosquitto, es un MQTT libre implementado por la fundación Eclipse. Para usarlo emplearemos docker, la imagen esta disponible en dockerhub.

Paso 1: Hacer pull a la imagen de mosquitto.

docker pull eclipse-mosquitto

Paso 2: Levantar la instancia de docker, usaremos los parámetros por defecto.

docker run -it -p 1883:1883 eclipse-mosquitto

Listo, ya tenemos nuestro broker funcionando, podemos hacer un netstat -tln y veremos que el puerto 1883 del docker está mapeado a nuestro 1883.

Otra forma de verificar es empleando un cliente MQTT de escritorio, recomiendo MQTT.fx, está escrito en Java y funciona en cualquier plataforma. Cuando lo abrimos, lo conectamos y se ve así:

Creando un publicador en Java

La librería mas popular para trabajar con Java y MQTT es Eclipse Paho, esta posee implementaciones en otros lenguajes también.

Creamos un proyecto Java, basado en Maven y añadimos la dependencia:

  <dependency>
   <groupId>org.eclipse.paho</groupId>
   <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
   <version>1.2.2</version>
 </dependency>

Luego con el siguiente código es suficiente para crear un publicador que se conecte a nuestro broker y escriba en el topic test, el mensaje de la variable content.

public class Publisher2 {

    public static void main(String[] args) {

        String topic        = "test";
        int qos             = 2;
        String broker       = "tcp://localhost:1883";
        String clientId     = "JavaSample";
        String content = "Hola este es un mensaje";
        MemoryPersistence persistence = new MemoryPersistence();

        try {
            MqttClient sampleClient = new MqttClient(broker, clientId, persistence);
            MqttConnectOptions connOpts = new MqttConnectOptions();
            connOpts.setCleanSession(true);
            System.<em>out</em>.println("Connecting to broker: " + broker);
            sampleClient.connect(connOpts);
            System.<em>out</em>.println("Connected");
            System.<em>out</em>.println("Publishing message");
            MqttMessage message = new MqttMessage(content.getBytes());
            message.setQos(qos);
            sampleClient.publish(topic, message);
            System.<em>out</em>.println("Message published");
            sampleClient.disconnect();
            System.<em>out</em>.println("Disconnected");
            System.<em>exit</em>(0);
        } catch(MqttException me) {
            System.<em>out</em>.println("reason "+me.getReasonCode());
            System.<em>out</em>.println("msg "+me.getMessage());
            System.<em>out</em>.println("loc "+me.getLocalizedMessage());
            System.<em>out</em>.println("cause "+me.getCause());
            System.<em>out</em>.println("excep "+me);
            me.printStackTrace();
        }
    }
}

Por el broker podemos pasar cualquier objeto, que implemente Serializable como arreglo de byte[].

Creando un suscriptor en Java

Ahora crearemos el componente que estará esperando que el broker le «pushee» los mensajes en tiempo real cuando estos sean publicados.

public class MqttSubscriber implements MqttCallback {

    /** The broker url. */
    private static final String brokerUrl ="tcp://localhost:1883";

    /** The client id. */
    private static final String clientId = "clientId";

    /** The topic. */
    private static final String topic = "test";

    public void subscribe() {
        //	logger file name and pattern to log
        MemoryPersistence persistence = new MemoryPersistence();

        try
        {

            MqttClient sampleClient = new MqttClient(brokerUrl, clientId, persistence);
            MqttConnectOptions connOpts = new MqttConnectOptions();
            connOpts.setCleanSession(true);

            System.out.println("checking");
            System.out.println("Mqtt Connecting to broker: " + brokerUrl);

            sampleClient.connect(connOpts);
            System.out.println("Mqtt Connected");

            sampleClient.setCallback(this);
            sampleClient.subscribe(topic);

            System.out.println("Subscribed");
            System.out.println("Listening");

        } catch (MqttException me) {
            System.out.println(me);
        }
    }

    //Called when the client lost the connection to the broker
    public void connectionLost(Throwable arg0) {

    }

    //Called when a outgoing publish is complete
    public void deliveryComplete(IMqttDeliveryToken arg0) {

    }

    public void messageArrived(String topic, MqttMessage message) {
        System.out.println("Topic:" + topic);
        System.out.println("Message: " +message.toString());
    }

}

De esta forma hemos creado un suscriptor que queda escuchando todo lo que se publica en el topico «test», podemos usar algunos comodines como por ejemplo «#» para escuchar en «modo promiscuo» todos los tópicos, etc.

Espero te haya gustado el artículo, déjanos tu comentario y/o comparte.

Deja una respuesta

Tu dirección de correo electrónico no será publicada. Los campos obligatorios están marcados con *

SACAViX Tech